grpc 异步服务涨内存

客户端调用服务异步接口,可以正常完成调用,但是服务内存一直在上涨,添加了多线程异步处理,内存还是会上涨

FileMonitorProtoServiceImpl.h如下

using grpc::Server;
using grpc::ServerAsyncResponseWriter;
using grpc::ServerBuilder;
using grpc::ServerCompletionQueue;
using grpc::ServerContext;
using grpc::Status;
using LWICKTool::Proto::FileMonitor::FileMonitorService;
using LWICKTool::Proto::FileMonitor::SynchronizationRequestA;
using LWICKTool::Proto::FileMonitor::SynchronizationReplyA;
using LWICKTool::Proto::FileMonitor::SynchronizationRequestB;
using LWICKTool::Proto::FileMonitor::SynchronizationReplyB;
using LWICKTool::Proto::FileMonitor::AsynchronizationRequestA;
using LWICKTool::Proto::FileMonitor::AsynchronizationReplyA;
using LWICKTool::Proto::FileMonitor::AsynchronizationRequestB;
using LWICKTool::Proto::FileMonitor::AsynchronizationReplyB;

class FileMonitorProtoServiceImpl final :public FileMonitorService::Service {
public:
    ~FileMonitorProtoServiceImpl();
private:
    // Class encompasing the state and logic needed to serve a request.
    class CallData {
    public:
        enum RpcType {
            RPC_TYPE_ASYNCHRONIZATION_A = 0,
            RPC_TYPE_ASYNCHRONIZATION_B
        };
        // Let's implement a tiny state machine with the following states.
        enum CallStatus { CREATE, PROCESS, FINISH };
        // Take in the "service" instance (in this case representing an asynchronous
        // server) and the completion queue "cq" used for asynchronous communication
        // with the gRPC runtime.
        CallData(FileMonitorService::AsyncService* service, ServerCompletionQueue* cq, RpcType rpcType);
    private:
        // The means of communication with the gRPC runtime for an asynchronous
        // server.
        FileMonitorService::AsyncService* service_;
        // The producer-consumer queue where for asynchronous server notifications.
        ServerCompletionQueue* cq_;
        // Context for the rpc, allowing to tweak aspects of it such as the use
        // of compression, authentication, as well as to send metadata back to the
        // client.
        std::shared_ptr<ServerContext> ctx_;

        RpcType rpcType_;

        // What we get from the client.
        AsynchronizationRequestA asynchronizationRequestA_;
        // What we send back to the client.
        std::shared_ptr<AsynchronizationReplyA> asynchronizationReplyA_;

        // The means to get back to the client.
        ServerAsyncResponseWriter<AsynchronizationReplyA> asynchronizationResponderA_;

        // What we get from the client.
        AsynchronizationRequestB asynchronizationRequestB_;
        // What we send back to the client.
        std::shared_ptr<AsynchronizationReplyB> asynchronizationReplyB_;

        // The means to get back to the client.
        ServerAsyncResponseWriter<AsynchronizationReplyB> asynchronizationResponderB_;

        CallStatus status_;  // The current serving state.

    public:
        void Proceed();
        RpcType getRpcType();
        CallStatus getStatus();
        //异步方法A
        void AsynchronizationMessageA();
        //异步方法B
        void AsynchronizationMessageB();
    };

private:
    std::unique_ptr<ServerCompletionQueue> cqAsynchronizationMessageA_;
    std::unique_ptr<ServerCompletionQueue> cqAsynchronizationMessageB_;
    FileMonitorService::AsyncService asyncService_;
    std::unique_ptr<Server> server_;
    std::unique_ptr<Server> asyncServer_;

    std::vector<pthread_t> threadPool_;
    std::recursive_mutex mutexAsynchronizationMessageA_;
    std::recursive_mutex mutexAsynchronizationMessageB_;

public:
    // There is no shutdown handling in this code.
    void Run();

