I have a gateway server, which can push message to client side by using websocket, A new client connected to my server, I will generate a cid
for it. And then I also subscribe a channel, which using cid
. If any message publish to this channel, My server will push it to client side. For now, all unit are working fine, but when I try to test with benchmark test by thor, it will crash, I fine the DeliverMessage
has some issue, it would never exit, since it has a die-loop. but since redis need to subscribe something, I don't know how to avoid loop.
func (h *Hub) DeliverMessage(pool *redis.Pool) {
conn := pool.Get()
defer conn.Close()
var gPubSubConn *redis.PubSubConn
gPubSubConn = &redis.PubSubConn{Conn: conn}
defer gPubSubConn.Close()
for {
switch v := gPubSubConn.Receive().(type) {
case redis.Message:
// fmt.Printf("Channel=%q | Data=%s
", v.Channel, string(v.Data))
h.Push(string(v.Data))
case redis.Subscription:
fmt.Printf("Subscription message: %s : %s %d
", v.Channel, v.Kind, v.Count)
case error:
fmt.Println("Error pub/sub, delivery has stopped", v)
panic("Error pub/sub")
}
}
}
In the main function, I have call the above function as:
go h.DeliverMessage(pool)
But when I test it with huge connection, it get me some error like:
ERR max number of clients reached
So, I change the redis pool size by change MaxIdle
:
func newPool(addr string) *redis.Pool {
return &redis.Pool{
MaxIdle: 5000,
IdleTimeout: 240 * time.Second,
Dial: func() (redis.Conn, error) { return redis.Dial("tcp", addr) },
}
}
But it still doesn't work, so I wonder to know, if there any good way to kill a goroutine after my websocket disconnected to my server on the below selection:
case client := <-h.Unregister:
if _, ok := h.Clients[client]; ok {
delete(h.Clients, client)
delete(h.Connections, client.CID)
close(client.Send)
if err := gPubSubConn.Unsubscribe(client.CID); err != nil {
panic(err)
}
// TODO kill subscribe goroutine if don't client-side disconnected ...
}
But How do I identify this goroutine? How can I do it like unix
way. kill -9 <PID>
?
Actually, I got wrong design architecture, I am going to explain what I want to do.
A client can connect to my websocket server;
The server have several handler of http, and the admin can post data via the handler, the structure of the data can be like:
{
"cid": "something",
"body": {
}
}
Since, I have several Nodes are running to service our client, and the Nginx can dispatch each request from
admin
to totally different Node, but only one Node has hold on the connection aboutcid
with "something", so I will need to publish this data toRedis
, if any Node has got the data, it's going to send this message to the client side.
3.Looking for the NodeID, which i am going to Publish
to by given an cid.
// redis code & golang
NodeID, err := conn.Do("HGET", "NODE_MAP", cid)
4.For now, I can publish any message from the admin
, and publish to the NodeID
, which we have got at step 3.
// redis code & golang
NodeID, err := conn.Do("PUBLISH", NodeID, data)
Time to show the core code, which related to this question. I am going to subscribe a channel, which name is NodeID. like the following.
go func(){
for {
switch v := gPubSubConn.Receive().(type) {
case redis.Message:
fmt.Println("Got a message", v.Data)
h.Broadcast <- v.Data
pipeline <- v.Data
case error:
panic(v)
}
}
}()
6.To manage your websocket, you do also need a goroutine to do that. like the following way:
go func () {
for {
select {
case client := <-h.Register:
h.Clients[client] = true
cid := client.CID
h.Connections[cid] = client
body := "something"
client.Send <- msg // greeting
case client := <-h.Unregister:
if _, ok := h.Clients[client]; ok {
delete(h.Clients, client)
delete(h.Connections, client.CID)
close(client.Send)
}
case message := <-h.Broadcast:
fmt.Println("message is", message)
}
}
}()
The last thing is manage a redis pool, you don't really need a connection pool right now. since we only have two goroutine
, one main process.
func newPool(addr string) *redis.Pool {
return &redis.Pool{
MaxIdle: 100,
IdleTimeout: 240 * time.Second,
Dial: func() (redis.Conn, error) { return redis.Dial("tcp", addr) },
}
}
var (
pool *redis.Pool
redisServer = flag.String("redisServer", ":6379", "")
)
pool = newPool(*redisServer)
conn := pool.Get()
defer conn.Close()
Look at the example here
You can make your goroutine exit by having a return statement inside your switch case in your DeliverMessage, once you're not receiving anything more. I'm guessing case error
, or as seen in the example, case 0
you'd want to return from that, and your goroutine will cancel. Or if I'm misunderstanding things, and case client := <-h.Unregister:
is inside the DeliverMessage, just return.
You're also closing your connection twice. defer gPubSubConn.Close()
simply calls conn.Close() so you don't need defer conn.Close()
Also take a look at the Pool and look at what all the parameters actually do. If you want to handle many connections, set MaxActive to 0 "When zero, there is no limit on the number of connections in the pool." (and do you actually want the idle timeout?)