I'm using the golang RabbitMQ library in a project, and I have a Connect
function in a separate package. I'm calling Connect, in my main
function, however because I connect to RabbitMQ in a separate function, the defer conn.Close()
function is called, which closes the connection within the Connect function. Which makes perfect sense, but that begs the question, where then, do I call conn.Close()
?
package drivers
import (
// Core
"log"
"os"
"time"
// Third party
"github.com/streadway/amqp"
)
type Queue struct {
Channel *amqp.Channel
}
func NewQueue() *Queue {
return &Queue{}
}
// Queue interface
type IQueue interface {
Connect(args ...interface{})
Publish(queue string, payload []byte) error
Listen(queue string) (<-chan amqp.Delivery, error)
Declare(queue string) (amqp.Queue, error)
}
// Connect - Connects to RabbitMQ
func (queue *Queue) Connect(args ...interface{}) {
var uri string
if args == nil {
// Get from env vars
uri = os.Getenv("RABBIT_MQ_URI")
if uri == "" {
log.Panic("No uri for queue given")
}
} else {
uri = args[0].(string)
}
// Make max 5 connection attempts, with a 1 second timeout
for i := 0; i < 5; i++ {
log.Println("Connecting to:", uri)
// If connection is successful, return new instance
conn, err := amqp.Dial(uri)
defer conn.Close()
if err == nil {
log.Println("Successfully connected to queue!")
channel, _ := conn.Channel()
queue.Channel = channel
return
}
log.Println("Failed to connect to queue, retrying...", err)
// Wait 1 second
time.Sleep(5 * time.Second)
}
}
// Declare a new queue
func (queue *Queue) Declare(queueName string) (amqp.Queue, error) {
return queue.Channel.QueueDeclare(
queueName,
true,
false,
false,
false,
nil,
)
}
// Publish a message
func (queue *Queue) Publish(queueName string, payload []byte) error {
return queue.Channel.Publish(
"",
queueName,
false,
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "application/json",
Body: payload,
},
)
}
// Listen for a new message
func (queue *Queue) Listen(queueName string) (<-chan amqp.Delivery, error) {
return queue.Channel.Consume(
queueName,
"",
true,
false,
false,
false,
nil,
)
}
As you can see in the code above, I'm calling defer conn.Close()
after making a connection, however, this immediately closes the connection again.
Here's a Go Playground spoofing what I'm talking about... https://play.golang.org/p/5cz2D4gDgn
The simple solution is to call conn.Close()
from elsewhere. This might just be me, but I think it's kinda odd that you wouldn't expose the connection elsewhere, i.e. as a field in Queue
. Exposing the ability to close the connection from the Queue would solve this and give you more flexibility.
So this:
type Queue struct {
// your original fields
Conn amqp.Connection
}
// Somewhere else
queue.Conn.Close()
You're other option is connecting, then doing all the actions you want with that connection, then closing. I'm thinking something like:
func action(conn amqp.Connection, args ...interface{}) (<-chan bool) {
done := make(chan bool)
go func(amqpConn amqp.Connection, dChan chan bool){
// Do what you want with the connection
dChan <- true
}(conn, done)
return done
}
func (queue *Queue) Connect(args ...interface{}) {
// your connection code
doneChans := make([](chan bool), 5)
for i := 0; i < 5; i++ {
conn, err := amqp.Dial(uri)
defer conn.Close()
if err != nil {
// handle error
}
done := action(conn)
}
// This for loop will block until the 5 action calls are done
for j := range doneChans {
isFinish := <-doneChans[j]
if !isFinish {
// handle bad state
}
}
}
One option is to have Connect
return conn
, and call defer conn.Close()
in the caller.
package driver
// imports, etc
func (queue *Queue) Connect(args ...interface{}) amqp.Connection, error {
// ...
conn, err := amqp.Dial(uri)
if err != nil {
return nil, err
}
// ...
return conn, nil
}
Then in another package:
package stuff
// imports, etc
func doStuff() {
queue = driver.NewQueue()
conn, err := queue.Connect(args...)
if err != nil {
log.Fatalf("oh no! %v!", err)
}
defer conn.Close()
// Do stuff
}