@Configuration
@Slf4j
public class ThreadPoolConfig {
private static final int CORE_POOL_SIZE = 6;
private static final int MAX_POOL_SIZE = 12;
private static final int KEEP_ALIVE_TIME = 60;
private static final int QUEUE_CAPACITY = 200;
private static final String THREAD_NAME_PREFIX = "completeFutureTaskExecutor-";
private static final int AWAIT_TERMINATION_SECONDS = 120;
@Bean("completeFutureTaskExecutor")
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(CORE_POOL_SIZE);
executor.setMaxPoolSize(MAX_POOL_SIZE);
executor.setQueueCapacity(QUEUE_CAPACITY);
executor.setKeepAliveSeconds(KEEP_ALIVE_TIME);
executor.setThreadNamePrefix(THREAD_NAME_PREFIX);
executor.setAwaitTerminationSeconds(AWAIT_TERMINATION_SECONDS);
executor.setDaemon(Boolean.TRUE);
// 线程池对拒绝任务的处理策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
@Bean("pmCompletableFutureExecutor")
public Executor pmCompletableFutureExecutor() {
final int parallelism = ForkJoinPool.getCommonPoolParallelism() * 3;
log.info("加载自定义ForkJoinPool线程池, 线程池并行线程数量:{}, 线程数量为ForkJoinPool.getCommonPoolParallelism()*3", parallelism);
ForkJoinPool buyForkJoinPool = new ForkJoinPool(parallelism);
CompletableFutureUtils.load(buyForkJoinPool);
return buyForkJoinPool;
}
}
使用ForkJoinPool可以在有限的线程数下来完成非常多的具有父子关系的任务,比如使用4个线程来完成超过2000万个任务。但是使用ThreadPoolExecutor是不可能的,因为ThreadPoolExecutor中的线程无法选择优先执行子任务,要完成2000万个具有父子关系的任务时,就需要2000万个线程,这样会导致ThreadPoolExecutor的任务队列撑满或创建的最大线程数把内存撑爆直接gg。
ForkJoinPool最适合计算密集型任务,而且最好是非阻塞任务,之前的一篇文章:Java踩坑记系列之线程池 也说了线程池的不同使用场景和注意事项。
所以ForkJoinPool是ThreadPoolExecutor线程池的一种补充,是对计算密集型场景的加强。
工作窃取的实现原理
每个线程都有自己的双端队列
当调用fork方法时,将任务放进队列头部,线程以LIFO顺序,使用push/pop方式处理队列中的任务
如果自己队列里的任务处理完后,会从其他线程维护的队列尾部使用poll的方式窃取任务,以达到充分利用CPU资源的目的
从尾部窃取可以减少同原线程的竞争
当队列中剩最后一个任务时,通过cas解决原线程和窃取线程的竞争
-
工作窃取便是ForkJoinPool线程池的优势所在,在一般的线程池比如ThreadPoolExecutor中,如果一个线程正在执行的任务由于某种原因无法继续运行,那么该线程会处于等待状态,包括
singleThreadPool
,fixedThreadPool
,cachedThreadPool
这几种线程池。而在ForkJoinPool中,那么线程会主动寻找其他尚未被执行的任务然后窃取过来执行,减少线程等待时间。
JDK8中的并行流(parallelStream)功能是基于ForkJoinPool实现的,另外还有
java.util.concurrent.CompletableFuture
异步回调future,内部使用的线程池也是ForkJoinPool。