    //同步方法A
    Status SynchronizationMessageA(ServerContext* context, const SynchronizationRequestA* request, SynchronizationReplyA* response) override;
    //同步方法B
    Status SynchronizationMessageB(ServerContext* context, const SynchronizationRequestB* request, SynchronizationReplyB* response) override;

    void HandleRpcsAsynchronizationMessageA();
    void HandleRpcsAsynchronizationMessageB();
    static void* ThreadStartRoutineAsynchronizationMessageA(void* parameter);
    static void* ThreadStartRoutineAsynchronizationMessageB(void* parameter);
};

FileMonitorProtoServiceImpl.cpp如下

FileMonitorProtoServiceImpl::~FileMonitorProtoServiceImpl(){
    server_->Shutdown();
    asyncServer_->Shutdown();

    // Always shutdown the completion queue after the server.
    cqAsynchronizationMessageA_->Shutdown();
    // uniquely identifies a request.
    void* tagAsynchronizationMessageAag;
    bool okAsynchronizationMessageA;
    cqAsynchronizationMessageA_->Next(&tagAsynchronizationMessageAag, &okAsynchronizationMessageA);

    // Always shutdown the completion queue after the server.
    cqAsynchronizationMessageB_->Shutdown();
    // uniquely identifies a request.
    void* tagAsynchronizationMessageB;
    bool okAsynchronizationMessageB;
    cqAsynchronizationMessageB_->Next(&tagAsynchronizationMessageB, &okAsynchronizationMessageB);
}

FileMonitorProtoServiceImpl::CallData::CallData(FileMonitorService::AsyncService* service, ServerCompletionQueue* cq, RpcType rpcType)
    : service_(service), cq_(cq), ctx_(std::make_shared<ServerContext>()), rpcType_(rpcType), asynchronizationResponderA_(ctx_.get()), asynchronizationResponderB_(ctx_.get()), status_(CREATE){
    // Invoke the serving logic right away.
    Proceed();
}

void FileMonitorProtoServiceImpl::CallData::Proceed(){
    switch(status_){
    case CREATE:
        // Make this instance progress to the PROCESS state.
        status_ = PROCESS;

        // As part of the initial CREATE state, we *request* that the system
        // start processing SayHello requests. In this request, "this" acts are
        // the tag uniquely identifying the request (so that different CallData
        // instances can serve different requests concurrently), in this case
        // the memory address of this CallData instance.
        switch (rpcType_)
        {
        case FileMonitorProtoServiceImpl::CallData::RPC_TYPE_ASYNCHRONIZATION_A:
            service_->RequestAsynchronizationMessageA(ctx_.get(), &asynchronizationRequestA_, &asynchronizationResponderA_, cq_, cq_, this);
            break;
        case FileMonitorProtoServiceImpl::CallData::RPC_TYPE_ASYNCHRONIZATION_B:
            service_->RequestAsynchronizationMessageB(ctx_.get(), &asynchronizationRequestB_, &asynchronizationResponderB_, cq_, cq_, this);
            break;
        default:
            break;
        }
        break;
    case PROCESS:
        // Spawn a new CallData instance to serve new clients while we process
        // the one for this CallData. The instance will deallocate itself as
        // part of its FINISH state.
        new CallData(service_, cq_, this->rpcType_);
        switch (rpcType_)
        {
        case FileMonitorProtoServiceImpl::CallData::RPC_TYPE_ASYNCHRONIZATION_A:
            AsynchronizationMessageA();
            break;
        case FileMonitorProtoServiceImpl::CallData::RPC_TYPE_ASYNCHRONIZATION_B:
            AsynchronizationMessageB();
            break;
        default:
            break;
        }
        break;
    case FINISH:
        GPR_ASSERT(status_ == FINISH);
        // Once in the FINISH state, deallocate ourselves (CallData).
        delete this;
//        gMutesCount.lock();
//        gCount++;
//        gMutesCount.unlock();
        break;
    }
}

