ThreadPoolExecutor?
ThreadPoolExecutor是什么,先拆开来看,Thread Pool And Executor?那Thread Pool是什么?Executor又是什么?
Executor:任务执行者,只定义了一个execute方法,接收一个Runable参数。
public interface Executor {
void execute(Runnable command);
}
Thread Pool:可以缓存一定数量线程的池子。
List<WorkerThread> workers = new ArrayList<>();
那结合在一起的ThreadPoolExecutor是什么?
ThreadPoolExecutor就是一个Executor,用于执行任务。那为什么前面有ThreadPool呢?因为执行任务时,所使用的线程是缓存在一个ThreadPool中的,结合了ThreadPool和Executor的概念,所以叫ThreadPoolExecutor。
Executor为什么需要Thread Pool?
因为创建线程所需要的开销较大,进程需要向系统内核申请,然后由系统内核创建线程后分配给进程,这个过程所需的时间可能比实际任务执行的时间还要久。如果每次任务执行都创建一个新的线程,既影响系统吞吐量,也浪费了系统资源。创建一定数量的线程,将其缓存到线程池中是更好的使用方式。
Executor实例
以java.util.concurrent.ThreadPoolExecutor为例,其构造方法如下,方法中的参数就是一个Executor的核心参数。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
构造参数
- int corePoolSize: 线程池中核心线程数据数量
- int maximumPoolSize: 在同步队列写满之后,线程池中最大可创建的线程数量
- long keepAliveTime: 非核心线程的最长存活时间
- TimeUnit unit: 时间单位
- BlockingQueue
workQueue: 保存待执行任务的同步队列 - ThreadFactory threadFactory: 创建线程的工厂
- RejectedExecutionHandler handler: 当同步队列满载之后,添加新的任务时执行的拒绝策略
核心配置
在配置时,需要重点关注的是corePoolSize和BlockingQueue
- corePoolSize:针对不同的任务类型,核心线程数量的配置不同。当任务类型为CPU密集型时,可以配置为(CPU核心 - 1);当为IO密集型时,根据单个任务的执行完成时间来设置,执行时间愈长时,配置的核心线程数可以愈多,例如(CPU核心*2)。
- BlockingQueue:在提交任务时,如果线程数小于corePoolSize配置,则新建一个线程执行任务。如果核心线程已经全部创建,新提交的任务将会被添加到同步队列中暂存,然后等待线程从队列中获取任务进行执行。这涉及到队列长度问题,队列过长或者过短都会影响使用。需要根据实际业务情况进行设置,例如Executor每秒的任务处理能力、每秒的任务提交数量和持续时间。
需要注意的是在一个ThreadPoolExecutor中,不管有多少个生产者(调用者),不管有多少个消费者(线程),其都共用了一个阻塞队列。而阻塞队列中都是加锁的,ArrayBlockingQueue不管是take和put操作,都使用了同一把锁;LinkedBlockingQueue则是对take和put操作分别设置了一把锁。
应用场景
最常使用的场景应该是多任务并行处理和异步处理的时候
并行处理
在使用ThreadPoolExecutor并行处理任务时,需要先计量一下单个任务的执行时间和线程间的切换时间。如果是CPU密集型,并且任务量不多的话,直接单线程就可以了。使用多线程未必有多大的提升,除非该处理步骤确实很耗时,在排除代码原因后可以尝试多线程处理。
如果是IO密集型,并且任务量较多时,应尽量使用多线程的方式。因为不管是磁盘IO还是网络IO,处理时间一般都远大于线程切换时间和同步队列的锁开销。
异步处理
在异步处理的时候,往往是主线程提交任务后立即返回,交由Executor异步执行。这种处理方式性能极佳,但可靠性不足,适用于在允许少量数据丢失的场景下,例如在logback中,启用异步日志记录。
使用实例
ThreadPoolTaskExecutor
在SpringBoot项目中,一般是创建一个ThreadPoolTaskExecutor。其是java中ThreadPoolExecutor的封装,提供了一些功能扩展和更便捷的配置和管理,使用更方便,功能更强大。
自定义ThreadPoolTaskExecutor
@Bean("taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(7);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(20000);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("taskExecutor-");
// 添加一个任务装饰器
executor.setTaskDecorator(new ContextDecorator());
// 线程池对拒绝任务的处理策略
executor.setRejectedExecutionHandler(newThreadPoolExecutor.AbortPolicy());
executor.initialize();
return executor;
}
TaskDecorator
ThreadPoolTaskExecutor中提供的一个任务装饰器,可以在任务执行前后进行一些自定义操作,适用于需要在线程间传递消息的场景。
例如,现在有上百个REST API请求任务,为了更快的获取,将这些任务放到ThreadPoolTaskExecutor中去执行。但是在Executor中执行时发现,每个请求任务都没有成功获取到数据,而使用主线程遍历获取是没问题的。
原因是在调用REST API接口时,需要用到当前线程中的RequestContext信息,主线程中存在RequestContext信息(Auth信息),但Executor中的线程是没有的。这时就需要实现一个TaskDecorator,将主线程中的RequestContext复制到执行的子线程中了。
实现一个TaskDecorator
package com.sugon.common.config;
import org.springframework.core.task.TaskDecorator;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;
public class ContextDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
RequestAttributes context;
try{
context = RequestContextHolder.currentRequestAttributes();
} catch (IllegalStateException e) {
// 不存在请求参数时会抛出异常,这里返回一个空对象
// NonWebRequestAttributes为实现RequestAttributes接口的空对象
context = new NonWebRequestAttributes();
}
RequestAttributes finalContext = context;
return () -> {
try {
RequestContextHolder.setRequestAttributes(finalContext);
runnable.run();
} finally {
RequestContextHolder.resetRequestAttributes();
}
};
}
}
将其设置到ThreadPoolTaskExecutor中
executor.setTaskDecorator(new ContextDecorator());
并行处理
使用java8中的CompletableFuture和Executor,进行多任务的并行处理
@Autowired
@Qualifier("taskExecutor")
Executor executor;
public void parallelRun(List<Runnable> runnables) {
CompletableFuture[] futures = runnables.stream()
.map(runnable -> CompletableFuture.runAsync(runnable, executor))
.toArray(CompletableFuture[]::new);
// 等待所有任务执行成功
CompletableFuture.allOf(futures).join();
}
异步处理
提交任务后,不关心返回结果
@Autowired
@Qualifier("taskExecutor")
Executor executor;
public void asyncRun(Runnable runnable) {
// 提交后立即返回
CompletableFuture.runAsync(runnable, executor);
}
标签:Java,executor,任务,线程,Executor,ThreadPoolTaskExecutor,ThreadPoolExecutor
From: https://www.cnblogs.com/cd-along/p/18211113