关于生产消费线程,消费者重复执行的问题

一个关于线程的问题,生产者放入队列中后,不打断点的话,消费者线程会执行两次,打断点的话是执行一次,贴一下代码,求解答。

缓冲队列:

 package psplat.unicorn.model;

import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * @author L
 *  缓冲队列
 */
public class CacheQueue<T> {

    private LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<T>();

    public CacheQueue() {
    }

    protected void add(T t){
        queue.add(t);
    }

    protected T remove(){
        return queue.poll();
    }

    protected T get(){
        return queue.peek();
    }

    protected boolean isEmpty(){
        return queue.isEmpty();
    }

    protected void addAll(List<T> list){
        queue.addAll(list);
    }

    protected int getLength() {
        return queue.size();
    }

}

缓冲队列管理类:

 package psplat.unicorn.model;

import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author L
 *  联通队列管理类
 */
public class UnicornQueueManager {

    private CacheQueue<MessageSendInfo> queueCache;
    private Lock lock;
    private Condition condition;

    private static volatile UnicornQueueManager instance;

    private UnicornQueueManager(){
        queueCache = new CacheQueue<MessageSendInfo>();
        this.lock = new ReentrantLock();
        this.condition = lock.newCondition();
    }

    public static UnicornQueueManager getInstance(){
        if(instance == null){
            synchronized(UnicornQueueManager.class){
                if(instance == null){
                    instance = new UnicornQueueManager();
                }
            }
        }
        return instance;
    }

    public void add(MessageSendInfo messageInfo){
        queueCache.add(messageInfo);
        lock.lock();
        condition.signalAll();
        lock.unlock();
    }

    public MessageSendInfo removeOne(){
        return queueCache.remove();
    }

    public MessageSendInfo get(){
        return queueCache.get();
    }

    public boolean isEmpty(){
        return queueCache.isEmpty();
    }

    public Lock getLock() {
        return lock;
    }

    public Condition getCondition() {
        return condition;
    }

    public void addAll(List<MessageSendInfo> list){
        queueCache.addAll(list);
    }

    public int getLength() {
        return queueCache.getLength();
    }

}

消费者线程:

 package psplat.unicorn.consumer;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.jeecgframework.web.system.service.SystemService;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import psplat.https.HttpClientUtil;
import psplat.unicorn.entity.UnicornLogEntity;
import psplat.unicorn.model.MessageSendInfo;
import psplat.unicorn.model.MessageStatus;
import psplat.unicorn.model.ReportMapManager;
import psplat.unicorn.model.UnicornConstants;
import psplat.unicorn.model.UnicornQueueManager;
import psplat.util.ConvertMapUtil;

@Component
public class UnicordSendConsumer implements InitializingBean {

    private ExecutorService consumerRunnable;

    @Autowired
    private SystemService systemService;

    //用于发送计数,发送三次,失败,则丢弃数据,否则队列后的消息全部阻塞
    private static int count = 0;

    @Override
    public void afterPropertiesSet() throws Exception {
        consumerRunnable = Executors.newSingleThreadExecutor();
        consumerRunnable.execute(new Runnable() {

            @Override
            public void run() {
                while(true) {
                    if(UnicornQueueManager.getInstance().isEmpty()) {
                        UnicornQueueManager.getInstance().getLock().lock();
                        try {
                            UnicornQueueManager.getInstance().getCondition().await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        UnicornQueueManager.getInstance().getLock().unlock();
                    } else {
                        MessageSendInfo sendInfo = UnicornQueueManager.getInstance().get();
                        //组装参数并发送
                        String url = UnicornConstants.COMMON_URL.getSuffix() + UnicornConstants.SEND.getSuffix();
                        Map<String, String> argsMap = new HashMap<String, String>();
                        argsMap.put("SpCode", UnicornConstants.SpCode.getSuffix());
                        argsMap.put("LoginName", UnicornConstants.LoginName.getSuffix());
                        argsMap.put("Password", UnicornConstants.Password.getSuffix());
                        argsMap.put("MessageContent", sendInfo.getMessageContent());
                        argsMap.put("UserNumber", sendInfo.getNumbers());
                        argsMap.put("SerialNumber", sendInfo.getSerialNumber());
                        String result = (new HttpClientUtil()).doPost(url, argsMap, "GBK");
                        String resultCode = ConvertMapUtil.convertMapFromArgs(result).get("result");
                        if(!resultCode.equals("0")) {
                            //发送失败,发三次,移除
                            count++;
                            if(count != 3) {
                                //失败了,1秒后重新发送
                                this.threadSleep(1000);
                                continue;
                            }
                        }
                        count = 0;
                        UnicornQueueManager.getInstance().removeOne();
                        System.out.println("执行,此时队列长度" + UnicornQueueManager.getInstance().getLength());
                        List<UnicornLogEntity> logs = sendInfo.inrichUnicornLogs(MessageStatus.COMMIT.getCode(), resultCode);
                        systemService.batchSave(logs);
                        ReportMapManager.getInstance().addAll(ReportConsumer.convertUnicornLogEntityMap(logs));
                    }
                }
            }

            private void threadSleep(int i) {
                try {
                    Thread.sleep(i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        });
    }

}

有大神看出来为何这个线程会执行两次么?加了打印之后,队列长度已经为0为什么不进入等待?
如下是打印:
调用结束,返回值:result=0&description=发送短信成功&taskid=208754669702&faillist=&task_id=208754669702
执行,此时队列长度0
调用结束,返回值:result=0&description=发送短信成功&taskid=208753973775&faillist=&task_id=208753973775
执行,此时队列长度0
求指导!

https://blog.csdn.net/qq_22271479/article/details/78495042

昨天太匆忙,说一下逻辑
这里spring加载完毕后,消费者线程处于等待状态,当生产者放入队列中即调用add方法时(addAll不用,所以没有操作),消费者被唤醒,发送消息
在发送的过程中,发送成功则移除队列中的数据,发送失败会尝试发送三次,即count计数直到3时,该条数据会被丢弃,直到队列为空,消费者继续等待
这里哪里的逻辑出了问题,为什么当队列有一条数据的时候会发送两次,是因为我的同步不对么?
我思考了一下应该是线程的问题吧,因为每次打断点的话都是执行一次,放开断点的话,就会执行两次
有大神看出来哪里的问题了吗?以前也这样写,都没有出现这个问题。。。
在进入死循环处加入了打印,打印出if条件和刚进入循环时的队列长度,发现每一次消费者被唤醒,死循环的打印先出现
false(if条件值)
false(if条件值)
被唤醒,此时队列长度1(刚进入else时队列的长度)
被唤醒,此时队列长度1(刚进入else时队列的长度)
------------------模拟调用(模拟成功的情况)
------------------模拟调用(模拟成功的情况)
发送成功,此时队列长度0(执行成功后,队列删除后长度)
发送成功,此时队列长度0(执行成功后,队列删除后长度)
这里打印出了两个false值先出来,为什么?在我理解中,这个线程应该是独立顺序执行,为什么第一次执行没有完成,第二次再次调用打印出一个false?
有些理解不了,求解答
解决了,add方法中原来是signal(),我改成了signalAll(),改回signal()后,就不会重复调用,但是还是有点想不明白,主线程为生产者,这个线程是消费者,为什么signalAll()会导致消费者执行两次?