首页 > 其他分享 >spring声明式事务提交工具类

spring声明式事务提交工具类

时间:2023-03-04 14:35:35浏览次数:51  
标签:事务 int spring batchSize new 提交 CountDownLatch import isError

package com.talkweb.modou;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

/**
 * @author : 苏文致
 * @date Date : 2023年03月04日 13:46
 * @Description:
 */
@Component
public class TransactionMultipartExecutor {

    @Autowired
    private DataSourceTransactionManager transactionManager;

    private static final Logger log = LoggerFactory.getLogger(TransactionMultipartExecutor.class);

    /**
     * 多线程事务提交执行
     *
     * @param threadPool:          线程池
     * @param datas:               数据
     * @param batchSize            :单批次执行的大小
     * @param propagationBehavior: 见TransactionDefinition常量类
     * @param consumer:            批量执行逻辑 <T> 单批次的数据
     */
    public <T> boolean execute (ExecutorService threadPool,
                                List<T> datas,
                                int batchSize,
                                int propagationBehavior,
                                Consumer<List<T>> consumer){
        int total = datas.size();
        int batch = total % batchSize == 0 ? total / batchSize : total / batchSize + 1; //执行的批次
        CountDownLatch startCountDown = new CountDownLatch(1); //开始执行线程任务
        CountDownLatch subCountDown = new CountDownLatch(batch); //控制子线程业务执行(控制全局事务开关)
        CountDownLatch mainCountDown = new CountDownLatch(batch); //子线程执行完成(控制主线程是否执行完成)
        AtomicBoolean isError = new AtomicBoolean(false); //全局事务提交回滚开关
        for (int i = 0; i < batch; i++) { //创建线程执行任务
            List<T> signalBatchData = splitSignalData(datas, i, batchSize);
            TransactionMultipartRunnable runnable = new TransactionMultipartRunnable(
                    transactionManager,
                    propagationBehavior,
                    startCountDown,
                    subCountDown,
                    mainCountDown,
                    isError) {
                @Override
                public void operation (){
                    consumer.accept(signalBatchData);
                }
            };
            threadPool.submit(runnable);
        }
        startCountDown.countDown(); //开始执行
        handleAwaitDownWithException(mainCountDown);//主线程执行
        return isError.get();
    }

    /**
     * 切分单次插入的数据
     */
    public <T> List<T> splitSignalData (List<T> datas, int currentBatch, int batchSize){
        int size = datas.size();
        int left = currentBatch * batchSize;
        int right = Math.min(left + batchSize, size);
        return datas.subList(left, right);
    }


    public abstract static class TransactionMultipartRunnable implements Runnable {
        private PlatformTransactionManager transactionManager;
        private int propagationBehavior;
        private CountDownLatch startCountDown;
        private CountDownLatch subCountDown;
        private CountDownLatch mainCountDown;
        private AtomicBoolean isError;

        protected TransactionMultipartRunnable (PlatformTransactionManager transactionManager, int propagationBehavior, CountDownLatch startCountDown, CountDownLatch subCountDown, CountDownLatch mainCountDown, AtomicBoolean isError){
            this.transactionManager = transactionManager;
            this.propagationBehavior = propagationBehavior;
            this.startCountDown = startCountDown;
            this.subCountDown = subCountDown;
            this.mainCountDown = mainCountDown;
            this.isError = isError;
        }

        public abstract void operation ();

        @Override
        public void run (){
            handleAwaitDownWithException(startCountDown);
            //传播属性
            //TransactionDefinition.PROPAGATION_REQUIRES_NEW
            DefaultTransactionDefinition def = new DefaultTransactionDefinition(propagationBehavior);
            //事务的状态
            TransactionStatus status = transactionManager.getTransaction(def);

            try {
                if (!isError.get()) {//如果已经存在异常的线程,直接条转
                    log.debug("执行业务逻辑...");
                    operation();
                }
            } catch (Exception e) {
                isError.set(true);
            }

            subCountDown.countDown(); //子线程 --

            handleAwaitDownWithException(subCountDown);

            try {
                if (isError.get()) {
                    transactionManager.rollback(status);
                    log.debug("回滚...");
                } else {
                    transactionManager.commit(status);
                    log.debug("提交...");
                }
            } catch (TransactionException e) {
                e.printStackTrace();
            }
            mainCountDown.countDown();

        }
    }

    public static void handleAwaitDownWithException (CountDownLatch countDownLatch){
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}    

测试

