So far I've been doing this:
import (
_redis "gopkg.in/redis.v3"
"strconv"
"time"
)
type Redis struct {
Connector *_redis.Client
PubSub *_redis.PubSub
}
var redis *Redis = nil
func NewRedis() bool {
if redis == nil {
redis = new(Redis)
redis.Connector = _redis.NewClient(&_redis.Options{
Addr: config.RedisHostname + ":" + strconv.FormatInt(config.RedisPort, 10),
Password: "",
DB: 0,
})
Logger.Log(nil, "Connected to Redis")
err := redis.Init()
if err != nil {
Logger.Fatal(nil, "Cannot setup Redis:", err.Error())
return false
}
return true
}
return false
}
func (this *Redis) Init() error {
pubsub, err := this.Connector.Subscribe("test")
if err != nil {
return err
}
defer pubsub.Close()
this.PubSub = pubsub
for {
msgi, err := this.PubSub.ReceiveTimeout(100 * time.Millisecond)
if err != nil {
Logger.Error(nil, "PubSub error:", err.Error())
err = this.PubSub.Ping("")
if err != nil {
Logger.Error(nil, "PubSub failure:", err.Error())
break
}
continue
}
switch msg := msgi.(type) {
case *_redis.Message:
Logger.Log(nil, "Received", msg.Payload, "on channel", msg.Channel)
}
}
return nil
}
My Connector is a redis.Client, it's working because I was able to publish messages as well.
When I run my program, I get the following error: PubSub error: WSARecv tcp 127.0.0.1:64505: i/o timeout
Do you have any idea of what I'm doing wrong ? I'm using this package: https://github.com/go-redis/redis
Some things to note:
Receive()
and ReceiveTimeout(duration)
methods, both of which return the next event on the wire; which can be subscribe/unsubscribe events and message events; (you don't necessarily know which) the only difference between them that Receive blocks forever until there's a new message, and ReceiveTimeout will error on timeout.With that in mind, unless you have messages far more than 10/second consistently (in other words, <100 milliseconds between messages), it's inefficient to use that short of a timeout; and I'd argue that due to golang having goroutines, you should almost never use ReceiveTimeout
for real applications, or use a sufficiently long timeout like a minute.
with that in mind, your receive loop should look like:
for {
msgi, err := this.PubSub.Receive()
if err != nil {
Logger.Error(nil, "PubSub error:", err.Error())
return err
}
switch msg := msgi.(type) {
case *_redis.Message:
Logger.Log(nil, "Received", msg.Payload, "on channel", msg.Channel)
default:
Logger.Log(nil, "Got control message", msg)
}
}
If your application really warranted using a timeout, then you should use a type assertion to assert the *net.OpError
that signifies a timeout and distinguish it from other more serious errors.