首页 > 其他分享 >ThreadPoolExecutor - 管理线程池的核心类

ThreadPoolExecutor - 管理线程池的核心类

时间:2024-07-07 22:27:18浏览次数:12  
标签:队列 核心 corePoolSize 任务 线程 public ThreadPoolExecutor

下面是使用给定的初始参数创建一个新的 ThreadPoolExecutor (构造方法)。

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

形参:

  • corePoolSize – 要保留在池中的线程数,即使它们处于空闲状态,除非 allowCoreThreadTimeOut 被设置了。
    allowCoreThreadTimeOut 是一个开关,用于控制核心线程是否可以在空闲状态下超时并被终止。默认情况下,allowCoreThreadTimeOut 是 false,这意味着核心线程不会因为空闲而超时被销毁。如果你调用
    setAllowCoreThreadTimeOut(true),那么核心线程也会像非核心线程一样,在空闲一段时间后(由
    keepAliveTime 和 unit 参数指定)被终止。
  • maximumPoolSize – 池中允许的最大线程数
  • keepAliveTime – 当线程数大于核心数时,这是多余的空闲线程在终止之前等待新任务的最长时间。
  • unit – 参数的时间 keepAliveTime 单位
  • workQueue – 用于在任务执行之前保存任务的队列。这个队列将仅保存通过 execute 方法提交的 Runnable 任务和通过
    submit 方法提交的 Runnable 或 Callable 任务。 execute 方法只接受 Runnable 任务。execute 方法没有返回值。适用于不需要返回结果的任务。
    submit 方法可以接受 Runnable 或 Callable 任务。submit 方法返回一个 Future 对象。
    如果提交的是 Runnable 任务,Future.get() 方法返回 null。
    如果提交的是 Callable 任务,Future.get() 方法返回 call 方法的返回值。
    适用于需要返回结果或检查任务状态的任务。
  • threadFactory – 执行器创建新线程时使用的工厂
  • handler – 当线程边界和队列容量已达到上限导致执行被阻塞时使用的处理程序。 handler 是
    RejectedExecutionHandler 接口的实现类对象。

在这里插入图片描述
RejectedExecutionHandler 接口的 rejectedExecution() 方法一共有四种默认实现:
在这里插入图片描述

在这里插入图片描述

  1. AbortPolicy - Abort 意思为中止
  • 类名:java.util.concurrent.ThreadPoolExecutor.AbortPolicy
  • 描述:这种策略在任务不能被执行时,会抛出 RejectedExecutionException 异常。它是默认的拒绝策略。
    使用场景:适用于不能丢弃任务且需要立即知道任务被拒绝的情况。
public static class AbortPolicy implements RejectedExecutionHandler {
    public AbortPolicy() { }
    
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}
  1. CallerRunsPolicy
  • 类名:java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy
  • 描述:这种策略在任务不能被执行时,会由提交任务的线程(通常是调用 execute 方法的线程)来执行该任务。这种策略可以有效降低新任务的流量,避免任务丢失。
    使用场景:适用于可以接受任务执行延迟但不希望任务被丢弃的情况。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public CallerRunsPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}
  1. DiscardOldestPolicy
  • 类名:java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy
  • 描述:这种策略在任务不能被执行时,会丢弃队列中最旧的未处理任务,然后尝试重新提交被拒绝的任务。
    使用场景:适用于希望优先处理新任务可以接受丢弃旧任务的情况。
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    public DiscardOldestPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}
  1. DiscardPolicy
  • 类名:java.util.concurrent.ThreadPoolExecutor.DiscardPolicy
  • 描述:这种策略在任务不能被执行时,会直接丢弃被拒绝的任务,不会有任何异常抛出。
    使用场景:适用于可以接受任务丢弃且不希望处理被拒绝任务的情况。
public static class DiscardPolicy implements RejectedExecutionHandler {
    public DiscardPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        // Do nothing
    }
}

总结:

  • AbortPolicy:直接抛出 RejectedExecutionException 异常。
  • CallerRunsPolicy:由提交任务的线程执行被拒绝的任务。
  • DiscardOldestPolicy:丢弃队列中最旧的任务,然后重新提交被拒绝的任务。
  • DiscardPolicy:直接丢弃被拒绝的任务,不抛出异常。

