首页 > 其他分享 >深入理解ThreadPoolExecutor

深入理解ThreadPoolExecutor

时间:2023-09-25 12:07:12浏览次数:30  
标签:策略 队列 corePoolSize 任务 理解 线程 深入 public ThreadPoolExecutor


在上节介绍ThreadPoolExecutor时,大部分参数中都很简单, 只有 workQueue和 handler需要进行详细说明。

队列

参数 workQueue指被提交但未执行的任务队列, 它是一个 BlockingQueue接口的对象,仅用于存放 Runnable对象。 根据队列功能分类, 在ThreadPoolExecutor的构造函数中可使用以下几种 BlockingQueue。

  • 直接提交的队列:该功能由 SynchronousQueue对象提供。 SynchronousQueue是一个特殊的 BlockingQueue。 SynchronousQueue没有容量,每一个插入操作都要等待一个相应的删除操作, 反之, 每一个删除操作都要等待对应的插入操作。 如果使用synchronousQueue, 提交的任务不会被真实的保存, 而总是将新任务提交给线程执行,如果没有空闲的进程, 则尝试创建新的进程, 如果进程数量已经达到最大值, 则执行拒绝策略。因此,使用 SynchronousQueue队列,通常要设置很大的 maximumPoolSize否则很容易执行拒绝策略。
  • 有界的任务队列:有界的任务队列可以使用 ArrayBlockingQueue实现。 当使用有限的 maximumPoolSizes 时,有界队列有助于防止资源耗尽,但是可能较难调整和控制。ArrayBlockingQueue的构造函数必须带一个容量参数,表示该队列的最大容量,如下所示public ArrayBlockingQueue(int capacity);当使用有界的任务队列时, 若有新的任务需要执行, 如果线程池的实际线程数小于corePoolSize,则会优先创建新的线程;若大于corePoolSize,则会将新任务加入等待队列;若等待队列已满,无法加入,则在总线程数不大于 maximumPoolSize的前提下,创建新的进程执行任;若大于 maximumPoolSize,则执行拒绝策略。可见,有界队列仅当在任务队列装满时,才可能将线程数提升到corePoolSize以上,换言之,除非系统非常繁忙, 否则确保核心线程数维持在 corePoolSize。
  • 无界的任务队列:无界任务队列可以通过 LinkedBlockingQueue类实现。与有界队列相比, 除非系统资源耗尽, 否则无界的任务队列不存在任务入队失败的情况 。 当有新的任务到来,系统的线程数小于corePoolSize时,线程池会生成新的线程执行任务,但当系统的线程数达到corePoolSize后,就不会继续增加。若后续仍有新的任务加入,而又没有空闲的线程资源, 则任务直接进入队列等待。 若任务创建和处理的速度差异很大, 无界队列会保持快速増长, 直到耗尽系统内存。
  • 优先任务队列:优先任务队列是带有执行优先级的队列。它通过 PriorityBlockingQueue实现, 可以控制任务的执行先后顺序 。 它是一个特殊的无界队列 。 无论是有界队ArrayBlockingQueue,还是未指定大小的无界队列 LinkedBlockingQueue都是按照先进先出算法处理任务的 。 而 PriorityBlockingQueue则可以根据任务自身的优先级顺序先后执行, 在确保系统性能的同时, 也能有很好的质量保证 (总是确保高优先级的任务先执行)。

现在再看下newFixedThreadPool()方法的实现。它返回了一个 corePoolSize和 maximumPoolSize大小一样的,并且使用了 LinkedBlockingQueue任务队列的线程池。因为对于固定大小的线程池而言,不存在线程数量的动态变化,因此corePoolSize和maximumPoolSize可以相等。同时,它使用无界队列存放无法立即执行的任务, 当任务提交非常频繁的时候, 该队列可能迅速膨胀,从而耗尽系统资源。

而newSingleThreadExecutor()返回的单线程线程池, 是 newFixedThreadPool()方法的一种退化,只是简单的将线程池线程数量设置为1 。

