Java线程池解析
在Java中有两种方式创建线程池,一种是直接使用Executors工具类创建预先定义好的线程池。一共有以下四种线程池
-
newCachedThreadPool:可缓存的无边界的线程池,最大线程数Integer.MAX_VALUE
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
-
newFixedThreadPool:定长的线程池,超出则等待,线程不可扩容
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
-
newScheduledThreadPool:定长的线程池,支持定时及周期性任务的执行。最大线程数Integer.MAX_VALUE。
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue()); }
-
newSingleThreadExecutor:单线程的线程池,保证所有任务FIFO执行。
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
可以看到这四个线程池中涉及到了三个阻塞队列,根据阻塞队列的不同,线程池也提供了不一样的功能
-
SynchronousQueue:这是一个无缓冲的队列,意思是不会存储元素,每个插入操作必须等待移除操作完成,它没有容量的概念,因此被称作“传递性”队列。当线程池中不允许任务等待,需要立刻创建新线程执行时使用。newCachedThreadPool使用了这个队列,所以在newCachedThreadPool中,如果线程没有空闲的,会立刻创建新线程来执行新任务。
-
LinkedBlockingQueue:基于链表的阻塞队列,按照FIFO的原则对元素进行排序,内部使用锁机制来保证多线程的安全性,可以设置容量大小。
-
DelayedWorkQueue:支持延时获取元素的阻塞队列,是一个基于堆的阻塞队列,插入删除的效率都是logN,可以理解为优先级队列,可以为每个元素设置到期时间,只有延迟时间到了,才能取走。适用于那些需要延迟处理的任务,如缓存失效、任务调度等。
讲完了内置的预定义的四个线程池,但是一般开发中不会使用这四个线程池,因为内置的不可变参数可能导致程序产生问题,例如线程数无限增加,一般来说我们希望定义一个任何操作都是可控的线程池,所以一般使用ThreadPoolExecutor进行线程池的创建。
使用ThreadPoolExecutor创建线程池
使用ThreadPoolExecutor创建线程池需要指定7个参数
-
核心线程数corePoolSize
-
最大线程数maximumPoolSize
-
线程生存时间keepAliveTime
-
线程生存时间的单位TimeUnit
-
阻塞队列BlockingQueue
-
线程工厂ThreadFactory
-
拒绝策略RejectedExecutionHandler
前面四个参数顾名思义,这里主要讲一下后面三个参数
阻塞队列
Java常用的阻塞队列一般有八个
ArrayBlockingQueue:基于数组的阻塞队列,需要初始化大小,因此是有界的。
LinkedBlockingQueue:基于链表的阻塞队列,最大长度MAX_VALUE,因此可以理解为无解的,可以指定大小。(常用)内部使用锁保证多线程安全性。
PriorityBlockingQueue:优先级队列,可以自定义比较器
DelayQueue:延迟队列,也可以理解为优先级队列
DelayedWorkQueue:实现和DelayQueue一样,不过是将优先级队列和DelayQueue的实现过程迁移到本身方法体中,从而可以在该过程当中灵活的加入定时任务特有的方法调用。可以理解为DelayQueue增强版。
SynchronouseQueue:上面说的“传递性”队列
LinkedTransferQueue:实现了一个重要功能:如果有等待的空闲线程,put操作直接将任务传递给线程,不会入队,一定程度上实现了插队的功能。可以理解为 LinkedBolckingQueue
和 SynchronousQueue
和合体。
LinkedBlockingDeque:顾名思义,是一个双向链表的队列,支持FIFIO和FILO操作,可以在队头和队尾同时操作。
线程工厂
线程工程是一个接口,内容很简单:收一个Runnable对象,并将其封装到Thread对象中,进行执行。
public interface ThreadFactory {
/**
* Constructs a new {@code Thread}. Implementations may also initialize
* priority, name, daemon status, {@code ThreadGroup}, etc.
*
* @param r a runnable to be executed by new thread instance
* @return constructed thread, or {@code null} if the request to
* create a thread is rejected
*/
Thread newThread(Runnable r);
}
因此我们可以自定义一个线程工厂,实现这个接口就行
public class MyFactory implements ThreadFactory{
@Override
public Thread newThread(Runnable r) {
return new Thread(r);
}
}
实际使用过程中可以添加自定义功能:例如线程名称、线程计数、线程总数控制等。
拒绝策略
拒绝策略是指,如果最大线程数到了并且等待队列满了之后,新到的任务应该怎么处理,JUC中定义了四种拒绝策略,如下:
-
AbortPolicy
:拒绝并抛出异常 -
CallerRunsPolicy
:拒绝并且调用者运行策略,谁给我的谁运行 -
DiscardOldestPolicy
:抛弃最早未处理任务 -
DiscardPolicy
:直接丢弃任务,啥也不管
讲完了线程池的概述,我们来讲一下线程池的运行逻辑
运行逻辑
运行逻辑是指一个新的任务提交,应该怎么做?
-
是否达到核心线程数?
-
没有达到则创建一个新的线程执行任务
-
注意:如果没有达到核心线程数,但是有空闲线程,是创建新的还是复用旧的?答案是创建新的线程。
-
-
是否队列已满?
- 没有满就加入工作队列中
-
是否达到最大线程数?
-
没有达到最大线程数,就会创建新线程执行任务
-
达到了就进入拒绝策略
-
也就是说,判断顺序是:是否大于核心线程数---队列是否已满---是否大于最大线程数
关于线程池的一些问题?
-
如果队列中的任务失效了怎么处理?
-
使用Future.cancel是可以取消一个任务的,原理是向正在运行任务的线程发送中断指令,即Interrupt方法。
-
如果任务没有开始执行,会仍然存在队列中,直到被拉取,拉取时发现Interrupt后不会执行。
-
如果任务正在运行切支持响应中断,会抛出
InterruptedException
并提前中止。
-
-
如果线程执行过程中异常停止了怎么办?线程池怎么处理异常?
首先提交任务可以使用execute方法或者submit方法
-
execute方法不会抛出异常给调用者,异常会被吞掉,但是可以重新uncaughtException方法进行异常主动处理。并且线程池内部会打印异常信息,这里指的被吞掉是指调用者无法感知。
-
submit方法会返回一个Future对象,抛出的异常会被封装在内,使用get获取返回值的时候会抛出异常。出现异常之后线程池也不会打印异常信息。
其次一个线程出现异常后,这个线程会被销毁,创建一个新的干净的线程放到线程池中。销毁线程是为了防止旧的线程中有脏数据影响新任务执行。
-
-
如何优雅的关闭一个线程池?
首先调用shutdown方法发送关闭指令,启动一个有序的关闭过程,已经提交的任务会继续执行,但是不会接收新的任务。
其次等待一定的可接受的时间threadPool.awaitTermination(为了给正在运行的任务时间),超时之后调用shutdownNow方法,强行退出,退出原理是给线程打上中断,但是中断不意味着能终止。
然后再等待一定的时间,还没有结束就做失败处理。。。
-
线程池是怎么知道线程完成了任务空闲了?
这个问题其实可以转变一下思路,改成
线程是怎么实现复用的?空闲就是线程复用呗
线程在线程池内部其实封装成了一个worker对象
Worker extends AbstractQueuedSynchronizer
可以看到继承了AQS,所以Worker自身是具有锁的特性的。在创建 Worker 对象的时候,会把线程和任务一起封装到 Worker 内部,然后调用 runWorker 方法来让线程执行任务
实现线程复用的逻辑就在runWorker方法中,由于使用了while循环,当第一个任务执行完成后,会不断通过getTask获取任务,只要能获取到任务,就会调用run方法执行任务。
并且getTask内部也是一个死循环,除非被一些原因退出(生存时间到、线程池关闭等)
如果获取不到getTask退出,就会执行finally中的processWorkerExit方法,将线程退出。
因为 Worker 继承了 AQS,每次在执行任务之前都会调用 Worker 的 lock 方法,执行完任务之后,会调用 unlock 方法,这样做的目的就可以通过 Woker 的加锁状态判断出当前线程是否正在执行任务。
如果想知道线程是否空闲,只需要调用Worker的tryLock方法,加锁成功就是空闲。
final void runWorker(Worker w) { // 获取当前工作线程 Thread wt = Thread.currentThread(); // 从 Worker 中取出第一个任务 Runnable task = w.firstTask; w.firstTask = null; // 解锁 Worker(允许中断) w.unlock(); boolean completedAbruptly = true; try { // 当有任务需要执行或者能够从任务队列中获取到任务时,工作线程就会持续运行 while (task != null || (task = getTask()) != null) { // 锁定 Worker,确保在执行任务期间不会被其他线程干扰 w.lock(); // 如果线程池正在停止,并确保线程已经中断 // 如果线程没有中断并且线程池已经达到停止状态,中断线程 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 在执行任务之前,可以插入一些自定义的操作 beforeExecute(wt, task); Throwable thrown = null; try { // 实际执行任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 执行任务后,可以插入一些自定义的操作 afterExecute(task, thrown); } } finally { // 清空任务,并更新完成任务的计数 task = null; w.completedTasks++; // 解锁 Worker w.unlock(); } } completedAbruptly = false; } finally { // 工作线程退出的后续处理 processWorkerExit(w, completedAbruptly); } }
-
如何设置线程池的核心线程数和最大线程数?
根据任务类型,如果是IO密集型,设置为两倍的CPU核心数,如果是CPU密集型,设置为CPU核心数加1。核心线程数一般设置为CPU核心数+1,最大线程池大小为CPU核心数x2。因为开发过程中机器很难是干净的,一般需要根据压力测试确定核心线程数和最大线程数。
-
线程池怎么优化?
首先线程池有什么优化思路?
-
合理配置线程池的大小以及队列长度,过小的核心线程数会导致资源无法合理利用任务积压,过大又会导致资源耗尽和不必要的开销,队列过短会导致无法充分消化任务,过长会导致任务积压,执行时间长。
-
合理优化任务分配策略,对于不同优先级的任务,可以按照优先级调度
-
合理拆分任务,对于大任务可以拆分多个小任务分配给不同的线程处理
-
动态调整线程池参数,线程池提供了部分Setter方法可以设置线程池的参数
-
使用线程池监控报警策略,获取线程池状态指标,判断异常后报警,分析异常,动态修改参数
-
-
如何自己实现一个线程池?
-
定义一个存放所有线程的集合以及任务队列,并使用参数保存相应的大小
-
编写初始化方法,初始化线程和队列。
-
编写execute方法,提交任务时如果线程池未满就开启线程,满了就提交到队列,队列满了就新建额外的线程
-
编写分配策略
-
额外的一个线程负责分配,实现复杂
-
线程一直运行,一直向阻塞队列拿任务,为此我们需要自定义Thread类
-
-
编写shutdown方法,停止接收新任务,启动有序关闭
-
编写shutdownNow方法,发送中断指令,尝试强行关闭
-
-
什么是线程池?为什么使用线程池?
线程池是一个并发框架,几乎所有并发执行的任务都可以使用线程池,使用线程池有以下几个好处
-
线程复用,降低资源消耗,防止线程创建和销毁带来的消耗
-
提高响应速度,线程池可以立刻执行任务,不需要创建线程的时间
-
提高线程的可管理性,防止线程数过多带来的系统问题OOM
-
-
什么是线程池的预热机制?
在创建线程池后,我们可以调用prestartAllCoreThreads方法来创建并启动所有核心线程,以准备处理即将到来的任务。
-
线程池的优点和缺点是什么?
优点就是上文说的好处,缺点如下:
-
线程池参数多,逻辑复杂,不合理使用可能引发资源浪费和内存泄漏
-
不合理的参数配置可能导致无限制的创建线程或者无限制的扩大队列
-
-
线程池中的线程是怎么执行任务的?
线程池将线程和任务一起封装在了Worker类中,初始化时分配第一个任务并执行,执行过程中会启动runWorker方法,方法中会不断循环执行获取任务方法getTask拿任务执行。
在获取任务方法getTask中也有循环操作,一直没拿到但是没有超过存活时间就会一直尝试获取任务,超时之后就会进入销毁程序。
-
线程池中的任务可以返回执行结果吗?
可以使用submit提交任务并使用Future.get获取返回值
-
线程池的内存泄漏问题?
内存泄漏是指没有使用的内存没有得到释放
线程池的内存泄漏问题主要指的是搭配ThreadLocal使用时造成的内存泄漏。ThreadLocal是一个线程私有数据空间。意思是ThreadLocal中填充的变量属于当前线程,对其它线程隔离。
核心机制是每一个Thread线程内部都有一个Map,里面存储了本地对象key和线程变量副本value,不同的线程互不干扰
所以当线程池中一个线程使用ThreadLocal后没有清除,执行新任务并不会使用原有任务的变量,但是却一直保持引用,就会导致内存泄漏。
所以在线程池中使用ThreadLocal要确保任务完成后清除ThreadLocal
-
在多线程的环境下,怎么保证线程池中的任务按照特定的顺序执行?
-
使用单线程线程池
-
使用同步控制
- 使用
Semaphore
或CountDownLatch
:在任务执行前进行同步控制,确保前一个任务完成后再允许下一个任务开始执行。 - 任务间传递状态:通过任务间的返回值或共享变量来传递任务执行的状态或结果,下一个任务在依赖的任务完成后才开始执行。
- 使用
CyclicBarrier
或Phaser
:当一组任务中的每一个都必须等待其他任务完成时,可以使用这些工具来同步任务。
- 使用
-
链式调用,在一个任务完成后直接启动下一个任务
-
-
ThreadPoolExecutor使用到了哪些锁?为什么要使用锁?
这个问题可以转换为:线程池的哪些行为需要用到锁?
-
线程池的worker队列是一个HashSet,本身线程不安全,所以添加线程进队列时需要加锁,线程池锁是可重入锁ReentrantLock,mainLock锁变量
-
其次最大线程数是一个共享变量,在操作共享变量的时候都需要加锁,也是ReentrantLock,mainLock锁变量
其次Worker继承了AQS,所以Worker本身就是锁,这就引出了一个问题?Worker为什么需要继承AQS,有什么需要使用AQS的场景?
-
设计者对worker的期望,在空闲时可以响应中断,在执行任务时不可被中断,除非任务内部有处理中断的逻辑。这里指的不可中断应该指的是线程池不可以主动操作一个工作中的线程,例如关闭多余的线程时需要获取锁,不能关闭一个工作中的线程。(什么时候可以被主动管理,什么时候不能被主动管理)。
-
其次Worker是一个不可重入的锁,为什么要设计为不可重入,因为当线程数超过最大线程数的时候,需要减少线程数量,是通过tryLock判断的,如果是可重入的,可能会打断正在工作的检查。因为tryLock是一个cas操作,只有0-1两个状态,所以不能设计成可重入
-
-
线程池运行过程中怎么监控?
首先线程池提供了get方法可以获取状态信息
-
getPoolSize()
:返回线程池中的线程数。这包括空闲线程和正在执行任务的线程。如果线程池已关闭,此方法将返回零。 -
getActiveCount()
:返回当前正在执行任务的线程数。这个值只是一个估计值,但它可以帮助你得到线程池的运行状态。 -
getLargestPoolSize()
:返回线程池曾经同时存在的最大线程数。这可以帮助你了解线程池的最大负载情况。 -
getTaskCount()
:返回线程池已经处理的任务总数。这包括已经完成的任务和还在队列中等待的任务。 -
getCompletedTaskCount()
:返回线程池已经完成的任务数量。这可以帮助你了解线程池的工作效率。
其次可以使用JMX动态改变参数
Java Management Extensions (JMX) 是Java平台提供的一种标准的网络管理和监控工具。通过JMX,我们可以远程监控和管理Java应用程序,包括线程池的运行状态。
-
通过JMX MBeanServer,我们可以获取线程池的所有运行参数,包括上面提到的线程数、活跃线程数、任务队列大小等。
-
另外,JMX还可以用来调整线程池的参数,比如增加或减少线程池的大小,改变任务队列的大小等。
最后可以使用VisualVM工具进行可视化监控
VisualVM是Java提供的一种可视化工具,可以用来监控和管理Java应用程序的运行状态,包括线程池的运行情况。
-
VisualVM可以显示线程池的实时运行数据,比如线程数、活跃线程数、任务队列大小等。
-
VisualVM还可以显示线程池的历史运行数据,这可以帮助我们了解线程池的运行趋势和性能瓶颈。
-