concurrentHashSet + ThreadPoolExectuor + RedisTemplate 多线程获取自增ID的问题

问题遇到的现象和发生背景

我定义了一个concurrentHashSet,然后使用线程池开启十个线程并行对redis做1000次incr操作,然后存进set内,最后打印set,结果是redis内部数据没问题,但是打印出来的数量没有1000个。

问题相关代码
/**
 * @author zwy
 * @date 2022/1/20
 */
@RestController
@RequestMapping("/redis")
public class RedisTestController {

    @Resource
    private DefaultRedisService defaultRedisService;

    private static final int CORE_POOL_SIZE = 5;
    private static final int MAX_POOL_SIZE = 10;
    private static final int QUEUE_CAPACITY = 100;
    private static final long KEEP_ALIVE_TIME = 1L;

    @GetMapping("/t1")
    public MultiResponse<Long> incrId() {
        ConcurrentHashSet<Long> concurrentHashSet = new ConcurrentHashSet<>();
        // 消费者线程池
        final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAX_POOL_SIZE,
                KEEP_ALIVE_TIME,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(QUEUE_CAPACITY),
                new ThreadPoolExecutor.CallerRunsPolicy()
        );

        for (int i = 0; i < 1000; i++) {
            threadPoolExecutor.execute(() -> {
//                Long testId = defaultRedisService.incr("test_id");
//                System.out.println("currentThread: " + Thread.currentThread().getName() + " id: " + testId);
//                concurrentHashSet.add(testId);
                concurrentHashSet.add(defaultRedisService.incr("test_id"));
            });
//            concurrentHashSet.add(defaultRedisService.incr("test_id"));
        }

//        threadPoolExecutor.shutdown();
        return MultiResponse.ofWithCollectionSize(concurrentHashSet);
    }

}

RedisService的incr实现

@Override
    public Long incr(Object key) {
        if (isAllStringType(key)) {
            return stringRedisTemplate.opsForValue().increment((String) key);
        }
        return redisTemplate.opsForValue().increment(key);
    }

运行结果及报错内容

img


此外,redis的自增是正常的

我的解答思路和尝试过的方法

开启1000个线程来执行则没问题,但是我的抛弃策略是CallerRunsPolicy,理论来说应该会全部执行才是

我想要达到的结果

结果正常

这个问题很简单,因为子线程确实会执行1000次,ConcurrentHashSet也是线程安全的,并且redis也会进行自增到1000,但是你有没有考虑过主线程不等待子线程执行完成就返回了
解决方式:1.在返回前sleep一段时间就能看出来
2.采用CountDownLatch来计数
一键三连吧

public MultiResponse<Long> incrId() throws InterruptedException {
    ConcurrentHashSet<Long> concurrentHashSet = new ConcurrentHashSet<>();
    final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            CORE_POOL_SIZE,
            MAX_POOL_SIZE,
            KEEP_ALIVE_TIME,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(QUEUE_CAPACITY),
            new ThreadPoolExecutor.CallerRunsPolicy()
    );
    CountDownLatch cdl = new CountDownLatch(1000);  // TODO
    for (int i = 0; i < 1000; i++) {
        threadPoolExecutor.execute(() -> {
            concurrentHashSet.add(defaultRedisService.incr("test_id"));
            cdl.countDown();  // TODO
        });
    }
    cdl.await(); // TODO
    return MultiResponse.ofWithCollectionSize(concurrentHashSet);
}