Kafka消费者组丢失未提交的消息

I am using consumer group with just one consumer, just one broker ( docker wurstmeister image ). It's decided in a code to commit offset or not - if code returns error then message is not commited. I need to ensure that system does not lose any message - even if that means retrying same msg forever ( for now ;) ). For testing this I have created simple handler which does not commit offset in case of 'error' string send as message to kafka. All other strings are commited.

kafka-console-producer --broker-list localhost:9092 --topic test
>this will be commited

Now running

kafka-run-class kafka.admin.ConsumerGroupCommand --bootstrap-server localhost:9092 --group michalgrupa --describe

returns

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
test                           0          13              13              0

so thats ok, there is no lag. Now we pass 'error' string to fake that something bad happened and message is not commited:

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
test                           0          13              14              1

Current offset stays at right position + there is 1 lagged message. Now if we pass correct message again offset will move on to 15:

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG test 0 15 15

and message number 14 will not be picked up ever again. Is it default behaviour? Do I need to trace last offset and load message by it+1 manually? I have set commit interval to 0 to hopefully not use any auto.commit mechanism.

fetch/commit code:

go func() {
    for {
        ctx := context.Background()

        m, err := mr.brokerReader.FetchMessage(ctx)
        if err != nil {
            break
        }

        if err := msgFunc(m); err != nil {
            log.Errorf("# messaging # cannot commit a message: %v", err)
            continue
        }

        // commit message if no error
        if err := mr.brokerReader.CommitMessages(ctx, m); err != nil {
            // should we do something else to just logging not committed message?
            log.Errorf("cannot commit message [%s] %v/%v: %s = %s; with error: %v", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value), err)
        }
    }
}()

reader configuration:

kafkaReader := kafka.NewReader(kafka.ReaderConfig{
Brokers:         brokers,
GroupID:         groupID,
Topic:           topic,
CommitInterval:  0,
MinBytes:        10e3,
MaxBytes:        10e6,
})

library used: https://github.com/segmentio/kafka-go

In kafka you just submit offsets not single messages. If I understand your code right (not a go-developer). You just continue after you hit an invalid message. If after in invalid message a valid one appears you will submit the offset again - I guess that was not your intention.

Just to make clear what does submitting or committing an offset means: Your consumer group will store the offset to a dedicated internal kafka topic (or on older kafka versions on zookeeper). An offset can identify a single position within a topic (or to be more precise on a partition of a given topic). This means you can only consume a topic in a linear fashion.

Here you can see what happens on kafka-consumer side:

New Kafka Consumer

You are consuming from a (most likely multiple) stack(s) of messages. You submit the position (a.k.a offset) at this topic/partition. So you can not say I want to reconsume a specific message again. What you can do is to stop consuming once you hit an invalid message. In this case your problem will be: How do I get rid of this message. Deleting a single message from a kafka topic is tricky. A common pattern is to write this messages to some kind of dead-letter topic and deal with it with a different consumer.

Hope that made things a little bit clearer to you.

It looks like your Kafka consumer is set up to commit offsets automatically (that's the default setting).
If so, that's probably why your app skips over the erroneous message - despite the fact you skip CommitMessages invocation, commit is performed on a background thread
Please check out enable.auto.commit property specification in the docs: https://kafka.apache.org/documentation/#newconsumerconfigs

Here make sense to understand the concept of consumer offset. For running consumer app, it stores the offset of consumed messages in memory regardless of commit/uncommit offset, if restarting the consumer app, it will retrieve the offset of 'CURRENT-OFFSET' to continue with the consumption.