项目需求,需要将大量实时数据从kafka拉下,并发送到第三方给定的url地址,并且第三方规定,只能用短连接。
目前是采用一个线程池共享一个httpclient实例来批量发送,但是总觉得这种短连接发挥不了httpclient的功能,并且在使用中还有大量的连接超时、读超时等异常,甚至比使用原生的java URLconnection的异常还要多。
虽说httpclient自带重发功能,但是时间一长,还发现程序有卡死的现象,通过jstack发现,死锁的出现正是在httpclient重试过程中写log4j的时候出现的:
"pool-4-thread-86" prio=10 tid=0x00007f5fe00cd000 nid=0x356d waiting for monitor entry [0x00007f5eec7c6000]
java.lang.Thread.State: BLOCKED (on object monitor)
at org.apache.log4j.Category.callAppenders(Category.java:204)
- waiting to lock (a org.apache.log4j.spi.RootLogger)
at org.apache.log4j.Category.forcedLog(Category.java:391)
at org.apache.log4j.Category.log(Category.java:856)
at org.apache.commons.logging.impl.Log4JLogger.info(Log4JLogger.java:176)
at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:96)
at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:184)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
at com.voole.ad.utils.HttpConnectionMgrUtils.invokeGetUrl(HttpConnectionMgrUtils.java:131)
at com.voole.ad.service.impl.AdTimeSendServiceImpl$2.run(AdTimeSendServiceImpl.java:130)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
** 所以现在不知道需要作何处理,有没有更好的方式来发送大量的数据?还请大神指导!**
大量、且实时的数据,不建议使用 HTTP。
HTTP 连接,其实也是 TCP 连接的。所以,是否可以考虑使用 Socket(HTTP)来实现。
(一)get方法和post方法简介
查看了MSDN,并且实际写了一番程序,才明白get方法和post方法的不同。其实二者实现的功能相同,都是客户端提交数据库给服务器端,只是实现的机制不同而已。get方法实际上就是通过网页的URL地址实现数据的传送,而服务器端实现从URL地址中解析数据;而post方法则是通过表单的方式提交,数据采取加密方式传送,服务器接到请求后解析数据。从安全角度上看,post方法更安全,get方法可以直接从浏览器直接提交数据,而post方法则必须从网页提交表单。
(二)get方法和post方法实现
啰嗦了一通,还是讲讲两种方法如何实现的吧。
1、get方法VC实现
闲话少说,直接看代码,VC6.0标准类库函数CInternetSession,更详细的用法,请查看MSDN。
CInternetSession tInternet;
CStdioFile* tFile;
char tChars[2056];
memset(tChars,0,2056);
sprintf(tChars,"%s","http://127.0.0.1:8080/app?username=123&password=123&data=111111111111");
tFile = tInternet.OpenURL(tChars,1, INTERNET_FLAG_TRANSFER_ASCII,NULL,0);
memset(tChars,0,2056);
tFile->Read(tChars,2056);
发送的超链接中,app是数据应用程序部署,在数据库端要有该程序的响应,数据库才会接收数据。程序的数据已经保存在了该URL中。
其实发送URL的方法有很多,API函数,C++ Builder中的空间都可以实现该功能。
2、post方法实现
(1)、仍然使用CInternetSession类
char chHeader[128];
char chData[128];
char tChars[128];
memset(tChars,0,128);
CInternetSession session;
CHttpConnection* pConnection=NULL;
CHttpFile* pFile=NULL;
INTERNET_PORT nPort=8080;
strcpy(chHeader,"Content-Type: application/x-www-form-urlencoded");
strcpy(chData,"username=123&password=123&data=123456789012345");
pConnection = session.GetHttpConnection("127.0.0.1",nPort);
pFile = pConnection->OpenRequest(CHttpConnection::HTTP_VERB_POST,"/app");
pFile->SendRequest(chHeader,(DWORD)strlen(chHeader),(LPVOID )chData,strlen(chData));
pFile->Read(tChars,128);
session.Close();
(2)、使用TCP/IP协议传输数据
SOCKADDR_IN saServer;
LPHOSTENT lphostent;
WSADATA wsadata;
SOCKET hsocket;
int nRet;
const char host_name="127.0.0.1";
char* req=
"POST http://127.0.0.1:8080/app?username=123&password=123&data=111111111111 HTTP/1.1\r\n"
"From: local\r\n"
"User-Agent: receiver/Receiver/1.1\r\n"
"Content-Type: application/x-www-form-urlencoded\r\n"
"Content-Length: 0\r\n\r\n";
char tChars[128];
if(WSAStartup(winsock_version,&wsadata))
{
MessageBox("can't initial socket");
}
lphostent=gethostbyname(host_name);
if(lphostent==NULL)
{
MessageBox("lphostent is null");
}
hsocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
saServer.sin_family = AF_INET;
saServer.sin_port = htons(8080);
saServer.sin_addr = *((LPIN_ADDR)*lphostent->h_addr_list);
nRet = connect(hsocket, (LPSOCKADDR)&saServer, sizeof(SOCKADDR_IN));
if(nRet == SOCKET_ERROR)
{
MessageBox("Can't connect");
closesocket(hsocket);
return;
}
else
{
MessageBox("connected with host");
}
nRet = send(hsocket, req, strlen(req), 0);
if(nRet == SOCKET_ERROR)
{
MessageBox("send() failed");
closesocket(hsocket);
}
else
{
MessageBox("send() OK");
}
char dest[1000];
nRet=0;
while(nRet>0)
{
nRet=recv(hsocket,(LPSTR)dest,sizeof(dest),0);
if(nRet>0)
{
dest[nRet]=0;
}
else
{
dest[0]=0;
}
sprintf(tChars,"Received bytes:%d",nRet);
MessageBox(tChars);
sprintf(tChars,"Result:%s",dest);
MessageBox(tChars);
}
(2)、使用API函数
HINTERNET hInternetOpen = InternetOpen("http://127.0.0.1:8080/app",
INTERNET_OPEN_TYPE_PRECONFIG,
NULL,
NULL,
0);
if(hInternetOpen < 0 )
{
MessageBox("Can't open the page");
}
HINTERNET hInternetConnect = InternetConnect(hInternetOpen,
"127.0.0.1",
8080,
NULL,
"HTTP/1.1",
INTERNET_SERVICE_HTTP,
0,
0);
if(hInternetConnect < 0 )
{
MessageBox("Can't connect to the server");
}
HINTERNET hHttpOpenRequest = HttpOpenRequest(hInternetConnect,
"POST",
"/IFCS/IMEICheck",
"HTTP/1.1",
NULL,
0,
INTERNET_FLAG_RELOAD,
0);
if(hHttpOpenRequest < 0 )
{
MessageBox("Open request fail");
}
char chData[128] = "username=123&password=123&data=123456789000001";
DWORD len = strlen(chData);
int bRet = HttpSendRequest(hHttpOpenRequest,
"Content-Type: application/x-www-form-urlencoded\r\n",
strlen("Content-Type: application/x-www-form-urlencoded\r\n"),
(LPVOID *)chData,
len);
char tChars[128];
memset(tChars,0,128);
DWORD dwReadLen;
InternetReadFile(hHttpOpenRequest,(LPVOID *)tChars,128,&dwReadLen);
if(dwReadLen > 0)
{
MessageBox(tChars);
}
else
{
MessageBox("Read the reply fail");
}
InternetCloseHandle(hHttpOpenRequest);
InternetCloseHandle(hInternetConnect);
InternetCloseHandle(hInternetOpen);
get发送不了大数据的,post也有数据大小限制的,但是post能发送的数据又比get多。
如果你能控制服务器,你可以get把地址发过去,让服务器自己去下载。
http本身发送大量数据能力有限 既然有kafka 为什么不在http服务器那端用consumer来直接从kafka拉取数据 然后再进行处理 这样跟http服务器在一个内网 数据发送就快多了
GET 方法向 URL 添加数据;URL 的长度是受限制的(URL 的最大长度是 2048 个字符)
所以,数据大了,还是用POST吧