I'm building a request-response setup in Kafka using the Kafka-go library using the message Key as a correlation ID. My setup works fine without concurrency, but when the messages start being sent in separate goroutines, the reader part skips the correct keys (since other routine probably read it already).
How can I read only a specific key from a topic, considering the connection is being shared by different goroutines?
Client example below (Error evaluation were removed for brevity):
package main
import (
"bytes"
"context"
"fmt"
"sync"
"time"
"github.com/google/uuid"
kafka "github.com/segmentio/kafka-go"
)
var wg sync.WaitGroup
func requestMessage(connR *kafka.Conn, connW *kafka.Conn, body []byte, index int) {
currentUUID := uuid.New()
byteUUID := []byte(fmt.Sprintf("%s", currentUUID))
connW.WriteMessages(kafka.Message{
Key: byteUUID,
Value: body,
})
fmt.Println("Posted id " + string(byteUUID))
for {
m, _ := connR.ReadMessage(10e6)
if bytes.Equal(m.Key, byteUUID) {
break
}
}
wg.Done()
fmt.Println("Done " + string(byteUUID))
}
func main() {
iterations := 100
interval := 500 * time.Millisecond
kafkaURL := "kafka:9092"
topic := "benchmarktopic"
partition := 0
connW, _ := kafka.DialLeader(context.Background(), "tcp", kafkaURL, topic, partition)
defer connW.Close()
connR, _ := kafka.DialLeader(context.Background(), "tcp", kafkaURL, topic+"response", partition)
defer connR.Close()
for i := 0; i < iterations; i++ {
<-time.After(interval)
go requestMessage(connR, connW, []byte("body"), i)
wg.Add(1)
}
wg.Wait()
}
You cannot really read only a specific key from a Kafka topic partition. The thing is that your records will be dispatched to specific partitions based on the hash of the key ( default behavior). So you might have different keys in a same partition. So as long as you have more keys than number of partitions, you'll find a partition containing different keys.
The only one way I have in mind would be to set N partitions for your topic where N is the number of different keys you could have ( quite a huge number if you use uuid as a key) and assign partition with a static mapping ( key - > partition) to your producers/consumers.
BTW, you're already assigning the part 0 to your producer, was wondering why?
Yannick