与kafka消费者一起去渠道

I'm new to go and starting to learn about channels. I'm using the confluent kafka consumer to create a functional consumer. What I want to accomplish is to send the messages into a buffered channel (2,000)...and then write the messages in the channel to redis using pipeline. I've gotten to consumer part to work by just doing a println of the message one by one until it reaches the end of the offsets, but when I try to add a channel, it seems to hit the default: case in the switch and then just freeze.

it also doesn't look like I'm filling the channel correctly? This fmt.Println("count is: ", len(redisChnl)) always prints 0

here is what I have so far:

// Example function-based high-level Apache Kafka consumer
package main

import (
    "fmt"
    "github.com/confluentinc/confluent-kafka-go/kafka"
    "os"
    "os/signal"
    "syscall"
    "time"
    "encoding/json"
    "regexp"
    "github.com/go-redis/redis"
    "encoding/binary"
)

var client *redis.Client

func init() {
    client = redis.NewClient(&redis.Options{
        Addr:         ":6379",
        DialTimeout:  10 * time.Second,
        ReadTimeout:  30 * time.Second,
        WriteTimeout: 30 * time.Second,
        PoolSize:     10,
        PoolTimeout:  30 * time.Second,
    })
    client.FlushDB()
}

type MessageFormat struct {
    MetricValueNumber float64     `json:"metric_value_number"`
    Path              string      `json:"path"`
    Cluster           string      `json:"cluster"`
    Timestamp         time.Time   `json:"@timestamp"`
    Version           string      `json:"@version"`
    Host              string      `json:"host"`
    MetricPath        string      `json:"metric_path"`
    Type              string      `json:"string"`
    Region            string      `json:"region"`
}

//func redis_pipeline(ky string, vl string) {
//  pipe := client.Pipeline()
//
//  exec := pipe.Set(ky, vl, time.Hour)
//
//  incr := pipe.Incr("pipeline_counter")
//  pipe.Expire("pipeline_counter", time.Hour)
//
//  // Execute
//  //
//  //     INCR pipeline_counter
//  //     EXPIRE pipeline_counts 3600
//  //
//  // using one client-server roundtrip.
//  _, err := pipe.Exec()
//  fmt.Println(incr.Val(), err)
//  // Output: 1 <nil>
//}

func main() {


    sigchan := make(chan os.Signal, 1)
    signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)

    c, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers":               "kafka.com:9093",
        "group.id":                        "testehb",
        "security.protocol":               "ssl",
        "ssl.key.location":                "/Users/key.key",
        "ssl.certificate.location":        "/Users/cert.cert",
        "ssl.ca.location":                 "/Users/ca.pem",
    })

    if err != nil {
        fmt.Fprintf(os.Stderr, "Failed to create consumer: %s
", err)
        os.Exit(1)
    }

    fmt.Printf("Created Consumer %v
", c)

    err = c.SubscribeTopics([]string{"jmx"}, nil)

    redisMap := make(map[string]string)

    redisChnl := make(chan []byte, 2000)

    run := true

    for run == true {
        select {
        case sig := <-sigchan:
            fmt.Printf("Caught signal %v: terminating
", sig)
            run = false
        default:
            ev := c.Poll(100)
            if ev == nil {
                continue
            }

            switch e := ev.(type) {
            case *kafka.Message:

                //fmt.Printf("%% Message on %s:
%s
",
                //  e.TopicPartition, string(e.Value))
                if e.Headers != nil {
                    fmt.Printf("%% Headers: %v
", e.Headers)
                }

                str := e.Value
                res := MessageFormat{}
                json.Unmarshal([]byte(str), &res)


                fmt.Println("size", binary.Size([]byte(str)))

                host:= regexp.MustCompile(`^([^.]+)`).FindString(res.MetricPath)

                redisMap[host] = string(str)
                fmt.Println("count is: ", len(redisChnl)) //this always prints "count is:  0"

                redisChnl <- e.Value //I think this is the write way to put the messages in the channel?

            case kafka.PartitionEOF:
                fmt.Printf("%% Reached %v
", e)
            case kafka.Error:
                fmt.Fprintf(os.Stderr, "%% Error: %v
", e)
                run = false
            default:
                fmt.Printf("Ignored %v
", e)
            }

            <- redisChnl // I thought I could just empty the channel like this once the buffer is full?


        }
    }

    fmt.Printf("Closing consumer
")
    c.Close()
}

-------EDIT-------

Ok, I think I got it to work by moving the <- redisChnl inside default, but now I see that the count before read and count after read inside the default always prints 2,000...I would have thought that the first count before read = 2,000 and then count after read = 0 since the channel would be empty then??

    select {
    case sig := <-sigchan:
        fmt.Printf("Caught signal %v: terminating
", sig)
        run = false
    default:
        ev := c.Poll(100)
        if ev == nil {
            continue
        }

        switch e := ev.(type) {
        case *kafka.Message:

            //fmt.Printf("%% Message on %s:
%s
",
            //  e.TopicPartition, string(e.Value))
            if e.Headers != nil {
                fmt.Printf("%% Headers: %v
", e.Headers)
            }

            str := e.Value
            res := MessageFormat{}
            json.Unmarshal([]byte(str), &res)


            //fmt.Println("size", binary.Size([]byte(str)))

            host:= regexp.MustCompile(`^([^.]+)`).FindString(res.MetricPath)

            redisMap[host] = string(str)

            go func() {
                redisChnl <- e.Value
            }()


        case kafka.PartitionEOF:
            fmt.Printf("%% Reached %v
", e)
        case kafka.Error:
            fmt.Fprintf(os.Stderr, "%% Error: %v
", e)
            run = false
        default:
            fmt.Println("count before read: ", len(redisChnl))

            fmt.Printf("Ignored %v
", e)

            <-redisChnl

            fmt.Println("count after read: ", len(redisChnl)) //would've expected this to be 0

        }


    }

I think the bigger way to simplify this code is to separate the pipeline into multiple goroutines.

The advantage of channels is the multiple people can be writing and reading on them at the same time. In this example, this might mean having a single go routine enqueueing onto the channel and another pulling off of the channel and putting things into redis.

Something like this:

c := make(chan Message, bufferLen)
go pollKafka(c)
go pushToRedis(c)

If you want to add batching, you could add a middle stage that polls from the kafka channel, and appends to a slice until the slice is full and then enqueues that slice onto the channel for redis.

If concurrency like this isn't a goal, it might be just be easier to replace the channel in your code with a slice. If there is only ever 1 goroutine acting on an object, it's not a good idea to try and use a channel.