首页 > 其他分享 >ThreadPoolExecutor使用和思考(上)-线程池大小设置与BlockingQueue的三种实现区别

ThreadPoolExecutor使用和思考(上)-线程池大小设置与BlockingQueue的三种实现区别

时间:2022-10-06 12:40:35浏览次数:83  
标签:队列 queue 任务 线程 new BlockingQueue ThreadPoolExecutor


前记:

 

  1. jdk官方文档(javadoc)是学习的最好,最权威的参考。
  2. corePoolSize和maximumPoolSize,BlockingQueue选型(SynchronousQueue,LinkedBlockingQueue,​​ArrayBlockingQueue​);中篇中主要聊聊与keepAliveTime这个参数相关的话题;下片中介绍一下一些比较少用的该类的API,及他的近亲:ScheduledThreadPoolExecutor。
  3. 如果理解错误,请直接指出。

 

 

查看JDK帮助文档,可以发现该类比较简单,继承自AbstractExecutorService,而AbstractExecutorService实现了ExecutorService接口。

 

ThreadPoolExecutor的完整构造方法的签名是:

 

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler​

 

先记着,后面慢慢解释。

 

===============================神奇分割线==================================

 

从Executors这个类入手。因为他的几个构造工厂构造方法名字取得令人很容易了解有什么特点。但是其实Executors类的底层实现便是ThreadPoolExecutor!

 

ThreadPoolExecutor是Executors类的底层实现。

 

在JDK帮助文档中,有如此一段话:

强烈建议程序员使用较为方便的 ​​Executors​​ 工厂方法 ​​Executors.newCachedThreadPool()​​(无界线程池,可以进行自动线程回收)、​​Executors.newFixedThreadPool(int)​​(固定大小线程池)和​​Executors.newSingleThreadExecutor()​​(单个后台线程),它们均为大多数使用场景预定义了设置。”

 

可以推断出ThreadPoolExecutor与Executors类必然关系密切。

 

===============================神奇分割线==================================

 

 

OK,那就来看看源码吧,从newFixedThreadPool开始。

 

ExecutorService newFixedThreadPool(int nThreads):固定大小线程池。

 

corePoolSize和maximumPoolSize的大小是一样的(实际上,后面会介绍,如果使用无界queue的话maximumPoolSize参数是没有意义的),keepAliveTime和unit的设值表名什么?-就是该实现不想keep alive!最后的BlockingQueue选择了LinkedBlockingQueue,该queue有一个特点,他是无界的。

 


Java代码    ​

ThreadPoolExecutor使用和思考(上)-线程池大小设置与BlockingQueue的三种实现区别_文档



  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2. return new ThreadPoolExecutor(nThreads, nThreads,
  3. 0L, TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue<Runnable>());
  5. }


 

ExecutorService newSingleThreadExecutor():单线程。

 

很像,只不过fixedThreadPool中的入参直接退化为1

 

 


Java代码    ​

ThreadPoolExecutor使用和思考(上)-线程池大小设置与BlockingQueue的三种实现区别_文档



  1. public static ExecutorService newSingleThreadExecutor() {
  2. return new FinalizableDelegatedExecutorService
  3. new ThreadPoolExecutor(1, 1,
  4. 0L, TimeUnit.MILLISECONDS,
  5. new LinkedBlockingQueue<Runnable>()));
  6. }

 

 

ExecutorService newCachedThreadPool():无界线程池,可以进行自动线程回收。

 

maximumPoolSize为big big。其次BlockingQueue的选择上使用SynchronousQueue。可能对于该BlockingQueue有些陌生,简单说:该QUEUE中,每个插入操作必须等待另一个

线程的对应移除操作。比如,我先添加一个元素,接下来如果继续想尝试添加则会阻塞,直到另一个线程取走一个元素,反之亦然。(想到什么?就是缓冲区为1的生产者消费者模式^_^)

corePoolSize和maximumPoolSize的大小不同。

 

 


Java代码    ​

ThreadPoolExecutor使用和思考(上)-线程池大小设置与BlockingQueue的三种实现区别_文档



  1. public static ExecutorService newCachedThreadPool() {
  2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  3. 60L, TimeUnit.SECONDS,
  4. new SynchronousQueue<Runnable>());
  5. }

 

