客户端断开连接后如何正确使用ctx.Done()?

If client will be disconnected by network error, server must close in my case pub/sub connection. I know about ctx.Done() function, but don't know how to use it properly in my case. Can somebody explain please?

grpc-go: 1.7.0

go version go1.8.4

func (a *API) Notifications(in *empty.Empty, stream pb.Service_NotificationsServer) error {
    ctx := stream.Context()
    _, ok := user.FromContext(ctx)
    if !ok {
        return grpc.Errorf(codes.Unauthenticated, "user not found")
    }

    pubsub := a.redisClient.Subscribe("notifications")
    defer pubsub.Close()

    for {
        msg, err := pubsub.ReceiveMessage()
        if err != nil {
            grpclog.Warningf("Notifications: pubsub error: %v", err)
            return grpc.Errorf(codes.Internal, "pubsub error %v", err)
        }

        notification := &pb.Notification{}
        err = json.Unmarshal([]byte(msg.Payload), notification)
        if err != nil {
            grpclog.Warningf("Notifications: parse error: %v", err)
            continue
        }
        if err := stream.Send(notification); err != nil {
            grpclog.Warningf("Notifications: %v", err)
            return err
        }
        grpclog.Infof("Notifications: send msg %v", notification)
    }
}

You should cancel the context from a caller function (or wherever the context could be accessed) and do appropriate action on Done() check in a select statement.

Done is provided for use in select statements

Done returns a channel that's closed when work done on behalf of this context should be canceled. Done may return nil if this context can never be canceled. Successive calls to Done return the same value.

And

WithCancel returns a copy of parent with a new Done channel. The returned context's Done channel is closed when the returned cancel function is called or when the parent context's Done channel is closed, whichever happens first.

Canceling this context releases resources associated with it, so code should call cancel as soon as the operations running in this Context complete.

go func() {
    for {
        select {
        case <-ctx.Done():
            return // returning not to leak the goroutine
        case dst <- n:
            n++
        }
    }
}()

You can use select. instead of normal getting data from a function, use a channel to get data and a go routine to handle it. Some thing like this :

func (a *API) Notifications(in *empty.Empty, stream 
    pb.Service_NotificationsServer) error {
    ctx := stream.Context()
    _, ok := user.FromContext(ctx)
    if !ok {
        return grpc.Errorf(codes.Unauthenticated, "user not found")
    }

    pubsub := a.redisClient.Subscribe("notifications")
    defer pubsub.Close()

    // I can not build the code, so I assume the msg in your code Message struct
    c := make(chan Message)
    go func() {
        for {
            msg, err := pubsub.ReceiveMessage()
            if err != nil {
                grpclog.Warningf("Notifications: pubsub error: %v", err)
                close(c)
                return grpc.Errorf(codes.Internal, "pubsub error %v", err)
            }
            c<- msg
        }
    }()

    for {
        select {
            case msg, ok  := <-c:
                if !ok {
                    // channel is closed handle it
                }
                notification := &pb.Notification{}
                err = json.Unmarshal([]byte(msg.Payload), notification)
                if err != nil {
                    grpclog.Warningf("Notifications: parse error: %v", err)
                    continue
                }
                if err := stream.Send(notification); err != nil {
                    grpclog.Warningf("Notifications: %v", err)
                    return err
                }
                grpclog.Infof("Notifications: send msg %v", notification)
            case <- ctx.Done():
                // do exit logic. some how close the pubsub, so next 
                // ReceiveMessage() return an error
                // if forget to do that the go routine runs for ever 
                // until the end of main(), which I think its not what you wanted
                pubsub.Close() // Its just pseudo code
                return
        }
    }
}

read the message (I assume the type is Message) from the channel, and use power of select .

two other related thing in this scenario:

  1. Make sure the go routine end after finalizing this function. I can not guess, since I don't know about the code, but I assume there is a Close() method for closing the pubsub so the next ReceiveMessage return an error. (which I see the defer that do the job I hope)

  2. if there is an error in ReceiveMessage before ctx.Done you can close the channel and then break the loop.