首页 > 其他分享 >剖析线程池实现原理

剖析线程池实现原理

时间:2024-10-17 13:53:21浏览次数:13  
标签:Thread int 剖析 任务 boolean 线程 原理 public

前置推荐阅读:java并发之线程池使用-CSDN博客

自定义实现一个带监控的线程池

首先我们继承ThreadPoolExecutor,实现构造函数以及重写beforeExecute和afterExecute两个函数,具体调用我们会在代码实现层面进行详细的分析。

import java.util.concurrent.*;


public class AsyncThreadPool extends ThreadPoolExecutor {

    /**
     * 任务队列
     */
    private BlockingQueue<Runnable> workerQueue;

    public AsyncThreadPool(int corePoolSize,
                           int maximumPoolSize,
                           long keepAliveTime,
                           TimeUnit unit,
                           BlockingQueue<Runnable> workQueue,
                           ThreadFactory threadFactory,
                           RejectedExecutionHandler handler){
        super(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler);
        this.workerQueue = workQueue;
    }

    /**
     * 在任务执行之后
     *
     *
     *
     * @param r 执行任务
     * @param t 异常信息
     */
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        System.out.println("AsyncThreadPool afterExecute threadName:"+Thread.currentThread().getName()+", afterExecutor queueSize:"+workerQueue.size()+" !!!");
    }

    /**
     * 在任务执行之前
     *
     *
     *
     * @param t 执行线程
     * @param r 异常信息
     */
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        System.out.println("AsyncThreadPool beforeExecute threadName:"+Thread.currentThread().getName()+", afterExecutor queueSize:"+workerQueue.size()+" !!!");
    }

}

创建Util并重写ThreadFactory,代码如下:

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class AsyncThreadPoolUtil {

    /**
     * 默认线程数(当前cpu核心数量)
     */
    private static final int DEFAULT_CORE_THREAD_SIZE = Runtime.getRuntime().availableProcessors() * 2 + 1;

    /**
     * 默认工作队列
     */
    private static final LinkedBlockingDeque<Runnable> DEFAULT_WORKER_QUEUE = new LinkedBlockingDeque<>(20);

    private ThreadPoolExecutor threadPoolExecutor;

    public AsyncThreadPoolUtil(String threadName){
        this(DEFAULT_CORE_THREAD_SIZE,DEFAULT_CORE_THREAD_SIZE,DEFAULT_WORKER_QUEUE,threadName);
    }

    public AsyncThreadPoolUtil(int coreThreadSize, int maxThreadSize, BlockingQueue<Runnable> workerQueue,String threadName){
        this(coreThreadSize,maxThreadSize,0L,TimeUnit.SECONDS,workerQueue,threadName);
    }

    public AsyncThreadPoolUtil(int corePoolSize,
                               int maximumPoolSize,
                               long keepAliveTime,
                               TimeUnit unit,
                               BlockingQueue<Runnable> workQueue,
                               String threadName){
        this.threadPoolExecutor = new AsyncThreadPool(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,
                new DefaultThreadFactory(threadName),new ThreadPoolExecutor.CallerRunsPolicy());
    }

    /**
     * The default thread factory
     */
    static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory(String threadName) {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                    Thread.currentThread().getThreadGroup();
            namePrefix = threadName +
                    poolNumber.getAndIncrement() +
                    "-thread-";
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                    namePrefix + threadNumber.getAndIncrement(),
                    0);
            if (t.isDaemon()){
                t.setDaemon(false);
            }
            if (t.getPriority() != Thread.NORM_PRIORITY){
                t.setPriority(Thread.NORM_PRIORITY);
            }
            return t;
        }
    }

    /**
     * 执行runnable 任务
     * @param runnable 提交任务
     */
    public void execute(Runnable runnable){
        this.threadPoolExecutor.execute(runnable);
    }

    /**
     * 提交异步任务
     * @param task 异步任务
     * @param <T> T
     * @return Future
     */
    public <T>Future<T> submit(Callable<T> task){
        return this.threadPoolExecutor.submit(task);
    }

}

编写Test进行验证

public class Test {

