微信公众号访问地址:SpringBoot用线程池ThreadPoolTaskExecutor异步处理百万级数据
一、背景:
利用ThreadPoolTaskExecutor多线程异步批量插入,提高百万级数据插入效率。ThreadPoolTaskExecutor是对ThreadPoolExecutor进行了封装处理。ThreadPoolTaskExecutor是ThreadPoolExecutor的封装,所以,性能更加优秀,推荐ThreadPoolTaskExecutor。
二、具体细节:
2.1、配置application.yml
# 异步线程配置 自定义使用参数
async:
executor:
thread:
core_pool_size: 10 # 配置核心线程数 默认8个 核数*2+2
max_pool_size: 100 # 配置最大线程数
queue_capacity: 99988 # 配置队列大小
keep_alive_seconds: 20 #设置线程空闲等待时间秒s
name:
prefix: async-thread- # 配置线程池中的线程的名称前缀
2.2、ThreadPoolConfig配置注入Bean
package com.wonders.common.config;
import cn.hutool.core.thread.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @Description: TODO:利用ThreadPoolTaskExecutor多线程批量执行相关配置
* 自定义线程池
* 发现不是线程数越多越好,具体多少合适,网上有一个不成文的算法:CPU核心数量*2 +2 个线程。
* @Author: yyalin
* @CreateDate: 2022/11/6 11:56
* @Version: V1.0
*/
@Configuration
@EnableAsync
@Slf4j
public class ThreadPoolConfig {
//自定义使用参数
@Value("${async.executor.thread.core_pool_size}")
private int corePoolSize; //配置核心线程数
@Value("${async.executor.thread.max_pool_size}")
private int maxPoolSize; //配置最大线程数
@Value("${async.executor.thread.queue_capacity}")
private int queueCapacity;
@Value("${async.executor.thread.name.prefix}")
private String namePrefix;
@Value("${async.executor.thread.keep_alive_seconds}")
private int keepAliveSeconds;
//1、自定义asyncServiceExecutor线程池
@Bean(name = "asyncServiceExecutor")
public ThreadPoolTaskExecutor asyncServiceExecutor() {
log.info("start asyncServiceExecutor......");
//在这里修改
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//配置核心线程数
executor.setCorePoolSize(corePoolSize);
//配置最大线程数
executor.setMaxPoolSize(maxPoolSize);
//设置线程空闲等待时间 s
executor.setKeepAliveSeconds(keepAliveSeconds);
//配置队列大小 设置任务等待队列的大小
executor.setQueueCapacity(queueCapacity);
//配置线程池中的线程的名称前缀
//设置线程池内线程名称的前缀-------阿里编码规约推荐--方便出错后进行调试
executor.setThreadNamePrefix(namePrefix);
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
//执行初始化
executor.initialize();
return executor;
}
/**
* 2、公共线程池,利用系统availableProcessors线程数量进行计算
*/
@Bean(name = "commonThreadPoolTaskExecutor")
public ThreadPoolTaskExecutor commonThreadPoolTaskExecutor() {
ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
int processNum = Runtime.getRuntime().availableProcessors(); // 返回可用处理器的Java虚拟机的数量
int corePoolSize = (int) (processNum / (1 - 0.2));
int maxPoolSize = (int) (processNum / (1 - 0.5));
pool.setCorePoolSize(corePoolSize); // 核心池大小
pool.setMaxPoolSize(maxPoolSize); // 最大线程数
pool.setQueueCapacity(maxPoolSize * 1000); // 队列程度
pool.setThreadPriority(Thread.MAX_PRIORITY);
pool.setDaemon(false);
pool.setKeepAliveSeconds(300);// 线程空闲时间
return pool;
}
//3自定义defaultThreadPoolExecutor线程池
@Bean(name = "defaultThreadPoolExecutor", destroyMethod = "shutdown")
public ThreadPoolExecutor systemCheckPoolExecutorService() {
int maxNumPool=Runtime.getRuntime().availableProcessors();
return new ThreadPoolExecutor(3,
maxNumPool,
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable" data-textnode-index-1692583966621="268" data-index-1692583966621="3909" data-index-len-1692583966621="3909" class="character" style="margin: 0px; padding: 0px; box-sizing: border-box; max-width: 100%; display: inline-block; text-indent: initial;">>(10000),
//置线程名前缀,例如设置前缀为hutool-thread-,则线程名为hutool-thread-1之类。
new ThreadFactoryBuilder().setNamePrefix("default-executor-thread-%d").build(),
(r, executor) -" data-textnode-index-1692583966621="276" data-index-1692583966621="4114" data-index-len-1692583966621="4114" class="character" style="margin: 0px; padding: 0px; box-sizing: border-box; max-width: 100%; display: inline-block; text-indent: initial;">> log.error("system pool is full! "));
}
}
2.3、创建异步线程,业务类
//1、自定义asyncServiceExecutor线程池
@Override
@Async("asyncServiceExecutor")
public void executeAsync(List<Student" data-textnode-index-1692583966621="297" data-index-1692583966621="4291" data-index-len-1692583966621="4291" class="character" style="margin: 0px; padding: 0px; box-sizing: border-box; max-width: 100%; display: inline-block; text-indent: initial;">> students,
StudentService studentService,
CountDownLatch countDownLatch) {
try{
log.info("start executeAsync");
//异步线程要做的事情
studentService.saveBatch(students);
log.info("end executeAsync");
}finally {
countDownLatch.countDown();// 很关键, 无论上面程序是否异常必须执行countDown,否则await无法释放
}
}
2.4、拆分集合工具类
package com.wonders.threads;
import com.google.common.collect.Lists;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
/**
* @Description: TODO:拆分工具类
* 1、获取需要进行批量更新的大集合A,对大集合进行拆分操作,分成N个小集合A-1 ~ A-N;
* 2、开启线程池,针对集合的大小进行调参,对小集合进行批量更新操作;
* 3、对流程进行控制,控制线程执行顺序。按照指定大小拆分集合的工具类
* @Author: yyalin
* @CreateDate: 2022/5/6 14:43
* @Version: V1.0
*/
public class SplitListUtils {
/**
* 功能描述:拆分集合
* @param <T" data-textnode-index-1692583966621="360" data-index-1692583966621="5164" data-index-len-1692583966621="5164" class="character" style="margin: 0px; padding: 0px; box-sizing: border-box; max-width: 100%; display: inline-block; text-indent: initial;">> 泛型对象
* @MethodName: split
* @MethodParam: [resList:需要拆分的集合, subListLength:每个子集合的元素个数]
* @Return: java.util.List<java.util.List<T" data-textnode-index-1692583966621="369" data-index-1692583966621="5306" data-index-len-1692583966621="5306" class="character" style="margin: 0px; padding: 0px; box-sizing: border-box; max-width: 100%; display: inline-block; text-indent: initial;">>" data-textnode-index-1692583966621="369" data-index-1692583966621="5307" data-index-len-1692583966621="5307" class="character" style="margin: 0px; padding: 0px; box-sizing: border-box; max-width: 100%; display: inline-block; text-indent: initial;">>:返回拆分后的各个集合组成的列表
* 代码里面用到了guava和common的结合工具类
* @Author: yyalin
* @CreateDate: 2022/5/6 14:44
*/
public static <T" data-textnode-index-1692583966621="382" data-index-1692583966621="5439" data-index-len-1692583966621="5439" class="character" style="margin: 0px; padding: 0px; box-sizing: border-box; max-width: 100%; display: inline-block; text-indent: initial;">> List<List<T" data-textnode-index-1692583966621="382" data-index-1692583966621="5452" data-index-len-1692583966621="5452" class="character" style="margin: 0px; padding: 0px; box-sizing: border-box; max-width: 100%; display: inline-block; text-indent: initial;">>" data-textnode-index-1692583966621="382" data-index-1692583966621="5453" data-index-len-1692583966621="5453" class="character" style="margin: 0px; padding: 0px; box-sizing: border-box; max-width: 100%; display: inline-block; text-indent: initial;">> split(List<T" data-textnode-index-1692583966621="382" data-index-1692583966621="5467" data-index-len-1692583966621="5467" class="character" style="margin: 0px; padding: 0px; box-sizing: border-box; max-width: 100%; display: inline-block; text-indent: initial;">> resList, int subListLength) {
if (CollectionUtils.isEmpty(resList) || subListLength <= 0) {
return Lists.newArrayList();
}
List<List<T" data-textnode-index-1692583966621="394" data-index-1692583966621="5635" data-index-len-1692583966621="5635" class="character" style="margin: 0px; padding: 0px; box-sizing: border-box; max-width: 100%; display: inline-block; text-indent: initial;">>" data-textnode-index-1692583966621="394" data-index-1692583966621="5636" data-index-len-1692583966621="5636" class="character" style="margin: 0px; padding: 0px; box-sizing: border-box; max-width: 100%; display: inline-block; text-indent: initial;">> ret = Lists.newArrayList();
int size = resList.size();
if (size <= subListLength) {
// 数据量不足 subListLength 指定的大小
ret.add(resList);
} else {
int pre = size / subListLength;
int last = size % subListLength;
// 前面pre个集合,每个大小都是 subListLength 个元素
for (int i = 0; i < pre; i++) {
List<T" data-textnode-index-1692583966621="422" data-index-1692583966621="6020" data-index-len-1692583966621="6020" class="character" style="margin: 0px; padding: 0px; box-sizing: border-box; max-width: 100%; display: inline-block; text-indent: initial;">> itemList = Lists.newArrayList();
for (int j = 0; j < subListLength; j++) {
itemList.add(resList.get(i * subListLength + j));
}
ret.add(itemList);
}
// last的进行处理
if (last " data-textnode-index-1692583966621="438" data-index-1692583966621="6289" data-index-len-1692583966621="6289" class="character" style="margin: 0px; padding: 0px; box-sizing: border-box; max-width: 100%; display: inline-block; text-indent: initial;">> 0) {
List<T" data-textnode-index-1692583966621="441" data-index-1692583966621="6317" data-index-len-1692583966621="6317" class="character" style="margin: 0px; padding: 0px; box-sizing: border-box; max-width: 100%; display: inline-block; text-indent: initial;">> itemList = Lists.newArrayList();
for (int i = 0; i < last; i++) {
itemList.add(resList.get(pre * subListLength + i));
}
ret.add(itemList);
}
}
return ret;
}
/**
* 功能描述:方法二:集合切割类,就是把一个大集合切割成多个指定条数的小集合,方便往数据库插入数据
* 推荐使用
* @MethodName: pagingList
* @MethodParam:[resList:需要拆分的集合, subListLength:每个子集合的元素个数]
* @Return: java.util.List<java.util.List<T" data-textnode-index-1692583966621="470" data-index-1692583966621="6779" data-index-len-1692583966621="6779" class="character" style="margin: 0px; padding: 0px; box-sizing: border-box; max-width: 100%; display: inline-block; text-indent: initial;">>" data-textnode-index-1692583966621="470" data-index-1692583966621="6780" data-index-len-1692583966621="6780" class="character" style="margin: 0px; padding: 0px; box-sizing: border-box; max-width: 100%; display: inline-block; text-indent: initial;">>:返回拆分后的各个集合组成的列表
* @Author: yyalin
* @CreateDate: 2022/5/6 15:15
*/
public static <T" data-textnode-index-1692583966621="482" data-index-1692583966621="6880" data-index-len-1692583966621="6880" class="character" style="margin: 0px; padding: 0px; box-sizing: border-box; max-width: 100%; display: inline-block; text-indent: initial;">> List<List<T" data-textnode-index-1692583966621="482" data-index-1692583966621="6893" data-index-len-1692583966621="6893" class="character" style="margin: 0px; padding: 0px; box-sizing: border-box; max-width: 100%; display: inline-block; text-indent: initial;">>" data-textnode-index-1692583966621="482" data-index-1692583966621="6894" data-index-len-1692583966621="6894" class="character" style="margin: 0px; padding: 0px; box-sizing: border-box; max-width: 100%; display: inline-block; text-indent: initial;">> pagingList(List<T" data-textnode-index-1692583966621="482" data-index-1692583966621="6913" data-index-len-1692583966621="6913" class="character" style="margin: 0px; padding: 0px; box-sizing: border-box; max-width: 100%; display: inline-block; text-indent: initial;">> resList, int pageSize){
//判断是否为空
if (CollectionUtils.isEmpty(resList) || pageSize <= 0) {
return Lists.newArrayList();
}
int length = resList.size();
int num = (length+pageSize-1)/pageSize;
List<List<T" data-textnode-index-1692583966621="504" data-index-1692583966621="7169" data-index-len-1692583966621="7169" class="character" style="margin: 0px; padding: 0px; box-sizing: border-box; max-width: 100%; display: inline-block; text-indent: initial;">>" data-textnode-index-1692583966621="504" data-index-1692583966621="7170" data-index-len-1692583966621="7170" class="character" style="margin: 0px; padding: 0px; box-sizing: border-box; max-width: 100%; display: inline-block; text-indent: initial;">> newList = new ArrayList<" data-textnode-index-1692583966621="506" data-index-1692583966621="7197" data-index-len-1692583966621="7197" class="character" style="margin: 0px; padding: 0px; box-sizing: border-box; max-width: 100%; display: inline-block; text-indent: initial;">>();
for(int i=0;i<num;i++){
int fromIndex = i*pageSize;
int toIndex = (i+1)*pageSize<length?(i+1)*pageSize:length;
newList.add(resList.subList(fromIndex,toIndex));
}
return newList;
}
// 运行测试代码 可以按顺序拆分为11个集合
public static void main(String[] args) {
//初始化数据
List<String" data-textnode-index-1692583966621="545" data-index-1692583966621="7543" data-index-len-1692583966621="7543" class="character" style="margin: 0px; padding: 0px; box-sizing: border-box; max-width: 100%; display: inline-block; text-indent: initial;">> list = Lists.newArrayList();
int size = 19;
for (int i = 0; i < size; i++) {
list.add("hello-" + i);
}
// 大集合里面包含多个小集合
List<List<String" data-textnode-index-1692583966621="564" data-index-1692583966621="7726" data-index-len-1692583966621="7726" class="character" style="margin: 0px; padding: 0px; box-sizing: border-box; max-width: 100%; display: inline-block; text-indent: initial;">>" data-textnode-index-1692583966621="564" data-index-1692583966621="7727" data-index-len-1692583966621="7727" class="character" style="margin: 0px; padding: 0px; box-sizing: border-box; max-width: 100%; display: inline-block; text-indent: initial;">> temps = pagingList(list, 100);
int j = 0;
// 对大集合里面的每一个小集合进行操作
for (List<String" data-textnode-index-1692583966621="576" data-index-1692583966621="7829" data-index-len-1692583966621="7829" class="character" style="margin: 0px; padding: 0px; box-sizing: border-box; max-width: 100%; display: inline-block; text-indent: initial;">> obj : temps) {
System.out.println(String.format("row:%s -" data-textnode-index-1692583966621="578" data-index-1692583966621="7899" data-index-len-1692583966621="7899" class="character" style="margin: 0px; padding: 0px; box-sizing: border-box; max-width: 100%; display: inline-block; text-indent: initial;">> size:%s,data:%s", ++j, obj.size(), obj));
}
}
}
2.5、造数据,多线程异步插入
public int batchInsertWay() throws Exception {
log.info("开始批量操作.........");
Random rand = new Random();
List<Student" data-textnode-index-1692583966621="601" data-index-1692583966621="8110" data-index-len-1692583966621="8110" class="character" style="margin: 0px; padding: 0px; box-sizing: border-box; max-width: 100%; display: inline-block; text-indent: initial;">> list = new ArrayList<" data-textnode-index-1692583966621="605" data-index-1692583966621="8133" data-index-len-1692583966621="8133" class="character" style="margin: 0px; padding: 0px; box-sizing: border-box; max-width: 100%; display: inline-block; text-indent: initial;">>();
//造100万条数据
for (int i = 0; i < 1000003; i++) {
Student student=new Student();
student.setStudentName("大明:"+i);
student.setAddr("上海:"+rand.nextInt(9) * 1000);
student.setAge(rand.nextInt(1000));
student.setPhone("134"+rand.nextInt(9) * 1000);
list.add(student);
}
//2、开始多线程异步批量导入
long startTime = System.currentTimeMillis(); // 开始时间
//boolean a=studentService.batchInsert(list);
List<List<Student" data-textnode-index-1692583966621="652" data-index-1692583966621="8648" data-index-len-1692583966621="8648" class="character" style="margin: 0px; padding: 0px; box-sizing: border-box; max-width: 100%; display: inline-block; text-indent: initial;">>" data-textnode-index-1692583966621="652" data-index-1692583966621="8649" data-index-len-1692583966621="8649" class="character" style="margin: 0px; padding: 0px; box-sizing: border-box; max-width: 100%; display: inline-block; text-indent: initial;">> list1=SplitListUtils.pagingList(list,100); //拆分集合
CountDownLatch countDownLatch = new CountDownLatch(list1.size());
for (List<Student" data-textnode-index-1692583966621="663" data-index-1692583966621="8799" data-index-len-1692583966621="8799" class="character" style="margin: 0px; padding: 0px; box-sizing: border-box; max-width: 100%; display: inline-block; text-indent: initial;">> list2 : list1) {
asyncService.executeAsync(list2,studentService,countDownLatch);
}
try {
countDownLatch.await(); //保证之前的所有的线程都执行完成,才会走下面的;
long endTime = System.currentTimeMillis(); //结束时间
log.info("一共耗时time: " + (endTime - startTime) / 1000 + " s");
// 这样就可以在下面拿到所有线程执行完的集合结果
} catch (Exception e) {
log.error("阻塞异常:"+e.getMessage());
}
return list.size();
}
2.6、测试结果
10个核心线程:
20个核心线程
50个核心线程:
汇总结果:
序号 | 核心线程(core_pool_size) | 插入数据(万) | 耗时(秒) |
1 | 10 | 100w | 31s |
2 | 15 | 100w | 28s |
3 | 50 | 100w | 27s |
结论:对不同线程数的测试,发现不是线程数越多越好,具体多少合适,网上有一个不成文的算法:CPU核心数量*2 +2 个线程。
个人推荐配置:
int processNum = Runtime.getRuntime().availableProcessors(); // 返回可用处理器的Java虚拟机的数量
int corePoolSize = (int) (processNum / (1 - 0.2));
int maxPoolSize = (int) (processNum / (1 - 0.5));
如果大家对相关文章感兴趣,可以关注微信公众号"程序猿小杨",会持续更新优秀文章!欢迎大家 分享、收藏、点赞、在看,您的支持就是我坚持下去的最大动力!谢谢!
标签:box,index,SpringBoot,int,data,1692583966621,线程,ThreadPoolTaskExecutor From: https://blog.51cto.com/u_11866810/7171766