===============================神奇分割线==================================

 

到此如果有很多疑问,那是必然了(除非你也很了解了)

 

​BlockingQueue​​​<​​Runnable​​> workQueue这个入参开始说起。在JDK中,其实已经说得很清楚了,一共有三种类型的queue。以下为引用:(我会稍微修改一下,并用红色突出显示)

 

 


​BlockingQueue​

 都可用于传输和保持提交的任务。可以使用此队列与池大小进行交互:

  • 如果当前运行的线程小于corePoolSize,则任务根本不会存放,添加到queue中,而是直接抄家伙(thread)开始运行)
  • 首选将请求加入队列,而不添加新的线程
  • 则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。

先不着急举例子,因为首先需要知道queue上的三种类型。



排队有三种通用策略:


  1. 直接提交。工作队列的默认选项是

​SynchronousQueue​

  1. ,它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
  2. 无界队列。使用无界队列(例如,不具有预定义容量的

​LinkedBlockingQueue​

  1. )将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
  2. 有界队列。当使用有限的 maximumPoolSizes 时,有界队列(如

​ArrayBlockingQueue​

  1. )有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。

 

===============================神奇分割线==================================

 

到这里,该了解的理论已经够多了,可以调节的就是corePoolSize和maximumPoolSizes 这对参数还有就是BlockingQueue的选择。

 

例子一:使用直接提交策略,也即SynchronousQueue。

 

由于该Queue本身的特性,在某次添加元素后必须等待其他线程取走后才能继续添加。在这里不是核心线程便是新创建的线程,但是我们试想一样下,下面的场景。

 

我们使用一下参数构造ThreadPoolExecutor:

 

 



Java代码    ​

ThreadPoolExecutor使用和思考(上)-线程池大小设置与BlockingQueue的三种实现区别_文档



  1. new ThreadPoolExecutor(
  2. 2, 3, 30, TimeUnit.SECONDS,
  3. new <span style="white-space: normal;">SynchronousQueue</span><Runnable>(),
  4. new RecorderThreadFactory("CookieRecorderPool"),
  5. new ThreadPoolExecutor.CallerRunsPolicy());



 当核心线程已经有2个正在运行.

 

  1. 首选将请求加入队列,而不添加新的线程。”,所以A被添加到queue中。
  2. 又来了一个任务(B),且核心2个线程还没有忙完,OK,接下来首先尝试1中描述,但是由于使用的SynchronousQueue,所以一定无法加入进去。
  3. 则创建新的线程,除非创建此线程超出maximumPoolSize,在这种情况下,任务将被拒绝。”,所以必然会新建一个线程来运行这个任务。
  4. 暂时还可以,但是如果这三个任务都还没完成,连续来了两个任务,第一个添加入queue中,后一个呢?queue中无法插入,而线程数达到了maximumPoolSize,所以只好执行异常策略了。

此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。





什么意思?如果你的任务A1,A2有内部关联,A1需要先运行,那么先提交A1,再提交A2,当使用SynchronousQueue我们可以保证,A1必定先被执行,在A1么有被执行前,A2不可能添加入queue中





例子二:使用无界队列策略,即LinkedBlockingQueue





这个就拿 newFixedThreadPool来说,根据前文提到的规则:



 写道



如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。



 那么当任务继续增加,会发生什么呢?



 写道

 


如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。



 OK,此时任务变加入队列之中了,那什么时候才会添加新线程呢?

 



 写道



如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。



换句说,永远也不会触发产生新的线程!corePoolSize大小的线程数会一直运行,忙完当前的,就从队列中拿任务开始运行。所以要防止任务疯长,比如任务运行的实行比较长,而添加任务的速度远远超过处理任务的时间,而且还不断增加,如果任务内存大一些,不一会儿就爆了,呵呵。

 

可以仔细想想哈。

 

例子三:有界队列,使用ArrayBlockingQueue。

 

这个是最为复杂的使用,所以JDK不推荐使用也有些道理。与上面的相比,最大的特点便是可以防止资源耗尽的情况发生。

 