    public static void main(String[] args) {
        AsyncThreadPoolUtil pool = new AsyncThreadPoolUtil("demo-test-");
        for (int i=0;i<200;i++){
            pool.execute(()->{
                try{
                    TimeUnit.MILLISECONDS.sleep(500);
                }catch (Exception e){}
            });
        }
    }

}

输出信息见截图,​​​​​由此我们可以在任务执行前以及执行后进行任务的监控,同时可以队列情况。

源码分析

ThreadPoolExecutor类图:

我们从AsyncThreadPool 代码中调用super函数开始看起,该函数中传入:

1.核心线程数:默认情况下不会回收,可通过allowCoreThreadTimeOut函数设置回收,或者设置为0。若无需求,不建议进行核心线程回收。

2.最大线程数:该参数必须大于等于核心线程数,非核心线程数在队列中没有要继续执行任务时会进行回收。

3.非核心线程存活时间

4.非核心线程存活时间单位

5.任务存储队列:当无空闲线城时提交的任务会进入到队列进行等待执行。

6.创建线程工厂:用于创建初始化线程

7.拒绝策略:当无空闲线程且任务队列已满则执行决绝策略。

看完了构造函数创建之后,我们来看任务的提交。在Test中,我们通过pool.execute()函数来提交一个任务到线程池执行,在该函数我们看到线程池中的线程是在提交任务后才进行的初始化。

1. workerCountOf(c)统计当核心线程数量是否已经全部初始化了,如果没有,那么则直接通过addWorker()创建线程执行任务。

2.如果当前核心线程已经全部初始化了,那么则将任务快速添加到队列中,同时校验如果当前线程池已经关闭,那么则移除任务同时执行拒绝策略。如果当前线程池存活线程是0,那么添加工作线程进行任务执行。

3.如果在第2步中添加到任务队列时队列已满,则直接尝试创建非核心线程执行,如果非核心线程也无法创建,那么执行决绝策略。

