I am currently testing rabbitmq with golang (github.com/streadway/amqp), and I have three programs, two of them sending messages to the queue with different priorities and one reading from the queue.
And the problem I am having is that after sending a few messages with the two programs I then proceed to launch the program that will read from the queue and when it starts to read from the queue it output the result like a FIFO.
I expected to output the high priority first followed by the lower priority message.
Did I misunderstand how rabbitmq works or am I doing something wrong?
Sending to queues package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
args := make(amqp.Table)
args["x-max-priority"] = int64(9)
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"test", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
args, // arguments
)
failOnError(err, "Failed to declare a queue")
body := "low"
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: amqp.Table{},
ContentType: "text/plain",
ContentEncoding: "",
Body: []byte(body),
DeliveryMode: amqp.Transient, // 1=non-persistent, 2=persistent
Priority: 0,
})
log.Printf(" [x] Sent %s", body)
failOnError(err, "Failed to publish a message")
body = "low"
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: amqp.Table{},
ContentType: "text/plain",
ContentEncoding: "",
Body: []byte(body),
DeliveryMode: amqp.Transient, // 1=non-persistent, 2=persistent
Priority: 0,
})
log.Printf(" [x] Sent %s", body)
failOnError(err, "Failed to publish a message")
body = "low"
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: amqp.Table{},
ContentType: "text/plain",
ContentEncoding: "",
Body: []byte(body),
DeliveryMode: amqp.Transient, // 1=non-persistent, 2=persistent
Priority: 0,
})
log.Printf(" [x] Sent %s", body)
failOnError(err, "Failed to publish a message")
body = "high"
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: amqp.Table{},
ContentType: "text/plain",
ContentEncoding: "",
Body: []byte(body),
DeliveryMode: amqp.Transient, // 1=non-persistent, 2=persistent
Priority: 9,
})
log.Printf(" [x] Sent %s", body)
failOnError(err, "Failed to publish a message")
body = "low"
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: amqp.Table{},
ContentType: "text/plain",
ContentEncoding: "",
Body: []byte(body),
DeliveryMode: amqp.Transient, // 1=non-persistent, 2=persistent
Priority: 0,
})
log.Printf(" [x] Sent %s", body)
failOnError(err, "Failed to publish a message")
body = "low"
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: amqp.Table{},
ContentType: "text/plain",
ContentEncoding: "",
Body: []byte(body),
DeliveryMode: amqp.Transient, // 1=non-persistent, 2=persistent
Priority: 0,
})
log.Printf(" [x] Sent %s", body)
failOnError(err, "Failed to publish a message")
body = "low"
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: amqp.Table{},
ContentType: "text/plain",
ContentEncoding: "",
Body: []byte(body),
DeliveryMode: amqp.Transient, // 1=non-persistent, 2=persistent
Priority: 0,
})
log.Printf(" [x] Sent %s", body)
failOnError(err, "Failed to publish a message")
}
Reading queue:
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
args := make(amqp.Table)
args["x-max-priority"] = int64(9)
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
q, err := ch.QueueDeclare(
"test", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
args, // arguments
)
failOnError(err, "Failed to declare a queue")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
args, // args
)
failOnError(err, "Failed to register a consumer")
defer ch.Close()
defer conn.Close()
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
First I launch the code sending the messages to the queue the following values:
With the low
messages having a priority of 0 and the high
a priority of 9.
Then I launch the program that will receive the queue.
Expected output:
Actual output:
Thank you
I updated to the most recent rabbitmq and it worked, I do not know if this is the only solution.
As well as callign ch.Qos(1, 0, false)
I also needed to remove the auto-ack
and acknowledge the message manually afterwards so that it doesn't acknowledge all the messages in the queue.
This is what I can figure out from your code, and the description of the steps you follow.
It seems the issue is you do not create the queue the same way in each piece of code, the low and high priority producers create the queue withouth x-max-priority argument.
As a queue is defined as a priority queue upon creation, and only at that time, you actually created a "standard" queue.
You should be able to easily confirm this checking the created queue and its arguments.