首页 > 编程语言 >面试官:Java 多线程怎么做事务控制?一半人答不上来。。

面试官:Java 多线程怎么做事务控制?一半人答不上来。。

时间:2022-10-13 09:13:10浏览次数:115  
标签:面试官 Java 事务 threadCount 线程 java 多线程 teacher

项目代码基于:MySql 数据,开发框架为:SpringBoot、Mybatis

开发语言为:Java8

前言

公司业务中遇到一个需求,需要同时修改最多约5万条数据,而且还不支持批量或异步修改操作。于是只能写个for循环操作,但操作耗时太长,只能一步一步寻找其他解决方案。

具体操作如下:

一、循环操作的代码

先写一个最简单的for循环代码,看看耗时情况怎么样。

/***
 * 一条一条依次对50000条数据进行更新操作
 * 耗时:2m27s,1m54s
 */
@Test
void updateStudent() {
    List<Student> allStudents = studentMapper.getAll();
    allStudents.forEach(s -> {
        //更新教师信息
        String teacher = s.getTeacher();
        String newTeacher = "TNO_" + new Random().nextInt(100);
        s.setTeacher(newTeacher);
        studentMapper.update(s);
    });
}

循环修改整体耗时约 1分54秒,且代码中没有手动事务控制应该是自动事务提交,所以每次操作事务都会提交所以操作比较慢,我们先对代码中添加手动事务控制,看查询效率怎样。

最新面试题整理:https://www.javastack.cn/mst/

二、使用手动事务的操作代码

修改后的代码如下:

@Autowired
private DataSourceTransactionManager dataSourceTransactionManager;

@Autowired
private TransactionDefinition transactionDefinition;

/**
 * 由于希望更新操作 一次性完成,需要手动控制添加事务
 * 耗时:24s
 * 从测试结果可以看出,添加事务后插入数据的效率有明显的提升
 */
@Test
void updateStudentWithTrans() {
    List<Student> allStudents = studentMapper.getAll();
    TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
    try {
        allStudents.forEach(s -> {
            //更新教师信息
            String teacher = s.getTeacher();
            String newTeacher = "TNO_" + new Random().nextInt(100);
            s.setTeacher(newTeacher);
            studentMapper.update(s);
        });
        dataSourceTransactionManager.commit(transactionStatus);
    } catch (Throwable e) {
        dataSourceTransactionManager.rollback(transactionStatus);
        throw e;
    }
}

添加手动事务操控制后,整体耗时约 24秒,这相对于自动事务提交的代码,快了约5倍,对于大量循环数据库提交操作,添加手动事务可以有效提高操作效率。

三、尝试多线程进行数据修改

添加数据库手动事务后操作效率有明细提高,但还是比较长,接下来尝试多线程提交看是不是能够再快一些。

先添加一个Service将批量修改操作整合一下,具体代码如下:

StudentServiceImpl.java
@Service
public class StudentServiceImpl implements StudentService {
    @Autowired
    private StudentMapper studentMapper;

    @Autowired
    private DataSourceTransactionManager dataSourceTransactionManager;

    @Autowired
    private TransactionDefinition transactionDefinition;

    @Override
    public void updateStudents(List<Student> students, CountDownLatch threadLatch) {
        TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
        System.out.println("子线程:" + Thread.currentThread().getName());
        try {
            students.forEach(s -> {
                // 更新教师信息
                // String teacher = s.getTeacher();
                String newTeacher = "TNO_" + new Random().nextInt(100);
                s.setTeacher(newTeacher);
                studentMapper.update(s);
            });
            dataSourceTransactionManager.commit(transactionStatus);
            threadLatch.countDown();
        } catch (Throwable e) {
            e.printStackTrace();
            dataSourceTransactionManager.rollback(transactionStatus);
        }
    }
}

批量测试代码,我们采用了多线程进行提交,修改后测试代码如下:

@Autowired
private DataSourceTransactionManager dataSourceTransactionManager;