接下来我们重点分析下 addWorker(Runnable firstTask, boolean core)的函数。

  • Runnable firstTask:要执行的第一个任务,如果为null,则表示新线程将从工作队列中获取任务。
  • boolean core:指示是否为核心线程,true表示是核心线程,false表示非核心线程。
  1. 循环尝试获取线程池状态runStateOf(ctl.get())):

    • 如果线程池状态大于或等于SHUTDOWN(即线程池正在关闭或已关闭),并且不是在关闭状态下添加新任务到非空队列,那么返回false,无法添加新工作线程。
  2. 检查工作线程数量

    • 获取当前线程池的工作线程数量(workerCountOf(c))。
    • 如果线程数量已经达到最大容量(CAPACITY),或者对于核心线程来说达到了corePoolSize,对于非核心线程来说达到了maximumPoolSize,则返回false,无法添加新工作线程。
    • 如果当前线程数量小于上述限制,并且成功通过compareAndIncrementWorkerCount(c)方法增加工作线程计数,则跳出循环。
  3. 创建新工作线程

    • 尝试创建新的Worker对象,它是一个继承了Thread的类,用于执行任务。
    • 如果新线程t不为空,并且线程池状态允许新线程启动(即runStateOf(ctl.get())小于SHUTDOWN或者在关闭状态下且firstTasknull),则将新工作线程添加到线程池的workers集合中,并标记为已添加(workerAdded = true)。
  4. 启动新工作线程

    • 如果工作线程成功添加,调用t.start()启动新线程,并将workerStarted标记为true
  5. 处理线程启动失败的情况

    • 如果新线程没有成功启动,调用addWorkerFailed(w)方法来处理失败情况,这可能包括移除工作线程计数和执行其他清理工作。
  6. 返回结果

    • 返回workerStarted,表示新工作线程是否成功启动。
    /**
     * Checks if a new worker can be added with respect to current
     * pool state and the given bound (either core or maximum). If so,
     * the worker count is adjusted accordingly, and, if possible, a
     * new worker is created and started, running firstTask as its
     * first task. This method returns false if the pool is stopped or
     * eligible to shut down. It also returns false if the thread
     * factory fails to create a thread when asked.  If the thread
     * creation fails, either due to the thread factory returning
     * null, or due to an exception (typically OutOfMemoryError in
     * Thread.start()), we roll back cleanly.
     *
     * @param firstTask the task the new thread should run first (or
     * null if none). Workers are created with an initial first task
     * (in method execute()) to bypass queuing when there are fewer
     * than corePoolSize threads (in which case we always start one),
     * or when the queue is full (in which case we must bypass queue).
     * Initially idle threads are usually created via
     * prestartCoreThread or to replace other dying workers.
     *
     * @param core if true use corePoolSize as bound, else
     * maximumPoolSize. (A boolean indicator is used here rather than a
     * value to ensure reads of fresh values after checking other pool
     * state).
     * @return true if successful
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

我们接着分析创建执行任务Worker(),它继承自AbstractQueuedSynchronizer并实现了Runnable接口。Worker类主要负责维护线程的中断状态和一些次要的记账工作,同时它也实现了任务的运行。

  • private static final long serialVersionUID:序列化ID,用于序列化机制。
  • final Thread thread:当前Worker线程运行的Thread对象。如果线程工厂创建线程失败,则为null
  • Runnable firstTask:当前Worker线程需要执行的第一个任务。如果没有初始任务,则为null
  • volatile long completedTasks:此线程完成的任务数量。

构造函数Worker(Runnable firstTask)

  • 调用setState(-1)初始化锁状态为-1,表示在runWorker方法执行之前禁止中断。
  • 初始化firstTask为传入的第一个任务。
  • 通过线程工厂创建新线程,并将其赋值给thread

运行方法:

  • public void run():将控制权委托给外部的runWorker方法,开始工作线程的主运行循环。

锁方法:

Worker类继承自AbstractQueuedSynchronizer,提供了锁的获取和释放方法。这些方法用于保护任务执行,防止在等待任务时被中断。

  • protected boolean isHeldExclusively():判断当前线程是否独占锁。
  • protected boolean tryAcquire(int unused):尝试获取锁。
  • protected boolean tryRelease(int unused):尝试释放锁。
  • public void lock():获取锁。
  • public boolean tryLock():尝试获取锁,如果锁被占用则立即返回false
  • public void unlock():释放锁。
  • public boolean isLocked():判断锁是否被占用。

中断方法:

  • void interruptIfStarted():如果线程已经开始运行并且尚未中断,则尝试中断该线程。这个方法用于在工作线程等待新任务时,如果线程池正在关闭,则中断工作线程。

Worker类是ThreadPoolExecutor线程池中每个工作线程的抽象表示。它负责维护线程的运行状态、锁状态和任务执行状态。通过继承AbstractQueuedSynchronizerWorker类提供了一个简单的互斥锁,以确保在执行任务时不会被中断。此外,Worker类还提供了中断控制,以确保在适当的时候中断

工作线程,特别是在线程池关闭时。

    /**
     * Class Worker mainly maintains interrupt control state for
     * threads running tasks, along with other minor bookkeeping.
     * This class opportunistically extends AbstractQueuedSynchronizer
     * to simplify acquiring and releasing a lock surrounding each
     * task execution.  This protects against interrupts that are
     * intended to wake up a worker thread waiting for a task from
     * instead interrupting a task being run.  We implement a simple
     * non-reentrant mutual exclusion lock rather than use
     * ReentrantLock because we do not want worker tasks to be able to
     * reacquire the lock when they invoke pool control methods like
     * setCorePoolSize.  Additionally, to suppress interrupts until
     * the thread actually starts running tasks, we initialize lock
     * state to a negative value, and clear it upon start (in
     * runWorker).
     */
    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

线程池的工作原理图解: 

以下是对本篇文章的的总结:

主要功能和特点:

  1. 线程池管理ThreadPoolExecutor允许你控制线程的创建和销毁,以及任务的执行和管理。
  2. 参数化配置:提供了多个参数来调整线程池的行为,包括核心线程数、最大线程数、线程存活时间、工作队列等。
  3. 任务执行:可以执行任何实现了Runnable接口的任务。
  4. 线程复用:通过重用线程来执行新任务,减少了线程创建和销毁的开销。
  5. 拒绝策略:当任务太多,无法被线程池及时处理时,可以定义拒绝策略来处理新提交的任务。

