#遇到的问题
使用flink1.13.6版本消费kafka,在idea端测试没有问题,打包后再linux的提交flink'job报错
#报错信息
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:820)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:666)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:646)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:55)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:574)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.kafka.common.KafkaException: class org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:713)
... 16 more
#程序运行的情况
已经可以打开flink的webui了
#试过的方法
不管是使用flink的datastream还是flinksql,在idea端测试都没有问题,打包到linux上运行就报上面的错误;
但是如果启动flink的session模式,在flinksql客户端用sql的方式就可以正常消费kafka的数据
#想知道应该怎么解决
jar包冲突了吧 ,可以看看liunx flink lib目录下是否有flink-sql-connector-kafka_2.11-1.12.0.jar,或者是flink-connector-kafka_2.11-1.12.0.jar如果有,你再打包的时候需要将pom里面的flink-sql-connector-kafka_2.11-1.12.0.jar包去掉
引用 皆我百晓生 小程序回复内容作答:
该异常信息表明在构造KafkaConsumer时发生了错误,导致无法连接Kafka。
具体的错误信息是:org.apache.kafka.common.KafkaException: class org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer
。
错误的原因是,Flink未能正确加载Kafka的序列化器org.apache.kafka.common.serialization.ByteArrayDeserializer
。
解决该问题的方法是,在Flink的作业中配置正确的Kafka序列化器。您可以通过在代码中手动指定序列化器来解决该问题,如下所示:
import org.apache.flink.api.common.serialization.SimpleStringSchema;
...
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
kafkaProps.setProperty("group.id", "flink-consumer-group");
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
"topic-name",
new SimpleStringSchema(), // 使用正确的序列化器,比如SimpleStringSchema
kafkaProps
);
请注意,您需要使用正确的序列化器,以使其与您在Kafka服务器上配置的序列化器匹配。
另外,还需要保证在打包后的项目中包含了正确的依赖。您可以通过使用mvn dependency:tree
命令来检查项目的依赖关系是否正确。
如果仍然遇到问题,请提供更多的上下文信息,例如完整的异常堆栈跟踪、部署脚本或命令和项目的依赖关系,以便更好地帮助您解决问题。
【以下回答由 GPT 生成】
很抱歉您遇到了这个问题。根据报错信息,问题出现在构造Kafka消费者时,可能是由于配置参数或依赖问题导致。为了帮助您解决问题,我需要进一步了解一些细节信息,请提供以下信息:
请提供以上信息,我将尽力帮助您解决问题。
【相关推荐】
1.序列化检查 2.带有依赖打包