已知kafka服务器:n9-kkt.test001.com:30902以及
topic:test.info.tst
如何实现topic数据实时同步写入mysql?
拉取一個EvaluateJsonPath 处理器,用来将kakfa内的json数据做简单的处理,主要是得到数据具体项目,具体配置如下:
此操作的意义是 : 将json串中的数据条目,使用 $ 表达式的方式获取数据值,在数据走到这个处理器的时候,就可以拿到对应的数值。
使用Kafka Connect将Kafka主题的数据实时同步写入MySQL的方法如下:
首先,下载kafka-connect-jdbc插件并解压。可以从https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc 下载插件。
将插件中lib目录下的jar文件提取出来,并将其放入Kafka的libs目录。
复制Java的MySQL驱动包到Kafka的libs目录。
在kafka-connect-jdbc插件的etc目录中找到source.properties文件,并将其复制到Kafka的config目录下,并将文件名修改为connect-mysql-source.properties。
修改connect-mysql-source.properties文件,设置以下属性:
除了上述属性,还可以根据需要设置其他属性,例如mode、timestamp.column.name等。
value.converter:设置值转换器的类名。
启动ZooKeeper和Kafka服务器。
启动Kafka Connect,执行以下命令: bin/connect-standalone.sh config/connect-standalone.properties config/connect-mysql-source.properties
注意:如果使用集群模式,需要修改connect-distributed.properties。
使用命令行工具消费Kafka主题,查看数据是否成功导入MySQL。 ./kafka-console-consumer.sh --bootstrap-server <kafka-server>:<kafka-port> --topic <mysql-topic> --from-beginning
其中,和是Kafka服务器的地址和端口,是导入MySQL的主题。
这样,就可以实现将Kafka主题的数据实时同步写入MySQL。具体的配置和连接器属性可以根据实际情况进行调整和修改。