       TestService bean = run.getBean(TestService.class);
        bean.test();

        AppWidgetsFileService appWidgetsFileService = SpringUtils.getBean(AppWidgetsFileService.class);
        //构造假数据
        List<AppWidgetsFile> files = new ArrayList<>();
        for (int i = 0; i < 3; i++) {
            AppWidgetsFile appWidgetsFile = new AppWidgetsFile();
            appWidgetsFile.setId(IdUtils.simpleUUID());
            appWidgetsFile.setFileName("测试" + i);
            files.add(appWidgetsFile);
        }
        AppWidgetsFile appWidgetsFile = new AppWidgetsFile();
        appWidgetsFile.setId("38f3e3f73969575c6b838d2f059869e7"); //数据库主键 已经存在 (回滚)
        appWidgetsFile.setFileName("测试");
        files.add(appWidgetsFile);

        //构造线程池
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                9,
                9,
                30,
                TimeUnit.MINUTES,
                new LinkedBlockingDeque<>(50),
                (r, e) -> {
                    log.debug("任务拒绝...");
                    throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
                }
        );

        TransactionMultipartExecutor executor = SpringUtils.getBean(TransactionMultipartExecutor.class);
        boolean execute = executor.execute(threadPool, files, 3, TransactionDefinition.PROPAGATION_REQUIRED,
                (item) -> {
                    //TODO: 批量操作的执行
                    appWidgetsFileService.saveBatch(item);
                });
        if (!execute) log.debug("事务执行成功!");
        else log.debug("事务执行失败!");
        log.debug("主线程执行完成!");

标签:事务,int,spring,batchSize,new,提交,CountDownLatch,import,isError
From: https://www.cnblogs.com/swz123/p/17178238.html

相关文章

  • 【Spring Boot源码剖析之Spring Boot源码剖析】
    SpringBoot源码剖析SpringBoot依赖管理问题:(1)为什么导入dependency时不需要指定版本?SpringBoot项目的父项目依赖spring-boot-starter-parent<parent><groupId>org.......
  • springcloud-gateway配置跨域
    跨域有两种解决方式:一是将前端和后端通过nginx部署在同一域名下,后端通过location/admin这种方式转发,避免浏览器因为协议,主机和端口号不同造成的跨域二是非简单请求,浏览......
  • springcloud-引入gateway
    gateway其实和MVC框架路由逻辑相似,mvc路由是将请求url交给对应的控制器方法处理,gateway是将请求转发给对应的服务来处理。1,gateway需要引入nacos注册和配置中心2,gateway......
  • springcloud-alibaba接入nacos配置中心
    未采用动态配置前,如果集群的话,某个服务集群中的每台服务器配置都不一样,需要单个打包部署,工作量大,不易维护。1,引入依赖,和注册中心一样,因为子模块都有使用,放到公用模块commm......
  • 21_Spring_日志框架和测试支持
    ​ spring5框架自带了通用的日志封装,也可以整合自己的日志 1)spring移除了LOG4jConfigListener,官方建议使用log4j2 2)spring5整合log4j2导入log4j2依赖 <......
  • springmvc整合thymeleaf之helloword
    版本说明:代码地址:https://gitee.com/joy521125/ssm-senior.git  thymeleaf分支;基于https://gitee.com/joy521125/ssm-senior.gitmaster分支修改而来;1.加入jar包:1......
  • 21_Spring_日志框架和测试支持
     spring5框架自带了通用的日志封装,也可以整合自己的日志 1)spring移除了LOG4jConfigListener,官方建议使用log4j2 2)spring5整合log4j2导入log4j2依赖 <!--log4j2......
  • 21_Spring_日志框架和测试支持
     spring5框架自带了通用的日志封装,也可以整合自己的日志 1)spring移除了LOG4jConfigListener,官方建议使用log4j2 2)spring5整合log4j2导入log4j2依赖 <!--log4j2......
  • java——spring boot集成RabbitMQ——高级特效——死信代码示例
    首先,消息成为死信的条件:       首先看消息生产者,生产者和之前的一样,没什么变化(注意:后面统一把nomal改为normal了):          消费......
  • 21_Spring_日志框架和测试支持
    ​ spring5框架自带了通用的日志封装,也可以整合自己的日志 1)spring移除了LOG4jConfigListener,官方建议使用log4j2 2)spring5整合log4j2导入log4j2依赖 <......