使用flink1.13.6版本消费kafka,在idea端测试没有问题,打包后再linux的提交flink'job报错

#遇到的问题
使用flink1.13.6版本消费kafka,在idea端测试没有问题,打包后再linux的提交flink'job报错
#报错信息

org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:820)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:666)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:646)
    at org.apache.flink.streaming.connectors.kafka.internals.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:55)
    at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:574)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
    at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.kafka.common.KafkaException: class org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374)
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:713)
    ... 16 more

#程序运行的情况
已经可以打开flink的webui了

img

#试过的方法
不管是使用flink的datastream还是flinksql,在idea端测试都没有问题,打包到linux上运行就报上面的错误;
但是如果启动flink的session模式,在flinksql客户端用sql的方式就可以正常消费kafka的数据
#想知道应该怎么解决

jar包冲突了吧 ,可以看看liunx flink lib目录下是否有flink-sql-connector-kafka_2.11-1.12.0.jar,或者是flink-connector-kafka_2.11-1.12.0.jar如果有,你再打包的时候需要将pom里面的flink-sql-connector-kafka_2.11-1.12.0.jar包去掉

引用 皆我百晓生 小程序回复内容作答:
该异常信息表明在构造KafkaConsumer时发生了错误,导致无法连接Kafka。

具体的错误信息是:org.apache.kafka.common.KafkaException: class org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer

错误的原因是,Flink未能正确加载Kafka的序列化器org.apache.kafka.common.serialization.ByteArrayDeserializer

解决该问题的方法是,在Flink的作业中配置正确的Kafka序列化器。您可以通过在代码中手动指定序列化器来解决该问题,如下所示:

import org.apache.flink.api.common.serialization.SimpleStringSchema;

...

Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
kafkaProps.setProperty("group.id", "flink-consumer-group");

FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
    "topic-name",
    new SimpleStringSchema(), // 使用正确的序列化器,比如SimpleStringSchema
    kafkaProps
);

请注意,您需要使用正确的序列化器,以使其与您在Kafka服务器上配置的序列化器匹配。

另外,还需要保证在打包后的项目中包含了正确的依赖。您可以通过使用mvn dependency:tree命令来检查项目的依赖关系是否正确。

如果仍然遇到问题,请提供更多的上下文信息,例如完整的异常堆栈跟踪、部署脚本或命令和项目的依赖关系,以便更好地帮助您解决问题。

【以下回答由 GPT 生成】

问题回答:

很抱歉您遇到了这个问题。根据报错信息,问题出现在构造Kafka消费者时,可能是由于配置参数或依赖问题导致。为了帮助您解决问题,我需要进一步了解一些细节信息,请提供以下信息:

  1. 您在本地使用Idea测试时的配置参数和代码片段。
  2. 在Linux上提交Flink job时的命令行参数和操作步骤。
  3. 您是否已经确认Kafka集群正常运行,并且在Linux服务器上能够访问到Kafka集群。

请提供以上信息,我将尽力帮助您解决问题。



【相关推荐】



如果你已经解决了该问题, 非常希望你能够分享一下解决方案, 写成博客, 将相关链接放在评论区, 以帮助更多的人 ^-^

1.序列化检查 2.带有依赖打包