flink cdc 使用debezium 参数不生效

因为有需求要记录一张表删除的数据,计划使用flink cdc来做,因为需要过滤,所以使用了debezium.skipped.operations参数,但是这个参数并未生效,还是会把insert update 记录同步到sink,想问下是哪里出问题了呢?下面是代码。

package org.example.sqlcdc;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.util.Properties;

public class SqlCdc {
    public static void main(String[] args) throws Exception {
//        Properties debeziumProperties = new Properties();
//        debeziumProperties.put("skipped.operations", "c,u,t");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        tableEnv.executeSql("create table demo1(" +
                " id INT," +
                " name STRING," +
                " primary key (id) not enforced" +
                ")WITH(" +
                " 'connector' = 'mysql-cdc'," +
                " 'hostname' = 'localhost'," +
                " 'port' = '3306'," +
                " 'username' = 'root'," +
                " 'password' = 'root@root'," +
                " 'database-name' = 'local'," +
                " 'table-name' = 'from_table'," +
                " 'debezium.skipped.operations' = 'c,u,t'" +
                ")");

        tableEnv.executeSql("create table demo2(" +
                " id INT," +
                " name STRING," +
                " primary key (id) not enforced" +
                ")WITH(" +
                " 'connector' = 'jdbc'," +
                " 'url' = 'jdbc:mysql://localhost:3306/local?serverTimezone=UTC'," +
                " 'username' = 'root'," +
                " 'password' = 'root@root'," +
                " 'table-name' = 'to_table'," +
                " 'driver' = 'com.mysql.cj.jdbc.Driver'," +
                " 'scan.fetch-size' = '200'" +
                ")");
        tableEnv.executeSql("insert into demo2 select id,name from demo1");
        tableEnv.executeSql("select * from demo1").print();
        env.execute("FLinkSql Cdc");

    }
}