首页 > 编程语言 >ThreadPoolExecutor源码解析

ThreadPoolExecutor源码解析

时间:2024-03-18 17:15:12浏览次数:24  
标签:解析 return mainLock int 源码 线程 ctl null ThreadPoolExecutor

public class ThreadPoolExecutor extends AbstractExecutorService

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctl int32位 高3位状态位,低29位工作线程数量

// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }

COUNT_BITS=32-3=29
private static final int COUNT_BITS = Integer.SIZE - 3;
CAPACITY=000 111111.....
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
101 000000... 注意这个是原码
实际是补码参与运算
10000....1补码111111......
111 000000... <-是补码结果,下次参与位运算也是此数   
private static final int RUNNING    = -1 << COUNT_BITS;
000 000000...
private static final int SHUTDOWN   =  0 << COUNT_BITS;
001 000000...
private static final int STOP       =  1 << COUNT_BITS;
010 000000...
private static final int TIDYING    =  2 << COUNT_BITS;
011 000000...
private static final int TERMINATED =  3 << COUNT_BITS;

默认拒绝策略是报错

private static final RejectedExecutionHandler defaultHandler =
    new AbortPolicy();

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }

ThreadPoolExecutor构造方法

Worker构造方法

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
//注意Worker实现了Runnable
private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
{
     Worker(Runnable firstTask) {
         //初始设置AQS状态-1
         setState(-1); // inhibit interrupts until runWorker
         this.firstTask = firstTask;
         //默认Executors.defaultThreadFactory()
         //注意this是Worker对象
         this.thread = getThreadFactory().newThread(this);
     }
    //所以Worker.start()执行的是runWorker(worker对象);
    public void run() {
            runWorker(this);
    }
}

DefaultThreadFactory() {
    SecurityManager s = System.getSecurityManager();
    group = (s != null) ? s.getThreadGroup() :
    Thread.currentThread().getThreadGroup();
    namePrefix = "pool-" +
        poolNumber.getAndIncrement() +
        "-thread-";
}

 public Thread newThread(Runnable r) {
     //创建Worker的执行线程
     Thread t = new Thread(group, r,
                           namePrefix + threadNumber.getAndIncrement(),
                           0);
     if (t.isDaemon())
         //默认非守护线程
         t.setDaemon(false);
     //默认优先级为5
     if (t.getPriority() != Thread.NORM_PRIORITY)
         t.setPriority(Thread.NORM_PRIORITY);
     return t;
 }
}

父类AbstractExecutorService定义了submit入参Callable方法

包装成RunnableFuture执行子类重写execute(Runnable command);

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    //封装成RunnableFuture
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

  protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
}

提交线程任务

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    //获取状态码+线程数 int值
    int c = ctl.get();
    //小于核心线程数
    if (workerCountOf(c) < corePoolSize) {
        //添加核心线程 true 是否核心线程
        if (addWorker(command, true))
            return;
        //添加失败
        c = ctl.get();
    }
    //run状态并且添加到队列成功
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        //二次检查,非run状态便移除队列,并且包含tryTerminate()方法
        if (! isRunning(recheck) && remove(command))
            //走拒绝策略
            reject(command);
        //二次检查,run状态或者移除队列失败=>
        else if (workerCountOf(recheck) == 0)
            //添加非核心线程,空方法任务
            addWorker(null, false);
    }
     //添加非核心线程任务失败,走拒绝策略
    else if (!addWorker(command, false))
        reject(command);
}

