I want to test the restart connection to the rabbitmq
server. On wrote small script to test. http://play.golang.org/p/l3ZWzG0Qqb But it's not working.
In step 10, I close the channel and connection. And open them again. And re-create chan amqp.Confirmation ( :75) . And continue the cycle. But after that, from the chan confirms nothing return.
UPD: code here.
package main
import (
"fmt"
"github.com/streadway/amqp"
"log"
"os"
"time"
)
const SERVER = "amqp://user:pass@localhost:5672/"
const EXCHANGE_NAME = "publisher.test.1"
const EXCHANGE_TYPE = "direct"
const ROUTING_KEY = "publisher.test"
var Connection *amqp.Connection
var Channel *amqp.Channel
func setup(url string) (*amqp.Connection, *amqp.Channel, error) {
conn, err := amqp.Dial(url)
if err != nil {
return nil, nil, err
}
ch, err := conn.Channel()
if err != nil {
return nil, nil, err
}
return conn, ch, nil
}
func main() {
url := SERVER
Connection, Channel, err := setup(url)
if err != nil {
fmt.Println("err publisher setup:", err)
return
}
confirms := Channel.NotifyPublish(make(chan amqp.Confirmation, 1))
if err := Channel.Confirm(false); err != nil {
log.Fatalf("confirm.select destination: %s", err)
}
for i := 1; i <= 3000000; i++ {
log.Println(i)
if err != nil {
fmt.Println("err consume:", err)
return
}
if err := Channel.Publish(EXCHANGE_NAME, ROUTING_KEY, false, false, amqp.Publishing{
Body: []byte(fmt.Sprintf("%d", i)),
}); err != nil {
fmt.Println("err publish:", err)
log.Printf("%+v", err)
os.Exit(1)
return
}
// only ack the source delivery when the destination acks the publishing
confirmed := <-confirms
if confirmed.Ack {
log.Printf("confirmed delivery with delivery tag: %d", confirmed.DeliveryTag)
} else {
log.Printf("failed delivery of delivery tag: %d", confirmed.DeliveryTag)
// TODO. Reconnect will be here
}
if i == 10 {
Channel.Close()
Connection.Close()
while := true
for while {
log.Println("while")
time.Sleep(time.Second * 1)
Connection, Channel, err = setup(url)
if err == nil {
while = false
confirms = Channel.NotifyPublish(make(chan amqp.Confirmation, 1))
log.Printf("%+v", confirms)
}
}
}
time.Sleep(time.Millisecond * 300)
}
os.Exit(1)
}
You should put channel in confirm mode. by calling the channel.Confirm() method. After closing the connection and even after getting new channel on the same connection, you should call Confirm() method again, since the channel is different from the old channel, and the default for all new channel is not to send confirm.