newCachedThreadPool()方法返回 corePoolSize为0, maximumPoolSize无穷大的线程池,这意味着在没有任务时, 该线程池内无线程, 而当任务被提交时, 该线程池会使用空闲的线程执行任务,若无空闲线程,则将任务加入 SynchronousQueue队列,而 SynchronousQueue队列是一种直接提交的队列, 它总会追使线程池增加新的线程执行任务 。 当任务执行完毕后, 由于corePoolSize为 0, 因此空闲线程又会在指定时间内(60秒)被回收。

拒绝策略

ThreadPoolExecutor的最后一个参数handler指定了拒绝策略 。也就是当任务数量超过系统实际承载能力时, 该如何处理呢?这时就要用到拒绝策略了 。拒绝策略可以说是系统超负荷运行时的补救措施, 通常由于压力太大而引起的, 也就是线程池中的线程已经用完了, 无法继续为新任务服务, 同时, 等待队列中也已经排满了, 再也塞不下新任务了 。这时我们就需要有一套机制, 合理地处理这个问题。

JDK内置提供了四种拒绝策略:

  • AbortPolicy策略:该策略会直接抛出运行时RejectedExecutionException异常,阻止系统正常工作。
  • CauerRunsPolicy策略:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。显然这样做不会真的丢弃任务,但是任务提交线程的性能极有可能会急剧下降。
  • DiscardOldestPolicy策略:该策略将丢弃最老的一个请求,也就是即将被执行的一个任务, 并尝试再次提交当前任务。
  • DiscardPolicy策略:该策略将丢弃无法处理的任务,不予任何处理。如果允许任务丢失,这可能是最好的一种方案

以上内置的策略均实现了 RejectedExecutionHandler接口, 若以上策略仍无法满足实际应用需要,完全可以自己扩展 RejectedExecutionHandler接口 ,RejectedExecutionHandler的定义如下:

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

下面列举一个代码简单说明一下

/**
 * 自定义线程池和拒绝策略
 * Created by xmr on 2018/9/5.
 */
public class RejectThreadPoolDemo {
    public static class MyTask implements Runnable {

        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID:" +
                    Thread.currentThread().getId());
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /*自定义线程池和拒绝策略*/
    public static void main(String[] args) throws InterruptedException {
        MyTask task = new MyTask();
        ExecutorService es = new ThreadPoolExecutor(5, 5,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(10),
                Executors.defaultThreadFactory(),//默认创建线程的实现方式,可以省略
                new RejectedExecutionHandler() {
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        System.out.println(r.toString() + " is discard");
                    }
                });
        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            es.submit(task);
            Thread.sleep(10);
        }
    }
}

扩展线程池

虽然 JDK已经帮我们实现了这个稳定的高性能线程池 。 但如果我们需要对这个线程池做一些扩展, JDK还是给了我们支持了,它提供了 beforeExecute()、afterExecute()和 terminated()三个接口对线程池进行控制 。

在默认的ThreadPoolExecutor实现中,提供了空的 beforeExecute0和 aferExecute0实现。在实际应用中, 可以对其进行扩展来实现对线程池运行状态的跟踪, 输出一些有用的调试信息, 以帮助系统故障诊断, 这对于多线程程序错误排査是很有帮助的 。 下面演示了对线程池的扩展

/**
 * 扩展线程池
 * Created by xmr on 2018/9/5.
 */
public class ExtThreadPool {
    public static class MyTask implements Runnable {
        public String name;

        public MyTask(String name) {
            this.name = name;
        }

        public void run() {
            System.out.println("正在执行 : Thread ID:" +
                    Thread.currentThread().getId() + ",Task name= " + name);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = new ThreadPoolExecutor(5, 5,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>()) {
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("准备执行: " + ((MyTask) r).name);
            }

            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println("执行完成: " + ((MyTask) r).name);
            }

            @Override
            protected void terminated() {
                System.out.println("线程池退出");
            }
        };

