springboots环境下,如果线程池中某一个task回滚怎么让主线程的事物也跟着回滚?

项目需要主线程配合线程池执行批量合并数据的业务,目前遇到的问题是线程池中某一个异步task执行失败,主线程中的事物并不知晓,希望zimuge能解答。
相关代码示例:

    /**
     * 主线程处理器执行类
     * @param dataMergeBO merge业务bo对象
     */
    @Transactional(rollbackFor = RuntimeException.class)
    public void startMerge(DataMergeBO dataMergeBO) {
        Preconditions.checkArgument(Objects.nonNull(dataMergeBO.getMergeObjectEnum()), "The current parameters are incorrect, and the scope of business execution cannot be confirmed");
        // 初始化流程dto数据
        startMergeOptions(TenantIdOrUserIdMergeDTO.convertFrom(dataMergeBO), dataMergeBO.getProcessTypeEnum());
    }
/**
     * 主线程并行处理
     * @param mergePageDTO 分页dto
     * @param sumPages 总页数
     * @return 影响行数
     */
    private Integer parallelProcessing(MergePageDTO mergePageDTO, Integer sumPages) {
        log.info("parllel processing ..... threads counts: {}", sumPages);
        List<Future<Integer>> resultList = new CopyOnWriteArrayList<>();
        CountDownLatch countDownLatch = new CountDownLatch(sumPages);
        // 获取task
        MergePageTask<M> mergePageTask = new MergePageTask<>(mMergeBaseService, transactionTemplate)
                .setSumPages(new AtomicInteger(sumPages)).setIndex(new AtomicInteger(0));
        // 定义固定线程池
        ExecutorService mergeExecutorJob = getMergeExecutor(sumPages);
        try {
            AtomicReference<MergePageDTO> atomicpageDTO = new AtomicReference<>();
            atomicpageDTO.set(mergePageDTO);
            mergePageTask.setCountDownLatch(countDownLatch).setMergPageDTO(atomicpageDTO);
            Future<Integer> submit = null;
            for (int i = 0; i < sumPages; i++) {
                submit = mergeExecutorJob.submit(mergePageTask);
                resultList.add(submit);
            }
            countDownLatch.await();
        }
        catch (InterruptedException exception) {
            Thread.currentThread().interrupt();
            throw new MergeDataExistProblemException(
                    "the merge page thread pool exist problem: " + exception.getMessage());
        }
        finally {
            // 线程池释放资源
        }
        return resultList.stream().mapToInt(this::applyAsInt).sum();
    }


    /**
     * 获取线程池
     * @param unitThreads
     * @return
     */
    private ExecutorService getMergeExecutor(int unitThreads) {
        return Executors.newFixedThreadPool(unitThreads);
    }
@Slf4j
@Service
@RequiredArgsConstructor
public class MergePageTask<M> implements Callable<Integer> {

    private final MergeBaseService<M> mMergeBaseService;

    private CountDownLatch countDownLatch;

    private AtomicReference<MergePageDTO> atomicReference;

    private  AtomicInteger atomIndex;

    private AtomicInteger atomSumPages;

    /**
     * @return 影响行数
     */
    @Override
    public Integer call() {
        Integer nums = null;
                // 分页SQL
        influenceRows = mMergeBaseService.mergeParallelPage(mergePageDTO.supportForParallel(atomIndex.get()));
        log.info("区间:[{} -{}], 当前影响行数:[{}]", mergePageDTO.getSize() * atomIndex.get(),
                 mergePageDTO.getSize() * (atomIndex.get() + 1), influenceRows);
        return influenceRows;
        countDownLatch.countDown();
        return nums;
    }

    // set 方法...
}

那你主线程得阻塞住,用共享变量或者回调去处理

submit以后,你将future存到了list中,那就自己去循环遍历list 通过Future.get让主线程去获取子线程的异常,你这就能处理了

可以考虑一下 CountDownLatch + List。子线程将结果保存到list中,然后countdown。主线程判断list中的状态,全部为TRUE提交事务,否则让子线程回滚。

https://blog.csdn.net/weixin_48484941/article/details/110237853