Having written a basic channel based producer in golang using confluent-kafka-go and a single partition single broker kafka set up through confluent. On producing sometimes during iteration through the acks
the offset of the successfully produced message is received as 0.
Sample logs :
{"level":"info","msg":"Published successfully to topic test, offset 0, Partition 0 and Length 1316","time":"2017-10-03T17:03:43+05:30"}
{"level":"info","msg":"Published successfully to topic test, offset 0, Partition 0 and Length 1316","time":"2017-10-03T17:03:43+05:30"}
This is the function that iterates over the kafka acks :
func (kc *KafkaClient) HandleAcknowledgements() {
for event := range kc.AckChannel {
message := event.(*kafka.Message)
if message.TopicPartition.Error != nil {
log.Errorf("Delivery Failed: Partition: %d Reason: %v", message.TopicPartition.Partition, message.TopicPartition.Error)
} else {
if message.TopicPartition.Offset == 0 {
bodyLen := len(message.Value)
log.Infof("Published successfully to topic %s, offset %s, Partition %d and Length %d", *message.TopicPartition.Topic, message.TopicPartition.Offset, message.TopicPartition.Partition, bodyLen)
}
}
}
}
My retention policy is as below :
log.retention.hours=168
Any sort of help will be great, so as to why this happens !
Thanks.
Try setting produce.offset.report
to true in config.
Try setting produce.offset.report to true for the producer, otherwise it will only report the offset of the last message in each batch/messageSet.
The go client should really do this by default.
https://github.com/confluentinc/confluent-kafka-go/issues/95#issuecomment-330837647
If this does not solve your problem, could you post the code and config you use for producing ?