首页 > 其他分享 >多线程的事务回滚问题

多线程的事务回滚问题

时间:2023-01-08 11:00:25浏览次数:33  
标签:回滚 java int List 事务 util new import 多线程

多线程的事务回滚问题

环境要求

1.mybatis-plus或mybatis

2.支持单表的增删改查

3.书写工具类将集合平分,获取线程池

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadUtil {
    /**
     * 平均拆分list方法.
     * @param source
     * @param n
     * @param <T>
     * @return
     */
    public static <T> List<List<T>> averageAssign(List<T> source, int n){
        List<List<T>> result=new ArrayList<List<T>>();
        int remaider=source.size()%n;
        int number=source.size()/n;
        int offset=0;//偏移量
        for(int i=0;i<n;i++){
            List<T> value=null;
            if(remaider>0){
                value=source.subList(i*number+offset, (i+1)*number+offset+1);
                remaider--;
                offset++;
            }else{
                value=source.subList(i*number+offset, (i+1)*number+offset);
            }
            result.add(value);
        }
        return result;
    }
    /** 线程池配置
     * @version V1.0
     */
    public static class ExecutorConfig {
        private static int maxPoolSize = Runtime.getRuntime().availableProcessors();
        private volatile static ExecutorService executorService;
        public static ExecutorService getThreadPool() {
            if (executorService == null){
                synchronized (ExecutorConfig.class){
                    if (executorService == null){
                        executorService = newThreadPool();
                    }
                }
            }
            return executorService;
        }

        private static  ExecutorService newThreadPool(){
            int queueSize = 500;
            int corePool = Math.min(5, maxPoolSize);
            return new ThreadPoolExecutor(corePool, maxPoolSize, 10000L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<>(queueSize),new ThreadPoolExecutor.AbortPolicy());
        }
        private ExecutorConfig(){}
    }

}

4.书写服务类接口

/**
 * <p>
 *  服务类
 * </p>
 *
 * @author wjj
 * @since 2023-01-07
 */
public interface DeptService extends IService<Dept> {
    //使用@Transactional测试多线程回滚问题
    void saveThread(List<Dept> employeeDOList);
    //使用手动操作失误测试多线程回滚问题
    void saveThread2(List<Dept> employeeDOList) throws SQLException;
    //使用手动操作事务正常测试多线程
    void saveThread3(List<Dept> employeeDOList) throws SQLException;
}

5.书写服务类实现

import com.zhuoyue.model.Dept;
import com.zhuoyue.mapper.DeptMapper;
import com.zhuoyue.service.DeptService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.zhuoyue.util.ThreadUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.session.SqlSession;
import org.mybatis.spring.SqlSessionTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.zhuoyue.util.ThreadUtil.averageAssign;

/**
 * <p>
 *  服务实现类
 * </p>
 *
 * @author myl
 * @since 2023-01-07
 */
@Service
@Slf4j
public class DeptServiceImpl extends ServiceImpl<DeptMapper, Dept> implements DeptService {

    /**
     * 测试多线程事务.
     * @param employeeDOList
     */
    @Resource
    private SqlSessionTemplate sqlSessionTemplate;