        for (int i = 0; i < 5; i++) {
            MyTask task = new MyTask("TASK-" + i);
            es.execute(task);
            Thread.sleep(10);
        }
        es.shutdown();
    }
}

 


标签:策略,队列,corePoolSize,任务,理解,线程,深入,public,ThreadPoolExecutor
From: https://blog.51cto.com/u_6947107/7594178

相关文章

  • MySQL实战实战系列 04 深入浅出索引(下)
    在上一篇文章中,我和你介绍了InnoDB索引的数据结构模型,今天我们再继续聊聊跟MySQL索引有关的概念。 在开始这篇文章之前,我们先来看一下这个问题: 在下面这个表T中,如果我执行select*fromTwherekbetween3and5,需要执行几次树的搜索操作,会扫描多少行? 下面是这......
  • 理解并掌握C#的Channel:从使用案例到源码解读(一)
    引言在C#的并发编程中,Channel是一种非常强大的数据结构,用于在生产者和消费者之间进行通信。本文将首先通过一个实际的使用案例,介绍如何在C#中使用Channel,然后深入到Channel的源码中,解析其内部的实现机制。使用案例一:文件遍历和过滤在我们的使用案例中,我们需要遍历一个文件夹及......
  • 宏观上理解blazor中的表单验证
    概述表单验证的最终效果大家都懂,这里不阐述了,主要从宏观角度说说blazor中表单验证框架涉及到的类,以及它们是如何协作的,看完这个,再看官方文档也许能更轻松点。blazor中的验证框架分为两部分:基础验证框架和基于数据注释Atrrbute的验证器,当然也提供了很多扩展点。注意我们通常使......
  • JavaWeb中对于 request对象和response对象的理解
    1.request对象和response对象的原理1.request和response对象是由服务器创建的。我们来使用它们2.request对象是来获取请求消息,response对象是来设置响应消息2.request对象继承体系结构:ServletRequest--接口|继承HttpServletRequest--接口|实现org.apache.catali......
  • 深入探讨Spring Security的单点注销
    前言在现代Web应用程序中,单点登录(SSO)是一种常见的身份验证机制。它允许用户使用一组凭据(例如用户名和密码)登录到多个应用程序中,而无需在每个应用程序中都进行身份验证。然而,当用户注销时,他们可能希望注销所有应用程序,而不仅仅是当前应用程序。这就是单点注销的作用。在本文中,我们......
  • 深入探讨Spring Boot中的Redis缓存
    介绍Redis是一种高性能的内存数据库,常用于缓存和消息队列等场景。在SpringBoot中,我们可以通过集成Redis来实现缓存功能。本文将深入探讨SpringBoot中的Redis缓存。集成Redis在SpringBoot中,我们可以通过添加以下依赖来集成Redis:<dependency><groupId>org.springframewor......
  • Go指针探秘:深入理解内存与安全性
    Go指针为程序员提供了对内存的深入管理能力,同时确保了代码的安全性。本文深入探讨了Go指针的基础概念、操作、深层理解及其特性与限制。通过深入了解其设计哲学和应用,我们可以更好地利用Go的强大功能。关注公众号【TechLeadCloud】,分享互联网架构、云服务技术的全维度知识。作......
  • 深入了解Java中的StringJoiner类
    在Java编程中,字符串的拼接是一个常见的操作。Java提供了多种方法来实现字符串拼接,其中之一就是StringJoiner类。本文将详细介绍StringJoiner的用法和功能。StringJoiner简介StringJoiner是Java8引入的一个用于拼接字符串的工具类。它允许我们以指定的分隔符将一组字符串连接成一个......
  • ThreadPoolExecutor线程池
    ......
  • spring事务控制的原理解析2
    上一篇:[spring事务控制的原理解析1](https://www.cnblogs.com/chengxuxiaoyuan/p/16581334.html)上一篇中总结了在spring中事务控制的基本原理,这一篇来记录下在spring源码中是如何实现的。一、TransactionInterceptorspring中的事务控制是通过aop实现的,提到aop肯定会有一个......