举例来说,请看如下构造方法:

 



Java代码    ​

ThreadPoolExecutor使用和思考(上)-线程池大小设置与BlockingQueue的三种实现区别_文档



  1. new ThreadPoolExecutor(
  2. 2, 4, 30, TimeUnit.SECONDS,
  3. new ArrayBlockingQueue<Runnable>(2),
  4. new RecorderThreadFactory("CookieRecorderPool"),
  5. new ThreadPoolExecutor.CallerRunsPolicy());



假设,所有的任务都永远无法执行完。

 

对于首先来的A,B来说直接运行,接下来,如果来了C,D,他们会被放到queu中,如果接下来再来E,F,则增加线程运行E,F。但是如果再来任务,队列无法再接受了,线程数也到达最大的限制了,所以就会使用拒绝策略来处理。

 

总结:

  1. ThreadPoolExecutor的使用还是很有技巧的。
  2. 使用无界queue可能会耗尽系统资源。
  3. 使用有界queue可能不能很好的满足性能,需要调节线程数和queue大小
  4. 线程数自然也有开销,所以需要根据不同应用进行调节。

通常来说对于静态任务可以归为:



  1. 数量大,但是执行时间很短
  2. 数量小,但是执行时间较长
  3. 数量又大执行时间又长
  4. 除了以上特点外,任务间还有些内在关系

看完这篇问文章后,希望能够可以选择合适的类型了

标签:队列,queue,任务,线程,new,BlockingQueue,ThreadPoolExecutor
From: https://blog.51cto.com/jdsjlzx/5733462

相关文章

  • 多线程创建
    创建多线程的方式一:继承Thread类 创建多线程的方式二:实现Runnable接口 创建多线程的方式三:实现Callable接口  ......
  • 初始多线程
    初始多线程一、基本概念1.1应用程序以Windows为例,一个拓展名为.exe的文件就是一个应用程序,应用程序是能够双击运行的。1.2进程应用程序运行起来就创建了一个进......
  • 并发学习记录17:tomcat线程池
    tomcat在哪里用到了线程池LimitLatch用来限流,可以控制最大连接个数acceptor负责接收新的socket连接poller负责监听socketchannel是否有可读的IO事件一旦有可读的IO......
  • JUC必要掌握(Synchronized,Lock,可重入锁ReentrantLock,可重入锁,读写锁,自旋锁,线程间通信,集
    本文已参与「新人创作礼」活动,一起开启掘金创作之路1.锁(Synchronized和lock)1.1Synchronized(1)Synchronized是Java内置的关键字,是Java内置的锁机制。(2)Synchronized的作......
  • 喜提JDK的BUG一枚!多线程的情况下请谨慎使用这个类的stream遍历。
    你好呀,我是歪歪。前段时间在RocketMQ的ISSUE里面冲浪的时候,看到一个pr,虽说是在RocketMQ的地盘上发现的,但是这个玩意吧,其实和RocketMQ没有任何关系。纯纯的就是......
  • 记一次 .NET 某工控数据采集平台 线程数 爆高分析
    一:背景1.讲故事前几天有位朋友在B站加到我,说他的程序出现了​​线程数​​爆高的问题,让我帮忙看一下怎么回事,截图如下:说来也奇怪,这些天碰到了好几起关于线程数无缘无故......
  • 记一次 .NET 某新能源系统 线程疯涨 分析
    一:背景1.讲故事前段时间收到一个朋友的求助,说他的程序线程数疯涨,寻求如何解决。等我分析完之后,我觉得这个问题很有代表性,所以拿出来和大家分享下,还是上老工具WinDbg。二:W......
  • 线程
    线程继承Thread类packagecom.teatea.demo01;//创建线程方程一:继承Threa类,重写run方法,调用start开启线程publicclassTestThread1extendsThread{@Override......
  • 并发学习记录16:任务调度线程池
    在任务调度池功能加入之前,可以使用java.util.Timer来实现定时功能,Timer的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能......
  • Linux多线程服务端编程 pdf
    高清文字版下载链接:https://pan.baidu.com/s/1Ar0sbiycp70BdNysXfkg2w点击这里获取提取码 ......