线程池核心类 ThreadPoolExecutor
,通过池化思想来维护线程的创建与消费
使用线程池的好处
提高任务执行的响应速度,降低资源消耗。任务执行时,直接立即使用线程池提供的线程运行,避免了临时创建线程的CPU/内存开销,达到快速响应的效果。
提高线程的可管理性。线程总数可预知,避免用户主动创建无限多线程导致死机风险,还可以进行线程统一的分配、调优和监控。
避免对资源的过度使用。在超出预期的请求任务情况,响应策略可控。
要想使用线程池,自然是要理解其接口的。一般我们使用ExecotorService
进行线程池的调用
整体的接口如下:
常用接口:
submit
(Runnable task): 提交一个无需返回结果的任务。
submit
(Callable
invokeAny
(Collection<? extends Callable
invokeAll
(Collection<? extends Callable
shutdown()
: 关闭线程池。
awaitTermination
(long timeout, TimeUnit unit): 等待关闭结果,最长不超过timeout时间。
线程池生命周期
线程池流程图
线程池本质上对应生产者消费者模型:
- 生产者:调用
execute()
方法提交任务的线程 - 消费者:线程池中的
Worker
,不断循环获取阻塞队列中的任务 - 中间层:阻塞队列,用于存放任务,将生产者和消费者解耦,生产者(线程)只管生产,消费者(
Worker
线程)只管消费
生命周期
线程池运行的状态,并不是用户显式设置的,而是伴随着线程池的运行,由内部来维护。线程池内部使用一个变量维护两个值:运行状态(runState)和线程数量 (workerCount)。在具体实现中,线程池将运行状态(runState)、线程数量 (workerCount)两个关键参数的维护放在了一起,如下代码所示:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ctl
这个AtomicInteger类型,是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它同时包含两部分的信息:线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),高3位保存runState,低29位保存workerCount,两个变量之间互不干扰。用一个变量去存储两个值,可避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源。通过阅读线程池源代码也可以发现,经常出现要同时判断线程池运行状态和线程数量的情况。线程池也提供了若干方法去供用户获得线程池当前的运行状态、线程个数。这里都使用的是位运算的方式,相比于基本运算,速度也会快很多。
关于内部封装的获取生命周期状态、获取线程池线程数量的计算方法如以下代码所示:
private static int runStateOf(int c) { return c & ~CAPACITY; } //计算当前运行状态
private static int workerCountOf(int c) { return c & CAPACITY; } //计算当前线程数量
private static int ctlOf(int rs, int wc) { return rs | wc; } //通过状态和线程数生成ctl
线程池状态
RUNNING
:接受新任务并处理任务队列的任务SHUTDOWN
:不接受新任务,但处理任务队列的任务STOP
:不接受新任务,不处理任务队列的任务,并中断正在进行的任务TIDYING
:所有任务都已终止,workerCount
为 0,线程转换为TIDYING
状态,将运行terminated()
钩子方法TERMINATED
:terminate()
方法执行完成
状态转换
RUNNING -> SHUTDOWN
调用 shutdown() 方法,finalize() 方法被调用时也会调用 shutdown() 方法
(RUNNING or SHUTDOWN) -> STOP
调用 shutdownNow() 方法
SHUTDOWN -> TIDYING
任务队列和线程列表都为空
STOP -> TIDYING
线程列表都为空
TIDYING -> TERMINATED
terminated() 执行完成
源码分析
构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime); // 根据时间和单位,算出 ns的时间
this.threadFactory = threadFactory;
this.handler = handler;
}
构造方法参数
int corePoolSize
:核心线程池大小int maximumPoolSize
:最大线程池大小,必须大于等于核心线程池long keepAliveTime
:非核心线程池的最大存活时间;但如果允许CoreThreadTimeOut,那么线程数量即使没超过corePoolSize(这些都是核心线程),空闲线程超时后也会被回收。TimeUnit unit
:存活时间的时间单位BlockingQueue<Runnable> workQueue
:任务阻塞队列ThreadFactory threadFactory
:线程工厂,传入Runnable
返回一个新Thread
对象,有默认的Executors.defaultThreadFactory
RejectedExecutionHandler
handler:拒绝策略处理器AbortPolicy
: 直接抛出异常CallerRunsPolicy
: 让调用execute
方法的线程执行任务DiscardOldestPolicy
: 丢弃最先入队的任务DiscardPolicy
: 丢弃当前任务
execute() 提交任务过程
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 分支 1: 当前线程数小于 corePoolSize
if (workerCountOf(c) < corePoolSize) {
// 新增核心线程
if (addWorker(command, true))
return;
c = ctl.get();
}
// 分支 2: 将任务提交到任务队列(此时线程数大于等于 corePoolSize)
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 重新校验线程池(如果线程池正在停止,那么移除任务)
// 移除成功,执行拒绝策略
if (!isRunning(recheck) && remove(command))
// 执行拒绝策略(由于线程池正在停止)
reject(command);
// 线程数为 0(此时任务队列中有任务,需要创建一个非核心线程池)
// 例如我们将 corePoolSize 设置成 0,就会进入此逻辑
else if (workerCountOf(recheck) == 0)
// 新增非核心线程
addWorker(null, false);
}
// 分支 3: 新增非核心线程(此时任务队列已满)
// 新增失败,执行拒绝策略
else if (!addWorker(command, false))
// 执行拒绝策略(由于线程数大于等于 maximumPoolSize)
reject(command);
}
// submit 调用的还是 execute
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
// 将 task 包装成 FutureTask
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
通过上面这一小段代码,我们就已经完整地看到了。通过一个 ctl 变量进行全局状态控制,从而保证了线程安全性。整个框架并没有使用锁,但是却是线程安全的。
整段代码刚好完整描述了线程池的执行流程:
1、判断核心线程池是否已满,如果不是,则创建线程执行任务;
2、如果核心线程池满了,判断队列是否满了,如果队列没满,将任务放在队列中;
3、如果队列满了,则判断线程池是否已满,如果没满,创建线程执行任务;
4、如果线程池也满了,则按照拒绝策略对任务进行处理;
添加新的worker
一个worker,即是一个工作线程。
private boolean addWorker(Runnable firstTask, boolean core) {
retry://该循环在检测线程池状态的前提下,和线程数量限制的前提下,尝试增加线程数量
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果状态已经不是running了
if (rs >= SHUTDOWN &&
// 三者都成立,括号内才返回true,括号外才返回false,函数才不会直接return
// 三者只要有一个不成立,那么addWorker将直接返回false
! (rs == SHUTDOWN && //当前是SHUTDOWN状态
firstTask == null && //传入参数是null(非null说明是新task,但已经SHUTDOWN所以不能再添加)
! workQueue.isEmpty())) //队列非空
//即有三种情况会造成addWorker直接false,不去新起线程了;还有一种特殊情况,addWorker会继续执行。
//1. 如果当前是STOP以及以后的状态(肯定不需要新起线程了,因为task队列为空了)
//2. 如果当前是SHUTDOWN,且firstTask参数不为null(非RUNNING状态线程池都不可以接受新task的)
//3. 如果当前是SHUTDOWN,且firstTask参数为null,但队列空(既然队列空,那么也不需要新起线程)
//1. 如果当前是SHUTDOWN,且firstTask参数为null,且队列非空(特殊情况,需要新起线程把队列剩余task执行完)
return false;
//此时说明,需要新起线程(状态为RUNNING或SHUTDOWN)
for (;;) {
int wc = workerCountOf(c);
//如果线程数量超过最大值
if (wc >= CAPACITY ||
//如果线程数量超过特定值,根据core参数决定是哪个特定值
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))//CAS尝试+1一次
//如果CAS成功,说明这个外循环任务完成,退出大循环
break retry;
//CAS修改失败可能是:线程数量被并发修改了,或者线程池状态都变了
c = ctl.get();
if (runStateOf(c) != rs) //再次检查当前线程池状态,和当初保存的线程池状态
continue retry; //如果改变,那么continue外循环,即重新检查线程池状态(毕竟线程池状态也在这个int里)
// 如果只是 线程数量被并发修改了,那么接下来会继续内循环,再次CAS增加线程数量
}
}
//此时,crl的线程数量已经成功加一,且线程池状态保持不变(相较于函数刚进入时)
boolean workerStarted = false;//新线程是否已经开始运行
boolean workerAdded = false;//新线程是否已经加入set
Worker w = null;
try {
w = new Worker(firstTask);//构造器中利用线程工厂得到新线程对象
final Thread t = w.thread;//获得这个新线程
if (t != null) {//防止工厂没有创建出新线程
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();//加锁不是为了ctl,而是为了workers.add(w)
try {
// 需要再次检查状态
int rs = runStateOf(ctl.get());
//如果线程还是运行状态
if (rs < SHUTDOWN ||
//如果线程是SHUTDOWN,且参数firstTask为null
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // 新线程肯定是还没有开始运行的
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {//加入集合成功,才启动新线程
t.start();//这里启动了新线程
workerStarted = true;
}
}
} finally {
//只要线程工厂创建线程成功,那么workerStarted为false只能是因为线程池状态发生变化,且现在一定不是running状态了
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
创建 Worker 失败的原因:
1、在添加时线程池被停止了
2、添加核心线程池时,超过核心线程池数量
3、添加非核心线程池时,超过最大线程池数量
4、Worker 对象初始化未完成时就被使用,即 thread 对象还未完全创建
5、当前线程正在被运行(防止出现重复运行,抛出异常)
6、线程创建过多,导致 OOM
想要新起线程,就必须将ctl的线程数量部分加1。但这还有个大前提,那就是线程状态必须符合下面两种情况,才需要新起线程。
1.状态为RUNNING。
2.状态为SHUTDOWN,且task队列非空,这种情况需要新起线程来执行剩余task。当然,firstTask参数必须null,因为此时不允许处理新task了。并且如果firstTask参数为null,那么新起线程将会从task队列中取出一个来执行,这就达到了 新起线程来执行剩余task 的目的。
ctl的线程数量部分成功加1后,就需要创建新线程,并启动线程。
利用线程工厂创建Worker,Worker里包含了新线程。
检测线程池状态没有发生变化后(符合上面两种情况),将Worker加入集合,启动新线程。
检测线程池状态发生变化后,那新线程不能被启动,则需要把已创建的Worker从集合中移除,并且把ctl的线程数量部分再减1(其实就是addWorkerFailed)。
addWorkerFailed回滚Worker
此函数把已创建的Worker从集合中移除(如果存在的话),并且把ctl
的线程数量部分再减1。不过前面也说过,调用addWorkerFailed
的前提很可能是线程池状态发生了变化,所以这里需要tryTerminate
。
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)//如果集合存在,移除它
workers.remove(w);
decrementWorkerCount();//循环CAS,直到成功减一
tryTerminate();//帮忙terminate
} finally {
mainLock.unlock();
}
}
tryTerminate 帮助Terminate
此函数是用来帮助线程池终止的,可以帮助的情况为:
- 当前SHUTDOWN,且线程数量和任务队列大小都为0。
- 当前STOP(隐含了任务队列大小为0),且线程数量为0。
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||//当前还是Running,那么根本不应该tryTerminate
runStateAtLeast(c, TIDYING) ||//已经是TIDYING,那么不需要当前线程去帮忙了
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))//虽然SHUTDOWN,但任务队列非空
return;
//1. 状态为SHUTDOWN,任务队列为空,继续尝试
//2. 状态为STOP
//如果线程数量都不是0,那么肯定不能tryTerminate,直接返回
if (workerCountOf(c) != 0) {
//中断一个空闲线程,中断状态会自己传播
interruptIdleWorkers(ONLY_ONE);
return;
}
//说明线程数量确实为0。现在已经满足可以帮助的条件
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//CAS更新为TIDYING状态
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();//空实现
} finally {
//谁变成了TIDYING,谁就负责变成TERMINATED(这解释了前面遇到TIDYING的情况)
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();//已经获得了锁,现在可以唤醒条件队列的线程了
}
return;
}
} finally {
mainLock.unlock();
}
// ctl.compareAndSet(c, ctlOf(TIDYING, 0))如果失败走这里,肯定是因为别的线程
// 把状态变成了TIDYING状态。下一次循环会直接退出
}
}
interruptIdleWorkers
此函数尝试中断空闲的Worker线程。Worker线程在执行task的前提是持有自己的Worker锁,相反,空闲的线程是没有持有自己的Worker锁的,所以当前线程执行w.tryLock()是能返回true的。参数onlyOne为true时,只中断一个空闲的Worker线程。
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
//没有被中断过的线程,并且这个线程并没有在运行task(运行task时会持有worker锁)
//注意,w.tryLock()一定得放到右边,不然可能获得锁后不释放锁
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)//只中断一个空闲线程即可
break;
}
} finally {
mainLock.unlock();
}
}
我们执行这句t.interrupt()
,就中断了Worker线程。但Worker线程是怎么做到“只中断一个,就传播这个中断状态”的,我们还得看Worker的实现。
Worker 的工作机制
从上面的实现中,我们可以看到,主要是对 Worker
的添加和 workQueue
的添加,具体的执行则完全由Worker
进行托管
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); //执行的任务
}
当工作线程启动时,会优先执行Worker
的 run()方法
对AQS的实现
可以看到Worker继承了AQS,目的是为了使用AQS的同步队列。继承了就需要实现AQS的抽象方法。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
final Thread thread;
Runnable firstTask;
volatile long completedTasks;//执行过几个task的计数器
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
// state为-1或1,都是属于锁被持有
protected boolean isHeldExclusively() {
return getState() != 0;
}
// 尝试获得锁的实现。是不可重入的独占锁。
// 这意味着当前线程已持有的情况,再调用AQS的acquire就会死锁。反之,这种情况只能调用AQS的tryAcquire
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {//只能从0到1
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
//释放锁的实现。
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }//阻塞,直到获得锁
public boolean tryLock() { return tryAcquire(1); }//一次尝试地获得锁
public void unlock() { release(1); }//释放锁,并唤醒同步队列上的第一个等待线程
public boolean isLocked() { return isHeldExclusively(); }
// 如果工作线程已经执行过它人生中第一个task了(那么state就肯定不是-1了)
// 那么就中断它
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
仅仅为了使用AQS的独占锁部分。利用独占锁的互斥,可以工作线程在从未开始时(state为-1)和正在执行task期间(state为1)不会被中断。
对tryAcquire的实现是不可重入的,原因之后再讲。总之,为了避免因不可重入而无限阻塞,只要避免当前线程持有锁的情况再去acquire(1),就不会出现无限阻塞。
相反,tryAcquire总会是安全的。
runWorker方法
工作线程会不停的从当前任务和workQueue
获取待执行的任务
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();//获得当前线程对象
Runnable task = w.firstTask;//获得第一个task(这里可能为null)
w.firstTask = null;//释放引用
w.unlock(); // 此时state为0,Worker锁此时可以被抢。且此时工作线程可以被中断了
boolean completedAbruptly = true;//是否发生异常
try {
//1. 如果第一个task不为null,开始执行循环
//2. 如果第一个task为null,但从task队列里获得的task不为null,也开始循环
//3. 如果task队列获得到的也是null,那么结束循环
while (task != null || (task = getTask()) != null) {
w.lock();//执行task前,先获得锁
// 第一个表达式 (runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
// 无论线程池状态是什么都会消耗掉当前线程的中断状态(如果当前线程确实被中断了),
// 并且只有线程池状态是STOP的情况下,且当前线程被中断了,才会返回true。
// 第二个表达式 !wt.isInterrupted()
// 因为第一个表达式永远会消耗掉中断状态,所以第二个表达式肯定为true
// 总之,重点在第一个表达式。
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
// 1. 如果线程池状态为STOP,且当前线程被中断,马上恢复中断状态
// 2. 如果线程池状态为其他,且当前线程被中断,仅仅消耗掉这个中断状态,不进入分支
wt.interrupt();//恢复中断状态
try {
//空实现的方法,如果抛出异常,completedAbruptly将为true
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();//执行task
} catch (RuntimeException x) {
thrown = x; throw x;//这里抛出异常,completedAbruptly也将为true
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//空实现方法
afterExecute(task, thrown);//task.run()抛出异常的话,至少afterExecute可以执行
}
} finally {//无论上面在beforeExecute或task.run()中抛出异常与否,最终都会执行这里
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;//上面循环是正常结束而没有抛出异常,这个变量才为false
} finally {
//无论循环正常结束(getTask返回null,completedAbruptly为false),
//还是循环中抛出了异常(completedAbruptly为true),都会执行这句。
//代表当前Worker马上要被回收了
processWorkerExit(w, completedAbruptly);
}
}
1、进入循环前,将state设置为0,使得Worker锁可以被抢。
2、循环是工作线程的主要逻辑,每次循环通过条件里的getTask()获得task来执行。当然,getTask()必然是可以阻塞等待直到从队列取得元素的。
- 执行task前,必须先消耗中断状态(如果线程已被中断),因为中断状态不清理会导致getTask()里无法阻塞。并且只有在线程池状态为STOP时(task队列已空)且线程已被中断,才恢复线程的中断状态(这看起来可以用来保证,在当前循环执行task后下一次循环getTask()会抛出中断异常,但实际上getTask()发现STOP状态会直接返回null;当然还有一种可能,就是task.run()会检测中断状态抛出中断异常)。
- 执行task前,先执行beforeExecute。如果抛出异常,会导致task.run()不执行。
- 执行task时(task.run()),可能抛出异常。
- 无论执行task时是否抛出异常,都会执行afterExecute。
- 每次循环结束前,无论前面有没有抛出异常,都会清空一些变量,并释放Worker锁,因为这次拿到的task已经执行完毕。
3、从循环结束有两个原因:1. task.run()返回null了,循环正常结束(completedAbruptly为false)。 2. 在执行task时抛出了异常,也会结束循环(completedAbruptly为true)。无论哪种情况,当前Worker线程都会马上回收了。
简单的说,runWorker做的就是每次循环中从队列中取出一个task来执行,如果队列为空,那么阻塞等待直到队列非空取到task。这就是每个工作线程的工作内容。
getTask方法
getTask
会循环获取workQueue中的等待任务
private Runnable getTask() {
boolean timedOut = false; //保存超时poll操作是否操作,
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 这里条件是一个组合关系:
// 1. rs >= SHUTDOWN 和 workQueue.isEmpty() 说明没有task可以获得了
// 2. rs >= SHUTDOWN 和 rs >= STOP(隐式的说明 task队列为空)
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();//循环CAS,直到线程数量减一
return null;//返回null代表取task失败
}
int wc = workerCountOf(c);
// 这个变量控制,出队操作是否需要为超时版本。
// 1. 如果allowCoreThreadTimeOut为true,那么所有线程都会使用超时版本的出队操作
// 2. 如果wc > corePoolSize,那么超过部分的线程应该使用超时版本的出队操作
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 第一个表达式((wc > maximumPoolSize || (timed && timedOut))
// 如果wc大于了最大线程数(因为动态调小了maximumPoolSize)或超时操作已经超时,
// 这说明当前worker很可能需要抛弃。
// 第二个表达式(wc > 1 || workQueue.isEmpty())
// 如果线程数至少为2(除了当前worker线程外还有其他线程),
// 或者线程数为1(只有当前worker线程)但队列为空:
// 这说明不用担心,队列剩余task没有线程去取,即确定了当前worker需要抛弃
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))//如果CAS尝试减一成功
return null;//直接返回null
continue;//如果CAS失败,可能有多个线程同时修改了ctl的线程数。
//而这个分支是否还会进入需要下一个循环再判断
}
try {
Runnable r = timed ?//根据变量来判断,使用超时poll,还是take
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ://限时阻塞
workQueue.take();//无限阻塞
if (r != null)//成功获得了task
return r;
//只有限时阻塞才可能返回null,且肯定是超时操作超时了
timedOut = true;
} catch (InterruptedException retry) {
//无论上面是哪种操作,在阻塞期间被中断的话,当前worker线程会抛出中断异常
timedOut = false;//不算超时,所以这里设为false
//继续下一次循环,下一次可能会返回null
}
}
}
1、在获取队列元素之前,必须要判断线程池状态和其他一些情况。视情况而定,可能会直接返回null,代表获取失败或者说不应该由当前线程来获取。
2、如果线程池状态为SHUTDOWN且队列为空,那么没有task可以让当前线程获得,返回null。
3、如果线程池状态为STOP(隐含队列为空),那么没有task可以让当前线程获得,返回null。
4、如果不是上面的情况,那么只是说明有task可以取。但还得继续判断情况,如果同时满足以下两个情况则则不可以去取task(返回null):
- 如果当前线程数量已经超过maximumPoolSize(这是因为动态调小了maximumPoolSize),或者虽然没有超过但上一次的超时poll动作已经超时了(做超时操作的前提是allowCoreThreadTimeOut || wc > corePoolSize,既然超时了,当前线程也就不用去取task了)。满足的话,说明当前线程应该是需要放弃取task的,但还得满足下一个情况。
- 因为即将放弃取task,所以得防止“队列里有task但是没有工作线程在工作”的情况。在队列非空时,除了当前线程还必须有别的线程,毕竟当前线程马上要放弃了。
满足了上面两种情况,则当前线程要放弃取task了,但在结束getTask之前要把ctl的线程数量减一。
但CAS修改ctl的线程数量可能失败,失败后再次走上面的流程。完全有可能,失败后再走上面流程就不会放弃了,比如当前线程数量为corePoolSize+1(考虑allowCoreThreadTimeOut为false),有两个工作线程都超时了,第一个线程放弃并修改线程数量成功,第二个线程也想放弃但修改ctl失败下一次循环发现wc > corePoolSize不成立,也就不放弃了,继续去做取task动作。
上面这些判断都通过了,说明当前线程确实需要取得一个task。
根据timed变量做 限时阻塞的出队动作、或无限阻塞的出队动作。如果成功出队一个task,则返回它。
总之,getTask返回非null值代表当前worker线程应该继续工作;返回null值代表当前条件下获取task失败,当前条件是考虑了线程池状态和当前线程状态(是否超过核心线程数,是否限时阻塞已经超时)。
processWorkerExit
当前线程已经从runWorker
中的循环中退出,可能是因为getTask
返回null,也可能是执行task时抛出了异常。总之,当前worker线程已经马上要被回收了,所以来调用processWorkerExit
。
//传入Worker已经马上要回收
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // 如果runWorker中的循环不是正常结束的,则需要此函数来减小线程数
decrementWorkerCount(); // 否则是runWorker里的getTask做了这件事(getTask返回了null)
//此时,ctl的线程数量是不包括传入Worker了
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//传入Worker的完成任务数量 加到 线程池的总和上去
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();//帮助Terminate
int c = ctl.get();
if (runStateLessThan(c, STOP)) {//如果状态为RUNNING或SHUTDOWN,才可能需要补充线程
//但补充前还需要判断一下
if (!completedAbruptly) {//如果传入Worker是正常结束的
//min表示线程池的最小线程数
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())//如果为0且队列非空,那么需要再加1
min = 1;
if (workerCountOf(c) >= min)//如果当前线程数量满足要求
return; // 那么不需要补充线程,直接返回
}
// 1. 如果传入Worker是因为抛出异常而结束,那么肯定补充
// 2. 如果传入Worker是正常结束,那么视需要而补充
addWorker(null, false);//补充一个线程
}
}
前面做了一些收尾工作,比如ctl减一,workers移除。当前worker即将要被回收,但可能需要在结束前补充一个worker。
- 如果当前worker是因为抛出异常从而结束自己生命的,那么肯定补充(!completedAbruptly为false)。
- 如果当前worker是因为getTask返回null从而结束自己生命的,那么在当前线程数量不够最小线程数时,才会去补充。
这也解释了interruptIdleWorkers(ONLY_ONE)为什么会传播中断状态。因为一个空闲的工作线程被中断后,会去执行processWorkerExit里的tryTerminate,而tryTerminate里又会去调用interruptIdleWorkers(ONLY_ONE)唤醒另一个空闲线程。
关闭线程池
private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))//此函数会保持之前的线程数量
break;
}
}
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();//检查权限
advanceRunState(SHUTDOWN);//改变线程池状态
interruptIdleWorkers();//中断所有空闲线程
onShutdown(); // 钩子方法,空实现
} finally {
mainLock.unlock();
}
tryTerminate();//尝试Terminate
}
1、线程池状态变成SHUTDOWN后,就无法再提交新task给线程池了。
2、interruptIdleWorkers可以中断那些阻塞在workQueue.超时poll或workQueue.take上的线程,它们被中断后,可能会继续取出队列中的task来执行,更可能结束掉自己的生命。
3、有可能此时已经满足了Terminate的条件,所以必须尝试一下tryTerminate。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);//改变线程池状态
interruptWorkers();//中断所有线程
tasks = drainQueue();//清空队列
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
1、线程池状态变成STOP后,同样无法再提交新task给线程池了。
2、interruptWorkers中断不仅中断空闲线程,正在工作的线程也会中断,注意这无视了工作线程已经持有的worker锁。但如果工作线程的执行task不关心中断的话,那么也没有意义。
3、tasks = drainQueue()清空队列。
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
//Worker内部类方法
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
awaitTermination
此函数可以让当前线程一直阻塞直到线程池所有任务都结束、或者超时。
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)//如果发现剩余时间少于0,说明超时
return false;
nanos = termination.awaitNanos(nanos);//限时阻塞,阻塞在这个termination条件队列上
}
} finally {
mainLock.unlock();
}
}
总之,就是利用了AQS的条件队列,让等待Termination的线程都阻塞在条件队列上。当然,在tryTerminate
中,会执行termination.signalAll()
来唤醒条件队列上的所有线程。
总结
1、理解ThreadPoolExecutor的关键在于理解execute提交task的执行逻辑:
- addWorker(command, true)。在线程数没有达到corePoolSize时,新起一个线程来执行task。核心线程数量内的线程不会被回收。
- workQueue.offer(command)。在线程数达到corePoolSize后,则不起线程,而是先入队task。
- addWorker(command, false)。如果线程数达到corePoolSize且队列已满,则新起一个线程来执行task。但这个线程可能会因为空闲地太久而被回收。
2、阻塞队列的选择也会影响到execute的逻辑。
- 有界队列,使得execute的策略可以正常运行。
- 无界队列,使得maximumPoolSize失去作用。
- 直接提交队列,使得队列失去缓存的作用。
3、线程池关闭后,肯定无法提交新task了。
- 如果执行的是shutdown,每个工作线程完成当前task后还会去执行队列中的剩余task。
- 如果执行的是shutdownNow,队列中剩余task会被清空,每个工作线程完成当前task后,线程池就会结束使命。
4、Worker是工作线程的实现,它继承了AQS实现了独占锁部分,目的是为了让工作线程在未开始执行task或正在执行task时,不会被interruptIdleWorkers中断。
5、工作线程执行task期间如果抛出了异常,一定会补充新的工作线程。
Q&A
1、独占锁的作用
我们知道在runWorker
中,如果worker线程还没有开始,或者正在执行task,state一定是非0的,这就使得别的线程无法获得worker锁。这样别的线程在调用interruptIdleWorkers
时,是无法中断正在执行task的worker线程的。
而独占锁被设计为不可重入的原因是为了防止自己中断自己。
比如生产者传入的task是这样实现的:
class MyTask implements Runnable {
@Override
public void run() {
threadPoolExecutor.setCorePoolSize(10);
}
}
而setCorePoolSize
里又有可能调用到interruptIdleWorkers
,所以不可重入就防止了自己中断自己。
2、如何设置动态线程池
通过动态调整线程池参数来实现即可
参考博客:https://blog.csdn.net/anlian523/article/details/108249789
标签:task,mainLock,队列,Worker,线程,null,ThreadPoolExecutor From: https://www.cnblogs.com/luojw/p/18176690