首页 > 编程语言 >【源码笔记】ThreadPoolExecutor#runWorker

【源码笔记】ThreadPoolExecutor#runWorker

时间:2022-09-25 19:00:14浏览次数:66  
标签:task runWorker thread 中断 -- 源码 线程 null ThreadPoolExecutor

/**
 * Main worker run loop.  Repeatedly gets tasks from queue and
 * executes them, while coping with a number of issues:
 *
 * 1. We may start out with an initial task, in which case we
 * don't need to get the first one. Otherwise, as long as pool is
 * running, we get tasks from getTask. If it returns null then the
 * worker exits due to changed pool state or configuration
 * parameters.  Other exits result from exception throws in
 * external code, in which case completedAbruptly holds, which
 * usually leads processWorkerExit to replace this thread.
 *
 * 2. Before running any task, the lock is acquired to prevent
 * other pool interrupts while the task is executing, and then we
 * ensure that unless pool is stopping, this thread does not have
 * its interrupt set.
 *
 * 3. Each task run is preceded by a call to beforeExecute, which
 * might throw an exception, in which case we cause thread to die
 * (breaking loop with completedAbruptly true) without processing
 * the task.
 *
 * 4. Assuming beforeExecute completes normally, we run the task,
 * gathering any of its thrown exceptions to send to afterExecute.
 * We separately handle RuntimeException, Error (both of which the
 * specs guarantee that we trap) and arbitrary Throwables.
 * Because we cannot rethrow Throwables within Runnable.run, we
 * wrap them within Errors on the way out (to the thread's
 * UncaughtExceptionHandler).  Any thrown exception also
 * conservatively causes thread to die.
 *
 * 5. After task.run completes, we call afterExecute, which may
 * also throw an exception, which will also cause thread to
 * die. According to JLS Sec 14.20, this exception is the one that
 * will be in effect even if task.run throws.
 *
 * The net effect of the exception mechanics is that afterExecute
 * and the thread's UncaughtExceptionHandler have as accurate
 * information as we can provide about any problems encountered by
 * user code.
 *
 * @param w the worker
 */
final void runWorker(Worker w) {
    // wt == worker.thread
    Thread wt = Thread.currentThread();
    // 将初始执行task赋值给task
    Runnable task = w.firstTask;
    // 清空当前w.firstTask引用 --> 马上就会执行这个task
    w.firstTask = null;

    // 这里调用unlock的作用:
    //   Worker的构造方法:
    //   (1)     Worker(Runnable firstTask) {
    //   (2)         setState(-1); // inhibit interrupts until runWorker
    //   (3)         this.firstTask = firstTask;
    //   (4)         this.thread = getThreadFactory().newThread(this);
    //   (5)     }
    //   由第2行代码可知,初始化Worker时,会将Worker的state设置为-1
    //   unlock会将worker的state设置为0
    //   总结:初始化worker state和exclusiveThread
    w.unlock(); // allow interrupts

    // 是否是突然退出
    //   true --> 发生异常了,当前线程是突然退出的,需要做一些后续的处理
    //   false -> 正常退出
    boolean completedAbruptly = true;
    try {
        // 条件1.task != null
        //   true --> firstTask不为null -> 直接在循环体中执行该task
        // 条件2.(task = getTask()) != null
        //   true --> 说明当前线程在queue中获取任务成功
        //            getTask()方法可能会阻塞线程
        //   false -> 当前线程需要执行结束逻辑
        while (task != null || (task = getTask()) != null) {
            // worker加锁,设置独占锁为自己
            // 设置独占锁的原因:
            //   shutdown时,会判断当前worker状态。根据独占锁是否空闲来判断当前worker是否正在工作
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt

            // 条件1.runStateAtLeast(ctl.get(), STOP) && !wt.isInterrupted()
            //   runStateAtLeast(ctl.get(), STOP) --> 说明线程池目前处于STOP/TIDYING/TERMINATION状态,此时线程一定要给它一个中断信号
            //   !wt.isInterrupted() --> 说明线程池当前不是中断状态
            //   true --> 说明当前线程状态>=STOP,且当前线程是未设置中断状态的 --> 此时需要进入到if里面,为线程设置一个中断信号
            //  如果线程池状态不正常(STOP/TIDYING/TERMINATED),并且当前线程没有中断,则给当前线程一个中断信号
            //  - 如果线程池 is stopping,则需要确保线程有中断信号

            // 条件2.Thread.interrupted() && runStateAtLeast(ctl.get(), STOP) && !wt.isInterrupted()
            //   前提条件:当前线程池的状态不是stopping --> runStateAtLeast(ctl.get(), STOP))) == false
            //   如果当前线程被中断了(并清除了中断标记),并线程池状态不正常(STOP/TIDYING/TERMINATED)
            //   !wt.isInterrupted() --> 一定会成立,因为如果线程之前没有中断的话,则Thread.interrupted()会为false,不会往后继续判断
            //                           如果线程之前中断了的话,则Thread.interrupted()已经清除了中断标记,所以!wt.isInterrupted()一定为true

            // interrupt() 给线程设置一个中断标志,线程仍会继续运行。
            // interrupted() 测试当前线程是否被中断(检查中断标志),并清除中断状态
            // isInterrupted() 测试此线程是否被中断

            // 总结:
            // - 条件1的作用:
            //   - 如果线程池正在停止,则对当前线程进行中断操作
            // - 条件2的作用:
            //   1 如果线程之前没有设置中断标记,则Thread.interrupted()会返回false
            //   2 如果线程之前设置了中断标记,则清空中断标记
            //     2.1 如果当前线程池正在停止的话,则对当前线程进行中断操作
            //     2.2 如果当前线程池没有正在停止的话,则此处会返回false
            // 所以总结就是线程池正在停止的话,就对当前线程进行中断操作。如果线程池不是正在中止的话,就清空中断标记
            //
            // 即:
            //   1 强制刷新当前线程的中断标记位为false。因为可能上一次执行task时,业务代码里将当前线程的中断标记位设置为true,且没有处理
            // 所以这里一定要强制刷新一下,不会再影响到后面的task
            //   2 如果线程池正在停止,则对当前线程进行中断操作
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() /*清除了中断标记*/ &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();

            try {
                // 钩子方法,留给子类实现的
                beforeExecute(wt, task);

                // 异常情况 --> 如果thrown不为空,则表示task运行过程中出现异常了
                Throwable thrown = null;
                try {
                    // task可能是FutureTask,也可能是Runnable
                    // - 如果是通过submit提交的runnable/callable,则会被封装成FutureTask
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 钩子方法,留给子类实现的
                    afterExecute(task, thrown);
                }
            } finally {
                // 将局部变量task置为空
                task = null;
                // 更新worker完成线程的数量
                w.completedTasks++;
                // worker处理完一个任务后,会释放掉独占锁
                // 1. 正常情况,会再次回到getTask(),重新获取任务 --> while (task != null || (task = getTask()) != null)
                // 2. task.run()执行过程中,抛出异常了 --> 直接跳到processWorkerExit(w, completedAbruptly) --> completedAbruptly = true
                w.unlock();
            }
        }

        // 什么情况下会执行到这里?
        // 1. getTask()方法返回null,说明当前线程应该执行退出逻辑了
        // 2.

        // 为false --> 正常退出
        completedAbruptly = false;
    } finally {
        // 正常退出:completedAbruptly == false
        // 异常退出:completedAbruptly == true

        processWorkerExit(w, completedAbruptly);
    }
}

标签:task,runWorker,thread,中断,--,源码,线程,null,ThreadPoolExecutor
From: https://www.cnblogs.com/daheww/p/16728485.html

相关文章