I connected Kafka with GO, it was ok at first, but there was an error later.And ERROR is "# github.com/Shopify/sarama ........\src\github.com\Shopify\sarama\sync_producer.go:35:6: invalid receiver type SyncProducer (SyncProducer is an interface type) "
In the beginning, I used Kafka to connect to Kafka in a synchronous manner. It was possible to connect Kafka asynchronously, and this error was displayed. I will try to sync again and find that the synchronization has also failed.
// 同步消息模式
func SynchronizeKafkaProducer(address ...string){
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.Timeout = 5 * time.Second
client, errClient:= sarama.NewClient(address, config)
if errClient != nil {
log.Printf("sarama.NewClient err, message=%s
", errClient)
return
}
producer, errPro := sarama.NewSyncProducerFromClient(client)
if errPro != nil {
log.Printf("sarama.NewSyncProducerFromClient err, message=%s
", errPro)
return
}
defer producer.Close()
srcValue := "sync: this is a message . index = %d"
for i:=0; i <10 ; i++ {
value := fmt.Sprintf(srcValue, i)
message := &sarama.ProducerMessage{
Topic: "flink_topic",
Value: sarama.ByteEncoder(value),
}
partition, offset, err := producer.SendMessage(message)
if err!=nil {
log.Printf("send message(%s) err=%s
", value, err)
}else {
fmt.Fprintf(os.Stdout, value + "发送成功,partition=%d, offset=%d
", partition, offset)
}
time.Sleep(2*time.Second)
}
}
// 异步的Kafka发送数据
func AsynchronousKafkaProducer(address ...string){
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.Timeout = 5 * time.Second
client, errClient := sarama.NewClient(address, config)
if errClient!= nil{
log.Printf("sarama.NewClient err, message=%s
", errClient)
return
}
asyncProducer, errPro := sarama.NewAsyncProducerFromClient(client)
if errPro != nil {
log.Printf("sarama.NewSyncProducerFromClient err, message=%s
", errPro)
return
}
defer asyncProducer.AsyncClose()
go func(asyncProducer sarama.AsyncProducer) {
for {
select {
case err := <- asyncProducer.Errors():
if err != nil {
glog.Errorln(err)
}
case <- asyncProducer.Successes():
}
}
}(asyncProducer)
v := "async: " + strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000))
fmt.Fprintln(os.Stdout, v)
message := &sarama.ProducerMessage{
Topic: "flink_topic",
Value: sarama.ByteEncoder(v),
}
asyncProducer.Input() <- message
time.Sleep(2*time.Second)
}
I hope that I can succeed, but the reality is always cruel.The error has been posted, so I want to know why?
# github.com/Shopify/sarama
..\..\..\..\src\github.com\Shopify\sarama\sync_producer.go:35:6: invalid receiver type SyncProducer (SyncProducer is an interface type)
..\..\..\..\src\github.com\Shopify\sarama\sync_producer.go:39:6: invalid receiver type SyncProducer (SyncProducer is an interface type)
..\..\..\..\src\github.com\Shopify\sarama\sync_producer.go:43:6: invalid receiver type SyncProducer (SyncProducer is an interface type)
..\..\..\..\src\github.com\Shopify\sarama\sync_producer.go:47:6: invalid receiver type SyncProducer (SyncProducer is an interface type)