首页 > 编程语言 >ThreadPoolExecutor源码学习

ThreadPoolExecutor源码学习

时间:2023-12-27 22:03:05浏览次数:36  
标签:task false int 学习 源码 线程 ctl null ThreadPoolExecutor

Java构建线程的方式

  1. 集成Thread
  2. 实现Runnable
  3. 实现CallAble
  4. 线程池方式
    1. Java提供了Executors创建(不推荐,不方便进行控制)
    2. 推荐手动创建线程池ThreadPoolExecutor。

ThreadPoolExecutor参数

  1. int corePoolSize 核心线程数
  2. int maximumPoolSize 最大线程数,最大减核心是非核心线程。
  3. long keepAliveTime 最大空闲时间
  4. TimeUnit unit 时间单位
  5. BlockingQueue workQueue 阻塞队列
  6. ThreadFactory threadFactory, 线程工厂
  7. RejectedExecutionHandler handler 拒绝策略

线程池的执行流程

image

拒绝策略在线程池无法接受新任务时会被执行,例如当线程池已满并且任务队列也已满时。

线程池属性

// ctl的高3位代表线程池状态,低29位代表线程池线程个数。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 即29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 2的29次方减1
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;


// 运行状态保存在高三位中
// -1的补码(绝对值取反加1)高三位为111,表示正常接受任务
private static final int RUNNING    = -1 << COUNT_BITS;
// 000 代表线程池不接受新任务,但仍会处理阻塞队列和正在进行的任务
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 001 不接收新任务,也不会处理阻塞队列中的任务,中断正在执行的任务。
private static final int STOP       =  1 << COUNT_BITS;
// 010 代表线程池即将关闭到达TERMINATED状态
private static final int TIDYING    =  2 << COUNT_BITS;
// 011 线程池最终状态,已关闭。
private static final int TERMINATED =  3 << COUNT_BITS;


// 得到线程池状态
private static int runStateOf(int c)     { return c & ~COUNT_MASK; }
// 获得线程池中线程数量
private static int workerCountOf(int c)  { return c & COUNT_MASK; }

线程池状态

image

execute 执行过程

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. 如果当前运行的线程少于核心线程数(corePoolSize),
     * 则尝试使用给定的任务启动一个新线程。通过调用addWorker原子性
     * 地检查runState和workerCount,防止在不应该添加线程时出现误报,
     * 从而返回false。
     *
     * 2. 如果任务可以成功排队,那么我们仍然需要再次检查是否应该添加线程
     * (因为自上次检查以来现有线程已经死亡),或者线程池是否在
     * 进入此方法后关闭。因此,我们重新检查状态,
     * 在stopped状态必要时则回滚排队,如果线程为空则启动一个新线程。 
     *
     * 3. 如果无法排队任务, 那么我们尝试添加一个新线程。
     *  如果失败, 我们会知道线程池已关闭或饱和,因此执行reject。
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        // 创建核心线程
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 线程池如果正在运行,将任务放到阻塞队列,在执行signal信号后运行
    if (isRunning(c) && workQueue.offer(command)) { // 此为创建核心线程失败时
        int recheck = ctl.get();
        // 不在运行时将任务移除掉并执行拒绝策略
        if (!isRunning(recheck) && remove(command))
            reject(command);
        // 并发条件下此时任务可能为空
        else if (workerCountOf(recheck) == 0) 
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
    }

addWorker()

不断尝试获取线程池的控制状态并检查是否可以添加新的工作线程。

private boolean addWorker(Runnable firstTask, boolean core) {
    // 尝试获取线程池的控制状态并检查是否可以添加新的工作线程。
    retry:
    for (int c = ctl.get();;) {
        // Check if queue empty only if necessary.
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty()))
            // 如果线程池处于SHUTDOWN状态并且满足以下条件之一:已经处于STOP状态、有待执行的任务或工作队列为空,则返回false
            return false;
        for (;;) {
            if (workerCountOf(c)
                >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                // 如果工作线程数量达到核心线程数或最大线程数的限制,则返回false
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry; // 返回到retry处
            c = ctl.get();  // Re-read ctl
            if (runStateAtLeast(c, SHUTDOWN))
                // 如果CAS操作失败是由于workerCount更改,则继续内部循环
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    // 尝试创建新的工作线程,并在必要时将其添加到线程池中。
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int c = ctl.get();
                if (isRunning(c) ||
                    (runStateLessThan(c, STOP) && firstTask == null)) {
                    // 如果线程池正在运行或者处于非STOP状态且没有待执行的任务,
                    // 则检查线程状态是否为NEW,
                    // 否则抛出IllegalThreadStateException异常
                    if (t.getState() != Thread.State.NEW)
                        throw new IllegalThreadStateException();
                    // 将工作线程添加到集合中
                    workers.add(w);
                    workerAdded = true;
                    int s = workers.size();
                    if (s > largestPoolSize)
                        // 更新最大线程池大小
                        largestPoolSize = s;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 启动工作线程
                t.start();
                // 标记线程为已启动
                workerStarted = true;
            }
        }
    } finally {
        if (!workerStarted)
            addWorkerFailed(w);
    }
    // 返回工作线程的状态
    return workerStarted;
}