新增Worker工作线程

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        //rs为Running状态
        //rs如果刚好为SHUTDOWN,提交空任务,并且队列不为空情况,是可以添加Worker
        //线程池会把正在执行的任务及队列中等待执行的任务都执行完毕后,再去关闭;
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            //如果给工作线程超过最大或者 (core=true 核心线程数core=false 最大线程数
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //CAS添加wc,跳出retry循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            //CAS失败,状态变更,继续retry循环
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            ////CAS失败,状态没有变更,继续内部循环
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //注意Worker是实现Runnable,w.thread是
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
				//Running状态或者SHUTDOWN状态新增空任务
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    //新建线程未启动,如果是alive状态,说明已经被启动了,则抛出异常。
                    //抛出异常未捕获,
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    //largestPoolSize用于记录线程池中曾经存在的最大的线程数量
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            //添加成功,t.start();,设置workerStarted true
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            //失败处理
            addWorkerFailed(w);
    }
    return workerStarted;
}

addWorkerFailed

private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (w != null)
            //移除创建worker
            workers.remove(w);
        //减少wc
        decrementWorkerCount();
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}
final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        //RUNNING||TIDYING||TERMINATED
        //SHUTDOWN && 阻塞队列不为空
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        //STOP || SHUTDOWN 阻塞队列为空
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //ctlOf(TIDYING, 0)=>TIDYING头状态+工作线程数组成
            //尝试CAS设置TIDYING且wc=0
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();
                } finally {
					//再设置为TERMINATED且wc=0
                    ctl.set(ctlOf(TERMINATED, 0));
                    //termination条件唤醒
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

真正运行任务

final void runWorker(Worker w) {
    //Worker线程
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    //worker继承AQS,unlock方法执行release(1)
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        //获取任务
        while (task != null || (task = getTask()) != null) {
            //AQS state由0->1,执行acquire(1),设置当前线程独占
            //失败就走AQS等待队列
            w.lock();
     
            //线程状态为STOP以及以上 或者 Thread.interrupted()线程中断为true,
            //然后复原为false,并且线程状态为STOP以及以上
            //二次判断
            //wt.isInterrupted()获取为复原false
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                //中断
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //真正执行运行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                //当前工作线程完成task总数
                w.completedTasks++;
                //w.unlock();执行,设置独占线程null,设置State=0
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

w.unlock();执行,设置独占线程null,设置State=0

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

protected boolean tryRelease(int unused) {
    setExclusiveOwnerThread(null);
    setState(0);
    return true;
}

获取阻塞队列任务

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        //SHUTDOWN状态并且阻塞队列为空 || STOP以上状态
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            //do while循环直到减少一个ctl中工作线程数
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        //allowCoreThreadTimeOut开启,核心线程关闭或超过核心线程,需要超时判断
        boolean timed = allowCore || wc > corePoolSize;
		//wc大于最大线程数 或者 配置超时且获取超时
        //并且 (wc>1 或者 阻塞队列任务为空)
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            //配置超时,超时时间内拿去任务
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            //条件队列等待获取队列任务
                workQueue.take();
            if (r != null)
                return r;
            //超时未获取,指定超时
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

如果runWorker(Worker w) 方法第一个try和第二个try之间发生异常

completedAbruptly = true;

 private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //统计总完成数量
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
     	//Running ShutDown
        if (runStateLessThan(c, STOP)) {
            //completedAbruptly=false,第二个try内执行finally结束
            if (!completedAbruptly) {
                //允许回收核心线程,最小为0,否则最小核心线程
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                //如果允许回收核心线程,阻塞队列有任务
                if (min == 0 && ! workQueue.isEmpty())
                    //则工作线程最小为1
                    min = 1;
                //满足条件就return
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            //异常状态或者允许回收核心线程,阻塞队列有任务,但没有工作线程
            //增加一个非核心线程
            addWorker(null, false);
        }
    }

标签:解析,return,mainLock,int,源码,线程,ctl,null,ThreadPoolExecutor
From: https://www.cnblogs.com/wsyphaha/p/18080939

相关文章

  • ScheduledThreadPoolExecutor 定时任务
    目录ScheduledThreadPoolExecutor一、概述二、常用方法1、schedule方法2、scheduleAtFixedRate方法3.scheduleWithFixedDelay方法三、ScheduledExecutorService的创建方式ScheduledThreadPoolExecutor一、概述java中ScheduledExecutorService接口是基于线程池设计的定时任......
  • 鸿鹄电子招投标系统源码实现与立项流程:基于Spring Boot、Mybatis、Redis和Layui的企业
    随着企业的快速发展,招采管理逐渐成为企业运营中的重要环节。为了满足公司对内部招采管理提升的要求,建立一个公平、公开、公正的采购环境至关重要。在这个背景下,我们开发了一款电子招标采购软件,以最大限度地控制采购成本,提高招投标工作的公开性和透明性,并确保符合国家电子招投标......
  • 数据库中的字符类型:char、varchar、nchar、nvarchar 全解析
    数据库中的字符类型选择对性能和存储效率有着重要的影响。char、varchar、nchar和nvarchar这四种字符类型各自有不同的特点和适用场景,同时也会影响数据库的碎片和页分裂情况。char类型char类型用于存储定长的字符串。它会为每个值分配固定数量的空间,即使实际内容没有填满这个空......
  • SpringBoot项目轻松集成Sentinel:熔断限流实战及核心代码解析
    一、引言Sentinel是阿里巴巴开源的一款轻量级流量控制组件,提供丰富的微服务流量控制能力,包括流量控制、熔断降级、系统负载保护等。本文将带你一步步实现在SpringBoot项目中集成Sentinel,实现服务的熔断限流,并给出关键代码示例及注意事项。二、集成Sentinel步骤添加依赖在......
  • 深入解析C#中的第三方库NPOI:Excel和Word文件处理的利器
    一、引言在.NET开发中,操作Office文档(特别是Excel和Word)是一项常见的需求。然而,在服务器端或无MicrosoftOffice环境的场景下,直接使用OfficeInterop可能会面临挑战。为了解决这个问题,开源库NPOI应运而生,它提供了无需安装Office即可创建、读取和修改Excel(.xls,.xlsx)和Word(......
  • 【即插即用】RefConv-重聚焦卷积模块(附源码)
    论文地址: http://arxiv.org/pdf/2310.10563.pdf源码地址:GitHub-Aiolus-X/RefConv概述:作者提出了一种可重参数化的重新聚焦卷积(RefConv),作为常规卷积层的即插即用替代品,能够在不引入额外推理成本的情况下显著提高基于CNN的模型性能。RefConv利用预训练参数编码的表示作为先......
  • 【即插即用】ELA注意力机制(附源码)
    原文地址:[2403.01123]ELA:EfficientLocalAttentionforDeepConvolutionalNeuralNetworks(arxiv.org)与SE、CA注意力机制的区别:ELA通过在空间维度采用带状池化来提取水平和垂直方向的特征向量,维持细长的核形状以捕捉远距离的依赖关系,同时避免不相关区域对标签预测的......
  • 基于SpringBoot的“乐校园二手书交易管理系统”的设计与实现(源码+数据库+文档+PPT)
    基于SpringBoot的“乐校园二手书交易管理系统”的设计与实现(源码+数据库+文档+PPT)开发语言:Java数据库:MySQL技术:SpringBoot工具:IDEA/Ecilpse、Navicat、Maven系统展示系统首页界面图用户注册界面图二手图书界面图留言反馈界面图个人中心界面图管理员......
  • 基于SpringBoot的“书籍学习平台”的设计与实现(源码+数据库+文档+PPT)
    基于SpringBoot的“书籍学习平台”的设计与实现(源码+数据库+文档+PPT)开发语言:Java数据库:MySQL技术:SpringBoot工具:IDEA/Ecilpse、Navicat、Maven系统展示平台首页界面图用户注册界面图付费专区界面图个人中心界面图后台登录界面图管理员功能界面图......
  • 基于微信小程序的高校跑腿小程序,附源码
    博主介绍:✌程序员徐师兄、7年大厂程序员经历。全网粉丝15w+、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java、Python技术领域和毕业项目实战✌......