I am looking at processing thousand of messages coming from multiple RabbitMQ queues (between 5 and 10) and push the processed messages by batch into ELK.
What would be the best generic way to process n
queues with the streadway/amqp library?
What exactly should be included in each goroutine, in term of amqp.Connection
, amqp.Channel
and amqp.Consume
r.
I mainly see 3 designs:
A) 1 Connection - 1 Channel - n Consumers
B) 1 Connection - n Channels - 1 Consumer
C) n Connection - 1 Channel - 1 Consummer
A) does not work for me:
Failed to register a consumer: Exception (504) Reason: "channel/connection is not open"
Each of those goroutine will then buffer x
messages and make a BatchRequest to ELK independently from the others.
For now, starting 1 connection per queue (C) seems to work even if I have to deal with high memory consumption from the server. Is it really the most effective design or should I keep 1 connection per worker handling all 5 to 10 channels?
Here is (C) with one connection per queue.
func main() {
queues := []string{"q1", "q2", "q3", "q4"}
forever := make(chan bool)
for _, queue := range queues {
go processQueue(queue)
}
<-forever
}
func processQueue(name string) {
conn, _ := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()
ch, _ := conn.Channel()
defer ch.Close()
msgs, _ := ch.Consume(name, "test-dev", false, false, false, false, nil)
go func() {
for d := range msgs {
log.Printf("[%s] %s", name, d.RoutingKey)
d.Ack(true)
}
}()
}