下面是四种对于上述的ThreadPoolExecutor的封装,ExecutorService是线程服务对象

  1. 创建固定数量的线程对象newFixedThreadPool()
ExecutorService executorService = Executors.newFixedThreadPool(3);
    /**
	创建一个线程池,该线程池重用在共享的无界队列上运行的固定数量的线程。在任何时候,大多数 nThreads 线程都将是活动的去处理任务。
	如果在所有线程都处于活动状态时提交了其他任务,它们将在队列中等待,直到线程可用。
	如果某个线程在关闭之前由于执行期间的故障而终止,那么如果需要执行后续任务,将会有一个新的线程替代它的位置。
	这段话解释了固定大小线程池的一个重要特性:线程池会维护固定数量的线程,即使其中某个线程由于运行时的异常或错误而终止,线程池也会创建一个新的线程来替换它,从而确保线程池中始终有固定数量的线程来处理任务。
		形参:
		nThreads – 池中的线程数
		返回值:
		新创建的线程池
		抛出:
		IllegalArgumentException –如果 nThreads <= 0
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS, // 毫秒
                                      new LinkedBlockingQueue<Runnable>());
    }

ThreadPoolExecutor(nThreads)
核心线程数:nThreads
最大线程数:nThreads
队列类型:LinkedBlockingQueue(无界队列)FIFO先进先出队列
行为:固定数量的线程处理任务。如果所有线程都在忙碌,新任务会被放入无界队列中等待。
适用场景:适用于需要稳定的线程数量来处理较为均匀的任务负载的场景。

  1. 根据需求动态创建线程newCachedThreadPool, 创建的线程可以重复使用,只是当目前线程不够了他会动态增加线程
ExecutorService executorService = Executors.newCachedThreadPool();
/*
创建一个可以根据需要创建新线程的线程池,但如果之前构造的线程可用,则会重用这些线程。
这种线程池通常会提升那些执行许多短暂异步任务的程序的性能。
调用 execute 方法时,如果有可用的线程,会重用之前构造的线程。如果没有可用的线程,则会创建一个新线程并添加到线程池中。

未使用超过60秒的线程会被终止并从缓存中移除。
因此,一个长时间处于空闲状态的线程池将不会消耗任何资源。
需要注意的是,可以使用 ThreadPoolExecutor 构造函数创建具有类似属性但不同细节(例如超时参数)的线程池。

返回值:
新创建的线程池
*/
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

newCachedThreadPool()
核心线程数:0
最大线程数:Integer.MAX_VALUE
队列类型:SynchronousQueue(不存储任务的队列,每次插入操作都必须等待对应的删除操作,反之亦然)。
每个 put 操作(插入任务)必须等待一个 take 操作(移除任务),相应地,每个 take 操作必须等待一个 put 操作。换句话说,生产者线程和消费者线程必须直接在队列上进行任务的交接。
行为:线程池可根据需求动态创建新线程,闲置超过 60 秒的线程会被终止并移出缓存。
适用场景:适用于大量短期异步任务或任务负载波动较大的场景。

newCachedThreadPool() 的工作原理:
① 任务提交:
当一个任务被提交到 newCachedThreadPool() 时,它会尝试将任务放入 SynchronousQueue。
由于 SynchronousQueue 不存储任务,这意味着必须有一个线程立即接收这个任务。
② 线程创建:
如果当前没有空闲线程能够立即接收任务,newCachedThreadPool() 会创建一个新的线程来处理这个任务。
这种机制确保了任务可以尽快得到处理。
③ 线程回收:
新创建的线程如果在闲置超过 60 秒后没有接收到新的任务,就会被终止并移出线程池。
这有助于释放资源,避免不必要的线程占用。

newCachedThreadPool() 完全依赖非核心线程来处理任务

  1. 单一线程newSingleThreadExecutor
ExecutorService executorService = Executors.newSingleThreadExecutor();
/*
创建一个 Executor,它使用单个工作线程从一个无界队列中获取任务进行操作。
(但是请注意,如果这个单个线程在执行过程中因为故障而在关闭之前终止了,那么如果需要执行后续任务,将会有一个新的线程替代它的位置。)
任务保证按顺序执行,并且在任何给定时间点,不会有超过一个任务在运行。
与功能相似的 newFixedThreadPool(1) 不同,返回的执行器保证不会被重新配置为使用额外的线程。

返回值:
新创建的单线程执行器
*/
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS, // 毫秒
                                    new LinkedBlockingQueue<Runnable>()));
    }
