首页 > 其他分享 >并发-线程池[老的,有时间我重新整理一下]

并发-线程池[老的,有时间我重新整理一下]

时间:2022-11-16 10:38:08浏览次数:47  
标签:并发 队列 创建 阻塞 重新整理 任务 线程 执行


并发-线程池[老的,有时间我重新整理一下]

文章是直接从我本地word笔记粘贴过来的,排版啥的可能有点乱,凑合看吧,有时间我会慢慢整理

为什么要用线程池?

平时开发的时候基本不太用线程,用线程池更多一点

Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来3个好处。

第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。Java创建一个线程出来,虚拟机要去跟操作系统申请的,给这个线程去分配资源,也就说创建线程需要消耗资源的,线程销毁也要消耗资源.

第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。假设一个服务器完成一项任务所需时间为:
T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。

如果:T1 + T3 远大于 T2(就是任务运行时间小于线程创建的时间和线程销毁的时间),则可以采用线程池,以提高服务器性能。线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1,T3的开销了。意思就是直接从线程池里面拿线程,用完再放回线程池里面.

第三:提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。

假设一个服务器一天要处理50000个请求,并且每个请求需要一个单独的线程完成。在线程池中,线程数一般是固定的,所以产生线程总数不会超过线程池中线程的数目,而如果服务器不利用线程池来处理这些请求则线程总数为50000。一般线程池大小是远小于50000。所以利用线程池的服务器程序不会为了创建50000而在处理请求时浪费时间,从而提高效率。

ThreadPoolExecutor 的类关系

Executor是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开来。该接口就定义了一个方法,execute()方法,执行的方法

ExecutorService接口继承了Executor,在其上做了一些shutdown()、submit()的扩展,可以说是真正的线程池接口;(这个接口提供了一些与任务管理相关的方法)

AbstractExecutorService抽象类实现了ExecutorService接口中的大部分方法;(抽象类实现接口,这样的类叫做骨架类)
然后基于AbstractExecutorService这个抽象类在实现具体的实现类(比如ExecutorServiceAdapter , ThreadPoolExecutor , ForkJoinPool 等等)

ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务(开发的时候用这个的多)。

ScheduledExecutorService接口继承了ExecutorService接口,提供了带"周期执行"功能ExecutorService;

ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。ScheduledThreadPoolExecutor比Timer更灵活,功能更强大,取代了Timer。

Timer缺点:

  1. 线程不安全
  2. 多个任务只要有一个任务失败了,整个定时器就会被全部关闭了.

线程池的工作原理

当提交一个任务的时候,线程池创建一个新的线程执行任务(而不是一开始就创建好所有的线程,需要注意的是,必行这一步骤需要获取全局锁).
直到当前线程数量等于corePoolSize,如果等于corePoolSize后,如果还继续提交任务,就会把当前任务保存到阻塞队列(传入的workQueue)里面,等待被执行.
如果把阻塞队列(workQueue)也填满了,线程池会再启动新的线程去处理任务,如果处理完任务之后,还会从workQueue队列里面获取任务,但是,创建线程的数量是有限制的,不能超过maximumPoolSize这个值.
如果任务更多,新启动的线程即将超过maximumPoolSize的值,此时启动拒绝策略(RejectedExecutionHandler).
如果任务没了,线程都空下来了,此时超过corePoolSize的线程数量就没必要空闲着占用系统内存了,会在keepAliveTime时间后销毁,unit是keepAliveTime的时间单位.

线程池的创建各个参数含义

案例:ZJJ_JavaBasic_2020/02/09_11:41:16_qi6f5

![](/i/ll/?i=img_convert/1e048913d3a0ae89b9b04021eea1a202.png#align=left&display=inline&height=427&margin=[object Object]&originHeight=569&originWidth=658&status=done&style=none&width=494)
面试中线程池各个参数含义和如何合理配置线程池的参数也是很重要的,基本是面试必考的

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)

1.corePoolSize线程数量

线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;
如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中(就是方法传入的workQueue中),等待被执行;
如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。

2.maximumPoolSize最大线程池数量

线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize

3.keepAliveTime(线程空闲时的存活时间)

线程空闲时的存活时间,即当线程没有任务执行时,继续存活的时间。默认情况下,该参数只在线程数大于corePoolSize时才有用
TimeUnit
keepAliveTime的时间单位

4.workQueue(存放多余线程的队列)

workQueue
workQueue必须是BlockingQueue阻塞队列。当线程池中的线程数超过它的corePoolSize的时候,线程会进入阻塞队列进行阻塞等待。通过workQueue,线程池实现了阻塞功能