@Autowired
private TransactionDefinition transactionDefinition;

@Autowired
private StudentService studentService;

/**
 * 对用户而言,27s 任是一个较长的时间,我们尝试用多线程的方式来经行修改操作看能否加快处理速度
 * 预计创建10个线程,每个线程进行5000条数据修改操作
 * 耗时统计
 * 1 线程数:1      耗时:25s
 * 2 线程数:2      耗时:14s
 * 3 线程数:5      耗时:15s
 * 4 线程数:10     耗时:15s
 * 5 线程数:100    耗时:15s
 * 6 线程数:200    耗时:15s
 * 7 线程数:500    耗时:17s
 * 8 线程数:1000    耗时:19s
 * 8 线程数:2000    耗时:23s
 * 8 线程数:5000    耗时:29s
 */
@Test
void updateStudentWithThreads() {
    //查询总数据
    List<Student> allStudents = studentMapper.getAll();
    // 线程数量
    final Integer threadCount = 100;

    //每个线程处理的数据量
    final Integer dataPartionLength = (allStudents.size() + threadCount - 1) / threadCount;

    // 创建多线程处理任务
    ExecutorService studentThreadPool = Executors.newFixedThreadPool(threadCount);
    CountDownLatch threadLatchs = new CountDownLatch(threadCount);

    for (int i = 0; i < threadCount; i++) {
        // 每个线程处理的数据
        List<Student> threadDatas = allStudents.stream()
                .skip(i * dataPartionLength).limit(dataPartionLength).collect(Collectors.toList());
        studentThreadPool.execute(() -> {
            studentService.updateStudents(threadDatas, threadLatchs);
        });
    }
    try {
        // 倒计时锁设置超时时间 30s
        threadLatchs.await(30, TimeUnit.SECONDS);
    } catch (Throwable e) {
        e.printStackTrace();
    }

    System.out.println("主线程完成");
}

多线程提交修改时,我们尝试了不同线程数对提交速度的影响,具体可以看下面表格,

多线程修改50000条数据时 不同线程数耗时对比(秒)

根据表格,我们线程数增大提交速度并非一直增大,在当前情况下约在2-5个线程数时,提交速度最快(实际线程数还是需要根据服务器配置实际测试)。

另外,MySQL 系列面试题和答案全部整理好了,微信搜索Java技术栈,在后台发送:面试,可以在线阅读。

四、基于两个CountDownLatch控制多线程事务提交

由于多线程提交时,每个线程事务时单独的,无法保证一致性,我们尝试给多线程添加事务控制,来保证每个线程都是在插入数据完成后在提交事务,

这里我们使用两个 CountDownLatch 来控制主线程与子线程事务提交,并设置了超时时间为 30 秒。我们对代码进行了一点修改:

@Override
public void updateStudentsThread(List<Student> students, CountDownLatch threadLatch, CountDownLatch mainLatch, StudentTaskError taskStatus) {
    TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
    System.out.println("子线程:" + Thread.currentThread().getName());
    try {
        students.forEach(s -> {
            // 更新教师信息
            // String teacher = s.getTeacher();
            String newTeacher = "TNO_" + new Random().nextInt(100);
            s.setTeacher(newTeacher);
            studentMapper.update(s);
        });
    } catch (Throwable e) {
        taskStatus.setIsError();
    } finally {
        threadLatch.countDown(); // 切换到主线程执行
    }
    try {
        mainLatch.await();  //等待主线程执行
    } catch (Throwable e) {
        taskStatus.setIsError();
    }
    // 判断是否有错误,如有错误 就回滚事务
    if (taskStatus.getIsError()) {
        dataSourceTransactionManager.rollback(transactionStatus);
    } else {
        dataSourceTransactionManager.commit(transactionStatus);
    }
}
/**
 * 由于每个线程都是单独的事务,需要添加对线程事务的统一控制
 * 我们这边使用两个 CountDownLatch 对子线程的事务进行控制
 */