FileMonitorProtoServiceImpl::CallData::RpcType FileMonitorProtoServiceImpl::CallData::getRpcType(){
    return rpcType_;
}
FileMonitorProtoServiceImpl::CallData::CallStatus FileMonitorProtoServiceImpl::CallData::getStatus(){
    return status_;
}

void FileMonitorProtoServiceImpl::CallData::AsynchronizationMessageA(){

    // The actual processing.
    sleep(2);
    std::stringstream ss;
    std::string prefix("Asynchronization A Hello ");
    std::string messages = asynchronizationRequestA_.message();
    long int threadId = gettid();
    ss<<" thread id "<<threadId;
    std::string replay = prefix + messages + ss.str();
    asynchronizationReplyA_ = std::make_shared<AsynchronizationReplyA>();
    asynchronizationReplyA_.get()->set_message(replay);

    // And we are done! Let the gRPC runtime know we've finished, using the
    // memory address of this instance as the uniquely identifying tag for
    // the event.
    status_ = FINISH;
    asynchronizationResponderA_.Finish(*(asynchronizationReplyA_.get()), Status::OK, this);
}

void FileMonitorProtoServiceImpl::CallData::AsynchronizationMessageB(){
    // The actual processing.
    sleep(5);
    std::stringstream ss;
    std::string prefix("Asynchronization B Hello ");
    std::string messages = asynchronizationRequestB_.message();
    long int threadId = gettid();
    ss<<" thread id "<<threadId;
    std::string replay = prefix + messages + ss.str();
    asynchronizationReplyB_ = std::make_shared<AsynchronizationReplyB>();
    asynchronizationReplyB_.get()->set_message(replay);

    // And we are done! Let the gRPC runtime know we've finished, using the
    // memory address of this instance as the uniquely identifying tag for
    // the event.
    status_ = FINISH;
    asynchronizationResponderB_.Finish(*(asynchronizationReplyB_.get()), Status::OK, this);
}

void FileMonitorProtoServiceImpl::Run(){
    std::string asyncServerAddress("127.0.0.1:60061");
    std::string serverAddress("127.0.0.1:60062");

//    grpc::EnableDefaultHealthCheckService(true);
//    grpc::reflection::InitProtoReflectionServerBuilderPlugin();
    std::shared_ptr<ServerBuilder> asyncBuilder = std::make_shared<ServerBuilder>();
    std::shared_ptr<ServerBuilder> builder = std::make_shared<ServerBuilder>();
    // Listen on the given address without any authentication mechanism.
    asyncBuilder.get()->AddListeningPort(asyncServerAddress, grpc::InsecureServerCredentials());
    builder.get()->AddListeningPort(serverAddress, grpc::InsecureServerCredentials());
//    asyncBuilder.AddChannelArgument(grpc::GRPC_ARG_KEEPALIVE_TIME_MS,2000);
//    asyncBuilder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIMEOUT_MS,3000);
//    asyncBuilder.AddChannelArgument(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS,1);
    // Register "service_" as the instance through which we'll communicate with
    // clients. In this case it corresponds to an *asynchronous* service.
    asyncBuilder.get()->RegisterService(&asyncService_);
    builder.get()->RegisterService(this);
    // Get hold of the completion queue used for the asynchronous communication
    // with the gRPC runtime.
    cqAsynchronizationMessageA_ = asyncBuilder.get()->AddCompletionQueue();
    cqAsynchronizationMessageB_ = asyncBuilder.get()->AddCompletionQueue();
    // Finally assemble the server.
    asyncServer_ = asyncBuilder.get()->BuildAndStart();
    server_ = builder.get()->BuildAndStart();

    std::string message;
    message = CStdStringUtil::format("Server listening on %s asynchronization message", asyncServerAddress.c_str());
    Helper::syslogex(message.c_str());
    message = CStdStringUtil::format("Server listening on %s asynchronization message", serverAddress.c_str());
    Helper::syslogex(message.c_str());

    new CallData(&asyncService_, cqAsynchronizationMessageA_.get(), FileMonitorProtoServiceImpl::CallData::RPC_TYPE_ASYNCHRONIZATION_A);
    new CallData(&asyncService_, cqAsynchronizationMessageB_.get(), FileMonitorProtoServiceImpl::CallData::RPC_TYPE_ASYNCHRONIZATION_B);
    for(int i=0; i<256; ++i) {
        pthread_t threadTemp;
        pthread_attr_t threadAttrTemp;
        pthread_attr_init(&threadAttrTemp);
        int nRet = pthread_create(&threadTemp, &threadAttrTemp, ThreadStartRoutineAsynchronizationMessageA, (void*)this);
        if(0 != nRet){
            Helper::syslogex("Failed to pthread_create");
            break;
        }
        threadPool_.push_back(threadTemp);
        pthread_attr_destroy(&threadAttrTemp);
    }
    Helper::syslogex("threadStartRoutineAsynchronizationMessageA");

    for(int i=0; i<256; ++i) {
        pthread_t threadTemp;
        pthread_attr_t threadAttrTemp;
        pthread_attr_init(&threadAttrTemp);
        int nRet = pthread_create(&threadTemp, &threadAttrTemp, ThreadStartRoutineAsynchronizationMessageB, (void*)this);
        if(0 != nRet){
            Helper::syslogex("Failed to pthread_create");
            break;
        }
        threadPool_.push_back(threadTemp);
        pthread_attr_destroy(&threadAttrTemp);
    }
    Helper::syslogex("threadStartRoutineAsynchronizationMessageB");

    // Wait for the server to shutdown. Note that some other thread must be
    // responsible for shutting down the server for this call to ever return.
//    server_->Wait();
//    asyncServer_->Wait();
    for(auto &threadItem : threadPool_) {
        pthread_join(threadItem, NULL);
    }
}

