是每秒钟从库里提取300个请求进行发送。没有设定流速的时候是每秒最高3000个,但是用了.newScheduledThreadPool后只能处理30个不到的请求
public SendMsgHandler(int threadSize)
{
if (threadSize > 0) {
this.threadSize = threadSize;
}
this.threadPool = Executors.newScheduledThreadPool(threadSize);
}
public void sendMsgs(String userName)
{
try
{
List list = getMsgs(userName);
do
{
if (list.size()== 300)
{
CountDownLatch cdl = new CountDownLatch(3);
long b = System.currentTimeMillis();
for (int i = 0; i < 3; i++)
{
SendMsgHandler.logger.info("-----------" + "thisi301"+i);
List<DBObject> list1 = list.subList(i * 100, (i + 1) * 100 - 1);
SendMsgThread st = new SendMsgThread(list1, cdl);
this.threadPool.schedule(st, 1, TimeUnit.SECONDS);
}
long d = System.currentTimeMillis() - b;
cdl.await();
}
else
{
SendMsgHandler.logger.info("start200"+format.format(System.currentTimeMillis()));
int result = pushMsgs(list);
if (result == 0)
{
long b = System.currentTimeMillis();
for (int i = 0; i < list.size(); i++)
{
DBObject obj = (DBObject)list.get(i);
deleteMsg(obj);
}
long d = System.currentTimeMillis() - b;
}
}
list = getMsgs(userName);
if (list == null) {
break;
}
} while (list.size() > 0);
}
class SendMsgThread
implements Runnable
{
private List<DBObject> list;
private CountDownLatch cdl;
public SendMsgThread(List<DBObject> list1,CountDownLatch cd2)
{
this.list = list1;
this.cdl = cd2;
}
public void run()
{
try
{
int result = SendMsgHandler.pushMsgs(this.list);
if (result == 0)
{
long b = System.currentTimeMillis();
for (int i = 0; i < this.list.size(); i++)
{
DBObject obj = (DBObject)this.list.get(i);
SendMsgHandler.deleteMsg(obj);
}
long d = System.currentTimeMillis() - b;
SendMsgHandler.logger.info("delete report300-----------" + d);
}
}
catch (Exception e)
{
e.printStackTrace();
SendMsgHandler.logger.warn("process line error: " + e.getMessage(), e);
}
finally
{
if (this.cdl != null) {
this.cdl.countDown();
}
}
}
}