@Test
void updateStudentWithThreadsAndTrans() {
    //查询总数据
    List<Student> allStudents = studentMapper.getAll();
    // 线程数量
    final Integer threadCount = 4;

    //每个线程处理的数据量
    final Integer dataPartionLength = (allStudents.size() + threadCount - 1) / threadCount;

    // 创建多线程处理任务
    ExecutorService studentThreadPool = Executors.newFixedThreadPool(threadCount);
    CountDownLatch threadLatchs = new CountDownLatch(threadCount); // 用于计算子线程提交数量
    CountDownLatch mainLatch = new CountDownLatch(1); // 用于判断主线程是否提交
    StudentTaskError taskStatus = new StudentTaskError(); // 用于判断子线程任务是否有错误

    for (int i = 0; i < threadCount; i++) {
        // 每个线程处理的数据
        List<Student> threadDatas = allStudents.stream()
                .skip(i * dataPartionLength).limit(dataPartionLength)
                .collect(Collectors.toList());
        studentThreadPool.execute(() -> {
            studentService.updateStudentsThread(threadDatas, threadLatchs, mainLatch, taskStatus);
        });
    }
    try {
        // 倒计时锁设置超时时间 30s
        boolean await = threadLatchs.await(30, TimeUnit.SECONDS);
        if (!await) { // 等待超时,事务回滚
            taskStatus.setIsError();
        }
    } catch (Throwable e) {
        e.printStackTrace();
        taskStatus.setIsError();
    }
    mainLatch.countDown(); // 切换到子线程执行
    studentThreadPool.shutdown(); //关闭线程池

    System.out.println("主线程完成");
}

本想再次测试一下不同线程数对执行效率的影响时,发现当线程数超过10个时,执行时就报错。具体错误内容如下:

Exception in thread "pool-1-thread-2" org.springframework.transaction.CannotCreateTransactionException: Could not open JDBC Connection for transaction; nested exception is java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30055ms.
 at org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:309)
 at org.springframework.transaction.support.AbstractPlatformTransactionManager.startTransaction(AbstractPlatformTransactionManager.java:400)
 at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:373)
 at com.example.springbootmybatis.service.Impl.StudentServiceImpl.updateStudentsThread(StudentServiceImpl.java:58)
 at com.example.springbootmybatis.StudentTest.lambda$updateStudentWithThreadsAndTrans$3(StudentTest.java:164)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30055ms.
 at com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:696)
 at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:197)
 at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:162)
 at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:128)
 at org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:265)
 ... 7 more

错误的大致意思时,不能为数据库事务打开 jdbc Connection,连接在30s的时候超时了。由于前面启动的十个线程需要等待主线程完成后才能提交,所以一直占用连接未释放,造成后面的进程创建连接超时。

看错误日志中错误的来源是 HikariPool ,我们来重新配置一下这个连接池的参数,将最大连接数修改为100,具体配置如下:

# 连接池中允许的最小连接数。缺省值:10
spring.datasource.hikari.minimum-idle=10
# 连接池中允许的最大连接数。缺省值:10
spring.datasource.hikari.maximum-pool-size=100
# 自动提交
spring.datasource.hikari.auto-commit=true
# 一个连接idle状态的最大时长(毫秒),超时则被释放(retired),缺省:10分钟
spring.datasource.hikari.idle-timeout=30000
# 一个连接的生命时长(毫秒),超时而且没被使用则被释放(retired),缺省:30分钟,建议设置比数据库超时时长少30秒
spring.datasource.hikari.max-lifetime=1800000
# 等待连接池分配连接的最大时长(毫秒),超过这个时长还没可用的连接则发生SQLException, 缺省:30秒

再次执行测试发现没有报错,修改线程数为20又执行了一下,同样执行成功了。另外,关注公众号Java技术栈,在后台回复:面试,可以获取我整理的 Java 系列面试题和答案,非常齐全。

