package com.javacode2022.util;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* @author:lijinhao
* @date:2022/8/31 21:38
* @useful:
* @modify
* ===================================================
* modifier modifytime description
* ===================================================
*/
@Slf4j
public class TaskDisposeUtils {
public static final int POOL_SIZE;
static {
POOL_SIZE = Integer.max(Runtime.getRuntime().availableProcessors(), 5);
}
/**
* @author:lijinhao
* @date:2022/8/31 22:19
* @method:dispose: 并行处理,并等待结束
* @param taskList 任务列表
* @param consumer 消费者
* @class:com.javacode2022.util.TaskDisposeUtils
* @return:void
*/
public static <T> void dispose(List<T> taskList, Consumer<T> consumer) throws InterruptedException {
dispose(true, POOL_SIZE, taskList, consumer);
}
/**
* @author:lijinhao
* @date:2022/8/31 22:05
* @method:dispose: 并行处理,并等待结束
* @param moreThread 是否多线程执行
* @param poolSize 线程池大小
* @param taskList 任务列表
* @param consumer 消费者
* @class:com.javacode2022.util.TaskDisposeUtils
* @return:void
*/
public static <T> void dispose(boolean moreThread, int poolSize, List<T> taskList, Consumer<T> consumer) throws InterruptedException {
log.info("参数信息:moreThread = {},poolSize = {},taskList = {},consumer = {}", moreThread, poolSize, taskList,
consumer);
if (CollectionUtils.isEmpty(taskList)) {
return;
}
if (moreThread && poolSize > 1) {
poolSize = Math.min(poolSize, taskList.size());
ExecutorService executorService = null;
try {
executorService = Executors.newFixedThreadPool(poolSize);
CountDownLatch countDownLatch = new CountDownLatch(taskList.size());
for (T item : taskList
) {
executorService.execute(() -> {
try {
consumer.accept(item);
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();
} finally {
if (executorService != null) {
executorService.shutdown();
}
}
} else {
for (T item : taskList
) {
consumer.accept(item);
}
}
}
public static void main(String[] args) throws InterruptedException {
// 生成1-10的10个数字,放到list中,相当于10个任务
List<Integer> list = Stream.iterate(1, a -> a + 1).limit(10).collect(Collectors.toList());
TaskDisposeUtils.dispose(list, item -> {
try {
long startTime = System.currentTimeMillis();
TimeUnit.SECONDS.sleep(item);
long endTime = System.currentTimeMillis();
log.info("任务【{}】执行完毕,耗时:{}", item, endTime - startTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
log.info("{}中的任务处理完毕!", list);
}
}
标签:taskList,JAVA,java,poolSize,util,线程,import,工具,consumer
From: https://www.cnblogs.com/shanzhidian/p/17023621.html