flinksql的方式通过flinkcdc将mysql的binlog写入kafka,一定要用upsert-kafka吗

#遇到的问题
flinksql的通过flinkcdc从mysql读取一张表,写入kafka,报错信息:

Exception in thread "main" org.apache.flink.table.api.TableException: Table sink 'default_catalog.default_database.kafka_t4' doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[default_catalog, default_database, cdc_t4]], fields=[id, name])

#运行的代码

tableEnv.executeSql(
                "CREATE TABLE cdc_t4 (\n" +
                        "    id bigint,\n" +
                        "    name varchar,\n" +
                        "    PRIMARY KEY (id) NOT ENFORCED\n" +
                        "  ) WITH (\n" +
                        "    'connector' = 'mysql-cdc',\n" +
                        "    'hostname' = 'cnap421',\n" +
                        "    'port' = '3306',\n" +
                        "    'server-time-zone' = 'Asia/Shanghai',\n" +
                        "    'debezium.snapshot.mode' = 'initial',\n" + //initial,earliest,latest
                        "    'username' = 'root',\n" +
                        "    'password' = 'sgs123654.',\n" +
                        "    'database-name' = 'test',\n" +
                        "    'table-name' = 't4'\n" +
                        "  )"
        );

        tableEnv.executeSql(
                "create table kafka_t4(id bigint, name varchar"
                        + ") with(\n"
                        + "'connector' = 'kafka',\n"
                        + "'topic' = 'test_output_sink',\n"
                        + "'properties.bootstrap.servers' = '10.168.136.31:9092,10.168.136.28:9092',\n"
                        + "'format' = 'json'\n"
                        + ")"

        );

        tableEnv.executeSql(
                "insert into kafka_t4 select * from cdc_t4"
        );


#解决思路
将connect的kafka换成upsert-kafka就可以了,我以前一直使用flink的datastream的api方式处理的,没注意这个。
#现在的疑问
flinksql的方式通过flinkcdc将mysql的binlog写入kafka,一定要用upsert-kafka吗

【以下回答由 GPT 生成】

问题中的报错信息是由于Table sink 'default_catalog.default_database.kafka_t4'不支持消费更新和删除的更改,而节点TableSourceScan(table=[['default_catalog', 'default_database', 'cdc_t4']], fields=[id, name])生成了此类更改。这意味着您所使用的Kafka表不支持CDC的更改日志格式。为了解决这个问题,您可以尝试使用upsert-kafka来正确处理更改数据。

upsert-kafka是Flink的一个表连接器,可以正确地处理来自CDC的更改数据并写入Kafka。下面是如何在您的代码中使用upsert-kafka的例子:

首先,确保您的项目的pom.xml文件包含了Flink对应版本的upsert-kafka依赖。例如:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>对应版本</version>
</dependency>

其次,替换您的代码中Kafka表的创建部分,使用upsert-kafka连接器,并为其指定主键字段。如下所示:

tableEnv.executeSql(
    "CREATE TABLE kafka_t4 (\n" +
    "    id bigint,\n" +
    "    name varchar,\n" +
    "    PRIMARY KEY (id) NOT ENFORCED\n" +
    ") WITH (\n" +
    "    'connector' = 'upsert-kafka',\n" +
    "    'topic' = 't4',\n" +
    "    'properties.bootstrap.servers' = 'localhost:9092',\n" +
    "    'properties.group.id' = 'testGroup',\n" +
    "    'format' = 'json',\n" +
    "    'key.fields' = 'id'  // 指定主键字段\n" +
    ")"
);

在这里,'connector' = 'upsert-kafka'表示使用upsert-kafka连接器,'key.fields' = 'id'指定了您的表的主键字段是'id'。使用upsert-kafka连接器,Flink将正确地处理来自CDC的更改数据,并将其写入Kafka。

请注意,您需要根据您使用的Flink和upsert-kafka版本来选择相应的依赖和配置。

希望这个解决方案对您有所帮助,如果还有其他问题请随时提问。



【相关推荐】



如果你已经解决了该问题, 非常希望你能够分享一下解决方案, 写成博客, 将相关链接放在评论区, 以帮助更多的人 ^-^

引用 皆我百晓生 小程序回复内容作答:

在Flink SQL中,如果要使用Flink CDC将MySQL的binlog写入Kafka,确实需要使用upsert-kafka连接器。这是因为Flink CDC会产生insert、update和delete操作,而传统的Kafka连接器无法处理这些操作。而upsert-kafka连接器可以将这些操作转换成Kafka消息的插入和更新操作。

如果您想使用普通的Kafka连接器,可以考虑在Flink SQL中先使用Flink CDC将MySQL的binlog写入到一个临时表中,然后再使用另外的Flink Job将临时表中的数据通过Flink DataStream API写入Kafka。这样可以实现将MySQL的binlog写入Kafka,但需要编写额外的代码来处理数据的转换和写入。

总之,使用upsert-kafka连接器是官方推荐的方式,可以更方便地将CDC数据写入Kafka。如果使用普通的Kafka连接器,需要额外的工作来处理数据转换和写入。