首页 > 其他分享 >支付宝:多线程事务怎么回滚?说用 @Transactional 可以回去等通知了!

支付宝:多线程事务怎么回滚?说用 @Transactional 可以回去等通知了!

时间:2024-01-24 20:31:40浏览次数:36  
标签:回滚 int employeeDOList Transactional List 线程 new 多线程 public

1,最近有一个大数据量插入的操作入库的业务场景,需要先做一些其他修改操作,然后在执行插入操作,由于插入数据可能会很多,用到多线程去拆分数据并行处理来提高响应时间,如果有一个线程执行失败,则全部回滚。

2,在spring中可以使用@Transactional注解去控制事务,使出现异常时会进行回滚,在多线程中,这个注解则不会生效,如果主线程需要先执行一些修改数据库的操作,当子线程在进行处理出现异常时,主线程修改的数据则不会回滚,导致数据错误。

3,下面用一个简单示例演示多线程事务。

公用的类和方法

/**
 * 平均拆分list方法.
 * @param source
 * @param n
 * @param <T>
 * @return
 */
public static <T> List<List<T>> averageAssign(List<T> source,int n){
    List<List<T>> result=new ArrayList<List<T>>();
    int remaider=source.size()%n; 
    int number=source.size()/n; 
    int offset=0;//偏移量
    for(int i=0;i<n;i++){
        List<T> value=null;
        if(remaider>0){
            value=source.subList(i*number+offset, (i+1)*number+offset+1);
            remaider--;
            offset++;
        }else{
            value=source.subList(i*number+offset, (i+1)*number+offset);
        }
        result.add(value);
    }
    return result;
}
/**  线程池配置
 * @version V1.0
 */
public class ExecutorConfig {
    private static int maxPoolSize = Runtime.getRuntime().availableProcessors();
    private volatile static ExecutorService executorService;
    public static ExecutorService getThreadPool() {
        if (executorService == null){
            synchronized (ExecutorConfig.class){
                if (executorService == null){
                    executorService =  newThreadPool();
                }
            }
        }
        return executorService;
    }

    private static  ExecutorService newThreadPool(){
        int queueSize = 500;
        int corePool = Math.min(5, maxPoolSize);
        return new ThreadPoolExecutor(corePool, maxPoolSize, 10000L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(queueSize),new ThreadPoolExecutor.AbortPolicy());
    }
    private ExecutorConfig(){}
}
/** 获取sqlSession
 * @author 86182
 * @version V1.0
 */
@Component
public class SqlContext {
    @Resource
    private SqlSessionTemplate sqlSessionTemplate;

    public SqlSession getSqlSession(){
        SqlSessionFactory sqlSessionFactory = sqlSessionTemplate.getSqlSessionFactory();
        return sqlSessionFactory.openSession();
    }
}

示例事务不成功操作

/**
 * 测试多线程事务.
 * @param employeeDOList
 */
@Override
@Transactional
public void saveThread(List<EmployeeDO> employeeDOList) {
    try {
        //先做删除操作,如果子线程出现异常,此操作不会回滚
        this.getBaseMapper().delete(null);
        //获取线程池
        ExecutorService service = ExecutorConfig.getThreadPool();
        //拆分数据,拆分5份
        List<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5);
        //执行的线程
        Thread []threadArray = new Thread[lists.size()];
        //监控子线程执行完毕,再执行主线程,要不然会导致主线程关闭,子线程也会随着关闭
        CountDownLatch countDownLatch = new CountDownLatch(lists.size());
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        for (int i =0;i<lists.size();i++){
            if (i==lists.size()-1){
                atomicBoolean.set(false);
            }
            List<EmployeeDO> list  = lists.get(i);
            threadArray[i] =  new Thread(() -> {
                try {
                 //最后一个线程抛出异常
                    if (!atomicBoolean.get()){
                        throw new ServiceException("001","出现异常");
                    }
                    //批量添加,mybatisPlus中自带的batch方法
                    this.saveBatch(list);
                }finally {
                    countDownLatch.countDown();
                }

            });
        }
        for (int i = 0; i <lists.size(); i++){
            service.execute(threadArray[i]);
        }
        //当子线程执行完毕时,主线程再往下执行
        countDownLatch.await();
        System.out.println("添加完毕");
    }catch (Exception e){
        log.info("error",e);
        throw new ServiceException("002","出现异常");
    }finally {
         connection.close();
     }
}

数据库中存在一条数据:

支付宝:多线程事务怎么回滚?说用 @Transactional 可以回去等通知了!_多线程事务

图片

//测试用例
@RunWith(SpringRunner.class)
@SpringBootTest(classes = { ThreadTest01.class, MainApplication.class})
public class ThreadTest01 {

    @Resource
    private EmployeeBO employeeBO;

