首页 > 其他分享 >ThreadPoolExecutor线程池参数设置技巧

ThreadPoolExecutor线程池参数设置技巧

时间:2023-02-01 17:35:28浏览次数:66  
标签:队列 池中 corePoolSize 任务 线程 执行 参数设置 ThreadPoolExecutor

一、ThreadPoolExecutor的重要参数

1、corePoolSize:核心线程数

* 核心线程会一直存活,及时没有任务需要执行

* 当线程数小于核心线程数时,即使有线程空闲,线程池也会优先创建新线程处理

* 设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭

2、maxPoolSize:最大线程数

* 当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务

* 当线程数=maxPoolSize,且任务队列已满时,线程池会拒绝处理任务而抛出异常

3、 keepAliveTime:线程空闲时间

* 当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize

* 如果allowCoreThreadTimeout=true,则会直到线程数量=0

4、allowCoreThreadTimeout:允许核心线程超时

5、queueCapacity:任务队列容量(阻塞队列)

* 当核心线程数达到最大时,新任务会放在队列中排队等待执行

(1)LinkedBlockingQueue

LinkedBlockingQueue是一个无界缓存等待队列。当前执行的线程数量达到corePoolSize的数量时,剩余的元素会在阻塞队列里等待。(所以在使用此阻塞队列时maximumPoolSizes就相当于无效了),每个线程完全独立于其他线程。生产者和消费者使用独立的锁来控制数据的同步,即在高并发的情况下可以并行操作队列中的数据。

这个队列需要注意的是,虽然通常称其为一个无界队列,但是可以人为指定队列大小,而且由于其用于记录队列大小的参数是int类型字段,所以通常意义上的无界其实就是队列长度为 Integer.MAX_VALUE,且在不指定队列大小的情况下也会默认队列大小为 Integer.MAX_VALUE。

(2)SynchronousQueue

SynchronousQueue没有容量,是无缓冲等待队列,是一个不存储元素的阻塞队列,会直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素。拥有公平(FIFO)和非公平(LIFO)策略,使用SynchronousQueue阻塞队列一般要求maximumPoolSizes为无界(Integer.MAX_VALUE),避免线程拒绝执行操作。

(3)ArrayBlockingQueue

ArrayBlockingQueue是一个有界缓存等待队列,可以指定缓存队列的大小,当正在执行的线程数等于corePoolSize时,多余的元素缓存在ArrayBlockingQueue队列中等待有空闲的线程时继续执行,当ArrayBlockingQueue已满时,加入ArrayBlockingQueue失败,会开启新的线程去执行,当线程数已经达到最大的maximumPoolSizes时,再有新的元素尝试加入ArrayBlockingQueue时会报错。

(4)DelayedWorkQueue

DelayedWorkQueue 的特点是内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序,内部采用的是“堆”的数据结构。之所以线程池 ScheduledThreadPool 和 SingleThreadScheduledExecutor 选择 DelayedWorkQueue,是因为它们本身正是基于时间执行任务的,而延迟队列正好可以把任务按时间进行排序,方便任务的执行。

6、ThreadFactory threadFactory 线程工厂

ThreadFactory 实际上是一个线程工厂,它的作用是生产线程以便执行任务。我们可以选择使用默认的线程工厂,创建的线程都会在同一个线程组,并拥有一样的优先级,且都不是守护线程,我们也可以选择自己定制线程工厂,以方便给线程自定义命名,不同的线程池内的线程通常会根据具体业务来定制不同的线程名。

* Executors.defaultThreadFactory()

7、rejectedExecutionHandler:任务拒绝处理器

* 两种情况会拒绝处理任务:

- 当线程数已经达到maxPoolSize,切队列已满,会拒绝新任务

- 当线程池被调用shutdown()后,会等待线程池里的任务执行完毕,再shutdown。如果在调用shutdown()和线程池真正shutdown之间提交任务,会拒绝新任务

* 线程池会调用rejectedExecutionHandler来处理这个任务。如果没有设置默认是AbortPolicy,会抛出异常

* ThreadPoolExecutor类有几个内部实现类来处理这类情况:

- AbortPolicy 丢弃任务,抛运行时异常

- CallerRunsPolicy 执行任务

- DiscardPolicy 忽视,什么都不会发生

- DiscardOldestPolicy 从队列中踢出最先进入队列(最后一个执行)的任务

* 实现RejectedExecutionHandler接口,可自定义处理器

二、ThreadPoolExecutor执行顺序

1、添加执行任务

    submit() 该方法返回一个Future对象,可执行带返回值的线程;或者执行想随时可以取消的线程。Future对象的get()方法获取返回值。Future对象的cancel(true/false)取消任务,未开始或已完成返回false,参数表示是否中断执行中的线程

    execute() 没有返回值。

2、线程池任务提交过程

一个线程提交到线程池的处理流程如下图

 

 

  •     如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
  •     如果此时线程池中的数量等于corePoolSize,但是缓冲队列workQueue未满,那么任务被放入缓冲队列。
  •     如果此时线程池中的数量大于等于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。
  •     如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。
  •     当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。

总结即:处理任务判断的优先级为 核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。