private static class FinalizableDelegatedExecutorService
            extends DelegatedExecutorService {
        FinalizableDelegatedExecutorService(ExecutorService executor) {
            super(executor);
        }
        @SuppressWarnings("deprecation")
        protected void finalize() {
            super.shutdown();
        }
    }
private static class DelegatedExecutorService
            implements ExecutorService {
        private final ExecutorService e;
        DelegatedExecutorService(ExecutorService executor) { e = executor; }

newSingleThreadExecutor()
核心线程数:1
最大线程数:1
队列类型:LinkedBlockingQueue(无界队列)
行为:始终只有一个线程执行任务,任务按提交顺序执行。如果线程因故障终止,会创建一个新线程来替代它。
适用场景:适用于需要按顺序执行任务的场景,并且在同一时间只需要一个线程执行任务。

newFixedThreadPool(1):虽然初始时只有一个线程,但它是使用 ThreadPoolExecutor 实现的,理论上可以通过调用 setCorePoolSize 和 setMaximumPoolSize 方法来重新配置线程池的线程数量。
也就是说,虽然默认情况下只有一个线程,但你可以在运行时增加线程池中的线程数量。
newSingleThreadExecutor():这个执行器保证始终只有一个线程,无法通过重新配置来增加线程数量。
它的实现确保了线程池的单线程性质是不可更改的。

  1. 定时调度线程 newScheduledThreadPool, 线程有3个,但是线程在什么时候执行我们可以去定义他
ExecutorService executorService = Executors.newScheduledThreadPool(3);
/*
	创建一个线程池,该线程池可以计划命令在给定延迟后运行,或定期执行。
	形参:
	corePoolSize – 池中要保留的线程数,即使它们处于空闲状态
	返回值:
	新创建的定时线程池
	抛出:
	IllegalArgumentException –如果 corePoolSize < 0
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService {
/*
	使用给定的核心池大小创建一个新 ScheduledThreadPoolExecutor 池。
	形参:
	corePoolSize – 要保留在池中的线程数,即使它们处于空闲状态,除非 allowCoreThreadTimeOut 已设置
	抛出:
	IllegalArgumentException –如果 corePoolSize < 0
*/
public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,  // DEFAULT_KEEPALIVE_MILLIS = 10L
              new DelayedWorkQueue());  
        // super父类对象是ThreadPoolExecutor
        // 相当于:
        // public ThreadPoolExecutor(int corePoolSize,
        //                      int maximumPoolSize,
       //                       long keepAliveTime,
        //                      TimeUnit unit,
        //                      BlockingQueue<Runnable> workQueue) {
       // this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
       //      Executors.defaultThreadFactory(), defaultHandler);
    }
    }
 
 }

newScheduledThreadPool(int corePoolSize) 方法用于创建一个定时线程池。
ScheduledThreadPoolExecutor 类继承自 ThreadPoolExecutor,并添加了对定时任务的支持。
核心线程数由 corePoolSize 决定,最大线程数为 Integer.MAX_VALUE
非核心线程的默认存活时间是 10 毫秒。
无界队列:DelayedWorkQueue 是一个无界队列,适用于存储定时任务和周期性任务。即使队列是无界的,它主要是一个优先级队列(基于任务的预定执行时间),并且不会因为任务太多而导致线程池拒绝任务。
超出线程池和等待队列容量的任务会被拒绝,并抛出异常。

newScheduledThreadPool(corePoolSize)
核心线程数:corePoolSize
最大线程数:Integer.MAX_VALUE
队列类型:DelayedWorkQueue(无界优先级队列,基于任务的预定执行时间)
行为:支持定时任务和周期性任务的线程池,核心线程数由 corePoolSize 决定,超出线程池和等待队列容量的任务会被拒绝并抛出异常。线程池可根据需求动态创建新的非核心线程,闲置超过 10 毫秒的线程会被终止并移出缓存,虽然话是这么说,但是由于 DelayedWorkQueue 是一个无界队列,通常情况下不会出现任务队列满的情况,因此非核心线程在 ScheduledThreadPoolExecutor 中被使用的机会较少,就主要还是依赖corePoolSize指定的核心线程来运行任务。因为核心线程先处理不完,然后把任务放到等待队列,等待队列也满了才开非核心线程,我是这样理解的。
适用场景:适用于需要在特定时间点或周期性执行任务的场景。

