首页 > 其他分享 >异步编排多线程任务事务控制

异步编排多线程任务事务控制

时间:2022-12-27 15:01:43浏览次数:41  
标签:异步 LOG private 编排 platformTransactionManager supplier 多线程 public

/**
 * <p>
 * <B>Description: 异步编排多线程任务事务控制</B>
 * </P>
 * Revision Trail: (Date/Author/Description)
 * 2022/12/26 Ryan Huang CREATE
 * 多线程异步处理时的事务管理
 *      1. addFunction 添加要异步执行的方法
 *      2. execute 方法中使用全局的计数器和异常标记字段统计异步线程执行的结果,当所有的异步线程执行完后根据异常标记字段判断是否回滚还是提交事务。
 * @author Ryan Huang
 * @version 1.0
 */
public class ThreadTransaction {

    /**
     * 日志
     */
    private final Logger LOG = LoggerFactory.getLogger(ThreadTransaction.class);

    /**
     * 事务管理
     */
    private PlatformTransactionManager platformTransactionManager;

    /**
     * JUC线程池
     */
    private ThreadPoolExecutor threadPoolExecutor;

    /**
     * Spring线程池
     */
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;


    private List<Supplier> supplierList = new ArrayList<>();

    /**
     * 执行计数器
     */
    private volatile CountDownLatch countDownLatch;

    /**
     * 是否存在异常
     */
    AtomicReference<Boolean> isError = new AtomicReference<>(false);

    public ThreadTransaction(PlatformTransactionManager platformTransactionManager, ThreadPoolTaskExecutor threadPoolTaskExecutor) {
        this.platformTransactionManager = platformTransactionManager;
        this.threadPoolTaskExecutor = threadPoolTaskExecutor;
    }

    public ThreadTransaction(PlatformTransactionManager platformTransactionManager, ThreadPoolExecutor threadPoolExecutor) {
        this.platformTransactionManager = platformTransactionManager;
        this.threadPoolExecutor = threadPoolExecutor;
    }

    public ThreadTransaction(PlatformTransactionManager platformTransactionManager, ThreadPoolTaskExecutor threadPoolTaskExecutor, int size) {
        this.platformTransactionManager = platformTransactionManager;
        this.threadPoolTaskExecutor = threadPoolTaskExecutor;
        supplierList = new ArrayList<>(size);
    }

    /**
     * 添加要异步执行的方法程序
     */
    public boolean addFunction(Supplier supplier){
        return supplierList.add(supplier);
    }

    public void execute(){
        LOG.info("多线程事务开始...");
        countDownLatch = new CountDownLatch(supplierList.size());
        for (Supplier supplier : supplierList) {
            this.threadPoolTaskExecutor.submit(new TransactionRunnable(platformTransactionManager, supplier));
        }
        try {
            if (isError.get()) {
                LOG.error("多线程执行失败,事务已回滚!");
                throw new RuntimeException("多线程执行失败!");
            }
            LOG.info("多线程执行成功,事务已提交!");
        }catch (Exception e){
            LOG.error("多线程执行失败:" + e.getMessage());
            e.printStackTrace();
        }
    }

    class TransactionRunnable implements Runnable{

        /**
         * 事务管理
         */
        private PlatformTransactionManager platformTransactionManager;

        private Supplier supplier;

        public TransactionRunnable(PlatformTransactionManager platformTransactionManager, Supplier supplier) {
            this.platformTransactionManager = platformTransactionManager;
            this.supplier = supplier;
        }

        @Override
        public void run() {
            DefaultTransactionDefinition defaultTransactionDefinition = new DefaultTransactionDefinition();
            defaultTransactionDefinition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
            TransactionStatus transaction = this.platformTransactionManager.getTransaction(defaultTransactionDefinition);
            try {
                this.supplier.get();
            } catch (Exception e){
                isError.set(true);
                LOG.error("多线程事务执行失败{}", e.getMessage());
                e.printStackTrace();
            }
            countDownLatch.countDown();
            try {
                if (isError.get()) {
                    LOG.info("多线程事务(子线程)回滚");
                    platformTransactionManager.rollback(transaction);
                } else {
                    LOG.info("多线程事务(子线程)提交");
                    platformTransactionManager.commit(transaction);
                }
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }
}

标签:异步,LOG,private,编排,platformTransactionManager,supplier,多线程,public
From: https://www.cnblogs.com/IamHzc/p/17008074.html

相关文章

  • POSIX 多线程程序设计
     POSIX 多线程程序设计 目录表 ​​摘要​​  ​​译者序​​​Pthreads概述 ​​​什么是线程? ​​​​什么是Pthreads? ​​​​为什么使用Pthreads? ​......
  • python进程之进程池、线程池与异步回调机制
    fromconcurrent.futuresimportProcessPoolExecutor,ThreadPoolExecutorimportosimporttimeimportrandom#1.产生含有固定数量线程的线程池#t_pool=Thread......
  • 手撕一个异步任务通用组件
    目的取代linuxcrontab的计划任务,那玩儿意最小粒度一分钟,意味着服务器不管如何清闲都会有一分钟延迟实现原理while(TRUE){}没错,就是这么粗暴,一个永不停止的无限循环......
  • JS手写练习随笔-20221226.2 ---- 带并发限制的异步调度器
    最多保持特定数量任务执行的异步调度器classScheduler{//最大任务执行数目privatemaxCnt:number;//正在执行的任务数目privaterunningCnt:number;......
  • 同步、异步、阻塞、非阻塞---BIO、NIO、AIO的简单理解
    概念BIO:同步并阻塞,服务实现模式为一个连接对应一个线程,即客户端发送一个连接,服务端要有一个线程来处理。如果连接多了,线程数量不够,就只能等待,即会发生阻塞。NIO:同步非阻塞,服......
  • 8.多线程
    一、基本概念:程序、进程、线程程序(program)是为完成特定任务、用某种语言编写的一组指令的集合。即指一段静态的代码,静态对象。进程(process)是程序的一次执行过程,或是正......
  • 多线程
    多线程什么是进程?什么是线程?进程是一个应用程序(1个进程是一个软件)。线程是一个进程中的执行场景/执行单元。一个进程可以启动多个线程。对于java程序来说,当在DOS命令......
  • 多线程下载的神器
    如果你在linux下下载比较大的文件,网速也不太稳定是,用单线程下载就远远不如用多线程工具下载了。Manjaro中可以使用axel1、安装axelsudopacman-Saxel2、语法......
  • Java——多线程
    文章目录​​一.多线程概述​​​​1.什么是进程?什么是线程?​​​​2.进程和线程的关系​​​​3.多线程并发​​​​4.分析以下程序有几个线程​​​​5.Java实现线......
  • Redis 6.0 为什么要引入多线程呢?
    Reactor模式Redis是基于Reactor模式开发了网络事件处理器,这个处理器称为文件事件处理器。组成结构为4个部分:多套接字IO多路复用程序文件事件派发器事件处理器。在这里......