我有三个问题:
// OffsetOldest stands for the oldest offset available on the broker for a
// partition.
OffsetOldest int64 = -2
假设:
A. 三个broker运行在一台机器上
B. 使用者群体只有一个使用者线程
C. 使用者信任OffsetOldest标志
D. 已经产生了100条消息,目前使用者线程已经消耗了90条消息
因此,如果使用者线程重新启动,那么该使用者将从哪个偏移量开始?是91还是0?
在下面的代码中,似乎每次启动使用者时都会重新消耗消息,但实际上这种情况不应该发生。为什么重新消费会紧接着重新启动后发生(不是全部) ?
func (this *consumerGroupHandler) ConsumeClaim(session
sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
this.handler(message)
session.MarkMessage(message, "")
}
return nil
}
ctx := context.Background()
conf := sarama.NewConfig()
conf.Version = sarama.V2_0_0_0
conf.Consumer.Offsets.Initial = sarama.OffsetOldest
conf.Consumer.Return.Errors = true
consumer, err := sarama.NewConsumerGroup(strings.Split(app.Config().KafkaBrokers, ","), groupId, conf)
if err != nil {
logger.Error("NewConsumerGroupFromClient(%s) error: %v", groupId, err)
return
}
No. When the retention policy is applied, older messages are deleted from the topics. Therefore, the oldest offset might not be the first-ever offset (i.e. 0
).
It depends on your configuration. Essentially, you have 3 options:
earliest
offsetlatest
offsetYou have to use sarama.OffsetOldest
. From the documentation,
const ( // OffsetNewest stands for the log head offset, i.e. the offset that will be // assigned to the next message that will be produced to the partition. You // can send this to a client's GetOffset method to get this offset, or when // calling ConsumePartition to start consuming new messages. OffsetNewest int64 = -1 // OffsetOldest stands for the oldest offset available on the broker for a // partition. You can send this to a client's GetOffset method to get this // offset, or when calling ConsumePartition to start consuming from the // oldest offset that is still available on the broker. OffsetOldest int64 = -2 )