利用Kafka设置某一时间消费数据

需要用到Kafka消费数据,但因为数据量过大,只想消费当前时间,前一分钟的数据。比如现在18.00,只想消费从17.59开始的数据,这个应该怎么实现呢?

你把过期时间设置为1分钟

参考
(1) kafka consumer指定时间戳消费 - 掘金. https://juejin.cn/post/7130499354192216077.
(2) Kafka 根据指定时间消费数据 - CSDN博客. https://blog.csdn.net/tianshishangxin1/article/details/120139470.
(3) Kafka——指定位移消费(回溯消费) - 曹伟雄 - 博客园. https://www.cnblogs.com/caoweixiong/p/11684370.html.

不知道你这个问题是否已经解决, 如果还没有解决的话:

如果你已经解决了该问题, 非常希望你能够分享一下解决方案, 写成博客, 将相关链接放在评论区, 以帮助更多的人 ^-^

要实现只消费当前时间和前一分钟的数据,您可以使用 Kafka 的时间戳(timestamp)来过滤消息。以下是一种实现方法:

  1. 在生产者端,确保在发送消息到 Kafka 时设置正确的时间戳。您可以使用 Kafka Producer 的 ProducerRecord 对象来设置时间戳,例如:

ProducerRecord<String, String> record = new ProducerRecord<>("topic-name", key, value);
record.timestamp(System.currentTimeMillis());
producer.send(record);
  1. 在消费者端,使用 Kafka 的高级消费者(High-Level Consumer)API 或新的消费者(New Consumer)API 来订阅主题并消费消息。在消费者代码中,可以使用 KafkaConsumer 对象的 assign 方法手动分配分区,并通过 seek 方法将消费者的偏移量(offset)设置为所需的时间戳。例如:

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
TopicPartition topicPartition = new TopicPartition("topic-name", 0);  // 假设只有一个分区
consumer.assign(Arrays.asList(topicPartition));

// 计算当前时间和前一分钟的时间戳
long currentTime = System.currentTimeMillis();
long previousMinute = currentTime - 60000;

// 设置消费者的偏移量为前一分钟的时间戳
consumer.seek(topicPartition, previousMinute);

// 开始消费消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
    // 处理消息
}

通过将消费者的偏移量设置为前一分钟的时间戳,消费者将从该时间戳开始消费消息,只获取当前时间和前一分钟之间的数据。

请注意,确保在生产者发送消息时正确设置时间戳,并且消费者的时间与 Kafka 集群的时间保持一致。另外,如果您的主题有多个分区,您需要根据分区的数量进行适当的分配和偏移量设置。

以上是一个基本的实现示例,您可以根据您的实际需求和项目架构进行适当调整。