客户端调用服务异步接口,可以正常完成调用,但是服务内存一直在上涨,添加了多线程异步处理,内存还是会上涨
FileMonitorProtoServiceImpl.h如下
using grpc::Server;
using grpc::ServerAsyncResponseWriter;
using grpc::ServerBuilder;
using grpc::ServerCompletionQueue;
using grpc::ServerContext;
using grpc::Status;
using LWICKTool::Proto::FileMonitor::FileMonitorService;
using LWICKTool::Proto::FileMonitor::SynchronizationRequestA;
using LWICKTool::Proto::FileMonitor::SynchronizationReplyA;
using LWICKTool::Proto::FileMonitor::SynchronizationRequestB;
using LWICKTool::Proto::FileMonitor::SynchronizationReplyB;
using LWICKTool::Proto::FileMonitor::AsynchronizationRequestA;
using LWICKTool::Proto::FileMonitor::AsynchronizationReplyA;
using LWICKTool::Proto::FileMonitor::AsynchronizationRequestB;
using LWICKTool::Proto::FileMonitor::AsynchronizationReplyB;
class FileMonitorProtoServiceImpl final :public FileMonitorService::Service {
public:
~FileMonitorProtoServiceImpl();
private:
// Class encompasing the state and logic needed to serve a request.
class CallData {
public:
enum RpcType {
RPC_TYPE_ASYNCHRONIZATION_A = 0,
RPC_TYPE_ASYNCHRONIZATION_B
};
// Let's implement a tiny state machine with the following states.
enum CallStatus { CREATE, PROCESS, FINISH };
// Take in the "service" instance (in this case representing an asynchronous
// server) and the completion queue "cq" used for asynchronous communication
// with the gRPC runtime.
CallData(FileMonitorService::AsyncService* service, ServerCompletionQueue* cq, RpcType rpcType);
private:
// The means of communication with the gRPC runtime for an asynchronous
// server.
FileMonitorService::AsyncService* service_;
// The producer-consumer queue where for asynchronous server notifications.
ServerCompletionQueue* cq_;
// Context for the rpc, allowing to tweak aspects of it such as the use
// of compression, authentication, as well as to send metadata back to the
// client.
std::shared_ptr<ServerContext> ctx_;
RpcType rpcType_;
// What we get from the client.
AsynchronizationRequestA asynchronizationRequestA_;
// What we send back to the client.
std::shared_ptr<AsynchronizationReplyA> asynchronizationReplyA_;
// The means to get back to the client.
ServerAsyncResponseWriter<AsynchronizationReplyA> asynchronizationResponderA_;
// What we get from the client.
AsynchronizationRequestB asynchronizationRequestB_;
// What we send back to the client.
std::shared_ptr<AsynchronizationReplyB> asynchronizationReplyB_;
// The means to get back to the client.
ServerAsyncResponseWriter<AsynchronizationReplyB> asynchronizationResponderB_;
CallStatus status_; // The current serving state.
public:
void Proceed();
RpcType getRpcType();
CallStatus getStatus();
//异步方法A
void AsynchronizationMessageA();
//异步方法B
void AsynchronizationMessageB();
};
private:
std::unique_ptr<ServerCompletionQueue> cqAsynchronizationMessageA_;
std::unique_ptr<ServerCompletionQueue> cqAsynchronizationMessageB_;
FileMonitorService::AsyncService asyncService_;
std::unique_ptr<Server> server_;
std::unique_ptr<Server> asyncServer_;
std::vector<pthread_t> threadPool_;
std::recursive_mutex mutexAsynchronizationMessageA_;
std::recursive_mutex mutexAsynchronizationMessageB_;
public:
// There is no shutdown handling in this code.
void Run();
//同步方法A
Status SynchronizationMessageA(ServerContext* context, const SynchronizationRequestA* request, SynchronizationReplyA* response) override;
//同步方法B
Status SynchronizationMessageB(ServerContext* context, const SynchronizationRequestB* request, SynchronizationReplyB* response) override;
void HandleRpcsAsynchronizationMessageA();
void HandleRpcsAsynchronizationMessageB();
static void* ThreadStartRoutineAsynchronizationMessageA(void* parameter);
static void* ThreadStartRoutineAsynchronizationMessageB(void* parameter);
};
FileMonitorProtoServiceImpl.cpp如下
FileMonitorProtoServiceImpl::~FileMonitorProtoServiceImpl(){
server_->Shutdown();
asyncServer_->Shutdown();
// Always shutdown the completion queue after the server.
cqAsynchronizationMessageA_->Shutdown();
// uniquely identifies a request.
void* tagAsynchronizationMessageAag;
bool okAsynchronizationMessageA;
cqAsynchronizationMessageA_->Next(&tagAsynchronizationMessageAag, &okAsynchronizationMessageA);
// Always shutdown the completion queue after the server.
cqAsynchronizationMessageB_->Shutdown();
// uniquely identifies a request.
void* tagAsynchronizationMessageB;
bool okAsynchronizationMessageB;
cqAsynchronizationMessageB_->Next(&tagAsynchronizationMessageB, &okAsynchronizationMessageB);
}
FileMonitorProtoServiceImpl::CallData::CallData(FileMonitorService::AsyncService* service, ServerCompletionQueue* cq, RpcType rpcType)
: service_(service), cq_(cq), ctx_(std::make_shared<ServerContext>()), rpcType_(rpcType), asynchronizationResponderA_(ctx_.get()), asynchronizationResponderB_(ctx_.get()), status_(CREATE){
// Invoke the serving logic right away.
Proceed();
}
void FileMonitorProtoServiceImpl::CallData::Proceed(){
switch(status_){
case CREATE:
// Make this instance progress to the PROCESS state.
status_ = PROCESS;
// As part of the initial CREATE state, we *request* that the system
// start processing SayHello requests. In this request, "this" acts are
// the tag uniquely identifying the request (so that different CallData
// instances can serve different requests concurrently), in this case
// the memory address of this CallData instance.
switch (rpcType_)
{
case FileMonitorProtoServiceImpl::CallData::RPC_TYPE_ASYNCHRONIZATION_A:
service_->RequestAsynchronizationMessageA(ctx_.get(), &asynchronizationRequestA_, &asynchronizationResponderA_, cq_, cq_, this);
break;
case FileMonitorProtoServiceImpl::CallData::RPC_TYPE_ASYNCHRONIZATION_B:
service_->RequestAsynchronizationMessageB(ctx_.get(), &asynchronizationRequestB_, &asynchronizationResponderB_, cq_, cq_, this);
break;
default:
break;
}
break;
case PROCESS:
// Spawn a new CallData instance to serve new clients while we process
// the one for this CallData. The instance will deallocate itself as
// part of its FINISH state.
new CallData(service_, cq_, this->rpcType_);
switch (rpcType_)
{
case FileMonitorProtoServiceImpl::CallData::RPC_TYPE_ASYNCHRONIZATION_A:
AsynchronizationMessageA();
break;
case FileMonitorProtoServiceImpl::CallData::RPC_TYPE_ASYNCHRONIZATION_B:
AsynchronizationMessageB();
break;
default:
break;
}
break;
case FINISH:
GPR_ASSERT(status_ == FINISH);
// Once in the FINISH state, deallocate ourselves (CallData).
delete this;
// gMutesCount.lock();
// gCount++;
// gMutesCount.unlock();
break;
}
}
FileMonitorProtoServiceImpl::CallData::RpcType FileMonitorProtoServiceImpl::CallData::getRpcType(){
return rpcType_;
}
FileMonitorProtoServiceImpl::CallData::CallStatus FileMonitorProtoServiceImpl::CallData::getStatus(){
return status_;
}
void FileMonitorProtoServiceImpl::CallData::AsynchronizationMessageA(){
// The actual processing.
sleep(2);
std::stringstream ss;
std::string prefix("Asynchronization A Hello ");
std::string messages = asynchronizationRequestA_.message();
long int threadId = gettid();
ss<<" thread id "<<threadId;
std::string replay = prefix + messages + ss.str();
asynchronizationReplyA_ = std::make_shared<AsynchronizationReplyA>();
asynchronizationReplyA_.get()->set_message(replay);
// And we are done! Let the gRPC runtime know we've finished, using the
// memory address of this instance as the uniquely identifying tag for
// the event.
status_ = FINISH;
asynchronizationResponderA_.Finish(*(asynchronizationReplyA_.get()), Status::OK, this);
}
void FileMonitorProtoServiceImpl::CallData::AsynchronizationMessageB(){
// The actual processing.
sleep(5);
std::stringstream ss;
std::string prefix("Asynchronization B Hello ");
std::string messages = asynchronizationRequestB_.message();
long int threadId = gettid();
ss<<" thread id "<<threadId;
std::string replay = prefix + messages + ss.str();
asynchronizationReplyB_ = std::make_shared<AsynchronizationReplyB>();
asynchronizationReplyB_.get()->set_message(replay);
// And we are done! Let the gRPC runtime know we've finished, using the
// memory address of this instance as the uniquely identifying tag for
// the event.
status_ = FINISH;
asynchronizationResponderB_.Finish(*(asynchronizationReplyB_.get()), Status::OK, this);
}
void FileMonitorProtoServiceImpl::Run(){
std::string asyncServerAddress("127.0.0.1:60061");
std::string serverAddress("127.0.0.1:60062");
// grpc::EnableDefaultHealthCheckService(true);
// grpc::reflection::InitProtoReflectionServerBuilderPlugin();
std::shared_ptr<ServerBuilder> asyncBuilder = std::make_shared<ServerBuilder>();
std::shared_ptr<ServerBuilder> builder = std::make_shared<ServerBuilder>();
// Listen on the given address without any authentication mechanism.
asyncBuilder.get()->AddListeningPort(asyncServerAddress, grpc::InsecureServerCredentials());
builder.get()->AddListeningPort(serverAddress, grpc::InsecureServerCredentials());
// asyncBuilder.AddChannelArgument(grpc::GRPC_ARG_KEEPALIVE_TIME_MS,2000);
// asyncBuilder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIMEOUT_MS,3000);
// asyncBuilder.AddChannelArgument(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS,1);
// Register "service_" as the instance through which we'll communicate with
// clients. In this case it corresponds to an *asynchronous* service.
asyncBuilder.get()->RegisterService(&asyncService_);
builder.get()->RegisterService(this);
// Get hold of the completion queue used for the asynchronous communication
// with the gRPC runtime.
cqAsynchronizationMessageA_ = asyncBuilder.get()->AddCompletionQueue();
cqAsynchronizationMessageB_ = asyncBuilder.get()->AddCompletionQueue();
// Finally assemble the server.
asyncServer_ = asyncBuilder.get()->BuildAndStart();
server_ = builder.get()->BuildAndStart();
std::string message;
message = CStdStringUtil::format("Server listening on %s asynchronization message", asyncServerAddress.c_str());
Helper::syslogex(message.c_str());
message = CStdStringUtil::format("Server listening on %s asynchronization message", serverAddress.c_str());
Helper::syslogex(message.c_str());
new CallData(&asyncService_, cqAsynchronizationMessageA_.get(), FileMonitorProtoServiceImpl::CallData::RPC_TYPE_ASYNCHRONIZATION_A);
new CallData(&asyncService_, cqAsynchronizationMessageB_.get(), FileMonitorProtoServiceImpl::CallData::RPC_TYPE_ASYNCHRONIZATION_B);
for(int i=0; i<256; ++i) {
pthread_t threadTemp;
pthread_attr_t threadAttrTemp;
pthread_attr_init(&threadAttrTemp);
int nRet = pthread_create(&threadTemp, &threadAttrTemp, ThreadStartRoutineAsynchronizationMessageA, (void*)this);
if(0 != nRet){
Helper::syslogex("Failed to pthread_create");
break;
}
threadPool_.push_back(threadTemp);
pthread_attr_destroy(&threadAttrTemp);
}
Helper::syslogex("threadStartRoutineAsynchronizationMessageA");
for(int i=0; i<256; ++i) {
pthread_t threadTemp;
pthread_attr_t threadAttrTemp;
pthread_attr_init(&threadAttrTemp);
int nRet = pthread_create(&threadTemp, &threadAttrTemp, ThreadStartRoutineAsynchronizationMessageB, (void*)this);
if(0 != nRet){
Helper::syslogex("Failed to pthread_create");
break;
}
threadPool_.push_back(threadTemp);
pthread_attr_destroy(&threadAttrTemp);
}
Helper::syslogex("threadStartRoutineAsynchronizationMessageB");
// Wait for the server to shutdown. Note that some other thread must be
// responsible for shutting down the server for this call to ever return.
// server_->Wait();
// asyncServer_->Wait();
for(auto &threadItem : threadPool_) {
pthread_join(threadItem, NULL);
}
}
Status FileMonitorProtoServiceImpl::SynchronizationMessageA(ServerContext* context, const SynchronizationRequestA* request, SynchronizationReplyA* response){
(void)context;
sleep(1);
std::stringstream ss;
std::string prefix("Synchronization A Hello ");
std::string messages = request->message();
long int threadId = gettid();
ss<<" thread id "<<threadId;
std::string replay = prefix + messages + ss.str();
response->set_message(replay);
return Status::OK;
}
Status FileMonitorProtoServiceImpl::SynchronizationMessageB(ServerContext* context, const SynchronizationRequestB* request, SynchronizationReplyB* response){
(void)context;
sleep(1);
std::stringstream ss;
std::string prefix("Synchronization B Hello ");
std::string messages = request->message();
long int threadId = gettid();
ss<<" thread id "<<threadId;
std::string replay = prefix + messages + ss.str();
response->set_message(replay);
return Status::OK;
}
// Proceed to the server's main loop.
void FileMonitorProtoServiceImpl::HandleRpcsAsynchronizationMessageA(){
// Spawn a new CallData instance to serve new clients.
//new CallData(&service_, cq_.get());
void* tag; // uniquely identifies a request.
bool ok;
while(true){
// Block waiting to read the next event from the completion queue. The
// event is uniquely identified by its tag, which in this case is the
// memory address of a CallData instance.
// The return value of Next should always be checked. This return value
// tells us whether there is any kind of event or cq_ is shutting down.
mutexAsynchronizationMessageA_.lock();
bool bRes = cqAsynchronizationMessageA_->Next(&tag, &ok);
if(!bRes){
Helper::syslogex("Failed to Function cqAsynchronizationMessageA_->Next");
mutexAsynchronizationMessageA_.unlock();
break;
}
if(!ok){
CallData *pCallData = static_cast<CallData*>(tag);
if(NULL != pCallData){
delete pCallData;
}
Helper::syslogex("ok returned by function cqAsynchronizationMessageA_.Next is false");
mutexAsynchronizationMessageA_.unlock();
continue;
};
mutexAsynchronizationMessageA_.unlock();
static_cast<CallData*>(tag)->Proceed();
// if(400 == gCount){
// mutex_.lock();
// server_->Shutdown();
// asyncServer_->Shutdown();
// // Always shutdown the completion queue after the server.
// cq_->Shutdown();
// void* tag; // uniquely identifies a request.
// bool ok;
// cq_->Next(&tag, &ok);
// mutex_.unlock();
// break;
// }
}
}
// Proceed to the server's main loop.
void FileMonitorProtoServiceImpl::HandleRpcsAsynchronizationMessageB(){
// Spawn a new CallData instance to serve new clients.
//new CallData(&service_, cq_.get());
void* tag; // uniquely identifies a request.
bool ok;
while(true){
// Block waiting to read the next event from the completion queue. The
// event is uniquely identified by its tag, which in this case is the
// memory address of a CallData instance.
// The return value of Next should always be checked. This return value
// tells us whether there is any kind of event or cq_ is shutting down.
mutexAsynchronizationMessageB_.lock();
bool bRes = cqAsynchronizationMessageB_->Next(&tag, &ok);
if(!bRes){
Helper::syslogex("Failed to Function cqAsynchronizationMessageB_->Next");
mutexAsynchronizationMessageB_.unlock();
break;
}
if(!ok){
CallData *pCallData = static_cast<CallData*>(tag);
if(NULL != pCallData){
delete pCallData;
}
Helper::syslogex("ok returned by function cqAsynchronizationMessageB_.Next is false");
mutexAsynchronizationMessageB_.unlock();
continue;
};
mutexAsynchronizationMessageB_.unlock();
static_cast<CallData*>(tag)->Proceed();
// if(400 == gCount){
// mutex_.lock();
// server_->Shutdown();
// asyncServer_->Shutdown();
// // Always shutdown the completion queue after the server.
// cq_->Shutdown();
// void* tag; // uniquely identifies a request.
// bool ok;
// cq_->Next(&tag, &ok);
// mutex_.unlock();
// break;
// }
}
}
void* FileMonitorProtoServiceImpl::ThreadStartRoutineAsynchronizationMessageA(void* parameter){
do{
if(NULL == parameter){
Helper::syslogex("Pointer variable parameter is null in function threadStartRoutineAsynchronizationMessageA");
break;
}
FileMonitorProtoServiceImpl *FileMonitorProtoServiceImplObj = (FileMonitorProtoServiceImpl*)(parameter);
FileMonitorProtoServiceImplObj->HandleRpcsAsynchronizationMessageA();
}while(0);
return NULL;
}
void* FileMonitorProtoServiceImpl::ThreadStartRoutineAsynchronizationMessageB(void* parameter){
do{
if(NULL == parameter){
Helper::syslogex("Pointer variable parameter is null in function threadStartRoutineAsynchronizationMessageB");
break;
}
FileMonitorProtoServiceImpl *FileMonitorProtoServiceImplObj = (FileMonitorProtoServiceImpl*)(parameter);
FileMonitorProtoServiceImplObj->HandleRpcsAsynchronizationMessageB();
}while(0);
return NULL;
}
你好,我是有问必答小助手,非常抱歉,本次您提出的有问必答问题,技术专家团超时未为您做出解答
本次提问扣除的有问必答次数,将会以问答VIP体验卡(1次有问必答机会、商城购买实体图书享受95折优惠)的形式为您补发到账户。
因为有问必答VIP体验卡有效期仅有1天,您在需要使用的时候【私信】联系我,我会为您补发。