业务需要从mysql导入10w维表数据(维表数据实时会变化),我想着通过cdc广播流实时动态加载到mapstate里。但存在processBroadcastElement 中流没有完全初始化processElement便开始执行了,导致主流中查询匹配不到维表数据从而造成数据丢失
代码如下
@Override
public void processBroadcastElement(String value,
BroadcastProcessFunction<JSONObject, String, String>.Context context,
Collector<String> collector) throws Exception {
JSONObject jsonObject = JSON.parseObject(value).getJSONObject("after");
if (jsonObject.containsKey(Constants.PRODUCT_NAME)) {
//获取产品名称bean对象
String productName = jsonObject.getString(Constants.PRODUCT_NAME);
String symbolName = jsonObject.getString(Constants.SYMBOL_NAME);
String splitCode = jsonObject.getString(Constants.SPLIT_CODE);
String manufactorShortName = jsonObject.getString(Constants.MANUFACTOR_SHORT_NAME);
String[] productType = getProductType(jsonObject.getInteger(Constants.PRODUCT_FRONT_TYPE));
BroadcastState<String, ProductConfig> productState = context.getBroadcastState(productMapStateDescriptor);
productState.put(productName, new ProductConfig(symbolName, splitCode, manufactorShortName, productType, productName));
}
}
@Override
public void processElement(JSONObject jsonObject,
BroadcastProcessFunction<JSONObject, String, String>.ReadOnlyContext context,
Collector<String> collector) throws Exception {
//获取广播的配置数据
ReadOnlyBroadcastState<String, ProductConfig> productState = context.getBroadcastState(productMapStateDescriptor);
if (productState.contains(epKeyword)) {
ProductConfig productConfig = productState.get(epKeyword);
epSymbolName = productConfig.getSymbolName();
epSplitCode = productConfig.getSplitCode();
epManufactorShortName = productConfig.getManufactorShortName();
epSupplyType = productConfig.getProductType();
commodityName = productConfig.getProductName();
Product product = new Product(xxx);
context.output(productTag, product);
}
}
代码执行一般得等个三四分钟,等mapstate加载完才会有数据,不知是不是维表数据太多再加上本地测试的缘故。
知道问题所在,也尝试找过解决办法,这条blog有个评论说根据state状态判断,但该怎么判断而不造成数据丢失我还是没有思路
https://blog.csdn.net/wangpei1949/article/details/99698978