项目需要主线程配合线程池执行批量合并数据的业务,目前遇到的问题是线程池中某一个异步task执行失败,主线程中的事物并不知晓,希望zimuge能解答。
相关代码示例:
/**
* 主线程处理器执行类
* @param dataMergeBO merge业务bo对象
*/
@Transactional(rollbackFor = RuntimeException.class)
public void startMerge(DataMergeBO dataMergeBO) {
Preconditions.checkArgument(Objects.nonNull(dataMergeBO.getMergeObjectEnum()), "The current parameters are incorrect, and the scope of business execution cannot be confirmed");
// 初始化流程dto数据
startMergeOptions(TenantIdOrUserIdMergeDTO.convertFrom(dataMergeBO), dataMergeBO.getProcessTypeEnum());
}
/**
* 主线程并行处理
* @param mergePageDTO 分页dto
* @param sumPages 总页数
* @return 影响行数
*/
private Integer parallelProcessing(MergePageDTO mergePageDTO, Integer sumPages) {
log.info("parllel processing ..... threads counts: {}", sumPages);
List<Future<Integer>> resultList = new CopyOnWriteArrayList<>();
CountDownLatch countDownLatch = new CountDownLatch(sumPages);
// 获取task
MergePageTask<M> mergePageTask = new MergePageTask<>(mMergeBaseService, transactionTemplate)
.setSumPages(new AtomicInteger(sumPages)).setIndex(new AtomicInteger(0));
// 定义固定线程池
ExecutorService mergeExecutorJob = getMergeExecutor(sumPages);
try {
AtomicReference<MergePageDTO> atomicpageDTO = new AtomicReference<>();
atomicpageDTO.set(mergePageDTO);
mergePageTask.setCountDownLatch(countDownLatch).setMergPageDTO(atomicpageDTO);
Future<Integer> submit = null;
for (int i = 0; i < sumPages; i++) {
submit = mergeExecutorJob.submit(mergePageTask);
resultList.add(submit);
}
countDownLatch.await();
}
catch (InterruptedException exception) {
Thread.currentThread().interrupt();
throw new MergeDataExistProblemException(
"the merge page thread pool exist problem: " + exception.getMessage());
}
finally {
// 线程池释放资源
}
return resultList.stream().mapToInt(this::applyAsInt).sum();
}
/**
* 获取线程池
* @param unitThreads
* @return
*/
private ExecutorService getMergeExecutor(int unitThreads) {
return Executors.newFixedThreadPool(unitThreads);
}
@Slf4j
@Service
@RequiredArgsConstructor
public class MergePageTask<M> implements Callable<Integer> {
private final MergeBaseService<M> mMergeBaseService;
private CountDownLatch countDownLatch;
private AtomicReference<MergePageDTO> atomicReference;
private AtomicInteger atomIndex;
private AtomicInteger atomSumPages;
/**
* @return 影响行数
*/
@Override
public Integer call() {
Integer nums = null;
// 分页SQL
influenceRows = mMergeBaseService.mergeParallelPage(mergePageDTO.supportForParallel(atomIndex.get()));
log.info("区间:[{} -{}], 当前影响行数:[{}]", mergePageDTO.getSize() * atomIndex.get(),
mergePageDTO.getSize() * (atomIndex.get() + 1), influenceRows);
return influenceRows;
countDownLatch.countDown();
return nums;
}
// set 方法...
}
那你主线程得阻塞住,用共享变量或者回调去处理
submit以后,你将future存到了list中,那就自己去循环遍历list 通过Future.get让主线程去获取子线程的异常,你这就能处理了
可以考虑一下 CountDownLatch + List。子线程将结果保存到list中,然后countdown。主线程判断list中的状态,全部为TRUE提交事务,否则让子线程回滚。
https://blog.csdn.net/weixin_48484941/article/details/110237853