如何在gRPC中从服务器广播到客户端?

我现在正在 gRPC 中创建一个小型聊天应用程序,我遇到了一个问题,如果一个用户希望作为客户端连接到 gRPC 服务器,我希望广播该事件已发生到所有其他已连接的客户端。

我正在考虑使用某种观察者,但是我不知道服务器如何知道谁被连接、以及我如何向所有客户机而不是仅仅一两个客户机广播事件。

我知道使用流是一种方法,但是因为每个客户机都在服务器上创建自己的流,所以我不确定它如何能够订阅其他服务器-客户机流。

Yup, I don't see any other way than keeping a global data structure containing all the connected streams and looping through them, telling each about the even that just occurred.

Another option would be to use a long-polling approach. That is try something like below (code in Python, since that is what I'm most familiar with, but go should be very similar). This was not tested, and is meant to just give you an idea of how to do long-polling in gRPC:

.PROTO defs
-------------------------------------------------
service Updater {
    rpc GetUpdates(GetUpdatesRequest) returns (GetUpdatesResponse);
}

message GetUpdatesRequest {
    int64 last_received_update = 1;
}

message GetUpdatesResponse {
    repeated Update updates = 1;
    int64 update_index = 2;
}

message Update {
    // your update structure
}


SERVER
-----------------------------------------------------------
class UpdaterServer(UpdaterServicer):
    def __init__(self):
        self.condition = threading.Condition()
        self.updates = []

    def post_update(self, update):
        """
        Used whenever the clients should be updated about something. It will
        trigger their long-poll calls to return
        """
        with self.condition:
            # TODO: You should probably remove old updates after some time
            self.updates.append(updates)
            self.condition.notify_all()

    def GetUpdates(self, req, context):
        with self.condition:
            while self.updates[req.last_received_update + 1:] == []:
                self.condition.wait()
            new_updates = self.updates[req.last_received_update + 1:]
            response = GetUpdatesResponse()
            for update in new_updates:
                response.updates.add().CopyFrom(update)
            response.update_index = req.last_received_update + len(new_updates)
            return response


SEPARATE THREAD IN THE CLIENT
----------------------------------------------
request = GetUpdatesRequest()
request.last_received_update = -1
while True:
    stub = UpdaterStub(channel)
    try:
        response = stub.GetUpdates(request, timeout=60*10)
        handle_updates(response.updates)
        request.last_received_update = response.update_index
    except grpc.FutureTimeoutError:
        pass

Another approach is to spawn a grpc-server on client side too. On app-level you have some handshake from client to server to exchange the clients grpc-server ip and port. You probably want to create a client for that address at this point and store the client in a list.

Now you can push messages to the clients from the list with default unary RPC calls. No [bidi] stream needed. Pros:

  • Possible to separate the clients "Push"-API from the server API.
  • Unary RPC push calls.

Cons:

  • Additional "server". Don't know if that is possible in every scenario.