五、基于TransactionStatus集合来控制多线程事务提交

在同事推荐下我们使用事务集合来进行多线程事务控制,主要代码如下

@Service
public class StudentsTransactionThread {

    @Autowired
    private StudentMapper studentMapper;
    @Autowired
    private StudentService studentService;
    @Autowired
    private PlatformTransactionManager transactionManager;

    List<TransactionStatus> transactionStatuses = Collections.synchronizedList(new ArrayList<TransactionStatus>());

    @Transactional(propagation = Propagation.REQUIRED, rollbackFor = {Exception.class})
    public void updateStudentWithThreadsAndTrans() throws InterruptedException {

        //查询总数据
        List<Student> allStudents = studentMapper.getAll();

        // 线程数量
        final Integer threadCount = 2;

        //每个线程处理的数据量
        final Integer dataPartionLength = (allStudents.size() + threadCount - 1) / threadCount;

        // 创建多线程处理任务
        ExecutorService studentThreadPool = Executors.newFixedThreadPool(threadCount);
        CountDownLatch threadLatchs = new CountDownLatch(threadCount);
        AtomicBoolean isError = new AtomicBoolean(false);
        try {
            for (int i = 0; i < threadCount; i++) {
                // 每个线程处理的数据
                List<Student> threadDatas = allStudents.stream()
                        .skip(i * dataPartionLength).limit(dataPartionLength).collect(Collectors.toList());
                studentThreadPool.execute(() -> {
                    try {
                        try {
                            studentService.updateStudentsTransaction(transactionManager, transactionStatuses, threadDatas);
                        } catch (Throwable e) {
                            e.printStackTrace();
                            isError.set(true);
                        }finally {
                            threadLatchs.countDown();
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        isError.set(true);
                    }
                });
            }

            // 倒计时锁设置超时时间 30s
            boolean await = threadLatchs.await(30, TimeUnit.SECONDS);
            // 判断是否超时
            if (!await) {
                isError.set(true);
            }
        } catch (Throwable e) {
            e.printStackTrace();
            isError.set(true);
        }

        if (!transactionStatuses.isEmpty()) {
            if (isError.get()) {
                transactionStatuses.forEach(s -> transactionManager.rollback(s));
            } else {
                transactionStatuses.forEach(s -> transactionManager.commit(s));
            }
        }

        System.out.println("主线程完成");
    }
}
@Override
@Transactional(propagation = Propagation.REQUIRED, rollbackFor = {Exception.class})
public void updateStudentsTransaction(PlatformTransactionManager transactionManager, List<TransactionStatus> transactionStatuses, List<Student> students) {
    // 使用这种方式将事务状态都放在同一个事务里面
    DefaultTransactionDefinition def = new DefaultTransactionDefinition();
    def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); // 事物隔离级别,开启新事务,这样会比较安全些。
    TransactionStatus status = transactionManager.getTransaction(def); // 获得事务状态
    transactionStatuses.add(status);

    students.forEach(s -> {
        // 更新教师信息
        // String teacher = s.getTeacher();
        String newTeacher = "TNO_" + new Random().nextInt(100);
        s.setTeacher(newTeacher);
        studentMapper.update(s);
    });
    System.out.println("子线程:" + Thread.currentThread().getName());
}

由于这个中方式去前面方式相同,需要等待线程执行完成后才会提交事务,所有任会占用Jdbc连接池,如果线程数量超过连接池最大数量会产生连接超时。所以在使用过程中任要控制线程数量,

六、使用union连接多个select实现批量update

有些情况写不支持,批量update,但支持insert 多条数据,这个时候可尝试将需要更新的数据拼接成多条select 语句,然后使用union 连接起来,再使用update 关联这个数据进行update,具体代码演示如下:

