I am using the Confluent Kafka Go library for my Golang application. I am use AdminClient.GetMetadata
to list all topics in the Kafka cluster. Here is my code to test
topic1 := fmt.Sprintf("test-get-all-topic-%d-%d", 1, randomInt())
err = adminClient.CreateTopic(topic1, numPartition, replicationFactor, map[string]string{})
topic2 := fmt.Sprintf("test-get-all-topic-%d-%d", 2, randomInt())
err = adminClient.CreateTopic(topic2, numPartition, replicationFactor, map[string]string{})
topics, err = adminClient.GetAllTopics()
The function that I use to get the list of all topics is:
admin.GetMetadata(nil, true, 2000)
The problem with the above code is: when I call function GetAllTopics
, sometimes topic2
has not been avaiable yet. It makes my unit test very easy to fail. If I put a sleep before GetAllData
and sleep for about 1 second, everything will works.
But this way is really tricky. I have used configuration "acks=all" for the admin client but it doesn't work either. I want to ask if there is any safer function to get all topics after pushing 1 successfully to Kafka.