关键组件:

  • 核心线程数:即使它们是空闲的,也会保持一定数量的线程。
  • 最大线程数:线程池中允许的最大线程数量。
  • 工作队列:用于存放待执行任务的队列。
  • 线程工厂:用于创建新线程。
  • 拒绝执行处理器:当任务太多,无法被线程池及时处理时,定义了如何处理新提交的任务。

方法概览:

  • execute(Runnable command):提交一个任务给线程池执行。
  • shutdown():平滑地关闭线程池,不再接受新任务,但会处理完已提交的任务。
  • shutdownNow():尝试立即停止所有正在执行的任务,并返回等待执行的任务列表。
  • isShutdown()isTerminating()isTerminated():检查线程池的状态。
  • awaitTermination(long timeout, TimeUnit unit):等待线程池终止。
  • setCorePoolSize(int corePoolSize)setMaximumPoolSize(int maximumPoolSize):动态调整线程池的大小。
  • getQueue():获取当前的任务队列。

拒绝策略:

  • AbortPolicy:默认策略,当任务不能被接受时抛出异常。
  • CallerRunsPolicy:用调用者线程来运行任务。
  • DiscardPolicy:直接丢弃任务。
  • DiscardOldestPolicy:丢弃队列中最旧的任务,并尝试再次提交新任务。

扩展性:

ThreadPoolExecutor提供了多个钩子方法,如beforeExecute(Thread t, Runnable r)afterExecute(Runnable r, Throwable t),允许在任务执行前后进行自定义操作。

这个类是Java并发包中的核心组件,为多线程编程提供了强大的工具,使得任务的并发执行更加高效和易于管理。

标签:Thread,int,剖析,任务,boolean,线程,原理,public
From: https://blog.csdn.net/qq_36070104/article/details/142991659

相关文章

  • 为何选择 C++:深入剖析其优势与适用场景
    在众多编程语言的璀璨星空中,C++以其独特的魅力和强大的功能闪耀着独特的光芒。对于许多编程学习者和开发者来说,选择C++作为学习或应用的语言,往往有着诸多深刻的原因。一、强大的性能与效率C++是一种高效的编程语言,它直接操作硬件资源,能够生成高度优化的代码。这使得它在......
  • 【Linux】<互斥量>解决<抢票问题>——【多线程竞争问题】
    前言大家好吖,欢迎来到YY滴Linux系列,热烈欢迎!本章主要内容面向接触过C++的老铁主要内容含:欢迎订阅YY滴C++专栏!更多干货持续更新!以下是传送门!YY的《C++》专栏YY的《C++11》专栏YY的《Linux》专栏YY的《数据结构》专栏YY的《C语言基础》专栏YY的《初学者易错点》......
  • Charger IC原理讲解与个人理解
    ChargerIC原理讲解与个人理解ChargerICChargerIC的主要功能包括:ChargerIC的共同点:ChargerIC典型应用图OVPOVP的工作原理:OVP的实现方法:ChargerICChargerIC(充电集成电路)是一种专门用于管理电池充电的集成电路。它的主要功能是控制电池的充电过程,以确保安全、......
  • SpringBoot原理
    1.配置优先级SpringBoot项目当中支持的三类配置文件:application.propertiesapplication.ymlapplication.yaml在SpringBoot项目当中,要想配置一个属性,可以通过这三种方式当中的任意一种来配置都可以,那么如果项目中同时存在这三种配置文件,且都配置了同一个属性,如:Tomcat端......
  • 中国海洋大学24秋《软件工程原理与实践》 实验4:MobileNet & ShuffleNet
    代码练习1.下载IndianPines数据集!wgethttp://www.ehu.eus/ccwintco/uploads/6/67/Indian_pines_corrected.mat!wgethttp://www.ehu.eus/ccwintco/uploads/c/c4/Indian_pines_gt.matIndianPines是一个标准的高光谱数据集,广泛用于分类任务的研究。2.导入......