首页 > 编程语言 >java线程池任务执行过程 | java线程池原理探究 | 线程池源码

java线程池任务执行过程 | java线程池原理探究 | 线程池源码

时间:2024-08-21 14:27:14浏览次数:11  
标签:task java int 队列 源码 线程 添加 public

目录

一、线程池的使用

二、线程池的创建

2.1 构造方法及参数

2.2 拒绝策略

2.2.1 AbortPolicy(直接抛出异常)

2.2.2 CallerRunsPolicy(将任务交给调用者处理)

2.2.3 DiscardOldestPolicy(弹出队列中等待最久的任务)

2.2.4 DiscardPolicy(无操作)

2.2.5 自定义拒绝策略(实现RejectedExecutionHandler接口,编写自己的拒绝逻辑)

2.3 成员变量

三、线程池执行流程

3.1 添加核心线程(当前线程数小于核心线程数)

3.2 添加进阻塞队列(线程数大于核心线程数,且队列未满)

3.3 添加非核心线程(当前线程数大于核心线程数且队列已满)

3.4 执行流程图

3.5 添加线程方法(addWorker)

3.5 添加线程流程图


一、线程池的使用

先来看一段代码,展示的是线程池最基本的使用方式:

public static void main(String[] args) {
    new T3().concurrentOperate();
}
// 创建线程池
public ThreadPoolExecutor executor = new ThreadPoolExecutor(
        10,
        20,
        300,
        TimeUnit.SECONDS,
        new ArrayBlockingQueue<>(100),
        Executors.defaultThreadFactory(),
        new ThreadPoolExecutor.AbortPolicy()
);
// 使用多线程并行处理任务
public void concurrentOperate() {
    for (int i = 0; i < 15; i++) {
        executor.execute(() -> {
            System.out.println(Thread.currentThread().getName() + "正在执行任务");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                System.out.println("线程被中断");
            }
            System.out.println(Thread.currentThread().getName() + "任务执行完毕");
        });
    }
}

作为程序员,我们不应止步于表面的使用。为了在生产环境中写出更高效的多线程方法并提高并发量,我们就需要深入探究线程池的内部调度机制。接下来,我们将从源代码的角度出发,探究线程池内部调度的奥秘。

二、线程池的创建

JDK17为例,不同版本JDK源码略有不同

2.1 构造方法及参数

创建线程池,即调用了ThreadPoolExecutor的构造函数

/**
 * @param corePoolSize 核心线程数
 * @param maximumPoolSize 最大线程数
 * @param keepAliveTime 非核心线程最大空闲时间
 * @param unit 空闲时间单位
 * @param workQueue 工作队列
 * @param threadFactory 线程工厂
 * @param handler 拒绝策略
 */
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
  1. corePoolSize (核心线程数):线程池的基本大小,即线程池中始终维持的最小线程数。即使这些线程处于空闲状态,也不会被销毁。
  2. maximumPoolSize (最大线程数):线程池允许的最大线程数。当任务到来时,如果当前线程数少于最大线程数,线程池会尝试创建新的线程来处理任务。
  3. keepAliveTime (空闲线程存活时间):当线程数超过corePoolSize时,多余的空闲线程的存活时间。一旦这些线程在指定时间内没有任务执行,它们就会被销毁。
  4. unit (时间单位):keepAliveTime参数的时间单位,如秒(SECONDS)、毫秒(MILLISECONDS)等。
  5. workQueue (工作队列):阻塞队列,用于存放等待执行的任务。当线程池中的线程数达到corePoolSize时,新到达的任务会被放入此队列中等待执行。
  6. threadFactory (线程工厂):用于创建新线程的工厂。可以通过这个工厂自定义线程的名称、优先级等属性。
  7. handler (拒绝策略):当线程池无法接受更多任务时的处理策略。当线程池中的工作队列已满并且线程数已经达到maximumPoolSize已满时,线程池会根据这个策略来处理新到达的任务。

2.2 拒绝策略

2.2.1 AbortPolicy(直接抛出异常)

