I am trying to build a system where a master service creates a bunch of messages and routes them to the correct queue based on the routing key. I have that working and the messages are being consumed on the worker/slave side. However I want the worker to only take one message from the queue and not all 15 that get put into it.
I was looking at this https://github.com/streadway/amqp/blob/master/channel.go#L616 however I am not sure how to set it up so only one event is taken.
Any ideas?
Disclaimer: I haven't worked with the Go AMQP client, but the AMQP protocol semantics should be universal across client implementations, so I'll take a shot.
You've already correctly identified the prefetch-size
and prefetch-count
parameters as the configuration variables to adjust. The RabbitMQ documentation says the following in this regard:1
AMQP specifies the
basic.qos
method to allow you to limit the number of unacknowledged messages on a channel (or connection) when consuming (aka "prefetch count").
If you want each consumer to be sent only one message at a time, you should set prefetch-count
to 1, and leave prefetch-size
undefined (i.e. 0):
err := channel.Qos(1, 0, false)
if err != nil {
// ...
}
1 There's also a longer description in RabbitMQ's AMQP reference on this one:
long prefetch-size
The client can request that messages be sent in advance so that when the client finishes processing a message, the following message is already held locally, rather than needing to be sent down the channel. [...] The server will send a message in advance if it is equal to or smaller in size than the available prefetch size (and also falls into other prefetch limits). May be set to zero, meaning "no specific limit", although other prefetch limits may still apply. The prefetch-size is ignored if the no-ack option is set.
[...]
short prefetch-count
Specifies a prefetch window in terms of whole messages. This field may be used in combination with the prefetch-size field; a message will only be sent in advance if both prefetch windows (and those at the channel and connection level) allow it. [...]