    @Override
    @Transactional
    public void saveThread(List<Dept> employeeDOList) {
        try {
            //先做删除操作,如果子线程出现异常,此操作不会回滚
            this.getBaseMapper().deleteById(8834L);
            //获取线程池
            ExecutorService service = ThreadUtil.ExecutorConfig.getThreadPool();
            //拆分数据,拆分5份
            List<List<Dept>> lists= averageAssign(employeeDOList, 5);
            //执行的线程
            Thread []threadArray = new Thread[lists.size()];
            //监控子线程执行完毕,再执行主线程,要不然会导致主线程关闭,子线程也会随着关闭
            CountDownLatch countDownLatch = new CountDownLatch(lists.size());
            AtomicBoolean atomicBoolean = new AtomicBoolean(true);
            for (int i =0;i<lists.size();i++){
                if (i==lists.size()-1){
                    atomicBoolean.set(false);
                }
                List<Dept> list = lists.get(i);
                threadArray[i] = new Thread(() -> {
                    try {
                        //最后一个线程抛出异常
                        if (!atomicBoolean.get()){
                            throw new RuntimeException("出现异常");
                        }
                        //批量添加,mybatisPlus中自带的batch方法
                        this.saveBatch(list);
                    }finally {
                        countDownLatch.countDown();
                    }

                });
            }
            for (int i = 0; i <lists.size(); i++){
                service.execute(threadArray[i]);
            }
            //当子线程执行完毕时,主线程再往下执行
            countDownLatch.await();
            System.out.println("添加完毕");
        }catch (Exception e){
            log.info("error",e);
            throw new RuntimeException("出现异常");
        }finally {
        }
    }
    /**
     * 测试多线程事务.
     * @param employeeDOList
     */
    @Override
    public void saveThread2(List<Dept> employeeDOList) throws SQLException {
        // 获取数据库连接,获取会话(内部自有事务)
        SqlSession sqlSession = sqlSessionTemplate.getSqlSessionFactory().openSession();
        Connection connection = sqlSession.getConnection();
        try {
            // 设置手动提交
            connection.setAutoCommit(false);
            //获取mapper
            DeptMapper mapper = sqlSession.getMapper(DeptMapper.class);
            //先做删除操作
            mapper.deleteById(8834L);
            //获取执行器
            ExecutorService service = ThreadUtil.ExecutorConfig.getThreadPool();
            List<Callable<Integer>> callableList = new ArrayList<>();
            //拆分list
            List<List<Dept>> lists=ThreadUtil.averageAssign(employeeDOList, 5);
            AtomicBoolean atomicBoolean = new AtomicBoolean(true);
            for (int i =0;i<lists.size();i++){
                if (i==lists.size()-1){
                    atomicBoolean.set(false);
                }
                List<Dept> list  = lists.get(i);
                //使用返回结果的callable去执行,
                Callable<Integer> callable = () -> {
                    //让最后一个线程抛出异常
                    if (!atomicBoolean.get()){
                        throw new RuntimeException("001出现异常");
                    }
                    return mapper.saveBatch(list);
                };
                callableList.add(callable);
            }
            //执行子线程
            List<Future<Integer>> futures = service.invokeAll(callableList);
            for (Future<Integer> future:futures) {
                //如果有一个执行不成功,则全部回滚
                if (future.get()<=0){
                    connection.rollback();
                    return;
                }
            }
            connection.commit();
            System.out.println("添加完毕");
        }catch (Exception e){
            connection.rollback();
            log.info("error",e);
            throw new RuntimeException("002出现异常");
        }finally {
            connection.close();
        }
    }
    /**
     * 测试多线程事务.
     * @param employeeDOList
     */
    @Override
    public void saveThread3(List<Dept> employeeDOList) throws SQLException {
        // 获取数据库连接,获取会话(内部自有事务)
        SqlSession sqlSession =  sqlSessionTemplate.getSqlSessionFactory().openSession();
        Connection connection = sqlSession.getConnection();
        try {
            // 设置手动提交
            connection.setAutoCommit(false);
            DeptMapper mapper = sqlSession.getMapper(DeptMapper.class);
            //先做删除操作
            this.getBaseMapper().deleteById(8834L);
            ExecutorService service = ThreadUtil.ExecutorConfig.getThreadPool();
            List<Callable<Integer>> callableList = new ArrayList<>();
            List<List<Dept>> lists=ThreadUtil.averageAssign(employeeDOList, 5);
            for (int i =0;i<lists.size();i++){
                List<Dept> list  = lists.get(i);
                Callable<Integer> callable = () -> mapper.saveBatch(list);
                callableList.add(callable);
            }
            //执行子线程
            List<Future<Integer>> futures = service.invokeAll(callableList);
            for (Future<Integer> future:futures) {
                if (future.get()<=0){
                    connection.rollback();
                    return;
                }
            }
            connection.commit();
            System.out.println("添加完毕");
        }catch (Exception e){
            connection.rollback();
            log.info("error",e);
            throw new RuntimeException("002出现异常");
            // throw new ServiceException(ExceptionCodeEnum.EMPLOYEE_SAVE_OR_UPDATE_ERROR);
        }
    }
}

测试

1.使用@Transactional测试多线程事务不回滚

2.手动操作事务多线程事务回滚

标签:回滚,java,int,List,事务,util,new,import,多线程
From: https://www.cnblogs.com/WangJingjun/p/17034250.html

相关文章

  • 一文掌握Spring事务的基本使用
    1依赖Spring事务的实际源码在spring-tx中:<dependency><groupId>org.springframework</groupId><artifactId>spring-tx</artifactId></dependency>在Spring体......
  • 分布式事务解决方案-后端分析
    一、什么是分布式事务在早期的单体架构时期,所有的数据操作都在同一个数据库里面进行,比如:A给B转100块钱,A的账户余额-100,B的账户余额+100,这两个操作放在同一个事务里面即可,......
  • 在微服务内部,调用另一个微服务,如何保证事务的一致性
    虽然,我们通常建议涉及到事务的情况下,不要在一个微服务里,调用另外一个微服务,但有时也会遇到无法避开的情况,那我们就来看看应该如何保证事务的一致性。我们先来看看微服务A......
  • java多线程创建一个简单的案例
    1、创建一个简单的线程,不需要去创建个RunnableThreadthread=newThread(newRunnable(){@Overridepublicvoidrun(){//todo你要执行的方法}......
  • [SQL Server] 循环游标执行-无事务
    From: https://blog.csdn.net/weixin_42609389/article/details/126955029begindeclare@aintdeclare@tempvarchar(50)set@a=1declareor......
  • 多线程
    一、多线程概述1.1进程和线程概述进程:操作系统中的应用程序,一个进程就是一个应用程序。进程A和进程B的内存独立不共享资源。线程:CPU调度的最小单元,进程的一个执行流/......
  • android基础02 - 多媒体、多线程与异步任务、Service、网络
    多媒体通知通知渠道:程序对自己发出的通知进行分类,用户可根据渠道对消息进行屏蔽或设置响铃振动。一个应用的通知渠道一旦创建就无法再修改,只能再创建新的可在Activity、......
  • java中的多线程
    一.线程的创建线程的创建方式有两种:一种是继承Thread类,重写run()方法【这里的run()方法只是普通的方法】,在主方法中,创建该类的对象,调用对象的start()方法。二种是实现R......
  • Redis事务
    参考资料:https://www.cnblogs.com/wkfvawl/p/15754956.html事务1.1事务定义Redis事务是一个单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的......
  • C# 多线程学习笔记
     ///进程:程序在服务器上运行是,占据的计算资源合集,称之为进程;///进程之间不会相互干扰--进程之间的通信比较困难(分布式)///线程:程序执行的最小单位,相应操作的最小执......