I'm using sarama (https://github.com/Shopify/sarama/) with Kafka 0.8.0 for my consumers. This is what my code looks like:
consumerLoop:
for {
select {
case event := <-consumer.Events():
if event.Err != nil {
break consumerLoop
panic(event.Err)
}
<-c.sem
go c.processJob(event.Value)
}
}
I'm using buffered channels(c.sem) to control how many processJob goroutines can run at a time. It's how I control the concurrency/speed of my consumers.
The problem I have with this approach is that if I need to change the concurrency, I have to shut the consumer down and restart it (channel buffer size is a command line flag). I log offsets that are processed and I have to look in my logs to figure out which offset(s) were processed and where I want the consumer to resume from. I want a more handsfree approach to manage these offsets.
I've turned autocommit.enabled to true in the consumer.properties but I don't see anything change in zookeeper. I think that is because the current Kafka protocol doesn't support the offset API: https://issues.apache.org/jira/browse/KAFKA-993
I can try and manually store the offset in zookeeper after I'm done processing a job but I don't know how that's going to work with multiple asynchronous processJob running. This is where Kafka is supposed to store the offsets: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper
Is this supposed to hold a single value? If that's true then that would mean I can't use asynchronous processJob as there might be latency between different processes and they'll overwrite each others value. Is the consumer supposed to run in a single thread and process a single event at a time? Is the right approach to start more consumers to speed things up, instead of going the goroutines route?
I suspect the simplest answer is to not use a channel for your semaphore. Use an integer protected by a lock instead, and then you can adjust the maximum available goroutines without restarting.
If you really want to keep using a channel for this, you could use a ResizableChannel
from my channel package: https://godoc.org/github.com/eapache/channels#ResizableChannel