用于保存等待执行的任务的阻塞队列,一般来说,我们应该尽量使用有界队列,因为使用无界队列作为工作队列会对线程池带来如下影响。
1)当线程池中的线程数达到corePoolSize后,新任务将在无界队列中等待,因此线程池中的线程数不会超过corePoolSize。
2)由于1,使用无界队列时maximumPoolSize将是一个无效参数。
3)由于1和2,使用无界队列时keepAliveTime将是一个无效参数。
4)更重要的,使用无界queue可能会耗尽系统资源,有界队列则有助于防止资源耗尽,同时即使使用有界队列,也要尽量控制队列的大小在一个合适的范围。
所以我们一般会使用,ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、PriorityBlockingQueue。

5.threadFactory(创建线程的工厂)

zjj_parent_1a6872e1-985d-2c81-e9f4-dfd474903c86

百度了半天我感觉是用处不大.

创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名,当然还可以更加自由的对线程做更多的设置,比如设置所有的线程为守护线程。

Executors静态工厂里默认的threadFactory,线程的命名规则是“pool-数字-thread-数字”。

6.RejectedExecutionHandler(饱和策略)

![](/i/ll/?i=img_convert/0334a037d2e5197b101902c6cec43c53.png#align=left&display=inline&height=152&margin=[object Object]&originHeight=203&originWidth=681&status=done&style=none&width=511)
当提交的任务数大于corePoolSize时,会优先放到队列缓冲区,只有填满了缓冲区后,才会判断当前运行的任务是否大于maxPoolSize,小于时会新建线程处理。大于时就触发了拒绝策略,总结就是:当前提交任务数大于(maxPoolSize + queueCapacity)时就会触发线程池的拒绝策略了。

线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种实现类策略:

(1)AbortPolicy:直接抛出异常,默认策略;
(2)CallerRunsPolicy:用调用者所在的线程来执行任务(谁提交的任务由谁来执行,假如是主线程提交的任务由主线程来执行);
(3)DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
(4)DiscardPolicy:直接丢弃任务;

当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务,或者把当前任务放到数据库里面去,在未来的某个时刻拿出来重新执行。

扩展线程池

zjj_parent_e4200ed8-139d-dbd3-c1f2-641ad53c0225

能扩展线程池的功能吗?比如在任务执行的前后做一点我们自己的业务工作?

实际上,JDK 的线程池已经为我们预留的接口,在线程池核心方法中,有2 个方法是空的,就是给我们预留的。还有一个线程池退出时会调用的方法。
可以看到,每个任务执行前后都会调用 beforeExecute和 afterExecute方法。相当于执行了一个切面。而在线程池关闭了调用 shutdown 方法后则会调用 terminated 方法。

使用场景:

写日志, 比如每提交一个任务就记录到日志里面
或者统计啥的

它的实现方式就是类似Spring的AOP的实现

提交任务

execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。

submit()方法用于提交需要返回值的任务,并且希望在未来的某个时刻拿到返回值。线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。

关闭线程池

可以通过调用线程池的shutdown或shutdownNow方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。但是它们存在一定的区别:

shutdownNow 做法比较野蛮一点,首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表.

而shutdown只是将线程池的状态设置成SHUTDOWN状态,它不会去尝试中断正在执行任务的线程, 它只是会中断所有没有正在执行任务的线程。

shutdown终止空闲的线程 shutdownNow 不管你有没有在工作,都会调用线程的interrupt方法.

一般都是用shutdown来执行关闭线程,如果任务不要求一定要执行完调用shutdownNow方法也行.

只要调用了这两个关闭方法中的任意一个,isShutdown方法就会返回true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。

合理地配置线程池

要想合理地配置线程池,就必须首先分析任务特性

要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析。

任务的性质在计算机里面分为三种:CPU密集型任务 IO密集型任务 混合型任务。

CPU密集型

CPU是运算单元,如果你的任务是复杂的大量的计算,只是和内存打交道,不断的从内存中获取数据来做复杂的计算,比如字符串的正则匹配,大数的计算,加密解密,这些都是CPU密集型

CPU密集型意味着CPU很忙,比如说八核逻辑cpu ,如果我启动了10个线程来做任务,CPU很忙,就算你启动了10个线程,对于计算机来讲,它能同时处理的线程是8个线程(一核一个线程),启动了10个线程对于CPU来讲只能同时运行8个线程,

CPU密集型建议配置线程池线程数和CPU核心数一样.最多最多就是线程数加个1 ,比如八核CPU配置线程数为9

| //获取当前机器有多个CPU可以使用(jdk能看到的CPU,就是逻辑核心)

Runtime.getRuntime().availableProcessors();

CPU密集型任务应配置尽可能小的线程,如配置Ncpu+1个线程的线程池。由于IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如2*Ncpu。

IO密集型任务

比如说你和磁盘和网络有打交道的, 比如说读写文件(IO操作),数据库(网络)
比较难以配置了,如果想要配置最佳性能,需要监控操作系统的用户态和核心态的时间,把系统态降到越低越好,但是一般来讲这个是运维的活儿,而且需要持续的观察

如果做一个通用型的任务,或者是暂时不知道,一般配置服务器CPU核心数乘以2 的一个经验取值.如果你对任务类型进行时时刻刻监控,发现CPU还是很闲(即使线程池线程数量配置了CPU核心数乘以2),这时候你就可以考虑配置CPU核心数乘以3 或者是CPU核心数量乘以4都没有什么问题.

混合型任务

又有CPU密集,又有IO密集的任务,如果CPU计算时间和IO密集型计算时间相差不大的话,尽量拆分两个线程池来执行,CPU密集的用线程池线程核心数加1 ,IO密集型的就尽量给线程池多分配一下.

如果CPU密集,又有IO密集的任务差距太多的话
比如CPU密集型1秒钟, IO密集型需要10分钟,这样拆分就没什么意义了,就没必要拆分了.

配置队列容量

除了配置线程池线程数量,配置队列也很重要,禁止在线程池里面使用无界队列,如果当时我们设置成无界队列,那么线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。

建议使用有界队列。有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点儿,比如几千。
假设,我们现在有一个Web系统,里面使用了线程池来处理业务, 在某些情况下,系统里后台任务线程池的队列和线程池全满了,不断抛出抛弃任务的异常,通过排查发现是数据库出现了问题,导致执行SQL变得非常缓慢,因为后台任务线程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的工作线程全部阻塞,任务积压在线程池里,会造成OOM异常。
OOM异常一产生基本当面服务应用就停止了.如果拆分了分布式,就是当前模块儿停止了,很有可能就会造成架构方面的雪崩,同样的你的分布式系统的高可用性会大大降低,所以一定要用有界队列,增强系统的稳定性和预警能力.
有界队列就不会产生OOM异常了,第二个一旦队列满了就去执行拒绝策略,对于线上系统来讲,拒绝策略可以往线上的监控系统发通知,或者往表里面写一个数据,提示给程序员说 当前的任务紧张 ,程序员就会发现当前系统有问题了.

一般来讲有界队列的容量控制在几千差不多, 如果任务特别小的话,容量就控制在1万左右差不多,但是一定要控制住,防止OOM异常.

预定义线程池(都不建议使用)

建议了解,更多的是建议根据任务的性质来规定线程的数量(使用ThreadPoolExecutor)

1.FixedThreadPool

不要用这个,任务队列容量太大了,很有可能会把内存撑爆产生OOM异常.

创建使用固定线程数的FixedThreadPool的API。适用于为了满足资源管理的需求,而需要限制当前线程数量的应用场景,它适用于负载比较重的服务器。FixedThreadPool的corePoolSize和maximumPoolSize都被设置为创建FixedThreadPool时指定的参数nThreads。
当线程池中的线程数大于corePoolSize时,keepAliveTime为多余的空闲线程等待新任务的
最长时间,超过这个时间后多余的线程将被终止。这里把keepAliveTime设置为0L,意味着多余的空闲线程会被立即终止。
FixedThreadPool使用有界队列LinkedBlockingQueue作为线程池的工作队列(队列的容量为Integer.MAX_VALUE)。

| **public static **ExecutorService newFixedThreadPool(**int **nThreads) {

**return new **ThreadPoolExecutor(nThreads, nThreads,

0L, TimeUnit.**_MILLISECONDS_**,

**new **LinkedBlockingQueue<Runnable>());

}

2.SingleThreadExecutor

不要用这个,任务队列容量太大了,很有可能会把内存撑爆产生OOM异常.

创建使用单个线程的SingleThread-Executor的API,保证任务的串行执行(因为线程池里面只有一个线程);并且在任意时间点,不会有多个线程是活动的应用场景。

corePoolSize和maximumPoolSize被设置为1。其他参数与FixedThreadPool相同。SingleThreadExecutor使用有界队列LinkedBlockingQueue作为线程池的工作队列(队列的容量为Integer.MAX_VALUE)。

| **public static **ExecutorService newSingleThreadExecutor() {

**return new **FinalizableDelegatedExecutorService

(**new **ThreadPoolExecutor(1, 1,

0L, TimeUnit.**_MILLISECONDS_**,

**new **LinkedBlockingQueue<Runnable>()));

}

3.CachedThreadPool

不建议使用这个线程池,maximumPoolSize被设置了最大的线程,因为线程是系统昂贵的资源,你创建很多个线程都是系统昂贵的资源.

只要有一个任务来就会马上创建一个任务,corePoolSize被设置为0,即corePool为空;maximumPoolSize被设置为Integer.MAX_VALUE。这里把keepAliveTime设置为60L,意味着CachedThreadPool中的空闲线程等待新任务的最长时间为60秒,空闲线程超过60秒后将会被终止。

CachedThreadPool使用没有容量的(不存储元素的阻塞队列)SynchronousQueue作为线程池的工作队列,也就是说来一个任务后面必须要人接着才能往下走,来一个任务就会创建线程,来一个任务就会创建线程.

但CachedThreadPool的maximumPool是无界的。这意味着,如果主线程提交任务的速度高于maximumPool中线程处理任务的速度时,CachedThreadPool会不断创建新线程。极端情况下,CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源。

大小无界的线程池,适用于执行很多的短期异步任务的小程序,很快就执行完了,或者是负载较轻的服务器。
FixedThreadPool和SingleThreadExecutor使用有界队列LinkedBlockingQueue作为线程池的工作队列。

| **public static **ExecutorService newCachedThreadPool() {

**return new **ThreadPoolExecutor(0, Integer.**_MAX_VALUE_**,

60L, TimeUnit.**_SECONDS_**,

**new **SynchronousQueue<Runnable>());

}

4.WorkStealingPool

利用所有运行的处理器数目来创建一个工作窃取的线程池,使用forkjoin实现

5.ScheduledThreadPoolExecutor

执行定时任务的线程池

zjj_parent_2019/09/19_15:49:49_2immyifmub7gw35ge457l9rrmko8yz

使用工厂类Executors来创建。Executors可以创建2种类型的ScheduledThreadPoolExecutor,如下。

1.•ScheduledThreadPoolExecutor。包含若干个线程的ScheduledThreadPoolExecutor (就是多个线程执行并行执行任务)。
ScheduledThreadPoolExecutor适用于需要多个后台线程执行周期任务,同时为了满足资源管理的需求而需要限制后台线程的数量的应用场景。

2.•SingleThreadScheduledExecutor。只包含一个线程的ScheduledThreadPoolExecutor (单个线程串行执行任务)。
SingleThreadScheduledExecutor适用于需要单个后台线程执行周期任务,同时需要保证顺序地执行各个任务的应用场景。

定时任务种类

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
//向定时任务线程池提交一个延时Runnable任务(仅执行一次)

public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit);
//向定时任务线程池提交一个延时的Callable任务(仅执行一次)

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
//向定时任务线程池提交一个固定延时间隔执行的任务

