需要用到Kafka消费数据,但因为数据量过大,只想消费当前时间,前一分钟的数据。比如现在18.00,只想消费从17.59开始的数据,这个应该怎么实现呢?
你把过期时间设置为1分钟
参考
(1) kafka consumer指定时间戳消费 - 掘金. https://juejin.cn/post/7130499354192216077.
(2) Kafka 根据指定时间消费数据 - CSDN博客. https://blog.csdn.net/tianshishangxin1/article/details/120139470.
(3) Kafka——指定位移消费(回溯消费) - 曹伟雄 - 博客园. https://www.cnblogs.com/caoweixiong/p/11684370.html.
要实现只消费当前时间和前一分钟的数据,您可以使用 Kafka 的时间戳(timestamp)来过滤消息。以下是一种实现方法:
ProducerRecord<String, String> record = new ProducerRecord<>("topic-name", key, value);
record.timestamp(System.currentTimeMillis());
producer.send(record);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
TopicPartition topicPartition = new TopicPartition("topic-name", 0); // 假设只有一个分区
consumer.assign(Arrays.asList(topicPartition));
// 计算当前时间和前一分钟的时间戳
long currentTime = System.currentTimeMillis();
long previousMinute = currentTime - 60000;
// 设置消费者的偏移量为前一分钟的时间戳
consumer.seek(topicPartition, previousMinute);
// 开始消费消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
// 处理消息
}
通过将消费者的偏移量设置为前一分钟的时间戳,消费者将从该时间戳开始消费消息,只获取当前时间和前一分钟之间的数据。
请注意,确保在生产者发送消息时正确设置时间戳,并且消费者的时间与 Kafka 集群的时间保持一致。另外,如果您的主题有多个分区,您需要根据分区的数量进行适当的分配和偏移量设置。
以上是一个基本的实现示例,您可以根据您的实际需求和项目架构进行适当调整。