spring boot kafka

spring boot kafka
实现发布订阅 代码如下
问题为 KafkaConsumer 客户端类 的process 方法会实时收到消息处理?

发布生产类
@RestController
@RequestMapping("/kafka/producer")
public class KafkaProducerController {

private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerController.class);

private static final String TOPIC = "topic-test";

@Resource
private KafkaTemplate kafkaTemplate;

public KafkaProducerController(KafkaTemplate kafkaTemplate) {
    this.kafkaTemplate = kafkaTemplate;
}


@PostMapping("/push")
public ResponseEntity<String> pushMessage(@RequestBody Message message) {
    Date time = new Date();
    message.setSendTime(time);
    kafkaTemplate.send(TOPIC, JSON.toJSONString(message)).addCallback(success
            -> LOGGER.info("{}-生产者发送消息成功:{},时间:{}", TOPIC, success, time), failure
            -> LOGGER.error("{}-生产者发送消息失败:{}", failure.getMessage()));
    return new ResponseEntity<>("success", HttpStatus.OK);
}

}

消费类
订阅类
@Component
public class KafkaConsumer {

@Autowired
private static KafkaTemplate kafkaTemplate;

/**
 * 单个topic订阅
 */
@KafkaListener(topics = "topic-test")
public void onDeviceSubStatusMessageDevice(List<ConsumerRecord<String, ?>> listRecord) {
    process(listRecord);
}

private void process(List<ConsumerRecord<String, ?>> listRecord) {
    System.out.println("订阅者接收到消息");
    listRecord.forEach(record -> {
        System.out.println(record.key()+record.value());

    });
}


//发送数据到kafka
private void sendKafka(String abcd, String efg) {
    Map<String, Object> body = new HashMap<>(8);
    body.put("time", System.currentTimeMillis());
    body.put("abcd", abcd);
    body.put("efg", efg);
    kafkaTemplate.send("test-topic", JSON.toJSONString(body));
}

}

【以下回答由 GPT 生成】

在KafkaConsumer类中,你可以使用@EnableKafka注解启动Kafka监听器的自动配置。在配置类上添加@EnableKafka注解后,可以通过设置KafkaListenerContainerFactory来配置监听器的一些属性,例如并发数量,消费组ID等。

首先,你需要在配置类上添加@EnableKafka注解:

@Configuration
@EnableKafka
public class KafkaConfig {

    // 配置其他相关的配置

}

然后,你可以创建一个KafkaListenerContainerFactory的bean,并指定一些属性,如下所示:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(3); // 设置并发消费者数量
    factory.getContainerProperties().setPollTimeout(3000); // 设置等待时间
    return factory;
}

最后,在KafkaConsumer类中的方法上添加@KafkaListener注解,指定要订阅的topic即可:

@Component
public class KafkaConsumer {

    @KafkaListener(topics = "topic-test")
    public void onMessage(String message) {
        process(message);
    }

    private void process(String message) {
        System.out.println("收到消息: " + message);
    }
}

通过以上步骤,你就可以实现实时收到消息的功能了。



【相关推荐】



如果你已经解决了该问题, 非常希望你能够分享一下解决方案, 写成博客, 将相关链接放在评论区, 以帮助更多的人 ^-^