代码如下
@Override
public void processBroadcastElement(String value,
BroadcastProcessFunction<JSONObject, String, String>.Context context,
Collector<String> collector) throws Exception {
JSONObject jsonObject = JSON.parseObject(value).getJSONObject("after");
if (jsonObject.containsKey(Constants.PRODUCT_NAME)) {
//获取产品名称bean对象
String productName = jsonObject.getString(Constants.PRODUCT_NAME);
String symbolName = jsonObject.getString(Constants.SYMBOL_NAME);
String splitCode = jsonObject.getString(Constants.SPLIT_CODE);
String manufactorShortName = jsonObject.getString(Constants.MANUFACTOR_SHORT_NAME);
String[] productType = getProductType(jsonObject.getInteger(Constants.PRODUCT_FRONT_TYPE));
BroadcastState<String, ProductConfig> productState = context.getBroadcastState(productMapStateDescriptor);
productState.put(productName, new ProductConfig(symbolName, splitCode, manufactorShortName, productType, productName));
}
}
@Override
public void processElement(JSONObject jsonObject,
BroadcastProcessFunction<JSONObject, String, String>.ReadOnlyContext context,
Collector<String> collector) throws Exception {
//获取广播的配置数据
ReadOnlyBroadcastState<String, ProductConfig> productState = context.getBroadcastState(productMapStateDescriptor);
if (productState.contains(epKeyword)) {
ProductConfig productConfig = productState.get(epKeyword);
epSymbolName = productConfig.getSymbolName();
epSplitCode = productConfig.getSplitCode();
epManufactorShortName = productConfig.getManufactorShortName();
epSupplyType = productConfig.getProductType();
commodityName = productConfig.getProductName();
Product product = new Product(xxx);
context.output(productTag, product);
}
}