springboot使用@KafkaListener接收消息报错,以下报错信息循环输出导致服务器日志打满
消息接收:
@KafkaListener(id = "${control.eventConsumerId}", topics = "${control.kafkaEventTopic}",
groupId = "${control.eventGroupId}")
kafka配置:
kafka:
# 以逗号分隔的host:port对列表,用于建立与Kafka集群的初始连接。适用于所有组件(生产、消费者),除非被覆盖。
# 例如:kafka服务地址1:端口,kafka服务地址2:端口,kafka服务地址3:端口
bootstrap-servers:
consumer:
# 标识此使用者所属的使用者组的唯一字符串。
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
# earliest:从最早的offset开始消费,就是partition的起始位置开始消费
# latest:从最近的offset开始消费,就是新加入partition的消息才会被消费
auto-offset-reset: latest
# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
enable-auto-commit: false
# 如果将“ enable.auto.commit”设置为true,则将消费者偏移量自动提交给kafka的时间间隔
# auto-commit-interval: 1s
# 键的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 值的反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#一次调用poll()操作时返回的最大记录数
max-poll-records: 1
listener:
# 在侦听器容器中运行的线程数。
concurrency: 3
# 指定ACK模式 消息消费确认机制
ack-mode: manual_immediate # 消费者立即手动处理
# 如果代理中没有至少一个配置的主题,则容器是否应无法启动。
missing-topics-fatal: false
2021-12-24 07:24:52,590 [consumer-0-C-1] ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer 149 - Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.common.KafkaException's; no record information is available
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:193)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1121)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:912)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.KafkaException: Received exception when fetching the next record from damo-1. If needed, please seek past the record to continue consumption.
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1473)
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1332)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1263)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:993)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:949)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:901)
... 3 common frames omitted
Caused by: org.apache.kafka.common.KafkaException: Record batch for partition damo-1 at offset 1378242 is invalid, cause: Record is corrupt (stored crc = 1886684963, computed crc = 139412280)
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.maybeEnsureValid(Fetcher.java:1386)
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.nextFetchedRecord(Fetcher.java:1430)
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1487)
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1332)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1294)
... 8 common frames omitted
怀疑是kafka消息重试机制,添加了如下代码
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory, KafkaTemplate<Object, Object> template) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
// 最大重试次数1次
SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler((consumerRecord, e) -> {
logger.error("异常.抛弃这个消息============,{}", consumerRecord.toString(), e);
}, new FixedBackOff(5000, 1));
factory.setErrorHandler(seekToCurrentErrorHandler);
return factory;
}
但还是出现了上述报错
你好,我是有问必答小助手,非常抱歉,本次您提出的有问必答问题,技术专家团超时未为您做出解答
本次提问扣除的有问必答次数,将会以问答VIP体验卡(1次有问必答机会、商城购买实体图书享受95折优惠)的形式为您补发到账户。
因为有问必答VIP体验卡有效期仅有1天,您在需要使用的时候【私信】联系我,我会为您补发。