假设说设置了3秒,不管上一个任务执行了多久,只有执行完了我才开始计时,等待3秒过后我才开始下一个任务的执行.

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
//向定时任务线程池提交一个固定时间间隔执行的任务

假设设置为6000ms执行一次,固定时间间隔执行的任务,从理论上说第二次任务在6000 ms后执行,第三次在 6000*2 ms后执行

比如说第一个任务是 一启动就开始执行任务 , 第二个任务是在6秒后执行,第三个任务是在12秒后执行

但是第一个任务执行了8秒,第二个马上就开始执行了(因为超过6秒了,但是只是执行了2秒,加上第一个任务就是10秒),第三个任务就固定等待了12秒,刚好是第三个任务正式开始执行的时间,也就说在提交定时任务的过程中,如果任务的处理时长超过了任务的设置间隔时长, 第二个任务不会抢先开始,会等待上一个任务执行完成了才开始下次的任务,下次任务会马上执行.如果任务处理时长小于定义的间隔时长,那么会一直等待间隔时长到了以后才去再次执行.

固定时间间隔执行的任务,虽然抛出了异常,但被捕捉了,next周期继续运行,如果不捕捉的,任务不会继续执行

注意也没有任何的异常信息,如果我们在实现的时候不捕获任何异常,schedule会把异常吞噬掉,所以在使用schedule的时候一定要注意使用try catch把所有的run方法代码都包裹起来,否则的话,出现了定时任务不跑,你都不知道发生了什么情况.你都看不到任何异常.

