//新建线程池
ThreadPoolExecutor cpuThreadPoolExecutor = ThreadUtil.getCpuThreadPoolExecutor();
//使用CountdoLatch final CountDownLatch countDownLatch = new CountDownLatch((int) (selectIpInfoParam.getIpValue2() - selectIpInfoParam.getIpValue1() + 1));
//循环的时候使用线程池执行 for (long i = selectIpInfoParam.getIpValue1(); i <= selectIpInfoParam.getIpValue2(); i++) { Ip finalIp = ip; long finalI = i; cpuThreadPoolExecutor.execute(() -> { getIpDetail(finalIp, ipDetails, item00, item01, netType, finalI, map); countDownLatch.countDown(); }); } try {
//使用countdoLatch 来停顿主线程,直到数值达到零为止 countDownLatch.await(); if (!CollectionUtils.isEmpty(ipDetails)) {
//批量保存可以设置每次存储更多,默认是1000 ipDetailService.saveBatch(ipDetails, 10000); } } catch (Exception e) { throw new ApiException(500, "服务异常!"); }
&rewriteBatchedStatements=true :设置在数据库连接后面,可以使保存批量更快速
package com.byd.plm.common.util;
import org.springframework.util.CollectionUtils;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import java.util.Collection;
import java.util.concurrent.*;
import java.util.function.Consumer;
/**
* ThreadUtil
* 线程工具类
*
* @author ZeXin.Chen
* @since 2022/9/30 17:51
*/
public final class ThreadUtil {
private ThreadUtil() {
}
/**
* 最大线程上并非越大越快,别瞎整,需要实际调试
*/
private static final int MAX_THREAD_COUNT = 15;
/**
* CPU 数量
*/
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
public static <T> ThreadPoolExecutor getIOThreadPoolExecutor() {
return getThreadPoolExecutor(CPU_COUNT << 4, CPU_COUNT << 5, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), Executors.defaultThreadFactory());
}
public static <T> ThreadPoolExecutor getCpuThreadPoolExecutor() {
return getThreadPoolExecutor(CPU_COUNT + 1, CPU_COUNT *2, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), Executors.defaultThreadFactory());
}
private static <T> ThreadPoolExecutor getThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit timeUnit, BlockingQueue<Runnable> queue, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, queue, threadFactory);
}
public static <T> void execute(ThreadPoolExecutor poolExecutor, Collection<T> collection, Consumer<T> consumer, int threadCount) {
// 空集合
if (CollectionUtils.isEmpty(collection)) {
return;
}
// 处理数量小于等于2 不开启子线程
if (collection.size() <= 2) {
collection.forEach(consumer);
return;
}
// 将request设置为子线程共享
ServletRequestAttributes sra = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
CountDownLatch countDownLatch = new CountDownLatch(threadCount);
BlockingQueue<T> queue = new LinkedBlockingQueue<>(collection);
for (int a = 0; a < threadCount; a++) {
poolExecutor.execute(() -> {
try {
RequestContextHolder.setRequestAttributes(sra);
while (true) {
T t = queue.poll();
if (null == t) {
break;
}
consumer.accept(t);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
RequestContextHolder.resetRequestAttributes();
countDownLatch.countDown();
}
});
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static <T> void execute(ThreadPoolExecutor poolExecutor, Collection<T> collection, Consumer<T> consumer) {
// 启用线程数量
int threadCount = Math.min((collection.size() >>> 1) + 1, MAX_THREAD_COUNT);
execute(poolExecutor, collection, consumer, threadCount);
}
}
标签:int,collection,插入,线程,countDownLatch,static,ThreadPoolExecutor From: https://www.cnblogs.com/orangeJuiceRain/p/17144369.html