最近学习FlinkCDC遇到一个问题,在连MySQL时明明设置了'scan.startup.mode' = 'initial',并且biglog也开启了,但是就是获取不到增量数据,求高人指点
tableEnv.executeSql("CREATE TABLE waybill ( " +
" id Integer primary key, " +
" PrimaryWayBillCode STRING, " +
" CarrierCode STRING, " +
" Channel STRING, " +
" WarehouseCode STRING, " +
" StoreCode_OMS STRING, " +
" ShipDate TIMESTAMP(0) " +
") WITH ( " +
" 'connector' = 'mysql-cdc', " +
" 'scan.startup.mode' = 'initial', " +
" 'hostname' = 'localhost', " +
" 'port' = '3306', " +
" 'username' = 'root', " +
" 'password' = 'root', " +
" 'database-name' = 'flink_test', " +
" 'table-name' = 'waybill' " +
")");
在操作数据库数据时Flink程序可以实时监听到
解决了,没有开启checkpoint导致,加上下面代码即可
env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointStorage("file:///E:/data/flink_checkpoint");