我定义了一个concurrentHashSet,然后使用线程池开启十个线程并行对redis做1000次incr操作,然后存进set内,最后打印set,结果是redis内部数据没问题,但是打印出来的数量没有1000个。
/**
* @author zwy
* @date 2022/1/20
*/
@RestController
@RequestMapping("/redis")
public class RedisTestController {
@Resource
private DefaultRedisService defaultRedisService;
private static final int CORE_POOL_SIZE = 5;
private static final int MAX_POOL_SIZE = 10;
private static final int QUEUE_CAPACITY = 100;
private static final long KEEP_ALIVE_TIME = 1L;
@GetMapping("/t1")
public MultiResponse<Long> incrId() {
ConcurrentHashSet<Long> concurrentHashSet = new ConcurrentHashSet<>();
// 消费者线程池
final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(QUEUE_CAPACITY),
new ThreadPoolExecutor.CallerRunsPolicy()
);
for (int i = 0; i < 1000; i++) {
threadPoolExecutor.execute(() -> {
// Long testId = defaultRedisService.incr("test_id");
// System.out.println("currentThread: " + Thread.currentThread().getName() + " id: " + testId);
// concurrentHashSet.add(testId);
concurrentHashSet.add(defaultRedisService.incr("test_id"));
});
// concurrentHashSet.add(defaultRedisService.incr("test_id"));
}
// threadPoolExecutor.shutdown();
return MultiResponse.ofWithCollectionSize(concurrentHashSet);
}
}
RedisService的incr实现
@Override
public Long incr(Object key) {
if (isAllStringType(key)) {
return stringRedisTemplate.opsForValue().increment((String) key);
}
return redisTemplate.opsForValue().increment(key);
}
开启1000个线程来执行则没问题,但是我的抛弃策略是CallerRunsPolicy,理论来说应该会全部执行才是
结果正常
这个问题很简单,因为子线程确实会执行1000次,ConcurrentHashSet也是线程安全的,并且redis也会进行自增到1000,但是你有没有考虑过主线程不等待子线程执行完成就返回了
解决方式:1.在返回前sleep一段时间就能看出来
2.采用CountDownLatch来计数
一键三连吧
public MultiResponse<Long> incrId() throws InterruptedException {
ConcurrentHashSet<Long> concurrentHashSet = new ConcurrentHashSet<>();
final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(QUEUE_CAPACITY),
new ThreadPoolExecutor.CallerRunsPolicy()
);
CountDownLatch cdl = new CountDownLatch(1000); // TODO
for (int i = 0; i < 1000; i++) {
threadPoolExecutor.execute(() -> {
concurrentHashSet.add(defaultRedisService.incr("test_id"));
cdl.countDown(); // TODO
});
}
cdl.await(); // TODO
return MultiResponse.ofWithCollectionSize(concurrentHashSet);
}