/**
* Checks if a new worker can be added with respect to current
* pool state and the given bound (either core or maximum). If so,
* the worker count is adjusted accordingly, and, if possible, a
* new worker is created and started, running firstTask as its
* first task. This method returns false if the pool is stopped or
* eligible to shut down. It also returns false if the thread
* factory fails to create a thread when asked. If the thread
* creation fails, either due to the thread factory returning
* null, or due to an exception (typically OutOfMemoryError in
* Thread.start()), we roll back cleanly.
*
* @param firstTask the task the new thread should run first (or
* null if none). Workers are created with an initial first task
* (in method execute()) to bypass queuing when there are fewer
* than corePoolSize threads (in which case we always start one),
* or when the queue is full (in which case we must bypass queue).
* Initially idle threads are usually created via
* prestartCoreThread or to replace other dying workers.
*
* @param core if true use corePoolSize as bound, else
* maximumPoolSize. (A boolean indicator is used here rather than a
* value to ensure reads of fresh values after checking other pool
* state).
* @return true if successful
*/
// firstTask:
// 可以为null。表示启动worker之后,worker自动到queue中获取任务
// 如果不为null,则worker会优先执行firstTask
// core:
// true 采用corePoolSize限制
// false 采用maximumPoolSize限制
private boolean addWorker(Runnable firstTask, boolean core) {
// 自旋
retry:
for (;;) {
// 获取当前ctl值,保存到c中
int c = ctl.get();
// 取到线程池的runState
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 条件1:rs >= SHUTDOWN
// true --> 说明当前线程池状态不是running
// 条件2:前置条件 --> 当前线程池状态不是running
// rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()
// --> 表示当前线程池的状态是SHUTDOWN && 提交的任务是null(addWorker这个方法可能不是execute调用的) && 当前任务队列不是空
// --> 排除掉这种情况:当前线程池的状态是SHUTDOWN,但是队列里面还有任务尚未完成。这个时候是允许添加worker,但是不允许再次提交task
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
// 走到这的情况:
// 1. 线程池状态rs > SHUTDOWN
// 2. rs == SHUTDOWN
return false;
// 上面这些代码就是判断当前线程池状态是否允许添加线程
// 内部自旋
for (;;) {
// 获取当前线程池中的线程数量
int wc = workerCountOf(c);
// 条件1:wc >= CAPACITY --> 永远不成立,因为CAPACITY是一个5亿多的数字
// 条件2:wc >= (core ? corePoolSize : maximumPoolSize)
// core == true --> 判断当前线程数量是否 >= corePoolSize
// core == false -> 判断当前线程数量是否 >= maximumPoolSize
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
// 当前线程数太多了,已经不能添加线程了
return false;
// 使用CAS使workerCount+1
if (compareAndIncrementWorkerCount(c))
// 行至此处,说明workerCount已经成功+1。相当于申请到一个信号量
break retry;
// 行至此处,说明workerCount+1失败。说明可能有其它线程已经修改过ctl的值
// 情况1.其它线程执行execute()申请过令牌了
// 情况2.外部线程可能调用过shutdown()或者shutdownNow()导致线程池状态发生变化了(ctl高3位表示状态,状态改变后,ctl也会失败)
c = ctl.get(); // Re-read ctl
// 判断线程池状态是否发生过变化。如果外部线程调用过shutdown()或者shutdownNow(),线程池状态会变化
if (runStateOf(c) != rs)
// 状态发生变化后,直接跳到外层循环
// 外层循环负责判断当前线程池状态是否允许创建线程
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 以上:申请一个创建worker的信号量
// 总结:外层循环负责判断线程池状态能否创建线程;内层循环负责判断线程数量能否创建线程。
// 线程状态、线程数量都用ctl表示
// CAS增加ctl的值,如果有并发(修改了线程池状态,或者线程数量),则重试
// 以下:添加一个worker
// 表示创建的worker是否已经启动
boolean workerStarted = false;
// 表示创建的worker是否已经添加到线程池中
boolean workerAdded = false;
// 表示创建的worker的一个引用
Worker w = null;
try {
// 创建worker
// worker中会创建一个线程,线程的target就是worker
// Worker#thread.run() --> Worker.run() --> ThreadPoolExecutor.runWorker() --> task.run()
// worker的state为初始化中
// firstTask设置为Worker的firstTask
w = new Worker(firstTask);
// 将worker的线程赋值给t
final Thread t = w.thread;
// 为了防止ThreadFactory有bug,因为ThreadFactory是一个接口,任何人都可以实现它
if (t != null) {
// 将全局锁的引用保存到mainLock中
final ReentrantLock mainLock = this.mainLock;
// 持有全局锁 --> 可能会阻塞
// 操作线程池内部的相关操作,都必须持锁
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
// 获取了当前线程池的状态
int rs = runStateOf(ctl.get());
// 条件1.rs < SHUTDOWN
// true --> 最正常的情况,线程池当前状态是RUNNING
// 条件2.rs == SHUTDOWN && firstTask == null
// true --> 前置条件:线程池状态不是running状态。虽然线程池SHUTDOWN了,但是firstTask为null
// 其实这个判断的就是SHUTDOWN状态下的特殊情况,只不过这里不再判断队列是否为空
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// t.isAlive() --> 线程start后,值会为true
// 还是防止如果有人自定义ThreadFactory实现的话,ThreadFactory创建的线程在返回给外部之前,线程start了
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 将创建的worker加入到线程池中
// - 线程池是一个HashSet
workers.add(w);
// 获取最新的线程池线程数量
int s = workers.size();
// 维护largestPoolSize
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
// 线程池中的线程不分核心与否
// 只是创建的时候,会根据core参数做不同的数量校验
} finally {
// 释放了线程池全局锁
mainLock.unlock();
}
// 条件成立:说明添加worker成功
// 条件失败:说明在lock之前,线程池状态发生了改变
if (workerAdded) {
// 启动线程:
// worker.thread.start() ---> worker#thread.run() --> thread.target.run()
// target是worker
// --> ThreadPoolExecutor.runWorker() --> worker.firstTask.run()
// --> firstTask.callable.cal() ==> 执行了程序员自己写的逻辑
t.start();
workerStarted = true;
}
}
} finally {
// 条件成立:说明添加失败 --> 需要做清理工作
if (! workerStarted)
// 做一些失败的清理工作:
// - 释放信号量
// - 将当前worker清理出worker集合
addWorkerFailed(w);
}
// woker是否启动
return workerStarted;
// 返回值信息:
// true --> 创建worker && 启动worker成功
// false -> 创建失败
// case1. 线程池状态 > SHUTDOWN (STOP/TIDYING/TERMINATION)
// case2. rs == SHUTDOWN && 队列中已经没有任务了
// rs == SHUTDOWN && 队列中有任务 && firstTask != null
// case3. 线程池已经达到数量限制(corePoolSize or maximumPoolSize)
// case4. threadFactory创建的线程是null,或者创建的线程有问题
}
/**
* Rolls back the worker thread creation.
* - removes worker from workers, if present
* - decrements worker count
* - rechecks for termination, in case the existence of this
* worker was holding up termination
*/
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
// 持有线程池全局锁。因为操作的是线程池相关的东西
mainLock.lock();
try {
// 条件成立:需要将worker在workers中清理出去
if (w != null)
workers.remove(w);
// 将线程池计数器-1(恢复) --> 相当于归还令牌
decrementWorkerCount();
tryTerminate();
} finally {
// 释放线程池全局锁
mainLock.unlock();
}
}
标签:rs,--,worker,源码,线程,SHUTDOWN,firstTask,addWorker,ThreadPoolExecutor
From: https://www.cnblogs.com/daheww/p/16723406.html