首页 > 其他分享 >线程分批处理数据及MyBatis的批量插入

线程分批处理数据及MyBatis的批量插入

时间:2024-01-29 17:01:31浏览次数:29  
标签:待处理 int 分批 处理 线程 activity MyBatis 数据 id

文章目录
一、背景
二、代码实现:
三、分页查询下游批次处理场景
四、MyBatis的批量插入
1、活动表简单表结构:
2、业务层组装数据:

一、背景
数据量较多时,我们常常遇到需要分批处理的情况,比如上千上万数据需要需要操作数据库时(入库或者更新),我们想到分批处理,或者解析文件数据量较多,我们可能多线程分批处理,每个线程执行一定的批次数据等…
不管何种场景,当我们想到分批处理时,那如何分批,怎么计算批次数呢?


tip:此处以小数据量为例:假设待处理数据总条数75,每10条数据一处理

我常用的方法总结两种供参考:

方法一:循环遍历待处理的处理,当到达待处理的批次或批次倍数,或最后不足一次剩余的,满足条件即执行一次分批处理操作。eg:10,20,30…7
方法二:根据总数计算总批次数(75/10+1=8次),每次从待处理数据集中截取要处理的数据,最后一个批次特殊处理。eg:0-9,10-19…70-74

二、代码实现:

public static void main(String[] args) {
        List sourList = new ArrayList();
        //1、将list中放进去75个数据--模拟待处理数据集
        for (int i = 0; i < 75; i++) {
            sourList.add(i);
        }

        //假设每N个(10个)数据一处理(实际应用中可能每N个开一个线程,或者每N个调一次数据库操作)
        int batchSize = 10;
        //tempList存放批次数据
        List<Object> tempList = new ArrayList<Object>();

        //方法一:循环遍历待处理的处理(可能多线程,可能分批次入数据库,总之需要按每次处理的大小,计算需要处理的批次数)
        for (int i = 0; i < sourList.size(); i++) {
            tempList.add(sourList.get(i));
            //判断批次,够N个数开始处理一波
            if ((i + 1) % batchSize == 0 || (i + 1) == sourList.size()) {
                log.info("处理数据集:{}", tempList.toString());
                //todo 你的线程处理,或者数据库操作等
                tempList.clear();
            }
        }
        log.info("************* 方法一结束 *************");

        //方法二:根据总数计算总批次数,每次从待处理数据集中截取要处理的数据
        int count = sourList.size();
        int totalPages = (count % batchSize > 0) ? (count / batchSize + 1) : (count / batchSize);
        log.info("待处理数据总条数:{},总批次数:{}", count, totalPages);

        int startIndex = 0;
        int endIndex = 0;
        if (totalPages > 0) {
            //循环批次数,计算待处理数据下标
            for (int pageNum = 1; pageNum <= totalPages; pageNum++) {
                //每批次计算要处理的数据起始位置。终止位置
                List subList = new ArrayList();
                startIndex = ((pageNum - 1) * batchSize);
                if (pageNum == totalPages) {
                    //考虑最后一个批次特殊处理
                    subList = sourList.subList(startIndex, count);
                } else {
                    endIndex = startIndex + batchSize;
                    subList = sourList.subList(startIndex, endIndex);
                }
                //todo 你的线程处理,或者数据库操作等
                log.info("处理第{}批次:{}", pageNum, subList.toString());
            }
        }
        log.info("************* 方法二结束 *************");
    }

========================打印结果:

17:42:03.357  处理数据集:[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
17:42:03.368  处理数据集:[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
17:42:03.368  处理数据集:[20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
17:42:03.368  处理数据集:[30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
17:42:03.368  处理数据集:[40, 41, 42, 43, 44, 45, 46, 47, 48, 49]
17:42:03.368  处理数据集:[50, 51, 52, 53, 54, 55, 56, 57, 58, 59]
17:42:03.368  处理数据集:[60, 61, 62, 63, 64, 65, 66, 67, 68, 69]
17:42:03.368  处理数据集:[70, 71, 72, 73, 74]
17:42:03.368  ************* 方法一结束 *************
17:42:03.368  待处理数据总条数:75,总批次数:8
17:42:03.369  处理第1批次:[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
17:42:03.369  处理第2批次:[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
17:42:03.369  处理第3批次:[20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
17:42:03.369  处理第4批次:[30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
17:42:03.369  处理第5批次:[40, 41, 42, 43, 44, 45, 46, 47, 48, 49]
17:42:03.369  处理第6批次:[50, 51, 52, 53, 54, 55, 56, 57, 58, 59]
17:42:03.369  处理第7批次:[60, 61, 62, 63, 64, 65, 66, 67, 68, 69]
17:42:03.369  处理第8批次:[70, 71, 72, 73, 74]
17:42:03.369  ************* 方法二结束 *************

这样每一波分批待处理的数据集合,以及位置你都能get到了,也就可以进行你的业务逻辑操作了,入库或者开线程了。

三、分页查询下游批次处理场景
如果是分页查询下游或者其他接口循环处理场景:

//每次导出或者查询下游接口等分页页大小(10页一查)
public final static int  PERSIZE = 10;
//查询第一次数据(返回的分页信息page中包含总页数、总条数)
Page<OrderInfoDTO> firstBOList = orderRepository.queryPrescriptionByOrderId(param);
//处理第一页查询回来的数据
if (CollectionUtil.isNotEmpty(firstBOList)) {
    log.info("处理数据{}条 ", firstBOList.size());
}
//页数大于1继续查询
int totalPages=firstBOList.getTotalPage();
if (totalPages > 1) {
    for (int pageNum = 2; pageNum <= totalPages; pageNum++) {
        query.setPageSize(PERSIZE);
        query.setPageNo(pageNum);
        Page<OrderInfoDTO> list = orderRepository.queryPrescriptionByOrderId(param);
        //todo 处理结果集
        log.info("处理第{}批次数据{}条 ", pageNum, list.size());
    }
}

四、MyBatis的批量插入
当我们有很多数据需要通过数据库操作,例如很多数据的更新或者插入, 如果用for循环的方式,里面一条条数据去调数据库操作,我们知道那样开销是很大的,会频繁的创建链接,带来性能问题,为了减少连接,我们通常会批量操作。

批量插入代码样例,已mybatis的批量插入活动表为例:

1、活动表简单表结构:

CREATE TABLE `activity` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增id',
  `activity_id` bigint(20) DEFAULT NULL COMMENT '活动id',
  `user_id` bigint(20) DEFAULT NULL COMMENT '用户id',
  `create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT DEFAULT CHARSET=utf8 COMMENT='活动表';

2、业务层组装数据:
ActivityServiceImpl.java:


private void saveRecords() {
        List<Activity> recordList =  new ArrayList<>();
            Activity activity = new Activity();
            activity.setUserId(userId);
            activity.setActivityId(activityId);
            recordList.add(activity);
            //此处根据业务需求具体组装待处理集合list
           activityDao.insertBatchActivity(recordList);
    }

activityDao.xml中写法:

activityDao.xml:

 <!-- 批量插入活动表-->
    <insert id="insertBatchActivity">
        INSERT INTO activity
        (
        activity_id, user_id, create_time
        )
        VALUES
        <foreach collection="list" item="item" separator=",">
            (
            #{item.activityId},
            #{item.userId},
            now()
            )
        </foreach>
    </insert>

特别注意:mysql默认接受sql的大小是1048576(1M),即第三种方式若数据量超过1M会报如下异常:(可通过调整MySQL安装目录下的my.ini文件中[mysqld]段的"max_allowed_packet = 1M")

标签:待处理,int,分批,处理,线程,activity,MyBatis,数据,id
From: https://www.cnblogs.com/codeLearn/p/17994880

相关文章

  • 线程安全问题
    需求:某电影院目前正在上映国产大片,共100张票,而它有3个窗口卖票,请设计一个程序模拟该电影院卖票publicclassMyThreadextendsThread{//表示这个类的所有对象都共享ticketstaticintticket=0;//0~99@Overridepublicvoidrun(){while(true){if(tic......
  • 线程池参数千万不要这样设置,坑得我整篇文章都写错了,要注意!
    你好呀,我是歪歪。先给大家道个歉:上周不是发布了这篇文章嘛:《三个烂怂八股文,变成两个场景题,打得我一脸懵逼。》其中第一个关于线程池的场景,经过读者提醒可能有问题,我又一次用尽浑身解数分析了一波,发现之前确实分析的不对。这个案例真的是再一次深入的刷新了我对于线程池运行过......
  • 深入浅出Java多线程(二):Java多线程类和接口
    引言大家好,我是你们的老伙计秀才!今天带来的是[深入浅出Java多线程]系列的第二篇内容:Java多线程类和接口。大家觉得有用请点赞,喜欢请关注!秀才在此谢过大家了!!!在现代计算机系统中,多线程技术是提升程序性能、优化资源利用和实现并发处理的重要手段。特别是在Java编程语言中,多线程机......
  • 写一段rust代码,两个线程共享一个bool变量,一个写,一个读
    usestd::sync::{Arc,Mutex};usestd::thread;fnmain(){//创建一个布尔变量并用Arc和Mutex包装,使其可在多个线程间共享和修改letshared_bool=Arc::new(Mutex::new(false));//克隆Arc变量,以便在两个线程之间共享letwriter_shared_bool=Ar......
  • 利用Mybatis拦截器实现自定义的ID自增器
    原生的Mybatis框架是没有ID自增器,但例如国产的MybatisPlus却是支持,不过,MybatisPlus却是缺少了自定属性的填充;例如:我们需要自定义填充一些属性,updateDate、createDate等,这时MybatisPlus自带的ID自增器就无法满足需求;这种时候我们就需要自定义的ID增加器,可以自定义ID增长策略同时......
  • Qt/C++音视频开发64-共享解码线程/重复利用解码/极低CPU占用/画面同步/进度同步
    一、前言共享解码线程主要是为了降低CPU占用,重复利用解码,毕竟在一个监控系统中,很可能打开了同一个地址,需要在多个不同的窗口中播放,形成多屏渲染的效果,做到真正的完全的画面同步,在主解码线程中切换了播放进度,所有关联的同一个解码线程的播放窗体也会立即同步画面,使得感官上看起来......
  • C# 线程本地存储 为什么线程间值不一样
    一:背景1.讲故事有朋友在微信里面问我,为什么用ThreadStatic标记的字段,只有第一个线程拿到了初始值,其他线程都是默认值,让我能不能帮他解答一下,尼玛,我也不是神仙什么都懂,既然问了,那我试着帮他解答一下,也给后面类似疑问的朋友解个惑吧。二:为什么值不一样1.问题复现为了方便讲述,定义......
  • MyBatis 源码系列:MyBatis 体系结构、六大解析器
    体系结构MyBatis是一个持久层框架,其体系结构分为三层:基础支持层、核心处理层和接口层。基础支持层包括数据源模块、事务管理模块、缓存模块、Binding模块、反射模块、类型转换模块、日志模块、资源加载模块和解析器模块。这些模块为MyBatis提供了基础功能,为核心处理层提供了良好......
  • 多线程
    多线程理论(1)什么是线程在Python中,线程(Thread)是执行单元的最小单位。线程是进程内的一条执行路径,每个线程都有自己的执行序列、执行环境和栈空间,但它们共享同一个进程的地址空间。在多线程编程中,可以同时运行多个线程,每个线程执行不同的任务,从而实现并发执行。相比于多进......
  • C++多线程 第一章 你好,C++并发世界
    第一章你好,C++并发世界C++并发并发(concurrency):主要包括任务切换与硬件并发两类.并发(concurrency)实际上与多线程(multithreading)存在差异.并发的种类任务切换(taskswitching):计算机在某一时刻只可以真正执行一个任务,但它可以每秒切换任务许多次.通过做一......