@EnableScheduling
@Component
public class CronTimer {
@Autowired
private KafkaListenerEndpointRegistry registry;
@Autowired
private ConsumerFactory consumerFactory;
// 监听器容器工厂(设置禁止KafkaListener自启动)
@Bean
public ConcurrentKafkaListenerContainerFactory delayContainerFactory() {
ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();
container.setConsumerFactory(consumerFactory);
//禁止KafkaListener自启动
container.setAutoStartup(false);
return container;
}
// 监听器
@KafkaListener(id="timingConsumer",topics = "topic3",containerFactory = "delayContainerFactory")
public void onMessage1(ConsumerRecord<?, ?> record){
System.out.println("消费成功:"+record.topic()+"-"+record.partition()+"-"+record.value());
}
// 定时启动监听器
@Scheduled(cron = "0 38 14 * * ? ")
public void startListener() {
System.out.println("启动监听器...");
// "timingConsumer"是@KafkaListener注解后面设置的监听器ID,标识这个监听器
if (!registry.getListenerContainer("timingConsumer").isRunning()) {
registry.getListenerContainer("timingConsumer").start();
}
//registry.getListenerContainer("timingConsumer").resume();
}
// 定时停止监听器
@Scheduled(cron = "0 40 14 * * ? ")
public void shutDownListener() {
System.out.println("关闭监听器...");
registry.getListenerContainer("timingConsumer").pause();
}
}
会报循环依赖的错误,请问有什么解决办法吗
这种我建议你百度找Kafka动态监听的文章看看,这种应该是直接使用框架不会被支持的,只能去重写一些方法实现