在使用kafka时,遇到了业务需求。需要在消费一条消息后,暂停消费。等回调接口触发时,恢复消费。
我使用的是Spring-kafka注解@KafkaListener。
在网上搜索一大堆后,得到了一种方式:
你可以在消费的逻辑里同步等待回调接口,然后再提交offset。
KafkaConsumer倒是有接口,只是暂停某些分区的消费
修改应答机制吧,设置手动应答,你只要不应答就不会消费下一条数据,等回调了再应答
消费一条后停止消费,可以一个线程逐条消费,只提交第一条消息的offset,第一条消费成功后停止监听器,可参考https://blog.csdn.net/weixin_43847283/article/details/124720492
我找到方法了。
private int start = 0;
指定一个变量。
在监听过程中,如果状态为0则执行提交操作
if (start == 0){
//手动提交消费
ack.acknowledge();
start = 1;
}
在消费完一条数据后,继续消费但不提交。
关闭监听,现在消息队列中其实只消费了一条。
System.out.println("关闭监听器..." + DateUtil.date());
registry.getListenerContainer("add_1").stop();
在一个事件触发的地方,加上监听的启动
// 启动kafka监听
System.out.println("启动监听器..." + DateUtil.date());
// "timingConsumer"是@KafkaListener注解后面设置的监听器ID,标识这个监听器
if (!registry.getListenerContainer("add_1").isRunning()) {
registry.getListenerContainer("add_1").start();
}
当条件达到以后,重新启动监听器,将从头开始。重复上面的操作。
最终达成,消费一条数据后关闭,条件达成后继续消费的效果。