    /**
     *   测试多线程事务.
     * @throws InterruptedException
     */
    @Test
    public  void MoreThreadTest2() throws InterruptedException {
        int size = 10;
        List<EmployeeDO> employeeDOList = new ArrayList<>(size);
        for (int i = 0; i<size;i++){
            EmployeeDO employeeDO = new EmployeeDO();
            employeeDO.setEmployeeName("lol"+i);
            employeeDO.setAge(18);
            employeeDO.setGender(1);
            employeeDO.setIdNumber(i+"XX");
            employeeDO.setCreatTime(Calendar.getInstance().getTime());
            employeeDOList.add(employeeDO);
        }
        try {
            employeeBO.saveThread(employeeDOList);
            System.out.println("添加成功");
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

测试结果:

支付宝:多线程事务怎么回滚?说用 @Transactional 可以回去等通知了!_sql_02

图片

支付宝:多线程事务怎么回滚?说用 @Transactional 可以回去等通知了!_sql_03

图片

可以发现子线程组执行时,有一个线程执行失败,其他线程也会抛出异常,但是主线程中执行的删除操作,没有回滚,@Transactional注解没有生效。

使用sqlSession控制手动提交事务

@Resource
  SqlContext sqlContext;
 /**
 * 测试多线程事务.
 * @param employeeDOList
 */
@Override
public void saveThread(List<EmployeeDO> employeeDOList) throws SQLException {
    // 获取数据库连接,获取会话(内部自有事务)
    SqlSession sqlSession = sqlContext.getSqlSession();
    Connection connection = sqlSession.getConnection();
    try {
        // 设置手动提交
        connection.setAutoCommit(false);
        //获取mapper
        EmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class);
        //先做删除操作
        employeeMapper.delete(null);
        //获取执行器
        ExecutorService service = ExecutorConfig.getThreadPool();
        List<Callable<Integer>> callableList  = new ArrayList<>();
        //拆分list
        List<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5);
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        for (int i =0;i<lists.size();i++){
            if (i==lists.size()-1){
                atomicBoolean.set(false);
            }
            List<EmployeeDO> list  = lists.get(i);
            //使用返回结果的callable去执行,
            Callable<Integer> callable = () -> {
                //让最后一个线程抛出异常
                if (!atomicBoolean.get()){
                    throw new ServiceException("001","出现异常");
                }
              return employeeMapper.saveBatch(list);
            };
            callableList.add(callable);
        }
        //执行子线程
       List<Future<Integer>> futures = service.invokeAll(callableList);
        for (Future<Integer> future:futures) {
        //如果有一个执行不成功,则全部回滚
            if (future.get()<=0){
                connection.rollback();
                 return;
            }
        }
        connection.commit();
        System.out.println("添加完毕");
    }catch (Exception e){
        connection.rollback();
        log.info("error",e);
        throw new ServiceException("002","出现异常");
    }finally {
         connection.close();
     }
}
// sql
<insert id="saveBatch" parameterType="List">
 INSERT INTO
 employee (employee_id,age,employee_name,birth_date,gender,id_number,creat_time,update_time,status)
 values
     <foreach collectinotallow="list" item="item" index="index" separator=",">
     (
     #{item.employeeId},
     #{item.age},
     #{item.employeeName},
     #{item.birthDate},
     #{item.gender},
     #{item.idNumber},
     #{item.creatTime},
     #{item.updateTime},
     #{item.status}
         )
     </foreach>
 </insert>

数据库中一条数据:

支付宝:多线程事务怎么回滚?说用 @Transactional 可以回去等通知了!_子线程_04

图片

测试结果:抛出异常,

支付宝:多线程事务怎么回滚?说用 @Transactional 可以回去等通知了!_多线程事务_05

图片

删除操作的数据回滚了,数据库中的数据依旧存在,说明事务成功了。

支付宝:多线程事务怎么回滚?说用 @Transactional 可以回去等通知了!_子线程_06

图片

成功操作示例:

@Resource
SqlContext sqlContext;
/**
 * 测试多线程事务.
 * @param employeeDOList
 */
@Override
public void saveThread(List<EmployeeDO> employeeDOList) throws SQLException {
    // 获取数据库连接,获取会话(内部自有事务)
    SqlSession sqlSession = sqlContext.getSqlSession();
    Connection connection = sqlSession.getConnection();
    try {
        // 设置手动提交
        connection.setAutoCommit(false);
        EmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class);
        //先做删除操作
        employeeMapper.delete(null);
        ExecutorService service = ExecutorConfig.getThreadPool();
        List<Callable<Integer>> callableList  = new ArrayList<>();
        List<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5);
        for (int i =0;i<lists.size();i++){
            List<EmployeeDO> list  = lists.get(i);
            Callable<Integer> callable = () -> employeeMapper.saveBatch(list);
            callableList.add(callable);
        }
        //执行子线程
       List<Future<Integer>> futures = service.invokeAll(callableList);
        for (Future<Integer> future:futures) {
            if (future.get()<=0){
                connection.rollback();
                 return;
            }
        }
        connection.commit();
        System.out.println("添加完毕");
    }catch (Exception e){
        connection.rollback();
        log.info("error",e);
        throw new ServiceException("002","出现异常");
       // throw new ServiceException(ExceptionCodeEnum.EMPLOYEE_SAVE_OR_UPDATE_ERROR);
    }
}

