首页 > 数据库 >线程池使用、countDownLatch、以及数据库批量插入 添加配置优化插入与计算

线程池使用、countDownLatch、以及数据库批量插入 添加配置优化插入与计算

时间:2023-02-22 15:00:23浏览次数:36  
标签:int collection 插入 线程 countDownLatch static ThreadPoolExecutor

//新建线程池
ThreadPoolExecutor cpuThreadPoolExecutor = ThreadUtil.getCpuThreadPoolExecutor();
//使用CountdoLatch final CountDownLatch countDownLatch = new CountDownLatch((int) (selectIpInfoParam.getIpValue2() - selectIpInfoParam.getIpValue1() + 1));
//循环的时候使用线程池执行 for (long i = selectIpInfoParam.getIpValue1(); i <= selectIpInfoParam.getIpValue2(); i++) { Ip finalIp = ip; long finalI = i; cpuThreadPoolExecutor.execute(() -> { getIpDetail(finalIp, ipDetails, item00, item01, netType, finalI, map); countDownLatch.countDown(); }); } try {
    //使用countdoLatch 来停顿主线程,直到数值达到零为止 countDownLatch.await(); if (!CollectionUtils.isEmpty(ipDetails)) {
       //批量保存可以设置每次存储更多,默认是1000 ipDetailService.saveBatch(ipDetails, 10000); } } catch (Exception e) { throw new ApiException(500, "服务异常!"); }

&rewriteBatchedStatements=true :设置在数据库连接后面,可以使保存批量更快速






package com.byd.plm.common.util;

import org.springframework.util.CollectionUtils;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;

import java.util.Collection;
import java.util.concurrent.*;
import java.util.function.Consumer;

/**
* ThreadUtil
* 线程工具类
*
* @author ZeXin.Chen
* @since 2022/9/30 17:51
*/
public final class ThreadUtil {
private ThreadUtil() {
}

/**
* 最大线程上并非越大越快,别瞎整,需要实际调试
*/
private static final int MAX_THREAD_COUNT = 15;
/**
* CPU 数量
*/
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();

public static <T> ThreadPoolExecutor getIOThreadPoolExecutor() {
return getThreadPoolExecutor(CPU_COUNT << 4, CPU_COUNT << 5, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), Executors.defaultThreadFactory());
}

public static <T> ThreadPoolExecutor getCpuThreadPoolExecutor() {
return getThreadPoolExecutor(CPU_COUNT + 1, CPU_COUNT *2, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), Executors.defaultThreadFactory());
}

private static <T> ThreadPoolExecutor getThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit timeUnit, BlockingQueue<Runnable> queue, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, queue, threadFactory);
}

public static <T> void execute(ThreadPoolExecutor poolExecutor, Collection<T> collection, Consumer<T> consumer, int threadCount) {
// 空集合
if (CollectionUtils.isEmpty(collection)) {
return;
}

// 处理数量小于等于2 不开启子线程
if (collection.size() <= 2) {
collection.forEach(consumer);
return;
}

// 将request设置为子线程共享
ServletRequestAttributes sra = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();

CountDownLatch countDownLatch = new CountDownLatch(threadCount);
BlockingQueue<T> queue = new LinkedBlockingQueue<>(collection);

for (int a = 0; a < threadCount; a++) {
poolExecutor.execute(() -> {
try {
RequestContextHolder.setRequestAttributes(sra);
while (true) {
T t = queue.poll();
if (null == t) {
break;
}
consumer.accept(t);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
RequestContextHolder.resetRequestAttributes();
countDownLatch.countDown();
}
});
}

try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}

}


public static <T> void execute(ThreadPoolExecutor poolExecutor, Collection<T> collection, Consumer<T> consumer) {
// 启用线程数量
int threadCount = Math.min((collection.size() >>> 1) + 1, MAX_THREAD_COUNT);
execute(poolExecutor, collection, consumer, threadCount);
}
}
 

 

标签:int,collection,插入,线程,countDownLatch,static,ThreadPoolExecutor
From: https://www.cnblogs.com/orangeJuiceRain/p/17144369.html

相关文章

  • Mysql-存储函数-批量插入数据
    1千万数据插入数据库大概也就几分钟----------------------------------删除存储函数----------------------------------dropprocedureifexistscm_basedb.loo......
  • 算法随想Day20【二叉树】| LC235-二叉搜索树的最近公共祖先、LC701-二叉搜索树中的插
    LC235.二叉搜索树的最近公共祖先利用二叉搜索树的特性,中序遍历,如果当前节点的值大于q和p的值,公共祖先一定在当前节点的左子树中,同理小于q和p值时,公共祖先一定在当前节点......
  • 插入排序
    definsertion_sort(list):N=len(list)foriinrange(1,len(list)):forjinrange(i,0,-1):iflist[j]<list[j-1]:......
  • mybatis - 连接mysql数据库插入中文乱码
    对于mysql数据库的乱码问题,有两中情况:1.mysql数据库编码问题(建库时设定)。2.连接mysql数据库的url编码设置问题。对于第一个问题:目前个人发现只能通过重新建库解决,建库......
  • mybatis批量插入大量数据
    Mybatis内置的ExecutorType有3种,SIMPLE、REUSE、BATCH;默认的是simple,该模式下它为每个语句的执行创建一个新的预处理语句,单条提交sql;而batch模式重复使用已经预处理的语......
  • 线程不安全问题
    使用Runnable接口实现多线程,多个线程操作同一个资源时,线程不安全,出现并发问题。如多个人同时抢票,会出现多个人抢了同一张票的问题,代码如下//多个线程同时操作同一个对象......
  • ssm学习笔记23001-mybatis接入和数据库连接实现一个插入数据的操作
    mybatis:是什么,用来干嘛的,同类竞品中有何种优势?mybatis在idea中的引入:1、创建一个空工程2、创建一个空的maven文件:会自动生成一个pox文件,打包类型配置为jar,添加mybat......
  • 线程
    介绍api文档介绍     Thread是Runnable的实现类,也可以说是其子类进程是程序从开始到结束的过程线程是进程进一步划分,是进程不同功能的具体实现 构造方法......
  • Python 多线程中的 Join Lock 和 Event
    Join函数的作用Join函数的作用主要是提供当前线程阻塞,等待线程结束后,在执行下一个线程,保护线程通畅有序执行如下当没有使用join时,主线程结束了子线程还在运行defd......
  • python-基础:线程
    3.线程安全一个线程中可以有多个子线程,且线程可以共享进程中所有的资源多个线程去操作一份资源,高概率性的发生数据混乱的情况,如下:示例importthreadingloop=1000......