线程的运行 runWorker

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 循环获取任务,task不为空执行task,否则从阻塞队列中获取任务。
        while (task != null || (task = getTask()) != null) {
            w.lock();
            
            // 检查线程池是否正在停止或线程是否被中断
            if ((runStateAtLeast(ctl.get(), STOP) ||  // 即将关闭或已关闭,即TIDYING或者TERMINATED
                 (Thread.interrupted() &&             // 测试是否被中断
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())                  // 获取中断状态
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                try {
                    // 执行任务
                    task.run();
                    afterExecute(task, null);
                } catch (Throwable ex) {
                    afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                task = null;
                // 增加已完成任务的数量。
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

标签:task,false,int,学习,源码,线程,ctl,null,ThreadPoolExecutor
From: https://www.cnblogs.com/shames/p/17931526.html

相关文章

  • 程序员学习网站推荐:路线向导(roadmap.sh)
    网站地址:https://roadmap.sh/在外网的技术论坛上看到这个网站,上面给出多种编程语言的学习路线,也就是给出不同编程语言的从易到难的组成内容(语言特性),通过这个网站可以辅助学习编程语言。比如学习python语言:这个网站并不能给出你不同编程语言的具体学习内容,但是它会给出......
  • k8s学习
    容器化技术优点:1.自我修复2.弹性伸缩3.服务发现4.负载均衡5.版本回退6.存储编排k8s构成:1.控制节点:集群的控制平面,负责集群的决策1>ApiServer2>Schedule3>ControllerManager4>Etcd 2.工作节点......
  • # ReentrantLock源码阅读
    ReentrantLock源码阅读目录ReentrantLock源码阅读简介例子代码分析Sync.tryLockSync.initialTryLockSync.lockSync.tryReleaseNonFairSync.initialTryLockNonFairSync.tryAcquireFairSync.initialTryLockFairSync.tryAcquire参考链接本人的源码阅读主要聚焦于类的使用场景,一般只......
  • MarkDown学习
    MarkDown学习标题格式一级标题:+空格+标题名称二级标题:+空格+二级标题以此类推最高六级标题(6个#)例如:二级标题三级标题四级标题字体粗体:字两边各2个*Hello,World!斜体:字两边各1个*Hello,World!斜体加粗:字两边各3个*Hello,World!删除线:字两边各2个~Hello,World!......
  • 学期(2023-2024-1) 学号(20232425)《网络空间安全导论》第5周学习总结
    学期(2023-2024-1)学号(20232425)《网络空间安全导论》第5周学习总结教材学习内容总结本周我学习了《网络空间安全导论》的第5章,其主要讲述了在学习过程中,我总结了如下要点,以思维导图的方式呈现:教材学习中的问题和解决过程问题1:监督学习在那种情况下更适用?问题1解决方案:通......
  • 精华一 学习笔记
    Lesson2【结论证明】任意一个无向图,都可以通过最少添加\(\left\lceil\dfrac{p}{2}\right\rceil\)条边使得图变成边双联通分量。证明可参考此博客。其实就是构造一个方案,用叶子两两连边,注意选的根需要度数不为1。【例题】EdgeinMST:无向图,对于每条边,判断“一定在/可......
  • 学期(2023-2024-1) 学号(20232425)《网络空间安全导论》第6周学习总结
    学期(2023-2024-1)学号(20232425)《网络空间安全导论》第6周学习总结教材学习内容总结本周我学习了《网络空间安全导论》的第6章,其主要讲述了在学习过程中,我总结了如下要点,以思维导图的方式呈现:教材学习中的问题和解决过程问题1:区块链技术意义是什么?问题1解决方案:通过研读......
  • 网络学习笔记(3):局域网
    局域网局域网的概念局域网是一种为单一机构所拥有的专用计算机网络,其通信被限制在中等规模的地理范围,如一栋办公楼、一座工厂或一所学校,具有较高的数据速率和较低的误码率,能够有效实现多种设备之间互联、信息交换和资源共享。无线局域网无线局域网WLAN,是一种以无线通信为传输......
  • 【C语言数据结构】对Lua Table源码的一次劣质学习
    /*new_key*/KLcBoolKLcmCreateMapKeyValue(KLCMAP_PTRpTag,KLCTVALUE_PTRpKv){ KLcBoolkbRet =KL_FALSE; KLcBoolkbIsKvLegal =KL_FALSE; DWORDdwInsertPos =0; DWORDdwFreePos =0; DWORDdwCollisionPos =0; KLCTVALUE_PTRptMainNo......
  • openGauss学习笔记-175 openGauss 数据库运维-备份与恢复-导入数据-管理并发写入操作
    openGauss学习笔记-175openGauss数据库运维-备份与恢复-导入数据-管理并发写入操作示例本章节以表test为例,分别介绍相同表的INSERT和DELETE并发,相同表的并发INSERT,相同表的并发UPDATE,以及数据导入和查询的并发的执行详情。CREATETABLEtest(idint,namechar(50),addressva......