Status FileMonitorProtoServiceImpl::SynchronizationMessageA(ServerContext* context, const SynchronizationRequestA* request, SynchronizationReplyA* response){
    (void)context;
    sleep(1);
    std::stringstream ss;
    std::string prefix("Synchronization A Hello ");
    std::string messages = request->message();
    long int threadId = gettid();
    ss<<" thread id "<<threadId;
    std::string replay = prefix + messages + ss.str();
    response->set_message(replay);
    return Status::OK;
}

Status FileMonitorProtoServiceImpl::SynchronizationMessageB(ServerContext* context, const SynchronizationRequestB* request, SynchronizationReplyB* response){
    (void)context;
    sleep(1);
    std::stringstream ss;
    std::string prefix("Synchronization B Hello ");
    std::string messages = request->message();
    long int threadId = gettid();
    ss<<" thread id "<<threadId;
    std::string replay = prefix + messages + ss.str();
    response->set_message(replay);
    return Status::OK;
}

// Proceed to the server's main loop.
void FileMonitorProtoServiceImpl::HandleRpcsAsynchronizationMessageA(){
    // Spawn a new CallData instance to serve new clients.
    //new CallData(&service_, cq_.get());
    void* tag;  // uniquely identifies a request.
    bool ok;
    while(true){
        // Block waiting to read the next event from the completion queue. The
        // event is uniquely identified by its tag, which in this case is the
        // memory address of a CallData instance.
        // The return value of Next should always be checked. This return value
        // tells us whether there is any kind of event or cq_ is shutting down.
        mutexAsynchronizationMessageA_.lock();
        bool bRes = cqAsynchronizationMessageA_->Next(&tag, &ok);
        if(!bRes){
            Helper::syslogex("Failed to Function cqAsynchronizationMessageA_->Next");
            mutexAsynchronizationMessageA_.unlock();
            break;
        }

        if(!ok){
            CallData *pCallData = static_cast<CallData*>(tag);
            if(NULL != pCallData){
                delete pCallData;
            }
            Helper::syslogex("ok returned by function cqAsynchronizationMessageA_.Next is false");
            mutexAsynchronizationMessageA_.unlock();
            continue;
        };
        mutexAsynchronizationMessageA_.unlock();
        static_cast<CallData*>(tag)->Proceed();
//        if(400 == gCount){
//            mutex_.lock();
//            server_->Shutdown();
//            asyncServer_->Shutdown();
//            // Always shutdown the completion queue after the server.
//            cq_->Shutdown();
//            void* tag;  // uniquely identifies a request.
//            bool ok;
//            cq_->Next(&tag, &ok);
//            mutex_.unlock();
//            break;
//        }
    }
}