public static class AbortPolicy implements RejectedExecutionHandler {
    /**
     * Creates an {@code AbortPolicy}.
     */
    public AbortPolicy() { }
    /**
     * Always throws RejectedExecutionException.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     * @throws RejectedExecutionException always
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
    }
}

2.2.2 CallerRunsPolicy(将任务交给调用者处理)

public static class CallerRunsPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code CallerRunsPolicy}.
     */
    public CallerRunsPolicy() { }
    /**
     * Executes task r in the caller's thread, unless the executor
     * has been shut down, in which case the task is discarded.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

2.2.3 DiscardOldestPolicy(弹出队列中等待最久的任务)

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code DiscardOldestPolicy} for the given executor.
     */
    public DiscardOldestPolicy() { }
    /**
     * Obtains and ignores the next task that the executor
     * would otherwise execute, if one is immediately available,
     * and then retries execution of task r, unless the executor
     * is shut down, in which case task r is instead discarded.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}

2.2.4 DiscardPolicy(无操作)

public static class DiscardPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code DiscardPolicy}.
     */
    public DiscardPolicy() { }
    /**
     * Does nothing, which has the effect of discarding task r.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}

2.2.5 自定义拒绝策略(实现RejectedExecutionHandler接口,编写自己的拒绝逻辑)

public static class MyRejectedPolicy implements RejectedExecutionHandler {
    public MyRejectedPolicy() { }
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("自定义拒绝策略");
    }
}

2.3 成员变量

// 32位二进制数,高三位表示线程状态,低29位表示当前线程池工作线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// @Native public static final int SIZE = 32; 所以COUNT_BITS = 29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 00011111 11111111 11111111 11111111
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
/*
 运行状态(111)11100000 00000000 00000000 00000000
 线程池的默认初始状态。在线程池创建后,它会处于RUNNING状态,
 此时线程池可以接受新任务并创建新线程来执行这些任务(只要不超过maximumPoolSize)
 */
private static final int RUNNING    = -1 << COUNT_BITS;
/*
 停止接受新任务(000)00000000 00000000 00000000 00000000
 当调用ExecutorService.shutdown()方法时,线程池会进入SHUTDOWN状态。
 在此状态下,线程池停止接受新任务,但是会继续执行已经提交的任务
 */
private static final int SHUTDOWN   =  0 << COUNT_BITS;
/*
 停止状态(001)00100000 00000000 00000000 00000000
 当调用ExecutorService.shutdownNow()方法时,线程池会进入STOP状态。
 在此状态下,线程池不仅不再接受新任务,还会尝试取消正在执行的任务,并清空任务队列
 */
private static final int STOP       =  1 << COUNT_BITS;
/*
 整理状态(010)01000000 00000000 00000000 00000000
 当线程池中的所有任务都已完成执行,并且所有工作线程都已经退出(即线程池中没有活动线程)时,
 线程池会进入TIDYING状态。在此状态下,线程池会执行终结器(terminator)线程,
 该线程负责执行terminated()钩子方法。
 */
private static final int TIDYING    =  2 << COUNT_BITS;
/*
 终止状态(011)01100000 00000000 00000000 00000000
 线程池的最终状态。当terminated()方法被执行完毕后,线程池会进入TERMINATED状态。
 在此状态下,线程池完全停止,并且不再有任何活动。
 */
private static final int TERMINATED =  3 << COUNT_BITS;

五种状态间转换关系:

三、线程池执行流程

回到之前的例子,进入execute方法

public void concurrentOperate() {
    for (int i = 0; i < 15; i++) {
        executor.execute(() -> {
            System.out.println(Thread.currentThread().getName() + "正在执行任务");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                System.out.println("线程被中断");
            }
            System.out.println(Thread.currentThread().getName() + "任务执行完毕");
        });
    }
}
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

3.1 添加核心线程(当前线程数小于核心线程数)

// 传入任务为空,抛出空指针异常
if (command == null)
    throw new NullPointerException();

// 获取线程池状态值,高3位为状态,低29位为工作线程数
int c = ctl.get();
// 获取工作线程数,并与核心线程数比较
if (workerCountOf(c) < corePoolSize) {
    // 当工作线程数未达到核心线程数上限时,执行添加核心线程逻辑
    if (addWorker(command, true))
        return;
    // 若添加失败,则更新状态值
    c = ctl.get();
}

// 取出参数c的低29位,即工作线程数
private static int workerCountOf(int c)  { return c & COUNT_MASK; }

3.2 添加进阻塞队列(线程数大于核心线程数,且队列未满)

// 状态为RUNNING时,向队列添加任务
if (isRunning(c) && workQueue.offer(command)) {
    // 成功添加任务进队列时进入,队满时添加失败,上述判断返回false
    // 更新状态值
    int recheck = ctl.get();
    // 再次判断线程池状态是不是RUNNING
    // 若不是,则从队列中移除刚刚的的任务
    if (!isRunning(recheck) && remove(command))
        // 执行拒绝策略
        reject(command);
    // 如果添加任务进队列后,工作线程数为0,则另起非核心线程执行任务
    else if (workerCountOf(recheck) == 0)
        addWorker(null, false);
}