update student,(
 (select  1 as id,'teacher_A' as teacher) union
 (select  2 as id,'teacher_A' as teacher) union
 (select  3 as id,'teacher_A' as teacher) union
 (select  4 as id,'teacher_A' as teacher)
    /* ....more data ... */
    ) as new_teacher
set
 student.teacher=new_teacher.teacher
where
 student.id=new_teacher.id

这种方式在Mysql 数据库没有配置 allowMultiQueries=true 也可以实现批量更新。

总结

  • 对于大批量数据库操作,使用手动事务提交可以很多程度上提高操作效率
  • 多线程对数据库进行操作时,并非线程数越多操作时间越快,按上述示例大约在2-5个线程时操作时间最快。
  • 对于多线程阻塞事务提交时,线程数量不能过多。
  • 如果能有办法实现批量更新那是最好

版权声明:本文为CSDN博主「圣心」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。原文链接:https://blog.csdn.net/qq273766764/article/details/119972911

近期热文推荐:

1.1,000+ 道 Java面试题及答案整理(2022最新版)

2.劲爆!Java 协程要来了。。。

3.Spring Boot 2.x 教程,太全了!

4.别再写满屏的爆爆爆炸类了,试试装饰器模式,这才是优雅的方式!!

5.《Java开发手册(嵩山版)》最新发布,速速下载!

觉得不错,别忘了随手点赞+转发哦!

标签:面试官,Java,事务,threadCount,线程,java,多线程,teacher
From: https://www.cnblogs.com/javastack/p/16786885.html

相关文章

  • Java查漏补缺——四种引用
    今天复习到一种没看到过的Java知识点,是关于内存不足时Java对象的建立的。引用Java对每一个对象的使用被称为Java的引用,类似于C和C++的指针,引用则是调用对象和对象方......
  • heimaJava-网络编程
    Java网络编程概念网络编程可以让程序与网络上的其他设备中的程序进行数据交互网络通信基本模式常见的通信模式有如下两种形式,Client-Server(CS),Browser/Server(BS......
  • ideajava快捷键
    idea:java快捷键快捷键一:psvm——快速生成一个main()函数Eg:输入psvm后回车,会直接生成“publicstaticvoidmain(String[]args){}”。快捷键二:sout——用来快......
  • java学习心得——Linux服务器操作命令整理
     基础操作:cd/  返回根目录mkdir wwwroot创建文件夹cdwwwroot跳转到文件夹bashstart.sh运行脚本命令cd..返回上级目录ls查看当前目录文件ll查看当......
  • Java8集合通过流排序,Stream<T> sorted(Comparator<? super T> comparator)
    Java8中,可以通过流的sorted操作对流中的元素排序,sorted操作的参数是Comparator接口,通过传入一个比较函数来实现排序操作,最直接的,就是通过形如(a,b)->{in......
  • java基础练习
    练习1:判断输入的值是否是偶数,另外,要处理输入错误(目的:熟悉输入、输出,特别是Scanner对象的方法)importjava.util.InputMismatchException;importjava.util.Scanner;pu......
  • Java 如何将 List 转换为 MAP
    有时候我们需要将给定的List转换为Map。如果你使用的是Java8以后版本的话,Stream是你的好朋友。Java8publicMap<Integer,Animal>convertListAfterJava8(Lis......
  • Java基础__学习笔记__常用API-Object
    对象类用equals比较的是地址值packageStudyAPIofString;publicclasstest{publicstaticvoidmain(String[]args){students1=newstudent("张三",......
  • Java Web 06
    约束:非空约束:notnull保证列中所有数据不能有null值唯一约束:unique保证列中所有数据各不相同主键约束:primarykey主键是一行数据的唯一标识,要求非空且唯一检查约......
  • 解决springBoot启动报错Failed to obtain JDBC Connection; nested exception is java
    FailedtoobtainJDBCConnection;nestedexceptionisjava.sql.SQLNonTransientConnectionException:CLIENT_PLUGIN_AUTHisrequired意思是获取JDBC连接失败,导致的......