I'm new to Golang and Kafa so this might seem like a silly question.
After my Kafka consumer first connects to the Kafka server, why is there a delay (~ 20 secs) between establishing connection to the Kafka server, and receiving the first message?
It prints a message right before consumer.Messages()
and print another message for each message received. The ~20 sec delay is between the first fmt.Println
and second fmt.Println
.
package main
import (
"fmt"
"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
)
func main() {
// Create the consumer and listen for new messages
consumer := createConsumer()
// Create a signal channel to know when we are done
done := make(chan bool)
// Start processing messages
go func() {
fmt.Println("Start consuming Kafka messages")
for msg := range consumer.Messages() {
s := string(msg.Value[:])
fmt.Println("Msg: ", s)
}
}()
<-done
}
func createConsumer() *cluster.Consumer {
// Define our configuration to the cluster
config := cluster.NewConfig()
config.Consumer.Return.Errors = false
config.Group.Return.Notifications = false
config.Consumer.Offsets.Initial = sarama.OffsetOldest
// Create the consumer
brokers := []string{"127.0.0.1:9092"}
topics := []string{"orders"}
consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config)
if err != nil {
log.Fatal("Unable to connect consumer to Kafka")
}
go handleErrors(consumer)
go handleNotifications(consumer)
return consumer
}
docker-compose.yml
version: '2'
services:
zookeeper:
image: "confluentinc/cp-zookeeper:5.0.1"
hostname: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker-1:
image: "confluentinc/cp-enterprise-kafka:5.0.1"
hostname: broker-1
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_BROKER_RACK: rack-a
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://127.0.0.1:9092'
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_DELETE_TOPIC_ENABLE: "true"
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: 'broker-1'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker-1:9092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
KAFKA_CREATE_TOPICS: "orders:1:1"
After my Kafka consumer first connects to the Kafka server, why is there a delay (~ 20 secs) between establishing connection to the Kafka server, and receiving the first message?
There can not be that much delay because consumer used message channel which receive messages from kafka. As soon as the message is available in kafka queue it will be sent to message channel which consumer can receive.
Coming to you code implementation :-
for msg := range consumer.Messages() {
s := string(msg.Value[:])
fmt.Println("Msg: ", s)
}
consumer.Messages()
returns a channel and for
loops over the channel which returns a message whenever it is available inside channel.
Refer to this question How to create a kafka consumer group in Golang? to connect using sarama. you don't need sarama-cluster for connection.