定时任务超时问题

scheduleAtFixedRate中,若任务处理时长超出设置的定时频率时长,本次任务执行完才开始下次任务,下次任务已经处于超时状态,会马上开始执行。
若任务处理时长小于定时频率时长,任务执行完后,定时器等待,下次任务会在定时器等待频率时长后执行。
如下例子:
设置定时任务每60s执行一次,那么从理论上应该第一次任务在第0s开始,第二次任务在第60s开始,第三次任务在120s开始,但实际运行时第一次任务时长80s,第二次任务时长30s,第三次任务时长50s,则实际运行结果为:
第一次任务第0s开始,第80s结束;
第二次任务第80s开始,第110s结束(上次任务已超时,本次不会再等待60s,会马上开始);
第三次任务第120s开始,第170s结束.
第四次任务第180s开始…

CompletionService

zjj_parent_2019/09/19_17:06:58_q4gdjf8vp6ks8anrckk9uuxu06wium

参考:
​javascript:void(0)​​

CompletionService实际上可以看做是Executor和BlockingQueue的结合体。CompletionService在接收到要执行的任务时,通过类似BlockingQueue的put和take获得任务执行的结果。

“如果向Executor提交了一组计算任务,并且希望在计算完成后获得结果,那么可以保留与每个任务关联的Future,然后反复使用get方法,同时将参数timeout指定为0,从而通过轮询来判断任务是否完成。这种方法虽然可行,但却有些繁琐。幸运的是,还有一种更好的方法:完成服务CompletionService。”

