I'm new to go and starting to learn about channels. I'm using the confluent kafka consumer to create a functional consumer. What I want to accomplish is to send the messages into a buffered channel (2,000)...and then write the messages in the channel to redis using pipeline. I've gotten to consumer part to work by just doing a println
of the message one by one until it reaches the end of the offsets, but when I try to add a channel, it seems to hit the default:
case in the switch
and then just freeze.
it also doesn't look like I'm filling the channel correctly? This fmt.Println("count is: ", len(redisChnl))
always prints 0
here is what I have so far:
// Example function-based high-level Apache Kafka consumer
package main
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"os"
"os/signal"
"syscall"
"time"
"encoding/json"
"regexp"
"github.com/go-redis/redis"
"encoding/binary"
)
var client *redis.Client
func init() {
client = redis.NewClient(&redis.Options{
Addr: ":6379",
DialTimeout: 10 * time.Second,
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
PoolSize: 10,
PoolTimeout: 30 * time.Second,
})
client.FlushDB()
}
type MessageFormat struct {
MetricValueNumber float64 `json:"metric_value_number"`
Path string `json:"path"`
Cluster string `json:"cluster"`
Timestamp time.Time `json:"@timestamp"`
Version string `json:"@version"`
Host string `json:"host"`
MetricPath string `json:"metric_path"`
Type string `json:"string"`
Region string `json:"region"`
}
//func redis_pipeline(ky string, vl string) {
// pipe := client.Pipeline()
//
// exec := pipe.Set(ky, vl, time.Hour)
//
// incr := pipe.Incr("pipeline_counter")
// pipe.Expire("pipeline_counter", time.Hour)
//
// // Execute
// //
// // INCR pipeline_counter
// // EXPIRE pipeline_counts 3600
// //
// // using one client-server roundtrip.
// _, err := pipe.Exec()
// fmt.Println(incr.Val(), err)
// // Output: 1 <nil>
//}
func main() {
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "kafka.com:9093",
"group.id": "testehb",
"security.protocol": "ssl",
"ssl.key.location": "/Users/key.key",
"ssl.certificate.location": "/Users/cert.cert",
"ssl.ca.location": "/Users/ca.pem",
})
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create consumer: %s
", err)
os.Exit(1)
}
fmt.Printf("Created Consumer %v
", c)
err = c.SubscribeTopics([]string{"jmx"}, nil)
redisMap := make(map[string]string)
redisChnl := make(chan []byte, 2000)
run := true
for run == true {
select {
case sig := <-sigchan:
fmt.Printf("Caught signal %v: terminating
", sig)
run = false
default:
ev := c.Poll(100)
if ev == nil {
continue
}
switch e := ev.(type) {
case *kafka.Message:
//fmt.Printf("%% Message on %s:
%s
",
// e.TopicPartition, string(e.Value))
if e.Headers != nil {
fmt.Printf("%% Headers: %v
", e.Headers)
}
str := e.Value
res := MessageFormat{}
json.Unmarshal([]byte(str), &res)
fmt.Println("size", binary.Size([]byte(str)))
host:= regexp.MustCompile(`^([^.]+)`).FindString(res.MetricPath)
redisMap[host] = string(str)
fmt.Println("count is: ", len(redisChnl)) //this always prints "count is: 0"
redisChnl <- e.Value //I think this is the write way to put the messages in the channel?
case kafka.PartitionEOF:
fmt.Printf("%% Reached %v
", e)
case kafka.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v
", e)
run = false
default:
fmt.Printf("Ignored %v
", e)
}
<- redisChnl // I thought I could just empty the channel like this once the buffer is full?
}
}
fmt.Printf("Closing consumer
")
c.Close()
}
-------EDIT-------
Ok, I think I got it to work by moving the <- redisChnl
inside default
, but now I see that the count before read
and count after read
inside the default
always prints 2,000
...I would have thought that the first count before read = 2,000
and then count after read = 0
since the channel would be empty then??
select {
case sig := <-sigchan:
fmt.Printf("Caught signal %v: terminating
", sig)
run = false
default:
ev := c.Poll(100)
if ev == nil {
continue
}
switch e := ev.(type) {
case *kafka.Message:
//fmt.Printf("%% Message on %s:
%s
",
// e.TopicPartition, string(e.Value))
if e.Headers != nil {
fmt.Printf("%% Headers: %v
", e.Headers)
}
str := e.Value
res := MessageFormat{}
json.Unmarshal([]byte(str), &res)
//fmt.Println("size", binary.Size([]byte(str)))
host:= regexp.MustCompile(`^([^.]+)`).FindString(res.MetricPath)
redisMap[host] = string(str)
go func() {
redisChnl <- e.Value
}()
case kafka.PartitionEOF:
fmt.Printf("%% Reached %v
", e)
case kafka.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v
", e)
run = false
default:
fmt.Println("count before read: ", len(redisChnl))
fmt.Printf("Ignored %v
", e)
<-redisChnl
fmt.Println("count after read: ", len(redisChnl)) //would've expected this to be 0
}
}
I think the bigger way to simplify this code is to separate the pipeline into multiple goroutines.
The advantage of channels is the multiple people can be writing and reading on them at the same time. In this example, this might mean having a single go routine enqueueing onto the channel and another pulling off of the channel and putting things into redis.
Something like this:
c := make(chan Message, bufferLen)
go pollKafka(c)
go pushToRedis(c)
If you want to add batching, you could add a middle stage that polls from the kafka channel, and appends to a slice until the slice is full and then enqueues that slice onto the channel for redis.
If concurrency like this isn't a goal, it might be just be easier to replace the channel in your code with a slice. If there is only ever 1 goroutine acting on an object, it's not a good idea to try and use a channel.