// Proceed to the server's main loop.
void FileMonitorProtoServiceImpl::HandleRpcsAsynchronizationMessageB(){
    // Spawn a new CallData instance to serve new clients.
    //new CallData(&service_, cq_.get());
    void* tag;  // uniquely identifies a request.
    bool ok;
    while(true){
        // Block waiting to read the next event from the completion queue. The
        // event is uniquely identified by its tag, which in this case is the
        // memory address of a CallData instance.
        // The return value of Next should always be checked. This return value
        // tells us whether there is any kind of event or cq_ is shutting down.
        mutexAsynchronizationMessageB_.lock();
        bool bRes = cqAsynchronizationMessageB_->Next(&tag, &ok);
        if(!bRes){
            Helper::syslogex("Failed to Function cqAsynchronizationMessageB_->Next");
            mutexAsynchronizationMessageB_.unlock();
            break;
        }

        if(!ok){
            CallData *pCallData = static_cast<CallData*>(tag);
            if(NULL != pCallData){
                delete pCallData;
            }
            Helper::syslogex("ok returned by function cqAsynchronizationMessageB_.Next is false");
            mutexAsynchronizationMessageB_.unlock();
            continue;
        };
        mutexAsynchronizationMessageB_.unlock();
        static_cast<CallData*>(tag)->Proceed();
//        if(400 == gCount){
//            mutex_.lock();
//            server_->Shutdown();
//            asyncServer_->Shutdown();
//            // Always shutdown the completion queue after the server.
//            cq_->Shutdown();
//            void* tag;  // uniquely identifies a request.
//            bool ok;
//            cq_->Next(&tag, &ok);
//            mutex_.unlock();
//            break;
//        }
    }
}

void* FileMonitorProtoServiceImpl::ThreadStartRoutineAsynchronizationMessageA(void* parameter){
    do{
        if(NULL == parameter){
            Helper::syslogex("Pointer variable parameter is null in function threadStartRoutineAsynchronizationMessageA");
            break;
        }
        FileMonitorProtoServiceImpl *FileMonitorProtoServiceImplObj = (FileMonitorProtoServiceImpl*)(parameter);
        FileMonitorProtoServiceImplObj->HandleRpcsAsynchronizationMessageA();
    }while(0);
    return NULL;
}

void* FileMonitorProtoServiceImpl::ThreadStartRoutineAsynchronizationMessageB(void* parameter){
    do{
        if(NULL == parameter){
            Helper::syslogex("Pointer variable parameter is null in function threadStartRoutineAsynchronizationMessageB");
            break;
        }
        FileMonitorProtoServiceImpl *FileMonitorProtoServiceImplObj = (FileMonitorProtoServiceImpl*)(parameter);
        FileMonitorProtoServiceImplObj->HandleRpcsAsynchronizationMessageB();
    }while(0);
    return NULL;
}

你好,我是有问必答小助手,非常抱歉,本次您提出的有问必答问题,技术专家团超时未为您做出解答


本次提问扣除的有问必答次数,将会以问答VIP体验卡(1次有问必答机会、商城购买实体图书享受95折优惠)的形式为您补发到账户。


因为有问必答VIP体验卡有效期仅有1天,您在需要使用的时候【私信】联系我,我会为您补发。