#遇到的问题
flinksql的通过flinkcdc从mysql读取一张表,写入kafka,报错信息:
Exception in thread "main" org.apache.flink.table.api.TableException: Table sink 'default_catalog.default_database.kafka_t4' doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[default_catalog, default_database, cdc_t4]], fields=[id, name])
#运行的代码
tableEnv.executeSql(
"CREATE TABLE cdc_t4 (\n" +
" id bigint,\n" +
" name varchar,\n" +
" PRIMARY KEY (id) NOT ENFORCED\n" +
" ) WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = 'cnap421',\n" +
" 'port' = '3306',\n" +
" 'server-time-zone' = 'Asia/Shanghai',\n" +
" 'debezium.snapshot.mode' = 'initial',\n" + //initial,earliest,latest
" 'username' = 'root',\n" +
" 'password' = 'sgs123654.',\n" +
" 'database-name' = 'test',\n" +
" 'table-name' = 't4'\n" +
" )"
);
tableEnv.executeSql(
"create table kafka_t4(id bigint, name varchar"
+ ") with(\n"
+ "'connector' = 'kafka',\n"
+ "'topic' = 'test_output_sink',\n"
+ "'properties.bootstrap.servers' = '10.168.136.31:9092,10.168.136.28:9092',\n"
+ "'format' = 'json'\n"
+ ")"
);
tableEnv.executeSql(
"insert into kafka_t4 select * from cdc_t4"
);
#解决思路
将connect的kafka换成upsert-kafka就可以了,我以前一直使用flink的datastream的api方式处理的,没注意这个。
#现在的疑问
flinksql的方式通过flinkcdc将mysql的binlog写入kafka,一定要用upsert-kafka吗
【以下回答由 GPT 生成】
问题中的报错信息是由于Table sink 'default_catalog.default_database.kafka_t4'不支持消费更新和删除的更改,而节点TableSourceScan(table=[['default_catalog', 'default_database', 'cdc_t4']], fields=[id, name])生成了此类更改。这意味着您所使用的Kafka表不支持CDC的更改日志格式。为了解决这个问题,您可以尝试使用upsert-kafka来正确处理更改数据。
upsert-kafka是Flink的一个表连接器,可以正确地处理来自CDC的更改数据并写入Kafka。下面是如何在您的代码中使用upsert-kafka的例子:
首先,确保您的项目的pom.xml文件包含了Flink对应版本的upsert-kafka依赖。例如:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>对应版本</version>
</dependency>
其次,替换您的代码中Kafka表的创建部分,使用upsert-kafka连接器,并为其指定主键字段。如下所示:
tableEnv.executeSql(
"CREATE TABLE kafka_t4 (\n" +
" id bigint,\n" +
" name varchar,\n" +
" PRIMARY KEY (id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'upsert-kafka',\n" +
" 'topic' = 't4',\n" +
" 'properties.bootstrap.servers' = 'localhost:9092',\n" +
" 'properties.group.id' = 'testGroup',\n" +
" 'format' = 'json',\n" +
" 'key.fields' = 'id' // 指定主键字段\n" +
")"
);
在这里,'connector' = 'upsert-kafka'表示使用upsert-kafka连接器,'key.fields' = 'id'指定了您的表的主键字段是'id'。使用upsert-kafka连接器,Flink将正确地处理来自CDC的更改数据,并将其写入Kafka。
请注意,您需要根据您使用的Flink和upsert-kafka版本来选择相应的依赖和配置。
希望这个解决方案对您有所帮助,如果还有其他问题请随时提问。
【相关推荐】
引用 皆我百晓生 小程序回复内容作答:
在Flink SQL中,如果要使用Flink CDC将MySQL的binlog写入Kafka,确实需要使用upsert-kafka连接器。这是因为Flink CDC会产生insert、update和delete操作,而传统的Kafka连接器无法处理这些操作。而upsert-kafka连接器可以将这些操作转换成Kafka消息的插入和更新操作。
如果您想使用普通的Kafka连接器,可以考虑在Flink SQL中先使用Flink CDC将MySQL的binlog写入到一个临时表中,然后再使用另外的Flink Job将临时表中的数据通过Flink DataStream API写入Kafka。这样可以实现将MySQL的binlog写入Kafka,但需要编写额外的代码来处理数据的转换和写入。
总之,使用upsert-kafka连接器是官方推荐的方式,可以更方便地将CDC数据写入Kafka。如果使用普通的Kafka连接器,需要额外的工作来处理数据转换和写入。