/**
* Main worker run loop. Repeatedly gets tasks from queue and
* executes them, while coping with a number of issues:
*
* 1. We may start out with an initial task, in which case we
* don't need to get the first one. Otherwise, as long as pool is
* running, we get tasks from getTask. If it returns null then the
* worker exits due to changed pool state or configuration
* parameters. Other exits result from exception throws in
* external code, in which case completedAbruptly holds, which
* usually leads processWorkerExit to replace this thread.
*
* 2. Before running any task, the lock is acquired to prevent
* other pool interrupts while the task is executing, and then we
* ensure that unless pool is stopping, this thread does not have
* its interrupt set.
*
* 3. Each task run is preceded by a call to beforeExecute, which
* might throw an exception, in which case we cause thread to die
* (breaking loop with completedAbruptly true) without processing
* the task.
*
* 4. Assuming beforeExecute completes normally, we run the task,
* gathering any of its thrown exceptions to send to afterExecute.
* We separately handle RuntimeException, Error (both of which the
* specs guarantee that we trap) and arbitrary Throwables.
* Because we cannot rethrow Throwables within Runnable.run, we
* wrap them within Errors on the way out (to the thread's
* UncaughtExceptionHandler). Any thrown exception also
* conservatively causes thread to die.
*
* 5. After task.run completes, we call afterExecute, which may
* also throw an exception, which will also cause thread to
* die. According to JLS Sec 14.20, this exception is the one that
* will be in effect even if task.run throws.
*
* The net effect of the exception mechanics is that afterExecute
* and the thread's UncaughtExceptionHandler have as accurate
* information as we can provide about any problems encountered by
* user code.
*
* @param w the worker
*/
final void runWorker(Worker w) {
// wt == worker.thread
Thread wt = Thread.currentThread();
// 将初始执行task赋值给task
Runnable task = w.firstTask;
// 清空当前w.firstTask引用 --> 马上就会执行这个task
w.firstTask = null;
// 这里调用unlock的作用:
// Worker的构造方法:
// (1) Worker(Runnable firstTask) {
// (2) setState(-1); // inhibit interrupts until runWorker
// (3) this.firstTask = firstTask;
// (4) this.thread = getThreadFactory().newThread(this);
// (5) }
// 由第2行代码可知,初始化Worker时,会将Worker的state设置为-1
// unlock会将worker的state设置为0
// 总结:初始化worker state和exclusiveThread
w.unlock(); // allow interrupts
// 是否是突然退出
// true --> 发生异常了,当前线程是突然退出的,需要做一些后续的处理
// false -> 正常退出
boolean completedAbruptly = true;
try {
// 条件1.task != null
// true --> firstTask不为null -> 直接在循环体中执行该task
// 条件2.(task = getTask()) != null
// true --> 说明当前线程在queue中获取任务成功
// getTask()方法可能会阻塞线程
// false -> 当前线程需要执行结束逻辑
while (task != null || (task = getTask()) != null) {
// worker加锁,设置独占锁为自己
// 设置独占锁的原因:
// shutdown时,会判断当前worker状态。根据独占锁是否空闲来判断当前worker是否正在工作
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 条件1.runStateAtLeast(ctl.get(), STOP) && !wt.isInterrupted()
// runStateAtLeast(ctl.get(), STOP) --> 说明线程池目前处于STOP/TIDYING/TERMINATION状态,此时线程一定要给它一个中断信号
// !wt.isInterrupted() --> 说明线程池当前不是中断状态
// true --> 说明当前线程状态>=STOP,且当前线程是未设置中断状态的 --> 此时需要进入到if里面,为线程设置一个中断信号
// 如果线程池状态不正常(STOP/TIDYING/TERMINATED),并且当前线程没有中断,则给当前线程一个中断信号
// - 如果线程池 is stopping,则需要确保线程有中断信号
// 条件2.Thread.interrupted() && runStateAtLeast(ctl.get(), STOP) && !wt.isInterrupted()
// 前提条件:当前线程池的状态不是stopping --> runStateAtLeast(ctl.get(), STOP))) == false
// 如果当前线程被中断了(并清除了中断标记),并线程池状态不正常(STOP/TIDYING/TERMINATED)
// !wt.isInterrupted() --> 一定会成立,因为如果线程之前没有中断的话,则Thread.interrupted()会为false,不会往后继续判断
// 如果线程之前中断了的话,则Thread.interrupted()已经清除了中断标记,所以!wt.isInterrupted()一定为true
// interrupt() 给线程设置一个中断标志,线程仍会继续运行。
// interrupted() 测试当前线程是否被中断(检查中断标志),并清除中断状态
// isInterrupted() 测试此线程是否被中断
// 总结:
// - 条件1的作用:
// - 如果线程池正在停止,则对当前线程进行中断操作
// - 条件2的作用:
// 1 如果线程之前没有设置中断标记,则Thread.interrupted()会返回false
// 2 如果线程之前设置了中断标记,则清空中断标记
// 2.1 如果当前线程池正在停止的话,则对当前线程进行中断操作
// 2.2 如果当前线程池没有正在停止的话,则此处会返回false
// 所以总结就是线程池正在停止的话,就对当前线程进行中断操作。如果线程池不是正在中止的话,就清空中断标记
//
// 即:
// 1 强制刷新当前线程的中断标记位为false。因为可能上一次执行task时,业务代码里将当前线程的中断标记位设置为true,且没有处理
// 所以这里一定要强制刷新一下,不会再影响到后面的task
// 2 如果线程池正在停止,则对当前线程进行中断操作
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() /*清除了中断标记*/ &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 钩子方法,留给子类实现的
beforeExecute(wt, task);
// 异常情况 --> 如果thrown不为空,则表示task运行过程中出现异常了
Throwable thrown = null;
try {
// task可能是FutureTask,也可能是Runnable
// - 如果是通过submit提交的runnable/callable,则会被封装成FutureTask
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 钩子方法,留给子类实现的
afterExecute(task, thrown);
}
} finally {
// 将局部变量task置为空
task = null;
// 更新worker完成线程的数量
w.completedTasks++;
// worker处理完一个任务后,会释放掉独占锁
// 1. 正常情况,会再次回到getTask(),重新获取任务 --> while (task != null || (task = getTask()) != null)
// 2. task.run()执行过程中,抛出异常了 --> 直接跳到processWorkerExit(w, completedAbruptly) --> completedAbruptly = true
w.unlock();
}
}
// 什么情况下会执行到这里?
// 1. getTask()方法返回null,说明当前线程应该执行退出逻辑了
// 2.
// 为false --> 正常退出
completedAbruptly = false;
} finally {
// 正常退出:completedAbruptly == false
// 异常退出:completedAbruptly == true
processWorkerExit(w, completedAbruptly);
}
}
标签:task,runWorker,thread,中断,--,源码,线程,null,ThreadPoolExecutor
From: https://www.cnblogs.com/daheww/p/16728485.html