// 线程池状态为RUNNING时,返回true
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}

public boolean remove(Runnable task) {
    boolean removed = workQueue.remove(task);
    tryTerminate(); // In case SHUTDOWN and now empty
    return removed;
}

3.3 添加非核心线程(当前线程数大于核心线程数且队列已满)

当执行到3.2的if判断步骤时,线程池状态不为RUNNING或者添加任务进队列失败(队列已满)时,新增非核心线程执行该任务;如果新增失败(已达到最大线程数maximumPoolSize),执行拒绝策略。

else if (!addWorker(command, false))
    reject(command);

3.4 执行流程图

文字描述:

  1. 首先判断当前传入的任务是否为空,若为空,则抛出空指针异常;
  2. 获取线程状态量,判断工作线程数是否小于核心线程数。若小于核心线程数,则启动一个核心线程处理任务,方法结束;
  3. 若核心线程数此时已达到上限,或者(步骤2)添加核心线程失败,进入阻塞队列部分;
  4. 判断当前线程池状态是否为RUNNING运行状态;
  5. 若是,则尝试将当前任务添加进阻塞队列中;
  6. 若阻塞队列已满,则任务添加失败,此时尝试启动一个非核心线程执行任务;
  7. 若工作线程已经达到最大线程数,非核心线程启动失败,执行拒绝策略;
  8. 若阻塞队列未满,则(步骤5)添加成功后,再次判断线程池状态是否为RUNNING;
  9. 若不是,则从队列中移除刚刚添加的任务,并执行拒绝策略;
  10. 若是,则进一步判断工作线程数是否为0,若为0,添加一个非核心线程执行该任务。

3.5 添加线程方法(addWorker)

这部分将分析上述添加线程这一操作的源代码:addWorker方法,先展示整个源码:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (int c = ctl.get();;) {
        // Check if queue empty only if necessary.
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP)
                || firstTask != null
                || workQueue.isEmpty()))
            return false;
        for (;;) {
            if (workerCountOf(c)
                >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateAtLeast(c, SHUTDOWN))
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int c = ctl.get();
                if (isRunning(c) ||
                    (runStateLessThan(c, STOP) && firstTask == null)) {
                    if (t.getState() != Thread.State.NEW)
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    workerAdded = true;
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

第一部分:

// 循环标记
retry:
// 获取线程池状态量
for (int c = ctl.get();;) {
    /*
     条件一:如果线程池状态为SHUTDOWN、STOP、TIDYING、TERMINATED
     条件二:线程池状态为STOP、TIDYING、TERMINATED三者之一 或 当前任务不为空 或 队列为空
     条件一和条件二都满足的情况下,任务添加失败
     */
    if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP)
            || firstTask != null
            || workQueue.isEmpty()))
        return false;
    for (;;) {
        /*
         如果当前线程池的工作线程数已经达到了corePoolSize或
         maximumPoolSize(取决于此时添加的是核心线程还是非核心线程),
         任务添加失败
         */
        if (workerCountOf(c)
                >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
            return false;
        // 尝试将工作线程数加1,若加1成功,跳出外层循环(retry:标记下面的for循环)
        if (compareAndIncrementWorkerCount(c))
            break retry;
        // 作线程数加1失败后,再次读取线程池状态量
        c = ctl.get();  // Re-read ctl
        // 如果线程池状态不是RUNNING,则再次尝试整个逻辑
        if (runStateAtLeast(c, SHUTDOWN))
            continue retry;
        // else CAS failed due to workerCount change; retry inner loop
    }
}

第二部分:

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
    w = new Worker(firstTask);
    // 拿到Worker的线程对象
    final Thread t = w.thread;
    if (t != null) {
        final ReentrantLock mainLock = this.mainLock;
        // 加锁
        mainLock.lock();
        try {
            // 获取线程池状态量
            int c = ctl.get();
            // 如果线程池状态是运行中
            if (isRunning(c) ||
                    /*
                     或者 线程池状态小于停止状态(RUNNING、SHUTDOWN,
                     但走到这段逻辑只能是SHUTDOWN)并且新任务为空
                     */
                    (runStateLessThan(c, STOP) && firstTask == null)) {
                // 判断Worker的线程状态是否为NEW,若不是,则抛出线程状态异常
                if (t.getState() != Thread.State.NEW)
                    throw new IllegalThreadStateException();
                // private final HashSet<Worker> workers = new HashSet<>();
                // 添加进workers中
                workers.add(w);
                // 添加标记置为true
                workerAdded = true;
                // 若workers队列大小超出largestPoolSize,替换
                int s = workers.size();
                if (s > largestPoolSize)
                    largestPoolSize = s;
            }
        } finally {
            mainLock.unlock();
        }
        // 如果worker添加成功,则启动线程,并将启动标记置为true
        if (workerAdded) {
            t.start();
            workerStarted = true;
        }
    }
} finally {
    // 若启动失败,则将worker从workers中移除
    if (! workerStarted)
        addWorkerFailed(w);
}
// 返回线程启动结果
return workerStarted;

