如标题
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(DuyanConstant.KAFKA_SERVER)
.setTopics(DuyanConstant.DEFAULT_TOPIC)
.setGroupId(DuyanConstant.CONSUMER)
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
.hostname(DuyanConstant.HOST_BI_REPORT)
.username(DuyanConstant.USER_FLINK_CDC)
.password(DuyanConstant.PASSWD_FLINK_CDC)
.databaseList("bi_report")
.tableList("bi_report.table_mapping")
.deserializer(new MyStringDeserializationSchema())
.startupOptions(StartupOptions.initial())
.build();
先执行BroadcastProcessFunction里面的processElement部分,然后再执行processBroadcastElement,导致数据流里面的内容在程序启动的时候无法关联到广播流的数据
网上查到有在BroadcastProcessFunction的open里面先加载一次配置表的数据,后面增量再通过广播流得分方式,暂时还没实现成功