断开连接后重新连接到RabbitMQ

I am using the recommending streadyway/amqp library to interact with RabbitMQ inside Go.

I want my service to gracefully fail when it cannot connect to RabbitMQ. This means noticing that it couldn't connect, waiting n seconds, and trying to reconnect (loop forever).

I'm pretty sure I need use the Channel.NotifyClose method, but I can't work it out:

func (ch *Channel) NotifyClose(c chan *Error) chan *Error

NotifyClose registers a listener for when the server sends a channel or connection exception in the form of a Connection.Close or Channel.Close method. Connection exceptions will be broadcast to all open channels and all channels will be closed, where channel exceptions will only be broadcast to listeners to this channel.

The chan provided will be closed when the Channel is closed and on a graceful close, no error will be sent.

This is what I attempted:

graceful := make(chan *amqp.Error)
errs := channel.NotifyClose(graceful)
for {
    case <-graceful:
        fmt.Println("Graceful close!")
        reconnect()
    case <-errs:
        fmt.Println("Not graceful close")
        reconnect()
}

Sometimes, this works! Othertimes, after reconnecting, it will repeatedly print out:

2018/11/04 15:29:26 Other close
2018/11/04 15:29:26 Connected!
2018/11/04 15:29:26 Graceful close!
2018/11/04 15:29:26 Connected!
2018/11/04 15:29:26 Other close
2018/11/04 15:29:26 Connected!
2018/11/04 15:29:26 Graceful close!
...

Very rapidly.

I want to be able to run the service in one terminal, and rabbit in the other. I should be able to stop & restart rabbit whenever I like, with the service reconnecting consistently.

I'm a bit confused about the NotifyClose method - is the c chan just closed when the connection is closed? Why does it return another channel?

Cheers.


My entire code. This doesn't have a push or pop function, because this is a minimal example to demonstrate re-connecting when a connection fails. Push and pop implementations would depend upon how reconnection is implemented.

Any code review comments also welcome.

package main

import (
    "github.com/streadway/amqp"
    "io"
    "log"
    "sync"
    "time"
)

// RabbitMQ ...
type RabbitMQ struct {
    Logger      *log.Logger
    IsConnected bool
    addr        string
    name        string
    connection  *amqp.Connection
    channel     *amqp.Channel
    queue       *amqp.Queue
    wg          *sync.WaitGroup
    done        chan bool
}

const retryDelay = 5 * time.Second

// NewQueue creates a new queue instance.
func NewQueue(logOut io.Writer, name string, addr string) *RabbitMQ {
    rabbit := RabbitMQ{
        IsConnected: false,
        addr:        addr,
        name:        name,
        wg:          new(sync.WaitGroup),
        done:        make(chan bool),
        Logger:      log.New(logOut, "", log.LstdFlags),
    }
    rabbit.wg.Add(1)
    rabbit.Connect()
    go rabbit.reconnect()
    return &rabbit
}

// reconnect waits to be notified about a connection
// error, and then attempts to reconnect to RabbitMQ.
func (rabbit *RabbitMQ) reconnect() {
    defer rabbit.wg.Done()
    graceful := make(chan *amqp.Error)
    errs := rabbit.channel.NotifyClose(graceful)
    for {
        select {
        case <-rabbit.done:
            return
        case <-graceful:
            graceful = make(chan *amqp.Error)
            rabbit.Logger.Println("Graceful close!")
            rabbit.IsConnected = false
            rabbit.Connect()
            rabbit.IsConnected = true
            errs = rabbit.channel.NotifyClose(graceful)
        case <-errs:
            graceful = make(chan *amqp.Error)
            rabbit.Logger.Println("Normal close")
            rabbit.IsConnected = false
            rabbit.Connect()
            errs = rabbit.channel.NotifyClose(graceful)
        }
    }
}

// Connect will block until a new connection to
// RabbitMQ is formed.
func (rabbit *RabbitMQ) Connect() {
    for {
        conn, err := amqp.Dial(rabbit.addr)
        if err != nil {
            rabbit.Logger.Println("Failed to establish connection")
            time.Sleep(retryDelay)
            continue
        }
        ch, err := conn.Channel()
        if err != nil {
            rabbit.Logger.Println("Failed to create a channel")
            time.Sleep(retryDelay)
            continue
        }
        queue, err := ch.QueueDeclare(
            name,
            false, // Durable
            false, // Delete when unused
            false, // Exclusive
            false, // No-wait
            nil,   // Arguments
        )
        if err != nil {
            rabbit.Logger.Println("Failed to publish a queue")
            time.Sleep(retryDelay)
            continue
        }
        rabbit.Logger.Println("Connected!")
        rabbit.IsConnected = true
        rabbit.connection = conn
        rabbit.channel = ch
        rabbit.queue = &queue
        return
    }
}

// Close the connection to RabbitMQ and stop
// checking for reconnections.
func (rabbit *RabbitMQ) Close() error {
    close(rabbit.done)
    rabbit.wg.Wait()
    return rabbit.connection.Close()
}

And how this is used:

package main

import (
    "fmt"
    "os"
)

const (
    name = "job_queue"
    addr = "amqp://guest:guest@localhost:5672/"
)

func main() {
    fmt.Println("Starting...")
    NewQueue(os.Stdout, name, addr)
    for {}
}