3.5 添加线程流程图

标签:task,java,int,队列,源码,线程,添加,public
From: https://blog.csdn.net/m0_62467665/article/details/141191691

相关文章

  • Java毕业设计作品(87):基于thymeleaf前后端分离 校园学习资料共享平台系统设计与实现
      博主介绍:黄菊华老师《Vue.js入门与商城开发实战》《微信小程序商城开发》图书作者,CSDN博客专家,在线教育专家,CSDN钻石讲师;专注大学生毕业设计教育和辅导。所有项目都配有从入门到精通的基础知识视频课程,学习后应对毕业设计答辩。项目配有对应开发文档、开题报告、任务书......
  • 源码解析之为何要用ConcurrentHashMap
    为什么要用ConcurrentHashMap?ConcurrentHashMap是JUC包下的一个线程安全的HashMap类,我们都知道多线程的场景下要用ConcurrentHashMap来代替HashMap使用,有没有想过为什么不能用HashMap,为什么能用ConcurrentHashMap呢?下面我通过走源码的方式,带大家看一看其中的一些细节!HashMapmap......
  • Java中的司机抢单实现:并发问题与解决方案
    文章目录司机抢单的基础实现乐观锁解决并发问题总结在共享经济的浪潮中,像滴滴打车这样的服务已经成为我们生活中不可或缺的一部分。对于司机和平台来说,抢单是一个关键环节,如何在保证系统高效运行的同时,确保抢单过程的公平与准确,是一个值得深入探讨的问题。在这篇博......
  • 基于SSM的小区物业管理系统2【附源码+文档】
    ......
  • 如何深入学习Java:从基础到高级的系统指南
    Java作为一种面向对象、跨平台、稳健且广泛应用的编程语言,已经成为软件开发领域中的中流砥柱。从企业级应用到Android开发,从大数据处理到Web应用,Java无处不在。对于学习Java的人来说,掌握这门语言不仅需要扎实的基础,还需要逐步深入,理解其核心概念、掌握先进技术,并能够灵活运用于......
  • 函数方法_java
    1.方法概述1.1方法的概念方法(method)是程序中最小的执行单元注意:方法必须先创建才可以使用,该过程成为方法定义方法创建后并不是直接可以运行的,需要手动使用后,才执行,该过程成为方法调用2.方法的定义和调用2.1无参数方法定义和调用定义格式:publicstaticvoid方......
  • java学习记录第八周
    在Java中,字符串是通过`String`类来表示的,`String`类是不可变的,这意味着一旦一个字符串被创建,它的值就不能被改变。字符串的创建字符串可以通过以下两种方式创建:使用字符串字面量:Stringstr1="Hello";使用new关键字:Stringstr2=newString("Hello");使用字符串字面量......
  • DevEco Studio 调试三方库源码
    有相关的官方文档:https://developer.huawei.com/consumer/cn/doc/harmonyos-faqs-V5/faqs-app-debugging-26-V5实操:将编译好的三方库文件和符号文件整理好在工程中添加对库文件的使用,一般是将库文件放到libs/arm64-v8a下点击顶栏的选项,Run->EditConfigurations,进入到R......
  • 基于Springboot的宿舍管理系统(有报告)。Javaee项目,springboot项目。
    演示视频:基于Springboot的宿舍管理系统(有报告)。Javaee项目,springboot项目。资源下载:基于Springboot的宿舍管理系统(有报告)。Javaee项目,springboot项目。项目介绍:采用M(model)V(view)C(controller)三层体系结构,通过Spring+SpringBoot+Mybatis+Vue+Maven来实现。MyS......
  • 基于Springboot的疫情物资捐赠和分配系统(有报告)。Javaee项目,springboot项目。
    演示视频:基于Springboot的疫情物资捐赠和分配系统(有报告)。Javaee项目,springboot项目。资源下载:基于Springboot的疫情物资捐赠和分配系统(有报告)。Javaee项目,springboot项目。项目介绍:采用M(model)V(view)C(controller)三层体系结构,通过Spring+SpringBoot+Mybatis+V......