CompletionService实现了生产者提交任务和消费者获取结果的解耦,生产者和消费者都不用关心任务的完成顺序,由CompletionService来保证,消费者一定是按照任务完成的先后顺序来获取执行结果。

CompletionService的一个实现是ExecutorCompletionService,ExecutorCompletionService把具体的计算任务交给Executor完成。

在实现上,ExecutorCompletionService在构造函数中会创建一个BlockingQueue(使用的基于链表的LinkedBlockingQueue),该BlockingQueue的作用是保存Executor执行的结果。
当提交一个任务到ExecutorCompletionService时,首先将任务包装成QueueingFuture,它是FutureTask的一个子类,然后改写FutureTask的done方法,之后把Executor执行的计算结果放入BlockingQueue中。

与ExecutorService最主要的区别在于submit的task不一定是按照加入时的顺序完成的。CompletionService对ExecutorService进行了包装,内部维护一个保存Future对象的BlockingQueue。只有当这个Future对象状态是结束的时候,才会加入到这个Queue中,take()方法其实就是Producer-Consumer中的Consumer。它会从Queue中取出Future对象,如果Queue是空的,就会阻塞在那里,直到有完成的Future对象加入到Queue中。所以,先完成的必定先被取出。这样就减少了不必要的等待时间。

使用方法一,自己创建一个集合来保存Future存根并循环调用其返回结果的时候,主线程并不能保证首先获得的是最先完成任务的线程返回值。它只是按加入线程池的顺序返回。因为take方法是阻塞方法,后面的任务完成了,前面的任务却没有完成,主程序就那样等待在那儿,只到前面的完成了,它才知道原来后面的也完成了。

使用方法二,使用CompletionService来维护处理线程池的返回结果时,主线程总是能够拿到最先完成的任务的返回值,而不管它们加入线程池的顺序。

线程池状态

首先线程池是有状态的,这些状态标识这线程池内部的一些运行情况,线程池的开启到关闭的过程就是线程池状态的一个流转的过程。
线程池共有五种状态:

