LinkedBlockingQueue没能同步消费

LinkedBlockingQueue的take方法消费消息没能同步消费(消费完一个再消费下一个)

以下是我的队列的代码(队列里的消息是通过put方法存入队列的)

@Component
public class IPOEQueue {

    private static Logger log = Logger.getLogger( "Logger_IPOEQueue" );

    public static LinkedBlockingQueue ipoeQueue = new LinkedBlockingQueue<>();

    @Resource
    private WsjyDao wsjyDao;

    @Resource
    private WsjyService wsjyService;

    @PostConstruct
    public void init() {
        Thread t =new Thread(new Runnable() {
            @Override
            public void run() {

                while (true) {
                    Map entityObject = new LinkedHashMap<>();
                    IPOELogParams insertLogParams = null;
                    Map params = null;
                    String entityRecordId = null;
                    try {
                        IPOEQueueParams data = ipoeQueue.take();

                        String url = data.getUrl();
                        List pairList = data.getPairList();
                        String orderId = data.getOrderId();

                        IPOELogParams ipoeLogParams = data.getIpoeLogParams();
                        insertLogParams = data.getInsertLogParams();
                        params = data.getParams();

                        entityRecordId = insertLogParams.getEntityRecordId();
                        log.info("IPOE日志:" + entityRecordId + ",开始执行队列逻辑");
                        entityObject = insertLogParams.getEntityObject();
                        String ipoeAccount = (String) params.get("user_name");
                        String ipoeId = wsjyDao.getCountFromIPOEEntity(ipoeAccount);    // IPOE表中的uuid
                        if (ipoeId != null && !"".equals(ipoeId)) {
                            // 已存在该账号
                            ipoeLogParams.setEntityRecordId(ipoeId);
                            BaseJson baseJson = IPOELogUtils.updatePushResult(ipoeLogParams);
                            log.info(ipoeId + "记录日志更新到IPOE表" + baseJson.isSuccess());

                            entityObject.put("iprzIpoeIpoeidFk", ipoeId);
                        }else {
                            // 不存在的IPOE账号,insert
                            String newUUID = CZUtils.getUUID();
                            ipoeLogParams.setEntityRecordId(newUUID);
                            BaseJson baseJson = IPOELogUtils.insertLog(ipoeLogParams);
                            log.info(ipoeId + "记录日志到IPOE表" + baseJson.isSuccess());

                            entityObject.put("iprzIpoeIpoeidFk", newUUID);
                        }

                        String result = HttpClientService.sendPost( url, pairList );
                        log.info("sendRequestToIPOE返回的结果:" + result);

                        try {
                            wsjyService.updateOrderServiceEndTime( result, params, orderId );
                        }catch (DeadlockLoserDataAccessException e) {
                            e.printStackTrace();
                            log.info("wsjyService.updateOrderServiceEndTime( result, params, orderId ); 出现DeadlockLoserDataAccessException,重试一次:");
                            try {
                                wsjyService.updateOrderServiceEndTime( result, params, orderId );
                            }catch (DeadlockLoserDataAccessException ex) {
                                e.printStackTrace();
                                log.info("wsjyService.updateOrderServiceEndTime( result, params, orderId ); 出现DeadlockLoserDataAccessException,重试第二次:");
                                wsjyService.updateOrderServiceEndTime( result, params, orderId );
                            }
                        }

                        // 推送IPOE结果-记录日志
                        if (StringUtils.isNotBlank(result)) {
                            Map map = JSON.parseObject(result, Map.class);
                            Integer code = (Integer) map.get("code");
                            if (code.equals(0)) {
                                // 推送成功
                                entityObject.put("iprzTuisongjieguo", "01");   // 01-推送成功
                            }else {
                                entityObject.put("iprzTuisongjieguo", "02");   // 02-推送失败
                            }

                        }else {
                            // 推送失败
                            entityObject.put("iprzTuisongjieguo", "02");   // 02-推送失败
                        }

                        entityObject.put("iprzFanhuinarong", result);
                        insertLogParams.setEntityObject(entityObject);

                    }catch (Throwable e) {
                        e.printStackTrace();
                        log.error(entityRecordId + "IPOE队列消费出现异常:" + JSON.toJSONString(e));

                        log.error( "向IPOE发送请求异常,请求参数:" + JSON.toJSONString( params ) );
                        log.error( "向IPOE发送请求异常,日志数据:" + JSON.toJSONString( insertLogParams ) );

                        entityObject.put("iprzTuisongjieguo", "02");
                        entityObject.put("iprzFanhuinarong", "推送过程中出现异常:" + JSON.toJSONString( e ));
                        if (insertLogParams != null)
                            insertLogParams.setEntityObject(entityObject);
                    }finally {
                        // 更新IPOE日志表
                        if (insertLogParams != null) {
                            BaseJson baseJson = IPOELogUtils.updatePushResult(insertLogParams);
                            log.info(insertLogParams.getEntityRecordId() + "记录日志到日志表" + baseJson.isSuccess());
                        }else {
                            log.error("insertLogParams为null");
                        }
                    }
                }
            }
        });
        t.setName("IPOEQueueConsumer");
        t.start();
    }

}


我在日志里看到的现象

img

有哪位道友遇到过这种问题吗 ?

可以在IPOEQueue.take方法中加个同步块试试

public static LinkedBlockingQueue<IPOEQueueParams> ipoeQueue = new LinkedBlockingQueue<>();
private static Object lock = new Object();

public static IPOEQueueParams take() throws InterruptedException {
    synchronized (lock) {
        while (ipoeQueue.isEmpty()) {
            lock.wait();
        }
        IPOEQueueParams data = ipoeQueue.poll();
        if (data != null) {
            // 处理IPOEQueueParams
        }
        return data;
    }
}

public static void put(IPOEQueueParams data) {
    ipoeQueue.offer(data);
    synchronized (lock) {
        lock.notifyAll();
    }
}