标签:队列,核心,corePoolSize,任务,线程,public,ThreadPoolExecutor
From: https://blog.csdn.net/qq_45732909/article/details/140242056

相关文章

  • Linux系统之 — 线程
    Linux系统之—线程线程介绍线程使用死锁(Deadlock)竞态条件(RaceCondition)线程使用示例服务器端代码示例服务器端示例拆解1.引入头文件和宏定义2.定义全局变量3.定义线程函数4.主函数5.错误处理和资源释放客户端代码示例客户端示例拆解1.引入必要的头文件2.定......
  • ComfyUI进阶篇:ComfyUI核心节点(三)
    ComfyUI核心节点(三)前言:学习ComfyUI是一场持久战。当你掌握了ComfyUI的安装和运行之后,会发现大量五花八门的节点。面对各种各样的工作流和复杂的节点种类,可能会让人感到不知所措。在这篇文章中,我们将用通俗易懂的语言对ComfyUI的核心节点进行系统梳理,并详细解释每个参数。希望大家......
  • Spring框架:核心概念与Spring Boot微服务开发指南
    引言        Spring框架是一个开源的Java平台,它提供了全面的基础设施支持,用于开发Java应用程序。Spring的核心概念包括依赖注入(DI)、面向切面编程(AOP)和事务管理。随着微服务架构的兴起,SpringBoot作为Spring框架的扩展,提供了一种快速开发独立微服务的方式。本文将详细......
  • C++的线程管理
    C++的线程管理线程类(Thread)线程构造器约定构造器初始化构造器复制构造器移动构造器多线程atomiccondition_variable应用实列futurepromise应用实列future应用实列线程类(Thread)执行线程是一个指令序列,它可以在多线程环境中,与其他此类指令序列同时执行,同时共享......
  • 线程进程2--线程安全-死锁
    5.Thread类中常用的一些方法staticvoidsleep:使当前线程阻塞多少毫秒--线程休眠yield:当前线程让出cpu参与下次竞争--使用yield线程出现交换执行的频率变高了join加入当前线程上(插入的线程执行完毕后,当前的线程才会执行)setDaemon()设置线程为守护线程(当所有的线程执行完......
  • Qt之多线程编程(QThread)
    文章目录前言Qt多线程的基本使用如何移动线程常用的一些函数示例代码总结前言在现代计算机系统中,多线程编程已经成为一种常见的编程模式,它可以有效地利用多核处理器的计算能力,提高程序的执行效率。Qt作为一种跨平台的应用程序开发框架,提供了QThread类来支持多线程编......
  • 多线程
    多线程概念并发:两个事件在同一时间段进行并行:两个事件在同一时刻进行程序:代码(指令和数据),静态进程Process:执行程序的一次过程,动态,系统分配资源的最小单位线程Thread:负责执行当前进程中程序的运行,一个进程中至少有一个线程,CPU调度和执行的单位多线程:真正的多线程是多个CPU多......
  • 014文章解读与程序——中国电机工程学报,电力自动化设备EI\CSCD\北大核心《主动配 电
    ......
  • 快速排序的思路及核心函数
    Quick_Sort###思想选取数组段内的任意一个值$x$(可以是左边界值`a[r]`,右边界值`a[l]`,中间值`a[(l+r+1)/2]`)。进行排序,在数组段内,将比$x$小的数都放在$x$的左边,比$x$大的数都放在$x$的右边(双指针)。不断递归处理数组段的左右两段,使得任意一个数的左边都比它小,右边......
  • 多线程二-同步锁
    关于线程安全问题的简述多个线程做同一件事的时候原子性:Syncronized,AtomicXXX,Lock可见性:Syncronized,volatile有序性:Syncronized,volatile原子性问题代码演示了两个线程分别调用incr()方法来对i进行累加,预期结果应该是20000,但是实际结果却是小于等于20000的值,这就是线......