关于Flink广播流中主流和广播流执行顺序的问题

业务需要从mysql导入10w维表数据(维表数据实时会变化),我想着通过cdc广播流实时动态加载到mapstate里。但存在processBroadcastElement 中流没有完全初始化processElement便开始执行了,导致主流中查询匹配不到维表数据从而造成数据丢失

代码如下

@Override
    public void processBroadcastElement(String value,
                                        BroadcastProcessFunction<JSONObject, String, String>.Context context,
                                        Collector<String> collector) throws Exception {

        JSONObject jsonObject = JSON.parseObject(value).getJSONObject("after");

        if (jsonObject.containsKey(Constants.PRODUCT_NAME)) {
            //获取产品名称bean对象
            String productName = jsonObject.getString(Constants.PRODUCT_NAME);
            String symbolName = jsonObject.getString(Constants.SYMBOL_NAME);
            String splitCode = jsonObject.getString(Constants.SPLIT_CODE);
            String manufactorShortName = jsonObject.getString(Constants.MANUFACTOR_SHORT_NAME);
            String[] productType = getProductType(jsonObject.getInteger(Constants.PRODUCT_FRONT_TYPE));

            BroadcastState<String, ProductConfig> productState = context.getBroadcastState(productMapStateDescriptor);

            productState.put(productName, new ProductConfig(symbolName, splitCode, manufactorShortName, productType, productName));
        } 
    }


    @Override
    public void processElement(JSONObject jsonObject,
                               BroadcastProcessFunction<JSONObject, String, String>.ReadOnlyContext context,
                               Collector<String> collector) throws Exception {
        //获取广播的配置数据
       ReadOnlyBroadcastState<String, ProductConfig> productState = context.getBroadcastState(productMapStateDescriptor);

                if (productState.contains(epKeyword)) {
                            ProductConfig productConfig = productState.get(epKeyword);

                            epSymbolName = productConfig.getSymbolName();
                            epSplitCode = productConfig.getSplitCode();
                            epManufactorShortName = productConfig.getManufactorShortName();
                            epSupplyType = productConfig.getProductType();

                            commodityName = productConfig.getProductName();

                            Product product = new Product(xxx);

                            context.output(productTag, product);
                        }
  }

代码执行一般得等个三四分钟,等mapstate加载完才会有数据,不知是不是维表数据太多再加上本地测试的缘故。
知道问题所在,也尝试找过解决办法,这条blog有个评论说根据state状态判断,但该怎么判断而不造成数据丢失我还是没有思路

https://blog.csdn.net/wangpei1949/article/details/99698978