If client will be disconnected by network error, server must close in my case pub/sub connection. I know about ctx.Done()
function, but don't know how to use it properly in my case. Can somebody explain please?
grpc-go: 1.7.0
go version go1.8.4
func (a *API) Notifications(in *empty.Empty, stream pb.Service_NotificationsServer) error {
ctx := stream.Context()
_, ok := user.FromContext(ctx)
if !ok {
return grpc.Errorf(codes.Unauthenticated, "user not found")
}
pubsub := a.redisClient.Subscribe("notifications")
defer pubsub.Close()
for {
msg, err := pubsub.ReceiveMessage()
if err != nil {
grpclog.Warningf("Notifications: pubsub error: %v", err)
return grpc.Errorf(codes.Internal, "pubsub error %v", err)
}
notification := &pb.Notification{}
err = json.Unmarshal([]byte(msg.Payload), notification)
if err != nil {
grpclog.Warningf("Notifications: parse error: %v", err)
continue
}
if err := stream.Send(notification); err != nil {
grpclog.Warningf("Notifications: %v", err)
return err
}
grpclog.Infof("Notifications: send msg %v", notification)
}
}
You should cancel the context from a caller function (or wherever the context could be accessed) and do appropriate action on Done()
check in a select
statement.
Done is provided for use in select statements
Done returns a channel that's closed when work done on behalf of this context should be canceled. Done may return nil if this context can never be canceled. Successive calls to Done return the same value.
And
WithCancel returns a copy of parent with a new Done channel. The returned context's Done channel is closed when the returned cancel function is called or when the parent context's Done channel is closed, whichever happens first.
Canceling this context releases resources associated with it, so code should call cancel as soon as the operations running in this Context complete.
go func() {
for {
select {
case <-ctx.Done():
return // returning not to leak the goroutine
case dst <- n:
n++
}
}
}()
You can use select
. instead of normal getting data from a function, use a channel to get data and a go routine to handle it. Some thing like this :
func (a *API) Notifications(in *empty.Empty, stream
pb.Service_NotificationsServer) error {
ctx := stream.Context()
_, ok := user.FromContext(ctx)
if !ok {
return grpc.Errorf(codes.Unauthenticated, "user not found")
}
pubsub := a.redisClient.Subscribe("notifications")
defer pubsub.Close()
// I can not build the code, so I assume the msg in your code Message struct
c := make(chan Message)
go func() {
for {
msg, err := pubsub.ReceiveMessage()
if err != nil {
grpclog.Warningf("Notifications: pubsub error: %v", err)
close(c)
return grpc.Errorf(codes.Internal, "pubsub error %v", err)
}
c<- msg
}
}()
for {
select {
case msg, ok := <-c:
if !ok {
// channel is closed handle it
}
notification := &pb.Notification{}
err = json.Unmarshal([]byte(msg.Payload), notification)
if err != nil {
grpclog.Warningf("Notifications: parse error: %v", err)
continue
}
if err := stream.Send(notification); err != nil {
grpclog.Warningf("Notifications: %v", err)
return err
}
grpclog.Infof("Notifications: send msg %v", notification)
case <- ctx.Done():
// do exit logic. some how close the pubsub, so next
// ReceiveMessage() return an error
// if forget to do that the go routine runs for ever
// until the end of main(), which I think its not what you wanted
pubsub.Close() // Its just pseudo code
return
}
}
}
read the message (I assume the type is Message) from the channel, and use power of select
.
two other related thing in this scenario:
Make sure the go routine end after finalizing this function. I can not guess, since I don't know about the code, but I assume there is a Close()
method for closing the pubsub
so the next ReceiveMessage
return an error. (which I see the defer that do the job I hope)
if there is an error in ReceiveMessage
before ctx.Done
you can close the channel and then break the loop.