如题,怎么用Java代码在consumer端,手动设置消息offset commit
在参数配置文件里配置
http://debugo.com/kafka-params/
auto.commit.enable=false
手动提交是有些问题的,参考:
consumer = new KafkaConsumer<>(props);
// 设置获取的表名
TopicPartition tp = new TopicPartition(tableName, 0);
consumer.assign(Collections.singletonList(tp));
//通过记录创建时间找到对应的id
// 从id为8开始获取 包含8
consumer.seek(tp, 8l);
long nextId = consumer.position(tp);