测试结果:


删除的删除了,添加的添加成功了,测试成功。

支付宝:多线程事务怎么回滚?说用 @Transactional 可以回去等通知了!_多线程事务_07

最后说一句(求关注!别白嫖!)

如果这篇文章对您有所帮助,或者有所启发的话,求一键三连:点赞、转发、在看。

关注公众号:woniuxgg,在公众号中回复:笔记  就可以获得蜗牛为你精心准备的java实战语雀笔记,回复面试、开发手册、有超赞的粉丝福利!


标签:回滚,int,employeeDOList,Transactional,List,线程,new,多线程,public
From: https://blog.51cto.com/u_16502039/9370688

相关文章

  • python多线程id获取
    demoimportthreadingimporttimedefprint_thread_info(thread_name):"""线程函数,打印线程名称和ID以及一些文本"""foriinrange(3):time.sleep(1)thread_id=threading.current_thread().identprint(f"{thr......
  • 并发编程之多线程
    多线程1.什么是线程就是一条流水线工作的过程,一条流水线必须属于一个车间,一个车间的工作过程是一个进程车间负责把资源整合到一起,是一个资源单位,而一个车间内至少有一个流水线流水线的工作需要电源,电源就相当于cpu所以,进程只是用来把资源集中到一起(进程只是一个资源单位,或者......
  • QT笔记:多线程和信号槽
    QT笔记:多线程和信号槽多线程创建多线程有两种方法,一般推荐用moveToThread方法参考代码如下:mainwindow.h#ifndefMAINWINDOW_H#defineMAINWINDOW_H#include<QMainWindow>#include<QApplication>QT_BEGIN_NAMESPACEnamespaceUi{classMainWindow;}QT_END_NAMES......
  • python 多线程multiprocessing
    该多线程,简单计算结果可以使用,在django里想并行处理多个实体进行计算不行,请自行验证importmultiprocessing#要在进程池中并行执行的任务函数defprocess_data(data):#执行任务的逻辑result=data*2returnresultif__name__=='__main__':#创......
  • C#中Dictionary与ConcurrentDictionary解锁多线程操作安全之道
     使用C#中的Dictionary与ConcurrentDictionary进行多线程操作在C#中,Dictionary是一个常见的字典类型,但它不是线程安全的。为了在多线程环境中确保安全的操作,我们可以使用ConcurrentDictionary,这是一个专门设计用于多线程场景的线程安全字典。1.使用Dictionary进行非线程安......
  • 多线程笔记
    线程创建继承Thread类//线程创建方式1:继承thread1,重写run方法,调用start开启线程publicclassThread1extendsThread{@Overridepublicvoidrun(){//run方法线程体for(inti=0;i<20;i++){System.out.println("我在看代......
  • PHP学习第八天:扩展开发与多线程编程
    在PHP学习的第八天,我深入了解了扩展开发和多线程编程的概念。早上,我学习了如何编写PHP扩展。扩展是PHP的模块,可以提供额外的功能。我了解了扩展开发的基础知识,如C语言和PHPAPI。通过编写一个简单的扩展,我熟悉了扩展的结构和编写过程。了解扩展开发使我能够更深入地了解PHP的内部工......
  • 并发编程之多线程理论篇
    什么叫线程线程也叫轻量级进程,是操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运作单位。线程自己不用有系统资源,只拥有一点在运行中必不可少的资源,但它可与同属一个一个进程的其他线程共享其所拥有的全部资源。一个线程可以创建和撤销另一个线程,同一个......
  • 并发编程之多线程操作篇
    多线程简单介绍多线程,或者说多任务,指的是操作系统同时运行多个任务。例如,听歌、洗衣服、看视频可以同时进行,这种就是多任务。单核CPU执行多任务:操作系统轮流让各个任务交替执行,任务1执行t1时间,切换到任务2,任务2执行t2时间,再切换到任务3,执行t3时间...如此反复执行,表面上看,每个任......
  • C++多线程3
    1利用栈特性自动释放锁RAII1.1什么是RAIIRAII(ResourceAcquisitionIsInitialization),使用局部对象管理资源的技术称为资源获取既初始化,它的生命周期由操作系统管理,无需人工干预。为什么要引入自动释放锁,因为我们有时会因为忘记释放锁,而造成死锁或内存泄漏。我们先来手动实......