注意:

  •     当workQueue使用的是无界限队列时,maximumPoolSize参数就变的无意义了,比如new LinkedBlockingQueue(),或者new ArrayBlockingQueue(Integer.MAX_VALUE);
  •     使用SynchronousQueue队列时由于该队列没有容量的特性,所以不会对任务进行排队,如果线程池中没有空闲线程,会立即创建一个新线程来接收这个任务。maximumPoolSize要设置大一点。
  •     核心线程和最大线程数量相等时keepAliveTime无作用.

三、常用队列介绍

ArrayBlockingQueue: 这是一个由数组实现的容量固定的有界阻塞队列.

SynchronousQueue: 没有容量,不能缓存数据;每个put必须等待一个take; offer()的时候如果没有另一个线程在poll()或者take()的话返回false。

LinkedBlockingQueue: 这是一个由单链表实现的默认无界的阻塞队列。LinkedBlockingQueue提供了一个可选有界的构造函数,而在未指明容量时,容量默认为Integer.MAX_VALUE。

四、如何设置参数

1、默认值

* corePoolSize=1

* queueCapacity=Integer.MAX_VALUE

* maxPoolSize=Integer.MAX_VALUE

* keepAliveTime=60s

* allowCoreThreadTimeout=false

* rejectedExecutionHandler=AbortPolicy()

2、如何来设置

* 需要根据几个值来决定

- tasks :每秒的任务数,假设为500~1000

- taskcost:每个任务花费时间,假设为0.1s

- responsetime:系统允许容忍的最大响应时间,假设为1s

* 做几个计算

- corePoolSize = 每秒需要多少个线程处理?

* threadcount = tasks/(1/taskcost) =tasks*taskcout =  (500~1000)*0.1 = 50~100 个线程。corePoolSize设置应该大于50

* 根据8020原则,如果80%的每秒任务数小于800,那么corePoolSize设置为80即可

- queueCapacity = (coreSizePool/taskcost)*responsetime

* 计算可得 queueCapacity = 80/0.1*1 = 80。意思是队列里的线程可以等待1s,超过了的需要新开线程来执行

* 切记不能设置为Integer.MAX_VALUE,这样队列会很大,线程数只会保持在corePoolSize大小,当任务陡增时,不能新开线程来执行,响应时间会随之陡增。

- maxPoolSize = (max(tasks)- queueCapacity)/(1/taskcost)

* 计算可得 maxPoolSize = (1000-80)/10 = 92

* (最大任务数-队列容量)/每个线程每秒处理能力 = 最大线程数

- rejectedExecutionHandler:根据具体情况来决定,任务不重要可丢弃,任务重要则要利用一些缓冲机制来处理

- keepAliveTime和allowCoreThreadTimeout采用默认通常能满足

3、 以上都是理想值,实际情况下要根据机器性能来决定。如果在未达到最大线程数的情况机器cpu load已经满了,则需要通过升级硬件(呵呵)和优化代码,降低taskcost来处理。

标签:队列,池中,corePoolSize,任务,线程,执行,参数设置,ThreadPoolExecutor
From: https://www.cnblogs.com/linjiqin/p/17083542.html

相关文章

  • 解决多线程中线程安全问题方式三
    importjava.util.concurrent.locks.ReentrantLock;/***解决多线程中线程安全问题方式三:调用Lock接口1.因为Lock是接口,无法直接使用,所以使用Lock的实现类ReentrantLo......
  • Java线程池中的execute和submit
    一、概述execute和submit都是线程池中执行任务的方法。execute是Executor接口中的方法publicinterfaceExecutor{voidexecute(Runnablecommand);}submit是......
  • Java多线程:Future和FutureTask
    一、FutureFuture是一个接口,所有方法如下:上源码:packagejava.util.concurrent;publicinterfaceFuture<V>{booleancancel(booleanmayInterruptIfRunning);......
  • 8.线程管理
    一.可靠性:线程的未捕获异常与监控 如果线程的run方法抛出未被捕获的异常,那么随着run方法的退出,相应的线程也提前终止。对于线程的这种异常终止,可以通过Thread内部定义的U......
  • 多线程知识点
    1.理论产生死锁的四个条件互斥条件:一个资源同时只能被一个线程占用请求与保持条件:一个进程因请求资源而阻塞时,对已获得资源不释放不剥夺条件:一个进程已获得的资源,在不......
  • 使用Disruptor完成多线程下并发、等待、先后等操作
    Java完成多线程间的等待功能:场景1:一个线程等待其他多个线程都完成后,再进行下一步操作(如裁判员计分功能,需要等待所有运动员都跑完后,才去统计分数。裁判员和每个运动员都是一......
  • CompletableFuture使用IOC容器中自定义线程池
    CompletableFuture使用IOC容器中自定义线程池创建自定义线程池,并交给ioc容器管理@ConfigurationpublicclassThreadPoolConfig{@BeanpublicExecutora......
  • SimpleDateFromat 是线程安全的吗
    1,在SimpleDateFormat转换日期是通过Calendar对象来操作的,SimpleDateFormat继承DateFormat类,DateFormat类中维护一个Calendar对象,通过DateFormat类中的注释可知:此处Calenda......
  • netcore之异步并不是多线程!
    1、遇到await,线程的变化遇到await会把当前线程返回且返回值就是await后面的Task,再从线程池随机取一个线程往下执行代码。我们使用封装好的异步方法模拟写入大量字符串的......
  • i++在多线程下的原子性问题
     staticinti=0;@TestvoidiTest()throwsInterruptedException{Threadt1=newThread(()->{for(intj=0;j<50000;......