暂时暂停RabbitMQ消费者

我在用Go编写RabbitMQ consumer时发现,它必须暂停消息消费一段时间,然后才能恢复使用队列中的消息。在阅读文档https://godoc.org/github.com/streadway/amqp时,我无法确定我需要在代码中实现的机制。

有可能这样做吗?举个例子?

我的代码片段:

rabbitMQMessages, err = ch.Consume(
        "TestQ",
        "testConsumer",
        false,
        true,
        false,
        false,
        nil,
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {
        select {
        case d := <-rabbitMQMessages: // Cheking if messge was recieved
            log.Printf("Received a message: %s", d.Body)
            dotcount := bytes.Count(d.Body, []byte("."))

            err = ch.Flow(false) // Returns error: Exception (540) Reason: "NOT_IMPLEMENTED - active=false
            failOnError(err, "Failed to close channel")

            t := time.Duration(dotcount)
            time.Sleep(t * time.Second)
            log.Printf("Done")

            err = ch.Flow(true)

            d.Ack(false)
        default:
            log.Println("Default section")
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever

You should cancel the consumer, then re-run ch.Consume at the time you wish to resume consuming messages.


NOTE: the RabbitMQ team monitors the rabbitmq-users mailing list and only sometimes answers questions on StackOverflow.

I was able to figure it out. I need to Close connection and then reopen it. This prevents messages from got read in upfront. Not sure if it the right way but it worked for me. Adding a snippet of my test code.

func main() {
    var rabbitMQMessages <-chan amqp.Delivery
    var err error
    var rabbitMQ RabbitMQ

    err = rabbitMQ.dial()
    failOnError(err, "Failed to connect to RabbitMQ")
    defer rabbitMQ.Close()

    err = rabbitMQ.setUpChannel()
    failOnError(err, "Failed to open a channel")

    err = rabbitMQ.Consumme()
    failOnError(err, "Failed to consume")

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")

    rabbitMQMessages = rabbitMQ.GetChan()

    for {
        select {
        case d, ok := <-rabbitMQMessages: // Cheking if messge was recieved
            log.Printf("Chan status at start of function %t", ok)

            if !ok {
                err = rabbitMQ.setUpChannel()
                failOnError(err, "Unable to open channel")
                defer rabbitMQ.Close()

                err = rabbitMQ.Consumme()
                failOnError(err, "Recover. Failed to register a consumer")

                rabbitMQMessages = rabbitMQ.GetChan()

                continue
            }

            log.Printf("Chan status at later of function %t", ok)

            log.Printf("Received a message: %s", d.Body)
            dotcount := bytes.Count(d.Body, []byte("."))
            d.Ack(false)

            err = rabbitMQ.CloseChannel()
            failOnError(err, "Failed to close channel")
            t := time.Duration(dotcount)
            time.Sleep(t * time.Second)
            log.Printf("Done")
        }
    }

}