我将Kafka与GO相连,起初还可以,但是后来出现错误

I connected Kafka with GO, it was ok at first, but there was an error later.And ERROR is "# github.com/Shopify/sarama ........\src\github.com\Shopify\sarama\sync_producer.go:35:6: invalid receiver type SyncProducer (SyncProducer is an interface type) "

In the beginning, I used Kafka to connect to Kafka in a synchronous manner. It was possible to connect Kafka asynchronously, and this error was displayed. I will try to sync again and find that the synchronization has also failed.

// 同步消息模式
func SynchronizeKafkaProducer(address ...string){
    config := sarama.NewConfig()
    config.Producer.Return.Successes = true
    config.Producer.Timeout = 5 * time.Second
    client, errClient:= sarama.NewClient(address, config)
    if errClient != nil {
        log.Printf("sarama.NewClient err, message=%s 
", errClient)
        return
    }
    producer, errPro := sarama.NewSyncProducerFromClient(client)
    if errPro != nil {
        log.Printf("sarama.NewSyncProducerFromClient err, message=%s 
", errPro)
        return
    }
    defer producer.Close()
    srcValue := "sync: this is a message . index = %d"
    for i:=0; i <10 ; i++  {
        value := fmt.Sprintf(srcValue, i)
        message := &sarama.ProducerMessage{
            Topic: "flink_topic",
            Value: sarama.ByteEncoder(value),
        }
        partition, offset, err := producer.SendMessage(message)
        if err!=nil {
            log.Printf("send message(%s) err=%s 
", value, err)
        }else {
            fmt.Fprintf(os.Stdout, value + "发送成功,partition=%d, offset=%d 
", partition, offset)
        }
        time.Sleep(2*time.Second)
    }
}

// 异步的Kafka发送数据
func AsynchronousKafkaProducer(address ...string){
    config := sarama.NewConfig()
    config.Producer.Return.Successes = true
    config.Producer.Timeout = 5 * time.Second
    client, errClient := sarama.NewClient(address, config)
    if errClient!= nil{
        log.Printf("sarama.NewClient err, message=%s 
", errClient)
        return
    }
    asyncProducer, errPro := sarama.NewAsyncProducerFromClient(client)
    if errPro != nil {
        log.Printf("sarama.NewSyncProducerFromClient err, message=%s 
", errPro)
        return
    }
    defer asyncProducer.AsyncClose()

    go func(asyncProducer sarama.AsyncProducer) {
        for  {
            select {
            case err := <- asyncProducer.Errors():
                if err != nil {
                    glog.Errorln(err)
                }
            case <- asyncProducer.Successes():
            }
        }
    }(asyncProducer)

    v := "async: " + strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000))
    fmt.Fprintln(os.Stdout, v)
    message := &sarama.ProducerMessage{
        Topic: "flink_topic",
        Value: sarama.ByteEncoder(v),
    }
    asyncProducer.Input() <- message
    time.Sleep(2*time.Second)
}

I hope that I can succeed, but the reality is always cruel.The error has been posted, so I want to know why?

# github.com/Shopify/sarama
..\..\..\..\src\github.com\Shopify\sarama\sync_producer.go:35:6: invalid receiver type SyncProducer (SyncProducer is an interface type)
..\..\..\..\src\github.com\Shopify\sarama\sync_producer.go:39:6: invalid receiver type SyncProducer (SyncProducer is an interface type)
..\..\..\..\src\github.com\Shopify\sarama\sync_producer.go:43:6: invalid receiver type SyncProducer (SyncProducer is an interface type)
..\..\..\..\src\github.com\Shopify\sarama\sync_producer.go:47:6: invalid receiver type SyncProducer (SyncProducer is an interface type)