如何使用Sarama Go Kafka Consumer从最新的抵消量消耗?

我有三个问题:

  1. “oldest offset”是什么意思? Oldest offset并不意味着偏移量为0?

// OffsetOldest stands for the oldest offset available on the broker for a
// partition.
OffsetOldest int64 = -2

  1. 假设:

    A. 三个broker运行在一台机器上
    B. 使用者群体只有一个使用者线程
    C. 使用者信任OffsetOldest标志
    D. 已经产生了100条消息,目前使用者线程已经消耗了90条消息

    因此,如果使用者线程重新启动,那么该使用者将从哪个偏移量开始?是91还是0?

  2. 在下面的代码中,似乎每次启动使用者时都会重新消耗消息,但实际上这种情况不应该发生。为什么重新消费会紧接着重新启动后发生(不是全部) ?

     func (this *consumerGroupHandler) ConsumeClaim(session 
     sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
              for message := range claim.Messages() {
              this.handler(message)
             session.MarkMessage(message, "")
        }
    
        return nil
    }
    
    ctx := context.Background()
    conf := sarama.NewConfig()
    
    conf.Version = sarama.V2_0_0_0
    conf.Consumer.Offsets.Initial = sarama.OffsetOldest
    conf.Consumer.Return.Errors = true
    
    consumer, err := sarama.NewConsumerGroup(strings.Split(app.Config().KafkaBrokers, ","), groupId, conf)
    if err != nil {
        logger.Error("NewConsumerGroupFromClient(%s) error: %v", groupId, err)
        return
    }
    
  1. No. When the retention policy is applied, older messages are deleted from the topics. Therefore, the oldest offset might not be the first-ever offset (i.e. 0).

  2. It depends on your configuration. Essentially, you have 3 options:

    • Start consuming from the earliest offset
    • Start consuming from the latest offset
    • Start consuming from a specific offset
  3. You have to use sarama.OffsetOldest. From the documentation,

 const (
        // OffsetNewest stands for the log head offset, i.e. the offset that will be
        // assigned to the next message that will be produced to the partition. You
        // can send this to a client's GetOffset method to get this offset, or when
        // calling ConsumePartition to start consuming new messages.
        OffsetNewest int64 = -1
        // OffsetOldest stands for the oldest offset available on the broker for a
        // partition. You can send this to a client's GetOffset method to get this
        // offset, or when calling ConsumePartition to start consuming from the
        // oldest offset that is still available on the broker.
        OffsetOldest int64 = -2
    )