在kafka-go中阅读具有特定ID的消息

I'm building a request-response setup in Kafka using the Kafka-go library using the message Key as a correlation ID. My setup works fine without concurrency, but when the messages start being sent in separate goroutines, the reader part skips the correct keys (since other routine probably read it already).

How can I read only a specific key from a topic, considering the connection is being shared by different goroutines?

Client example below (Error evaluation were removed for brevity):

package main

import (
    "bytes"
    "context"
    "fmt"
    "sync"
    "time"

    "github.com/google/uuid"
    kafka "github.com/segmentio/kafka-go"
)

var wg sync.WaitGroup

func requestMessage(connR *kafka.Conn, connW *kafka.Conn, body []byte, index int) {
    currentUUID := uuid.New()
    byteUUID := []byte(fmt.Sprintf("%s", currentUUID))
    connW.WriteMessages(kafka.Message{
        Key:   byteUUID,
        Value: body,
    })
    fmt.Println("Posted id " + string(byteUUID))
    for {
        m, _ := connR.ReadMessage(10e6)
        if bytes.Equal(m.Key, byteUUID) {
            break
        }
    }

    wg.Done()
    fmt.Println("Done " + string(byteUUID))

}

func main() {
    iterations := 100
    interval := 500 * time.Millisecond
    kafkaURL := "kafka:9092"
    topic := "benchmarktopic"
    partition := 0
    connW, _ := kafka.DialLeader(context.Background(), "tcp", kafkaURL, topic, partition)
    defer connW.Close()
    connR, _ := kafka.DialLeader(context.Background(), "tcp", kafkaURL, topic+"response", partition)
    defer connR.Close()
    for i := 0; i < iterations; i++ {
        <-time.After(interval)
        go requestMessage(connR, connW, []byte("body"), i)
        wg.Add(1)
    }
    wg.Wait()
}

You cannot really read only a specific key from a Kafka topic partition. The thing is that your records will be dispatched to specific partitions based on the hash of the key ( default behavior). So you might have different keys in a same partition. So as long as you have more keys than number of partitions, you'll find a partition containing different keys.

The only one way I have in mind would be to set N partitions for your topic where N is the number of different keys you could have ( quite a huge number if you use uuid as a key) and assign partition with a static mapping ( key - > partition) to your producers/consumers.

BTW, you're already assigning the part 0 to your producer, was wondering why?

Yannick