一个关于线程的问题,生产者放入队列中后,不打断点的话,消费者线程会执行两次,打断点的话是执行一次,贴一下代码,求解答。
缓冲队列:
package psplat.unicorn.model;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
/**
* @author L
* 缓冲队列
*/
public class CacheQueue<T> {
private LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<T>();
public CacheQueue() {
}
protected void add(T t){
queue.add(t);
}
protected T remove(){
return queue.poll();
}
protected T get(){
return queue.peek();
}
protected boolean isEmpty(){
return queue.isEmpty();
}
protected void addAll(List<T> list){
queue.addAll(list);
}
protected int getLength() {
return queue.size();
}
}
缓冲队列管理类:
package psplat.unicorn.model;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author L
* 联通队列管理类
*/
public class UnicornQueueManager {
private CacheQueue<MessageSendInfo> queueCache;
private Lock lock;
private Condition condition;
private static volatile UnicornQueueManager instance;
private UnicornQueueManager(){
queueCache = new CacheQueue<MessageSendInfo>();
this.lock = new ReentrantLock();
this.condition = lock.newCondition();
}
public static UnicornQueueManager getInstance(){
if(instance == null){
synchronized(UnicornQueueManager.class){
if(instance == null){
instance = new UnicornQueueManager();
}
}
}
return instance;
}
public void add(MessageSendInfo messageInfo){
queueCache.add(messageInfo);
lock.lock();
condition.signalAll();
lock.unlock();
}
public MessageSendInfo removeOne(){
return queueCache.remove();
}
public MessageSendInfo get(){
return queueCache.get();
}
public boolean isEmpty(){
return queueCache.isEmpty();
}
public Lock getLock() {
return lock;
}
public Condition getCondition() {
return condition;
}
public void addAll(List<MessageSendInfo> list){
queueCache.addAll(list);
}
public int getLength() {
return queueCache.getLength();
}
}
消费者线程:
package psplat.unicorn.consumer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.jeecgframework.web.system.service.SystemService;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import psplat.https.HttpClientUtil;
import psplat.unicorn.entity.UnicornLogEntity;
import psplat.unicorn.model.MessageSendInfo;
import psplat.unicorn.model.MessageStatus;
import psplat.unicorn.model.ReportMapManager;
import psplat.unicorn.model.UnicornConstants;
import psplat.unicorn.model.UnicornQueueManager;
import psplat.util.ConvertMapUtil;
@Component
public class UnicordSendConsumer implements InitializingBean {
private ExecutorService consumerRunnable;
@Autowired
private SystemService systemService;
//用于发送计数,发送三次,失败,则丢弃数据,否则队列后的消息全部阻塞
private static int count = 0;
@Override
public void afterPropertiesSet() throws Exception {
consumerRunnable = Executors.newSingleThreadExecutor();
consumerRunnable.execute(new Runnable() {
@Override
public void run() {
while(true) {
if(UnicornQueueManager.getInstance().isEmpty()) {
UnicornQueueManager.getInstance().getLock().lock();
try {
UnicornQueueManager.getInstance().getCondition().await();
} catch (InterruptedException e) {
e.printStackTrace();
}
UnicornQueueManager.getInstance().getLock().unlock();
} else {
MessageSendInfo sendInfo = UnicornQueueManager.getInstance().get();
//组装参数并发送
String url = UnicornConstants.COMMON_URL.getSuffix() + UnicornConstants.SEND.getSuffix();
Map<String, String> argsMap = new HashMap<String, String>();
argsMap.put("SpCode", UnicornConstants.SpCode.getSuffix());
argsMap.put("LoginName", UnicornConstants.LoginName.getSuffix());
argsMap.put("Password", UnicornConstants.Password.getSuffix());
argsMap.put("MessageContent", sendInfo.getMessageContent());
argsMap.put("UserNumber", sendInfo.getNumbers());
argsMap.put("SerialNumber", sendInfo.getSerialNumber());
String result = (new HttpClientUtil()).doPost(url, argsMap, "GBK");
String resultCode = ConvertMapUtil.convertMapFromArgs(result).get("result");
if(!resultCode.equals("0")) {
//发送失败,发三次,移除
count++;
if(count != 3) {
//失败了,1秒后重新发送
this.threadSleep(1000);
continue;
}
}
count = 0;
UnicornQueueManager.getInstance().removeOne();
System.out.println("执行,此时队列长度" + UnicornQueueManager.getInstance().getLength());
List<UnicornLogEntity> logs = sendInfo.inrichUnicornLogs(MessageStatus.COMMIT.getCode(), resultCode);
systemService.batchSave(logs);
ReportMapManager.getInstance().addAll(ReportConsumer.convertUnicornLogEntityMap(logs));
}
}
}
private void threadSleep(int i) {
try {
Thread.sleep(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
有大神看出来为何这个线程会执行两次么?加了打印之后,队列长度已经为0为什么不进入等待?
如下是打印:
调用结束,返回值:result=0&description=发送短信成功&taskid=208754669702&faillist=&task_id=208754669702
执行,此时队列长度0
调用结束,返回值:result=0&description=发送短信成功&taskid=208753973775&faillist=&task_id=208753973775
执行,此时队列长度0
求指导!
https://blog.csdn.net/qq_22271479/article/details/78495042
昨天太匆忙,说一下逻辑
这里spring加载完毕后,消费者线程处于等待状态,当生产者放入队列中即调用add方法时(addAll不用,所以没有操作),消费者被唤醒,发送消息
在发送的过程中,发送成功则移除队列中的数据,发送失败会尝试发送三次,即count计数直到3时,该条数据会被丢弃,直到队列为空,消费者继续等待
这里哪里的逻辑出了问题,为什么当队列有一条数据的时候会发送两次,是因为我的同步不对么?
我思考了一下应该是线程的问题吧,因为每次打断点的话都是执行一次,放开断点的话,就会执行两次
有大神看出来哪里的问题了吗?以前也这样写,都没有出现这个问题。。。
在进入死循环处加入了打印,打印出if条件和刚进入循环时的队列长度,发现每一次消费者被唤醒,死循环的打印先出现
false(if条件值)
false(if条件值)
被唤醒,此时队列长度1(刚进入else时队列的长度)
被唤醒,此时队列长度1(刚进入else时队列的长度)
------------------模拟调用(模拟成功的情况)
------------------模拟调用(模拟成功的情况)
发送成功,此时队列长度0(执行成功后,队列删除后长度)
发送成功,此时队列长度0(执行成功后,队列删除后长度)
这里打印出了两个false值先出来,为什么?在我理解中,这个线程应该是独立顺序执行,为什么第一次执行没有完成,第二次再次调用打印出一个false?
有些理解不了,求解答
解决了,add方法中原来是signal(),我改成了signalAll(),改回signal()后,就不会重复调用,但是还是有点想不明白,主线程为生产者,这个线程是消费者,为什么signalAll()会导致消费者执行两次?