目录
本人的源码阅读主要聚焦于类的使用场景,一般只在java层面进行分析,没有深入到一些native方法的实现。并且由于知识储备不完整,很可能出现疏漏甚至是谬误,欢迎指出共同学习
本文基于corretto-17.0.9源码,参考本文时请打开相应的源码对照,否则你会不知道我在说什么
简介
ThreadPoolExecutor是一个异步Executor,其背后使用线程池来执行用户提交的任务。ThreadPoolExecutor拥有很多可以配置的参数,用户可以在不同的使用场景中使用不同配置的线程池以便效率最大化。
继承结构
Executor
从上至下,ThreadPoolExeceutor首先是一个Executor,先看看Executor这个接口的说明:
Executor功能如其名字,执行器,我们可以向Executor提交任务,Executor可以保证这些任务的启动运行,但我们不需要管它背后是如何调度这些任务的执行,也就是Executor将任务的提交与任务运行解耦,使得我们使用Executor的时候只需要专注于任务本身(即专注于实现Runnable的run方法),而不需要考虑任务调度。因此,你可以实现一个最简单的同步Executor,提交后马上运行:
class DirectExecutor implements Executor {
public void execute(Runnable r) {
r.run();
}
}
也可以实现一个最简单的异步Executor,每次提交任务都开一个线程来执行:
class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) {
new Thread(r).start();
}
}
而ThreadPoolExecutor实现的是异步Executor,但它不是每个任务都开一个线程,这样无限制地创建线程开销肯定很大,因此ThreadPoolExecutor管理了一组线程,称为线程池,负责创建、复用、销毁线程,高效利用线程资源。
ExecutorService
在Executor的基础上,这个接口定义了更多的方法控制执行器和任务
-
关闭执行器
shutdown:关闭执行器,使其不能提交新任务。
shutdownNow:在shutdown的基础上尝试停止正在执行的任务,也称为terminate(shutdown指的是执行器不能提交新任务,terminate指的是已经shutdown并且停止正在运行的任务)。
-
等待执行器完全停止
awaitTermination:如果执行器被shutdown/shutdownNow后,依然有任务在执行,那么就会阻塞等待直到超时。
-
提供Future跟踪/控制单个任务
submit:提交单个任务并立即返回Future。
invokeAll:提交任务集合并等待所有任务执行结束,返回所有任务对应的Future。
invokeAny:提交任务集合并等待其中至少一个任务执行结束,返回该任务对应的Future。
ExecutorService定义了方法isTerminated检测执行器是否已经完全停止,需要注意的是,必须shutdown了并且所有任务结束了,这个isTerminated才会返回true,并且,shutdown可能是需要一定时间的,意味着调用shutdown后isTerminated不一定马上true。
AbstractExecutorService
这个抽象类实现了ExecutorService,提供如下几个方法的默认实现:submit, invokeAny, invokeAll
这个类定义了一个新方法:newTaskFor,这个方法返回一个RunnableFuture,默认的实现类为FutureTask。使用RunnableFuture的好处在于将任务执行与任务控制封装在同一个类中,submit方法的实现只需要将其传给execute方法执行并且直接返回它即可。
下面看一下doInvokeAny,这个方法是invokeAny的具体实现:
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
if (tasks == null)
throw new NullPointerException();
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
// 保存任务集合对应的futures
ArrayList<Future<T>> futures = new ArrayList<>(ntasks);
// ExecutorCompletionService实现了CompletionService接口,这个接口用于解耦任务提交与结果获取
// 这个接口不负责调度任务的执行,因此一般还封装了一个Executor实例用于执行任务
ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this);
// 下面正式开始执行任务,由于invokeAny只要一个任务完成就能返回
try {
ExecutionException ee = null;
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Iterator<? extends Callable<T>> it = tasks.iterator();
// 先提交一个任务
futures.add(ecs.submit(it.next()));
--ntasks;
int active = 1;
for (;;) {
// 检查有没有已经完成的任务
Future<T> f = ecs.poll();
// 没有已经完成的任务
if (f == null) {
// 如果还有未提交的任务,则提交
if (ntasks > 0) {
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
}
// 如果没有正在运行的任务,结束循环
else if (active == 0)
break;
// 如果有限等待
else if (timed) {
f = ecs.poll(nanos, NANOSECONDS);
// 超时
if (f == null)
throw new TimeoutException();
// 计算剩余时间
nanos = deadline - System.nanoTime();
}
// 无限等待
else
f = ecs.take();
}
// 有已经完成的任务,尝试get返回结果
// 如果有执行异常和运行时异常则记录下来,继续运行下一个任务
if (f != null) {
--active;
try {
return f.get();
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}
// 运行到这里说明每个f.get都抛异常(否则已经在循环内return了f.get)
// 这里ee == null的判断是为了兜底
if (ee == null)
ee = new ExecutionException();
// ee记录的是最后一个任务抛出的异常,将其抛出给上层
throw ee;
} finally {
// 尝试取消所有任务
cancelAll(futures);
}
}
总结一下invokeAny的细节:
- 使用了ExecutorCompletionService实现获取首个完成的任务
- 只返回没有抛异常,正常返回结果的任务对应的future
- 如果所有任务都没有正常结束(即抛了异常),那么将最后一个任务抛的异常重新抛出
- finally块会尝试取消所有任务,意味着只要有一个任务返回了结果,就会取消其他的任务并且不关心他们返回的结果
- cancelAll之所以是“尝试”取消,指的是如果任务无法检测中断,那么任务实际上还在继续运行,只不过是future的状态会改变,isDone会返回true,这个任务已经完全脱离了我们的控制
最后是invokeAll:
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
try {
// execute所有任务并记录他们的future
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f);
}
// 遍历等待每个future结束,忽略异常的抛出
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
try { f.get(); }
catch (CancellationException | ExecutionException ignore) {}
}
}
// 返回future集合,每个future.isDone都会返回true
return futures;
} catch (Throwable t) {
// 兜底,取消所有任务,保证返回的每个future.isDone都返回true
cancelAll(futures);
throw t;
}
}
有限等待的invokeAll也很简单,主要是使用f.get(timeout, unit)来实现,不展示了。
ExecutorCompletionService
AbstractExecutorService实现invokeAny使用了ExecutorCompletionService这个类,这个类实现了CompletionService接口,该接口是对「任务提交」与「结果获取」的解耦。
- 任务提交:submit方法
- 结果获取:poll / take方法
说白了就是提供了消费者方法poll/take,以任务结束的顺序获取已经结束的任务future。而任务的真正执行还得靠真正的Executor对象,因此可以看到构造方法传入了一个executor。
ExecutorCompletionService按任务结束的顺序收集任务的关键点就在于FutureTask提供的钩子函数done
。我抽取了核心的成员看一下:
public class ExecutorCompletionService<V> implements CompletionService<V> {
private final Executor executor;
// 已结束任务队列
private final BlockingQueue<Future<V>> completionQueue;
// QueueingFuture包装了真正的Future
private static class QueueingFuture<V> extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task,
BlockingQueue<Future<V>> completionQueue) {
// 注意此处,RunnableFuture本身作为一个Runnable,通过FutureTask构造函数将其任务的执行委托给QueueingFuture
super(task, null);
this.task = task;
this.completionQueue = completionQueue;
}
private final Future<V> task;
private final BlockingQueue<Future<V>> completionQueue;
// 重写FutureTask的done方法,当任务结束的一刻FutureTask内部会回调这个方法
// 然后将future添加到已结束任务队列中
protected void done() { completionQueue.add(task); }
}
// 提交任务
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
// 注意这里,用QueueingFuture包装真正的Future,传给execute执行
executor.execute(new QueueingFuture<V>(f, completionQueue));
return f;
}
public Future<V> take() throws InterruptedException {
return completionQueue.take();
}
public Future<V> poll() {
return completionQueue.poll();
}
}
核心就是:用QueueingFuture代理真正的RunnableFuture去执行任务,然后通过重写FutureTask的done方法,当任务完成的一刻将RunnableFuture保存到队列中,消费者就可以直接从队列中按照任务结束顺序获取已完成任务。
父接口和父类都讲完了,下面真正进入ThreadPoolExecutor
线程池配置
由于ThreadPoolExecutor有一大堆的可配置参数,因此先看一下这些参数。
一般情况下用户使用Executors提供的几个工厂方法构造出来的ThreadPoolExecutor就够用了:
- Executors.newCachedThreadPool:无界线程池,基本上来一个任务就创建一个新线程去处理,自动回收空闲线程
- Executors.newFixedThreadPool:固定线程个数的线程池
- Executors.newSingleThreadExecutor:单线程的线程池
当然,看一下ThreadPoolExecutor的构造函数就知道,还有其他很多可以配置的参数:
public ThreadPoolExecutor(
int corePoolSize, // 核心线程数
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 空闲线程最大存活时长
TimeUnit unit, // keepAliveTime的单位
BlockingQueue<Runnable> workQueue, // 待调度任务队列
ThreadFactory threadFactory, // 线程工厂
RejectedExecutionHandler handler // 拒绝入队的回调函数
);
-
核心(core)和最大(maximum)线程数:当提交新任务的时候,如果已创建线程数小于core线程数,那么不管已创建线程中是否有空闲线程,都会创建新的线程去执行任务。否则,如果已创建线程数小于maximum线程数,此时看已创建线程中是否有空闲线程,有的话就用空闲线程执行任务,否则创建新线程去执行。最后,如果已创建线程数已经等于maximum线程数,则不能继续创建新线程。一般这两个参数通过构造函数传入,也可以在运行时设置。如果核心和最大线程数设成一样的话,那么就是一个fixed-size的线程池。
-
线程的提前运行:默认情况下core thread先被创建好,只有新任务提交时才会启动。如果构造线程池时队列不为空,可以考虑通过prestartCoreThread或prestartAllCoreThreads方法提前运行线程。
-
线程的创建:线程池线程的创建默认是通过 Executors.defaultThreadFactory 得到的 DefaultThreadFactory 线程工厂对象来创建的,我们可以自定义线程工厂,这样我们可以进一步配置线程的名字、线程组、优先级等。
-
空闲线程最大存活时间(keep-alive time):默认情况下当已创建线程数超过核心线程数时,将会触发这个机制:如果空闲线程的空闲时间keep-alive time,将该线程销毁。也可以通过方法 allowCoreThreadTimeOut(true) 配置成只要有线程空闲时间大于keep-alive time则销毁,不必一定大于核心线程数。
-
工作队列:线程池维护一个工作队列,里面存储了还未被调度运行的任务。任务什么时候会入队与core和maximum线程数有关:
- 如果忙碌线程数 < core:新任务总是被直接使用空闲线程或创建新线程运行
- 如果忙碌线程数 >= core:新任务总是被入队等待调度运行
- 在情况2中,如果队列满了,并且如果创建新线程会导致线程数超过maximum,则任务被拒绝提交(抛异常),否则创建新线程运行任务
根据不同使用场景,一般有三种常见的队列:
- SynchronousQueue:无缓存队列,通常与maximum=Integer.MAX_VALUE结合使用实现无界线程池,使得线程数超过core时,由于不能入队而直接创建新线程。这种策略的好处在于避免了如下情况:考虑线程A依赖于线程B提供的某个结果,如果A已经运行起来,B由于入队而没有被调度运行,A也由于需要依赖B无法主动停下来,造成类似死锁的局面。
- LinkedBlockingQueue:无界队列,与无缓存队列相反,当任务之间没有依赖的时候才可以用这种策略(比如静态网页服务)。这种策略的好处在于实现了限流的效果,避免突然过高的系统负载,使得最大同时运行的任务个数限制在core线程数。
- ArrayBlockingQueue:有界队列,一般来说队列容量小就要增大线程池,但是这样可能会导致加重CPU负载、线程切换开销导致吞吐量减小;或者是队列容量大但线程池小,这样可以减少CPU负载、线程切换开销等,但也可能由于任务本身IO操作多,导致吞吐量减小。因此需要对队列容量和线程池容量进行trade-off,相比以上两种策略更加难以把控。
-
任务拒绝:任务可能会因为线程池shutdown 或者 线程数达到maximum并且队列已满而被拒绝。此时会调用RejectedExecutionHandler回调函数进行处理,默认情况下是抛出RejectedExecutionException(见ThreadPoolExecutor.AbortPolicy),如构造函数所示,用户可以自定义handler。源码还预定义了几种策略,随便看看就行。
-
钩子函数:钩子函数即模版设计模式的体现,总之就是在框架预先定义好的某个地方,钩子函数会被调用。ThreadPoolExecutor提供三个钩子函数 beforeExecute(Thread, Runnable)、afterExecute(Runnable, Throwable)、terminated(),前两个方法分别在任务开始前和任务结束后被调用,而terminated则在线程池完全终止后被调用。需要注意的是如果任务被包含在任务包装类中(比如FutureTask),那么任务抛出异常不会传入afterExecute,比如FutureTask的实现中,run方法抛出的异常最终会设置到outcome变量中,只有在get的时候才会再次抛出这个异常给用户,如果要在afterExecute检测这样的异常可以这么做:
protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); if (t == null && r instanceof Future<?> && ((Future<?>) r).isDone()) { try { // 手动调用get,检测任务抛出的异常 Object result = ((Future<?>) r).get(); } catch (CancellationException ce) { t = ce; } catch (ExecutionException ee) { t = ee.getCause(); } catch (InterruptedException ie) { // ignore/reset Thread.currentThread().interrupt(); } } // 处理异常 if (t != null) System.out.println(t); }
-
队列的控制:对于队列的任务,有的任务可能已经被用户取消,ThreadPoolExecutor提供了remove和purge方法来删除这些任务以节省内存。另外还有一个getQueue()方法直接拿到队列,但是除了debug外不建议用于生产用途。
-
线程池的GC:当没有正在运行的线程(空闲线程也算运行线程)并且线程池不被引用时,就算没有shutdown,线程池也会被GC。
最后,源码注释还给了一个PausableThreadPoolExecutor作为扩展ThreadPoolExecutor的例子,实现了可暂停的线程池:在线程池暂停的状态下,所有还未开始执行的任务会等待线程池resume才能开始执行。(当然,作为小例子,并不是特别严谨,看看就行)
class PausableThreadPoolExecutor extends ThreadPoolExecutor {
private boolean isPaused;
private ReentrantLock pauseLock = new ReentrantLock();
private Condition unpaused = pauseLock.newCondition();
public PausableThreadPoolExecutor(...) {
super(...);
}
// 重写beforeExecute,阻塞等待isPaused=true
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
pauseLock.lock();
try {
while (isPaused) unpaused.await();
} catch (InterruptedException ie) {
t.interrupt();
} finally {
pauseLock.unlock();
}
}
public void pause() {
pauseLock.lock();
try {
isPaused = true;
} finally {
pauseLock.unlock();
}
}
public void resume() {
pauseLock.lock();
try {
isPaused = false;
unpaused.signalAll();
} finally {
pauseLock.unlock();
}
}
}
代码分析
了解了线程池的各个参数之后,接下来看看如何用Doug Lea是如何实现线程池的。
成员变量
成员变量比较多,下面按照变量的含义进行分块讲解。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int RUNNING = -1 << COUNT_BITS; // 使用中
private static final int SHUTDOWN = 0 << COUNT_BITS; // shutdown之后
private static final int STOP = 1 << COUNT_BITS; // shutdownNow之后,此时队列已清空
private static final int TIDYING = 2 << COUNT_BITS; // 队列和线程池都为空时
private static final int TERMINATED = 3 << COUNT_BITS; // 钩子函数terminated被调用后
/*
runState的值就五种状态的数值上来看,只会递增不会递减,所有可能的状态转换为:
RUNNING -> SHUTDOWN
shutdown调用后
(RUNNING or SHUTDOWN) -> STOP
shutdownNow调用后
SHUTDOWN -> TIDYING
队列和线程池都清空后
STOP -> TIDYING
线程池清空后(在STOP时队列已经为空)
TIDYING -> TERMINATED
terminated函数被调用后
*/
ctl
是用来表示线程池状态的,可以说是线程池的大脑。ctl是一个32位原子变量,存储了两个状态:
- 高3位(runState):表示当前线程池的状态,一共五种状态
- 低29位(workerCount):表示当前线程池中的线程数
// 待调度任务队列
private final BlockingQueue<Runnable> workQueue;
// 保护下面的变量,并且线性化某些操作,比如线性化interruptIdleWorkers避免中断风暴
private final ReentrantLock mainLock = new ReentrantLock();
// 线程池中的线程集合
private final HashSet<Worker> workers = new HashSet<>();
// 用于awaitTermination函数
private final Condition termination = mainLock.newCondition();
// 保存至今线程池中同时存在线程数的最大数量,用于数据统计
private int largestPoolSize;
// 保存至今已结束的任务数量,用于数据统计
private long completedTaskCount;
以上几个变量都比较好理解,其中BlockingQueue本身是thread-safe的,不用mainLock保护。mainLock下面的几个变量都需要在mainLock的范围内访问。最后两个变量不用怎么关注,没有什么实际的作用,只是用来方便用户进行数据统计的(用于debug、打日志啥的)。
/*
* All user control parameters are declared as volatiles so that
* ongoing actions are based on freshest values, but without need
* for locking, since no internal invariants depend on them
* changing synchronously with respect to other actions.
*/
private volatile ThreadFactory threadFactory;
private volatile RejectedExecutionHandler handler;
private volatile long keepAliveTime;
private volatile boolean allowCoreThreadTimeOut;
private volatile int corePoolSize;
private volatile int maximumPoolSize;
这几个变量对应上一小节所述线程池的配置,并且这几个变量只需要volatile保持可见性,不用加锁保护。
注意到源码在这几个变量上面写了一段注释,用来说明为什么不用加锁访问这些变量,我最初看不懂并且纠结了半天:
其中invariant这个词意味着某个东西的属性是必须维持不变的(与一般意义上的常量区分开来),比如二叉搜索树的左子节点值小于本节点这个特性是必须维持不变,否则就不是一颗正确的二叉搜索树。所以这句话我猜大概意思是,线程池具有的某些invariant并不依赖于「这些变量的同步访问(即加锁访问)」,换句话说就是「这些变量不加锁访问也不会破坏线程池的invariant」,比如threadFactory只有两个地方赋值:构造函数和setter,看一下setter:
// 在set之前构造函数已经保证了threadFactory不为null(用户不指定的话,会使用Executors.defaultThreadFactory)
public void setThreadFactory(ThreadFactory threadFactory) {
// 在运行中进行set,在赋值前也会判断null,此举可以看作已经保证了invariant
if (threadFactory == null)
throw new NullPointerException();
// volatile写,保证可见性使得用户指定的threadFactory生效
this.threadFactory = threadFactory;
}
再对比一下需要加锁的那些变量来看,比如workers,由于HashMap不是thread-safe的,如果不加锁访问的话可能会由于多线程写而导致某些worker没有被保存上。再比如largestPoolSize,它的更新依赖于当前值,因此涉及一次读和一次写,也是非原子性的:
// 这是addWorker函数里的代码片段
// 当多线程addWorker的时候,如果没有加锁,可能会造成:
// 1. workers这个HashMap混乱,比如少保存一个worker,破坏了workers记录的是所有线程的这个invariant
// 2. largestPoolSize最终为「次」最大池大小,破坏了largetPoolSize记录的是「最」最大池大小的这个invariant
workers.add(w);
workerAdded = true;
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workers或largetstPoolSize的invariant被破坏了,相当于ThreadPoolExecutor的invariant被破坏了,这就好比二叉搜索树的某节点的值小于其左字节点的值,该节点的invariant被破坏了,进而导致整棵树的invariant都被破坏了。
「破坏invariant」换人话来说就是「出bug了」,因此那段注释其实就是强调了一下不用加锁来同步这些变量的访问,ThreadPoolExecutor也能正确运行。所以invariant这个词初看很抽象,仔细体会后又会觉得没有其他比他更适合的词来形容了。
最后来看看Worker这个类:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
// 绑定的工作线程
final Thread thread;
// 将要运行的任务
Runnable firstTask;
// 该worker已经完成的任务数量
volatile long completedTasks;
// 传入要执行的任务,并且通过threadFactory创建工作线程来执行这个任务
Worker(Runnable firstTask) {
// setState(-1)
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
// Worker作为Runnable运行任务,委托给外部类实现
public void run() {
runWorker(this);
}
// Worker作为一个AQS,提供锁和等待队列机制
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
Worker继承自AQS,AQS的state定义为:
- state=-1:刚初始化Worker,还没启动线程,不可中断(线程都没有怎么中断)
- state=0:线程已启动,未加锁,表示线程空闲,可以被中断
- state=1:线程已启动,已加锁,表示线程忙碌,不可被中断(当然也可能是因为正在进行中断逻辑)
加锁表示线程正在执行任务,防止被中断(因为中断前要加锁,具体看interruptIdleWorkers)。实际上也可以不继承AQS,但是需要自己定义一个volatile变量表示Worker的状态(忙碌/空闲),并加锁访问,但AQS都帮你做好了,state本就是volatile。因此AQS在这里干了两件事:加锁、状态表示。
方法
简单的方法就不解析了
// 将状态更新为max(当前runState, targetState)
private void advanceRunState(int targetState);
// 中断所有工作线程
private void interruptWorkers();
// 中断空闲的工作线程
private void interruptIdleWorkers(boolean onlyOne);
// 清空工作队列,并返回所有元素
private List<Runnable> drainQueue();
// 运行worker,执行任务,返回worker是否被启动
private boolean addWorker(Runnable firstTask, boolean core) {
// retry这段用于判断是否可以运行任务,可以的话workerCount+1
retry:
for (int c = ctl.get();;) {
// 如果以下条件满足一个,则返回false:
// 1. 状态为STOP及以上
// 2. 状态为SHUTDOWN 并且 没有待运行任务
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
for (;;) {
// 如果已有线程数>=最大线程数,则返回false
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
// 增加一个线程数
if (compareAndIncrementWorkerCount(c))
break retry;
// CAS失败,说明线程池状态改变,应该重新走外层循环
c = ctl.get();
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
ThreadPoolExecutor.Worker w = null;
// 创建worker,运行任务
try {
w = new ThreadPoolExecutor.Worker(firstTask);
final Thread t = w.thread;
// 如果新建线程成功
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 在全局加锁下,检查状态,确保worker可以被添加
int c = ctl.get();
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
// 将worker加入线程池,更新最大线程池大小
workers.add(w);
workerAdded = true;
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
}
} finally {
mainLock.unlock();
}
// 如果worker成功添加,那么启动worker
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 如果worker没启动
if (!workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
// 启动worker失败,移除worker,并且尝试终止线程池
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
看一下tryTerminate:
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 如果:
// 1. RUNNING
// 2. 至少为TIDYING
// 3. 小于STOP且队列不为空
// 则返回
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateLessThan(c, STOP) && ! workQueue.isEmpty()))
return;
// 此时是可以终止的,但如果线程池不为空,则中断一个空闲线程并返回
if (workerCountOf(c) != 0) {
interruptIdleWorkers(ONLY_ONE);
return;
}
// 下面CAS设置TERMINATED
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 设置为TIDYING,执行terminated后设置为TERMINATED
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
// 通知awaitTermination线程池已终止,没有线程正在执行任务了
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
worker绑定的线程启动之后,执行run方法,最终执行runWorker方法:
// 不断从队列中获取任务并执行
final void runWorker(ThreadPoolExecutor.Worker w) {
// 将worker的任务取出
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
// getTask获取任务
while (task != null || (task = getTask()) != null) {
// worker加锁禁止中断,但加锁前可能已经由于shutdownNow被中断,因此下面还需要进一步判断
w.lock();
// 状态小于STOP的话就清除中断,大于等于STOP的话就保留中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
try {
// 执行任务
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
// task.run和afterExecute抛出的异常由线程的UncaughtExceptionHandler处理
throw ex;
}
} finally {
// 执行完毕,解锁允许中断
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 该worker(工作线程)的生命到此结束
processWorkerExit(w, completedAbruptly);
}
}
runWorker通过getTask方法不断获取任务执行,如果返回null则completedAbruptly = false正常结束worker生命周期,看一下getTask这个函数,getTask负责获取任务交给worker执行,如果返回null表示没有任务再给这个worker,它可以正常结束了。
// 返回null的原因如下:
// 1. worker的数量超过了maximumPoolSize
// 2. STOP,就算队列还有任务也不会再执行
// 3. SHUTDOWN并且队列没有任务了
// 4. keep-alive超时
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
// 情况2,3
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 情况1,4,这里的if意思是:发生情况1或4的话,还必须满足队列空或者workerCount>1,如果两个都不满足
// 说明workerCount=1并且队列不空,那么把当前worker干掉的话,队列的任务就没worker来执行了。
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
// 获取任务,并判断是否超时
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
可以看到runWorker中整个while循环都没有catch,说明任务抛出的任何异常都会导致completedAbruptly=true,异常交由线程的UncaughtExceptionHandler处理,worker的收尾工作交由processWorkerExit处理:
// 清理当前worker,并且如果当前worker是因为用户任务而异常结束,那么判断是否需要开一个新的worker来替代它来继续完成线程池的工作
private void processWorkerExit(ThreadPoolExecutor.Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
// 清理该worker
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
// 尝试终止线程池
tryTerminate();
int c = ctl.get();
// 下面判断是否需要开启新的worker
if (runStateLessThan(c, STOP)) {
// 如果worker异常结束,才可能需要新worker继续执行未开始执行的任务
if (!completedAbruptly) {
// 如果core允许超时回收并且队列不空的话,至少保留一个worker
// 否则如果core不允许超时回收并且目前线程数小于core,那么也可以开新的worker
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return;
}
// 开启新的worker
addWorker(null, false);
}
}
值得注意的是,如果worker异常结束,开一个新worker来替代它看起来没什么问题,但转念一想为什么不直接让当前worker继续跑呢,这样不是运行效率会更好吗,不用开启一个新的线程,也不用new一个新的worker。不可以,因为任务是由线程执行,任务抛异常了,如果用户没处理异常的话,需要触发线程的UncaughtExceptionHandler,然后这个线程的生命周期就算是到尽头了,线程池没理由把这个线程的这个异常吃掉,否则,用户肯定会很懵逼:我TM异常呢?
OK,内部的流程都走完一遍了,接下来看看线程池的公有方法逻辑:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 线程数小于core直接开新worker
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 尝试加入队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 加入队列后可能线程池突然关闭,做double-check,并且移除任务成功后拒绝任务
if (!isRunning(recheck) && remove(command))
reject(command);
// 如果没有worker,那么就开新worker
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果加入队列失败,那么就开新worker
else if (!addWorker(command, false))
reject(command);
}
shutdown:
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 将状态改变为SHUTDOWN,中断所有空闲worker
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
shutdownNow:
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 将状态改变为STOP,中断所有worker
advanceRunState(STOP);
interruptWorkers();
// 清空队列
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
// 告知用户这些任务还没运行,线程池关闭了
return tasks;
}
shutdown与shutdownNow的区别在:
- shutdownNow清空队列,shutdown没有清空队列,说明队列中的任务还会被执行完
- shutdownNow中断所有worker,shutdown只中断空闲worker
再看一下isTerminating这个函数:
public boolean isTerminating() {
int c = ctl.get();
return runStateAtLeast(c, SHUTDOWN) && runStateLessThan(c, TERMINATED);
}
分析到这里,runState的几个状态就十分清晰了:
- RUNNING:线程池正在运行
- SHUTDOWN:调用了shutdown的状态,目的是拒绝新任务的提交,已经提交的任务(包括了正在执行的以及队列中的)最终都会被执行完
- STOP:调用了shutdownNow的状态,目的是在SHUTDOWN的基础上,尝试停止一切正在执行的任务以及清空还在队列等待的任务,但是正在执行的任务可能不响应中断,因此shutdownNow之后可能还有在执行的任务
- TIDYING:在STOP的基础上,已经确保所有任务已经执行完了,即线程池已经清空,再执行一个terminated函数就正式终止
- TERMINATED:TIDYING的基础上执行了terminated函数后的状态
注意到TIDYING状态的转换只有tryTerminate可以做到,并且不一定成功,因为就如之前所说正在执行的任务可能不响应中断,每次结束一个worker或者从队列移除任务的时候都得tryTerminate一下。
然后看一下awaitTermination这个函数,结合tryTerminate很容易理解:
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 如果不是TERMINATED
while (runStateLessThan(ctl.get(), TERMINATED)) {
if (nanos <= 0L)
return false;
// 在Condition上等待tryTerminate成功时唤醒(或者超时)
nanos = termination.awaitNanos(nanos);
}
return true;
} finally {
mainLock.unlock();
}
}
总结
边看边写两三天终于写完了...
在看线程池之前我比较好奇“空闲线程”的判断是怎么实现的,万万没想到是借助了AQS。还有“空闲线程”是如何“空闲”等待任务的,想过可能是使用await/notify的方式,其实是BlockingQueue.poll
另外Future这个东西也很妙,对执行器几乎0侵入性,就能实现了精确对任务执行结果进行获取。
还有ExecutorCompletionService这个类,相当于是Executor的一个适配器,通过重写Future的钩子函数done,使得Executor具有了按任务完成顺序获取结果的功能。如果没有这个适配器的话,一般的方式下只能先获取所有任务的future,然后再循环get这些future,但这样就无法做到按完成顺序获取结果了。至于为什么像done, beforeExecute, afterExecute这种钩子函数不设计成回调函数,我就不清楚了。
参考链接
「StackOverflow」What is an invariant?
标签:队列,worker,private,任务,源码,线程,阅读,null,ThreadPoolExecutor From: https://www.cnblogs.com/nosae/p/17994874