tail -f几个日志,然后使用sarama将每行换发到kafka

I'd like to tail -f several log files in /var/log (I thing one goroutine per log would be fine) and every goroutine will keep on "watching" forever on log and send every new line with kafka client sarama. Here is my code (does not work) :

package main

import (
    "flag"
    "github.com/Shopify/sarama"
    "log"
    "os"
    "fmt"
    "strings"
    "github.com/hpcloud/tail"
    "github.com/spf13/viper"
    //"io/ioutil"
    "reflect"
)



func produce(producer sarama.SyncProducer, cfg *sarama.Config, brokers *string, topic string, logger *log.Logger, log string, t *tail.Tail){
    logger.Printf("Entering produce")
    logger.Println(strings.Split(*brokers, ","))
    logger.Println(reflect.TypeOf(strings.Split(*brokers, ",")))
    logger.Println(log)
    /*t, err := tail.TailFile(log, tail.Config{Follow: true, ReOpen: true})
    if err != nil {
        fmt.Println(fmt.Errorf("Error with tail: %v
", err.Error()))
    }*/
    for line := range t.Lines {
        //logger.Println(line)
        //logger.Println(line.Text)
        msg := &sarama.ProducerMessage{Topic: topic, Value: sarama.StringEncoder(line.Text)}
        _, _, err := producer.SendMessage(msg)
        if err != nil {
            logger.Printf("FAILED to send message: %s
", err)
        }
    }

}


func main() {
    //Getting config file params
    viper.SetConfigName("config") 
    viper.AddConfigPath("/root/work/src/linux2kafka/")
    err := viper.ReadInConfig()
    if err != nil {
        panic(err)
    }
    viper.WatchConfig()
    logList := viper.Get("log_list")
    //logListString, err := ioutil.ReadFile(logList.(string))
    //fmt.Println(logList)
    //fmt.Println(reflect.TypeOf(logList))
    logsConfig := strings.Split(logList.(string),",")
    // print logs to watch
    /*for i := range logsConfig {
        fmt.Println(logsConfig[i])
    }*/
    brokerList := viper.Get("brokerList")
    brokers := flag.String("brokers", brokerList.(string), "Comma separated kafka brokers list") //must be set in config.toml
    topic := flag.String("topic", "test0", "Kafka topic to send messages to")
    flag.Parse()
    logger := log.New(os.Stdout, "producer ", log.Lmicroseconds)
    cfg := sarama.NewConfig()
    //Wait for replication
    cfg.Producer.RequiredAcks = -1
    cfg.Producer.Flush.Frequency = 333
    cfg.Producer.Flush.Messages = 1000
    cfg.Producer.Flush.MaxMessages = 3000
    producer, err := sarama.NewSyncProducer(strings.Split(*brokers, ","), cfg)
    if err != nil {
        logger.Fatalln(err)
    }

    defer func() {
        if err := producer.Close(); err != nil {
            logger.Fatalln(err)
        }
    }()
    for i := range logsConfig {
        fmt.Println("go")
        t, err := tail.TailFile(logsConfig[i], tail.Config{Follow: true, ReOpen: true})
        if err != nil {
            fmt.Println(fmt.Errorf("Error with tail: %v
", err.Error()))
        }
        go produce(producer, cfg, brokers, *topic, logger, logsConfig[i], t)
        }
}

And here are my errors (nothing is received by the consumer):

root@home:~/work/src/linux2kafka# go run main.go
go
producer 15:54:44.297745 Entering produce
root@home:~/work/src/linux2kafka# go run main.go
go
root@home:~/work/src/linux2kafka# go run main.go
go
root@home:~/work/src/linux2kafka# go run main.go
go
producer 15:55:01.951155 Entering produce
producer 15:55:01.951193 [localhost:9092 localhost:9092]
producer 15:55:01.951205 []string
producer 15:55:01.951214 /root/work/src/linux2kafka/test/log
panic: send on closed channel

goroutine 56 [running]:
panic(0x756440, 0xc820164290)
    /usr/local/go/src/runtime/panic.go:464 +0x3e6
github.com/Shopify/sarama.(*syncProducer).SendMessage(0xc8201742a0, 0xc820176300, 0x0, 0x0, 0x0, 0x0)
    /root/work/src/github.com/Shopify/sarama/sync_producer.go:66 +0x156
main.produce(0x7f65528661b8, 0xc8201742a0, 0xc82008ea20, 0xc82000b230, 0x8855c0, 0x5, 0xc8200789b0, 0xc820011320, 0x23, 0xc82017e000)
    /root/work/src/linux2kafka/main.go:31 +0x5d2
created by main.main
    /root/work/src/linux2kafka/main.go:85 +0x9f7
exit status 2

I cannot understand what I'm doing wrong. thx

Here is the modified code, is that ok ? :

package main

import (
    "flag"
    "github.com/Shopify/sarama"
    "log"
    "os"
    "fmt"
    "strings"
    "github.com/hpcloud/tail"
    "github.com/spf13/viper"
    //"io/ioutil"
    //"reflect"
)



//func produce(producer sarama.SyncProducer, cfg *sarama.Config, brokers *string, topic string, logger *log.Logger, log string, t *tail.Tail){
func produce(cfg *sarama.Config, brokers *string, topic string, logger *log.Logger, log string, t *tail.Tail){
    logger.Println("Entering produce")
    /*logger.Println(strings.Split(*brokers, ","))
    logger.Println(reflect.TypeOf(strings.Split(*brokers, ",")))
    logger.Println(log)*/
    logger.Printf("sarama.NewSyncProducer")
    producer, err := sarama.NewSyncProducer(strings.Split(*brokers, ","), cfg)
    if err != nil {
        logger.Fatalln(err)
    }
    defer func() {
        if err := producer.Close(); err != nil {
            logger.Fatalln(err)
        }
    }()

    /*t, err := tail.TailFile(log, tail.Config{Follow: true, ReOpen: true})
    if err != nil {
        fmt.Println(fmt.Errorf("Error with tail: %v
", err.Error()))
    }*/
    for line := range t.Lines {
        //logger.Println(line)
        //logger.Println(line.Text)
        logger.Printf("ProduceMessage")
        msg := &sarama.ProducerMessage{Topic: topic, Value: sarama.StringEncoder(line.Text)}
        _, _, err := producer.SendMessage(msg)
        if err != nil {
            logger.Printf("FAILED to send message: %s
", err)
        }
    }

}


func main() {
    //Getting config file params
    viper.SetConfigName("config") 
    viper.AddConfigPath("/root/work/src/linux2kafka/")
    err := viper.ReadInConfig()
    if err != nil {
        panic(err)
    }
    viper.WatchConfig()
    logList := viper.Get("log_list")
    //logListString, err := ioutil.ReadFile(logList.(string))
    //fmt.Println(logList)
    //fmt.Println(reflect.TypeOf(logList))
    logsConfig := strings.Split(logList.(string),",")
    // print logs to watch
    /*for i := range logsConfig {
        fmt.Println(logsConfig[i])
    }*/
    brokerList := viper.Get("brokerList")
    brokers := flag.String("brokers", brokerList.(string), "Comma separated kafka brokers list") //must be set in config.toml
    topic := flag.String("topic", "test0", "Kafka topic to send messages to")
    flag.Parse()
    logger := log.New(os.Stdout, "producer ", log.Lmicroseconds)
    cfg := sarama.NewConfig()
    //Wait for replication
    cfg.Producer.RequiredAcks = -1
    cfg.Producer.Flush.Frequency = 333
    cfg.Producer.Flush.Messages = 1000
    cfg.Producer.Flush.MaxMessages = 3000
    for i := range logsConfig {
        fmt.Println("go")
        t, err := tail.TailFile(logsConfig[i], tail.Config{Follow: true, ReOpen: true})
        if err != nil {
            fmt.Println(fmt.Errorf("Error with tail: %v
", err.Error()))
        }
        go produce(cfg, brokers, *topic, logger, logsConfig[i], t)
        }
}

But still not working... it's not printing the first Println

root@home:~/work/src/linux2kafka# go run main.go
go
root@home:~/work/src/linux2kafka# go run main.go
go
root@home:~/work/src/linux2kafka# go run main.go
go