服务器集群消费了kafka的同一条数据

问题来源:目前项目需求下:作为kafka消息consumer方,如果因为某些原因造成服务不可用,期间产生的历史数据,服务器重启之后希望从最新的offset开始消废。比如服务宕机前消废到了offset值是100,服务宕机一段时间,产生了100条数据,当服务重启之后希望从offset 201开始消废,从101到200之间的消息丢弃。

思路:目前有两种解决方案

1.每次重启生成新的groupId消废改topic,可以达到此目的

2.通过消息监听器,获取每个分区上最后的offset,设置从该offset开始消废

 

经过考虑采用第二种解决方案,通过配置ConcurrentMessageListenerContainer类,获取指定topic下的分区,得到当前的offset,然后设置消费者从该位置消废。此时发布测试产生了问题

 

测试环境有4台服务器a  b  c  d ,主题为test-topic , 并且都在同一个group中,groupid为test-group。当来了一条消息,abcd四台服务器都进行了消废。理论上来说应该只会有一台服务器消废该消息。目前该问题尚未解决

希望有大佬能帮我答疑,有长

不知道你这个问题是否已经解决, 如果还没有解决的话:

如果你已经解决了该问题, 非常希望你能够分享一下解决方案, 以帮助更多的人 ^-^