![](/i/ll/?i=img_convert/be154b85bf46526523439e034a363d12.png#align=left&display=inline&height=136&margin=[object Object]&originHeight=227&originWidth=813&status=done&style=none&width=488)

状态

含义

RUNNING

运行状态,该状态下线程池可以接受新的任务,也可以处理阻塞队列中的任务

执行 shutdown 方法可进入 SHUTDOWN 状态
执行 shutdownNow 方法可进入 STOP 状态 |
| SHUTDOWN | 待关闭状态,不再接受新的任务,继续处理阻塞队列中的任务
当阻塞队列中的任务为空,并且工作线程数为0时,进入 TIDYING 状态 |
| STOP | 停止状态,不接收新任务,也不处理阻塞队列中的任务,并且会尝试结束执行中的任务
当工作线程数为0时,进入 TIDYING 状态 |
| TIDYING | 整理状态,此时任务都已经执行完毕,并且也没有工作线程
|
| TERMINATED | 终止状态,此时线程池完全终止了,并完成了所有资源的释放 |

(一)重要属性

一个线程池的核心参数有很多,每个参数都有着特殊的作用,各个参数聚合在一起后将完成整个线程池的完整工作。

1、线程状态和工作线程数量
首先线程池是有状态的,不同状态下线程池的行为是不一样的,5种状态已经在上面说过了。
另外线程池肯定是需要线程去执行具体的任务的,所以在线程池中就封装了一个内部类 Worker 作为工作线程,每个 Worker 中都维持着一个 Thread。
线程池的重点之一就是控制线程资源合理高效的使用,所以必须控制工作线程的个数,所以需要保存当前线程池中工作线程的个数。
看到这里,你是否觉得需要用两个变量来保存线程池的状态和线程池中工作线程的个数呢?但是在 ThreadPoolExecutor 中只用了一个 AtomicInteger 型的变量就保存了这两个属性的值,那就是 ctl。
![](/i/ll/?i=img_convert/f98c90dd4fb3c3497541527c0d62b76e.png#align=left&display=inline&height=159&margin=[object Object]&originHeight=265&originWidth=706&status=done&style=none&width=424)

ctl 的高3位用来表示线程池的状态(runState),低29位用来表示工作线程的个数(workerCnt),为什么要用3位来表示线程池的状态呢,原因是线程池一共有5种状态,而2位只能表示出4种情况,所以至少需要3位才能表示得了5种状态。

2、核心线程数和最大线程数
现在有了标志工作线程的个数的变量了,那到底该有多少个线程才合适呢?线程多了浪费线程资源,少了又不能发挥线程池的性能。
为了解决这个问题,线程池设计了两个变量来协作,分别是:

核心线程数:corePoolSize 用来表示线程池中的核心线程的数量,也可以称为可闲置的线程数量

最大线程数:maximumPoolSize 用来表示线程池中最多能够创建的线程数量

现在我们有一个疑问,既然已经有了标识工作线程的个数的变量了,为什么还要有核心线程数、最大线程数呢?
其实你这样想就能够理解了,创建线程是有代价的,不能每次要执行一个任务时就创建一个线程,但是也不能在任务非常多的时候,只有少量的线程在执行,这样任务是来不及处理的,而是应该创建合适的足够多的线程来及时的处理任务。随着任务数量的变化,当任务数明显很小时,原本创建的多余的线程就没有必要再存活着了,因为这时使用少量的线程就能够处理的过来了,所以说真正工作的线程的数量,是随着任务的变化而变化的。
那核心线程数和最大线程数与工作线程个数的关系是什么呢?

![](/i/ll/?i=img_convert/c59aaddffb3ccee515e79b2420610840.png#align=left&display=inline&height=203&margin=[object Object]&originHeight=338&originWidth=709&status=done&style=none&width=425)

工作线程的个数可能从0到最大线程数之间变化,当执行一段时间之后可能维持在 corePoolSize,但也不是绝对的,取决于核心线程是否允许被超时回收。

3、创建线程的工厂
既然是线程池,那自然少不了线程,线程该如何来创建呢?这个任务就交给了线程工厂 ThreadFactory 来完成。

4、缓存任务的阻塞队列

上面我们说了核心线程数和最大线程数,并且也介绍了工作线程的个数是在0和最大线程数之间变化的。但是不可能一下子就创建了所有线程,把线程池装满,而是有一个过程,这个过程是这样的:
当线程池接收到一个任务时,如果工作线程数没有达到corePoolSize,那么就会新建一个线程,并绑定该任务,直到工作线程的数量达到 corePoolSize 前都不会重用之前的线程。
当工作线程数达到 corePoolSize 了,这时又接收到新任务时,会将任务存放在一个阻塞队列中等待核心线程去执行。为什么不直接创建更多的线程来执行新任务呢,原因是核心线程中很可能已经有线程执行完自己的任务了,或者有其他线程马上就能处理完当前的任务,并且接下来就能投入到新的任务中去,所以阻塞队列是一种缓冲的机制,给核心线程一个机会让他们充分发挥自己的能力。另外一个值得考虑的原因是,创建线程毕竟是比较昂贵的,不可能一有任务要执行就去创建一个新的线程。
所以我们需要为线程池配备一个阻塞队列,用来临时缓存任务,这些任务将等待工作线程来执行。

![](/i/ll/?i=img_convert/d12a332cb8b0719197adb7247120f2ee.png#align=left&display=inline&height=59&margin=[object Object]&originHeight=98&originWidth=765&status=done&style=none&width=459)

5、非核心线程存活时间

上面我们说了当工作线程数达到 corePoolSize 时,线程池会将新接收到的任务存放在阻塞队列中,而阻塞队列又两种情况:一种是有界的队列,一种是无界的队列。
如果是无界队列,那么当核心线程都在忙的时候,所有新提交的任务都会被存放在该无界队列中,这时最大线程数将变得没有意义,因为阻塞队列不会存在被装满的情况。
如果是有界队列,那么当阻塞队列中装满了等待执行的任务,这时再有新任务提交时,线程池就需要创建新的“临时”线程来处理,相当于增派人手来处理任务。
但是创建的“临时”线程是有存活时间的,不可能让他们一直都存活着,当阻塞队列中的任务被执行完毕,并且又没有那么多新任务被提交时,“临时”线程就需要被回收销毁,在被回收销毁之前等待的这段时间,就是非核心线程的存活时间,也就是 keepAliveTime 属性。
那么什么是“非核心线程”呢?是不是先创建的线程就是核心线程,后创建的就是非核心线程呢?
其实核心线程跟创建的先后没有关系,而是跟工作线程的个数有关,如果当前工作线程的个数大于核心线程数,那么所有的线程都可能是“非核心线程”,都有被回收的可能。
一个线程执行完了一个任务后,会去阻塞队列里面取新的任务,在取到任务之前它就是一个闲置的线程。
取任务的方法有两种,一种是通过 take() 方法一直阻塞直到取出任务,另一种是通过 poll(keepAliveTime,timeUnit) 方法在一定时间内取出任务或者超时,如果超时这个线程就会被回收,请注意核心线程一般不会被回收。
那么怎么保证核心线程不会被回收呢?还是跟工作线程的个数有关,每一个线程在取任务的时候,线程池会比较当前的工作线程个数与核心线程数:

如果工作线程数小于当前的核心线程数,则使用第一种方法取任务,也就是没有超时回收,这时所有的工作线程都是“核心线程”,他们不会被回收;

如果大于核心线程数,则使用第二种方法取任务,一旦超时就回收,所以并没有绝对的核心线程,只要这个线程没有在存活时间内取到任务去执行就会被回收。

所以每个线程想要保住自己“核心线程”的身份,必须充分努力,尽可能快的获取到任务去执行,这样才能逃避被回收的命运。
核心线程一般不会被回收,但是也不是绝对的,如果我们设置了允许核心线程超时被回收的话,那么就没有核心线程这种说法了,所有的线程都会通过 poll(keepAliveTime, timeUnit) 来获取任务,一旦超时获取不到任务,就会被回收,一般很少会这样来使用,除非该线程池需要处理的任务非常少,并且频率也不高,不需要将核心线程一直维持着。

6、拒绝策略
虽然我们有了阻塞队列来对任务进行缓存,这从一定程度上为线程池的执行提供了缓冲期,但是如果是有界的阻塞队列,那就存在队列满的情况,也存在工作线程的数据已经达到最大线程数的时候。如果这时候再有新的任务提交时,显然线程池已经心有余而力不足了,因为既没有空余的队列空间来存放该任务,也无法创建新的线程来执行该任务了,所以这时我们就需要有一种拒绝策略,即 handler。
拒绝策略是一个 RejectedExecutionHandler 类型的变量,用户可以自行指定拒绝的策略,如果不指定的话,线程池将使用默认的拒绝策略:抛出异常。
在线程池中还为我们提供了很多其他可以选择的拒绝策略:

  1. 直接丢弃该任务
  2. 使用调用者线程执行该任务
  3. 丢弃任务队列中的最老的一个任务,然后提交该任务

(二)工作流程

了解了线程池中所有的重要属性之后,现在我们需要来了解下线程池的工作流程了。

![](/i/ll/?i=img_convert/a1b5b3048b1486ef3954d5e1829759ed.png#align=left&display=inline&height=446&margin=[object Object]&originHeight=743&originWidth=812&status=done&style=none&width=487)

上图是一张线程池工作的精简图,实际的过程比这个要复杂的多,不过这些应该能够完全覆盖到线程池的整个工作流程了。
整个过程可以拆分成以下几个部分:

1、提交任务
当向线程池提交一个新的任务时,线程池有三种处理情况,分别是:创建一个工作线程来执行该任务、将任务加入阻塞队列、拒绝该任务。
提交任务的过程也可以拆分成以下几个部分:

  1. 当工作线程数小于核心线程数时,直接创建新的核心工作线程
  2. 当工作线程数不小于核心线程数时,就需要尝试将任务添加到阻塞队列中去
  3. 如果能够加入成功,说明队列还没有满,那么需要做以下的二次验证来保证添加进去的任务能够成功被执行
  4. 验证当前线程池的运行状态,如果是非RUNNING状态,则需要将任务从阻塞队列中移除,然后拒绝该任务
  5. 验证当前线程池中的工作线程的个数,如果为0,则需要主动添加一个空工作线程来执行刚刚添加到阻塞队列中的任务
  6. 如果加入失败,则说明队列已经满了,那么这时就需要创建新的“临时”工作线程来执行任务
  7. 如果创建成功,则直接执行该任务
  8. 如果创建失败,则说明工作线程数已经等于最大线程数了,则只能拒绝该任务了

整个过程可以用下面这张图来表示:

![](/i/ll/?i=img_convert/cbc652ebeb3e62d05ef5d47e8c9bd1dd.png#align=left&display=inline&height=482&margin=[object Object]&originHeight=804&originWidth=720&status=done&style=none&width=432)

2、创建工作线程

创建工作线程需要做一系列的判断,需要确保当前线程池可以创建新的线程之后,才能创建。
首先,当线程池的状态是 SHUTDOWN 或者 STOP 时,则不能创建新的线程。
另外,当线程工厂创建线程失败时,也不能创建新的线程。
还有就是当前工作线程的数量与核心线程数、最大线程数进行比较,如果前者大于后者的话,也不允许创建。
除此之外,会尝试通过 CAS 来自增工作线程的个数,如果自增成功了,则会创建新的工作线程,即 Worker 对象。
然后加锁进行二次验证是否能够创建工作线程,最后如果创建成功,则会启动该工作线程。
3、启动工作线程
当工作线程创建成功后,也就是 Worker 对象已经创建好了,这时就需要启动该工作线程,让线程开始干活了,Worker 对象中关联着一个 Thread,所以要启动工作线程的话,只要通过 worker.thread.start() 来启动该线程即可。
启动完了之后,就会执行 Worker 对象的 run 方法,因为 Worker 实现了 Runnable 接口,所以本质上 Worker 也是一个线程。
通过线程 start 开启之后就会调用到 Runnable 的 run 方法,在 worker 对象的 run 方法中,调用了 runWorker(this) 方法,也就是把当前对象传递给了 runWorker 方法,让他来执行。
4、获取任务并执行
在 runWorker 方法被调用之后,就是执行具体的任务了,首先需要拿到一个可以执行的任务,而 Worker 对象中默认绑定了一个任务,如果该任务不为空的话,那么就是直接执行。
执行完了之后,就会去阻塞队列中获取任务来执行,而获取任务的过程,需要考虑当前工作线程的个数。

  1. 如果工作线程数大于核心线程数,那么就需要通过 poll 来获取,因为这时需要对闲置的线程进行回收;
  2. 如果工作线程数小于等于核心线程数,那么就可以通过 take 来获取了,因此这时所有的线程都是核心线程,不需要进行回收,前提是没有设置 allowCoreThreadTimeOut

线上问题解决

(一)如果线上机器突然宕机,线程池中阻塞队列中的任务怎么办

如果说你要提交一个任务到线程池里面去,在提交之前,先在数据库里面插入这个任务信息,更新它的状态为未提交, 已提交, 已完成 信息.提交成功之后,更新它的状态是已提交状态.

如果宕机了,系统重启之后,后台开启一个线程去扫描数据库里面未提交和已提交状态任务,然后给任务读取出来,重新提交到线程池里面,继续进行执行.


标签:并发,队列,创建,阻塞,重新整理,任务,线程,执行
From: https://blog.51cto.com/u_14861909/5854939

相关文章

  • MySQL InnooDB引擎之并发事务问题以及隔离级别的作用和区别
    最近在复习MySQL事务,但网上很多博客和资料可以说讲的不是模棱两可就是只有文字描述不够形象易懂,下面通过我的学习来详细讲一讲事务并发都会引起哪些问题?以及隔离级别是什么......
  • 关于FastAPI异步并发的技术背景和细节
    FastAPI的路径操作函数,可以使用asyncdef定义:fromtypingimportUnionfromfastapiimportFastAPIapp=FastAPI()@app.get("/")asyncdefread_root():re......
  • 040_并发下集合类不安全
    目录并发下ArrayList不安全解决方案一:Vector(不推荐使用)解决方案二:Collections.synchronizedList()解决方案三:CopyOnWriteArrayList(推荐使用)并发下HashSet不安全解决方......
  • python 线程池 ThreadPoolExecutor
    从Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutor(线程池)和ProcessPoolExecutor(进程池)两个类。相比threading等模块,该模......
  • CS149ass2线程池+计算图
    CS149ass2线程池+计算图任务描述实现线程池,要求同时实现一个spin和sleep的线程池。要求实现一个计算图,任务按照顺序完成我的收获这个assign不算难,主要目的就是......
  • 接口性能指标-QPS-TPS-并发量
    转载:https://www.cnblogs.com/liuqingzheng/p/16207660.html1QPSQueriesPerSecond,每秒查询率,一台服务器每秒能够响应的查询次数。是对一个特定的查询服务器在规定......
  • 并发上传md5值不匹配解决方法
    因为同步分片上传对于大文件非常耗时,如果并发上传定会出现乱序,而某些厂商的云盘没有对分片并发上传做相应处理,导致上传后的文件与原文件md5值不匹配。以下给出我对此问题......
  • 异步编排 Spring(线程池)
    目录异步编排CompletableFuture的详解代码测试配置类的引入Demo1Demo2CompletableFuture的async后缀函数与不带async的函数的区别ThreadPoolTaskExecutor和ThreadPoolEx......
  • 异步和多线程有什么区别
    一、异步和多线程有什么区别?其实,异步是目的,而多线程是实现这个目的的方法。 多线程和异步操作两者都可以达到避免调用线程阻塞的目的,从而提高软件的可响应性。甚至......
  • 网络并发1
    今日内容总结软件开发架构规定了程序的请求逻辑、功能模块1.C/S架构 Client:客户端 Server:服务端"""我们使用计算机下载下来的一个个app本质是各大互联网公司的客......