Kafka Version: kafka_2.12-1.1.0.tgz Go Version: 1.9.1
package kafka
import (
"flag"
"fmt"
"log"
"strings"
"github.com/Shopify/sarama"
)
var partition = flag.Int("partition", 12, "The partition to produce to.")
func Start_producer(payload []byte) {
flag.Parse()
s := "mydata"
topic := &s
// brokers := &[]string{"172.25.33.175:9092,172.25.33.176:9092,172.25.33.177:9092"}
// brokers := []string{"172.25.33.175:9092,172.25.33.176:9092,172.25.33.177:9092"}
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 5
config.Producer.Return.Successes = true
producer, err := sarama.NewSyncProducer(strings.Split("172.25.33.175:9092,172.25.33.176:9092,172.25.33.177:9092", ","), config) //default port
if err != nil {
log.Println("ERRR")
panic(err)
}
defer func() {
if err := producer.Close(); err != nil {
panic(err)
}
}()
msg := &sarama.ProducerMessage{
Topic: *topic,
Value: sarama.StringEncoder(payload),
Partition: int32(*partition),
}
fmt.Println("XXXX: ", msg.Partition)
partition, offset, err := producer.SendMessage(msg)
if err != nil {
panic(err)
}
fmt.Println()
fmt.Printf("Message is stored in topic(%s)/partition(%d)/offset(%d)
", *topic, partition, offset)
fmt.Println("--------------------------------------------------")
fmt.Println(partition)
}
I am trying to send a stream of messages on to my Kafka broker, I have 3 nodes as specified and I want the messages to be in 12 partitions, but when I call the SendMessage(msg) it returns 0 partitions. Why is that? Why can I not send my messages in partitions?
You have not specified the ManualPartitioner
in your config, so it will be using the default instead of obeying what you have told it in the partition field.
Add to your config
config.Producer.Partitioner = NewManualPartitioner
Your partition value should now be obeyed
(Docs : https://godoc.org/github.com/Shopify/sarama#example-Partitioner--Manual)