线程池
核心参数
补充: 还有一个参数 threadFactory(线程工厂):
用于创建新线程的工厂,通常用于给线程设定名称、设置为守护线程等。默认的线程工厂会创建一个普通的、非守护线程。
ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(
3, // 核心线程数, 默认情况下, 当前线程数量不超过核心线程数时, 空闲超时后也不会被回收
5, // 最大线程数
10, // 空闲worker线程多久后销毁
TimeUnit.SECONDS, // 时间单位
new ArrayBlockingQueue<>(10), // 任务队列
new ThreadPoolExecutor.DiscardOldestPolicy()); // 拒绝策略
for (int i = 0; i < 20; i++) {
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
});
}
threadPoolExecutor.shutdown();
try {
if (!threadPoolExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
// 如果超时,尝试立即关闭线程池
threadPoolExecutor.shutdownNow();
// 再次等待所有任务执行完成
if (!threadPoolExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("线程池未能完全关闭");
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
-
Executors.FixedThreadPool: 返回一个ExcutorService, ThreadPoolExcutor 实际上也是一个 ExcutorService
-
核心线程数和最大线程数相等且可配置; 无过期时间
-
适用场景:当需要控制并发线程数量
-
和CachedThreadPool相比, 线程数量上限可控, 防止同时存在的线程数量过多导致资源耗尽;
但任务过多时, 无法动态地增加线程数量;
-
典型应用:适用于需要处理大量时间较长的任务;
-
-
CachedThreadPool:
-
核心线程数是0, 最大线程数Integer.MAX_VALUE, 线程空闲60s被销毁
-
使用的任务队列是
SynchronousQueue
没有存储元素的能力,不保存任何元素。当一个线程试图将元素放入
SynchronousQueue
时,它会阻塞直到另一个线程试图从队列中取出这个元素。 -
适用场景:适用于需要处理大量短生命周期的, 具有突发性的任务;
-
典型应用:?
-
-
SingleThreadExecutor:
- 只有一个线程, 最大线程数量也是1;
- 适用场景:适用于需要保证任务按照顺序执行的场景, SingleThreadExecutor 只会使用单个工作线程来执行任务,保证了任务之间的顺序性。
- 典型应用:适用于需要顺序执行的任务,例如按照时间顺序保存日志, 要求保存顺序和日志发生顺序一致;
篇幅限制下面就只能给大家展示小册部分内容了。整理了一份核心面试笔记包括了:Java面试、Spring、JVM、MyBatis、Redis、MySQL、并发编程、微服务、Linux、Springboot、SpringCloud、MQ、Kafka 面试专题
需要全套面试笔记【点击此处即可】免费获取
-
ScheduledThreadPoolExecutor
- 可以定时执行任务, 原理是用堆实现无界的优先队列, 按执行时间对任务排序; Worker线程循环取任务, 如果队头任务还没到时间, Worker阻塞;
- 有一个专门的线程负责检查队头任务, 如果到期, 唤醒Worker;
- 新添加任务时, 唤醒Worker;
- 核心线程数可配置, 最大线程数 Integer.MAX_VALUE, 超时时间0, 也就是说取不到任务立即被回收;
拒绝策略
-
需要实现
RejectedExecutionHandler
, 重写rejectedExecution方法
-
ThreadPoolExecutor提供了几种现成的
-
默认是AbortPolicy
public static class CallerRunsPolicy implements RejectedExecutionHandler { // 由调用ThreadPoolExecutor.execute()方法的线程自己去执行r.run(); public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } } public static class AbortPolicy implements RejectedExecutionHandler { // 丢弃任务并抛出异常 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } } public static class DiscardPolicy implements RejectedExecutionHandler { // 丢弃任务, 不抛异常 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } } public static class DiscardOldestPolicy implements RejectedExecutionHandler { // 将任务队里中最早入队的任务丢弃, 添加新的任务 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
添加任务的流程
- 并非一开始就创建了所有核心线程, 而是添加任务时一个一个创建的;
- 任务队列排满以后创建非核心线程执行任务, 这会导致后来的任务在任务队列里的任务之前执行;
属性与状态
// AtomicInteger, 使用CAS实现的线程安全的Integer
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; // 29
// 低29位表示工作线程的个数
// 1 << 29 再 - 1, =29位全1, 表示工作线程个数的最大值
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 前三位表示状态, 有符号数
// 正常运行, 可以添加任务
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
源码解读
execute( )
- 注意一个地方, 在并发情况下, 可能出现一个线程中向线程池内添加任务, 而另一个线程执行了shutdown的情况; 这种情况可能导致在shutdown状态下, 线程池中工作线程已经被销毁的情况下, 又往工作队列中添加了新的任务;
- 所以往任务队列中添加任务成功以后, 需要再次检查状态和工作线程个数;
- 如果状态是shutdown, 会尝试移除新添加的任务, 如果移除失败, 并且已经没工作线程, 则添加一个工作线程;
if (command == null)
throw new NullPointerException();
// 获取状态
int c = ctl.get();
// 如果当前核心线程数量 < 最大核心线程数量, 尝试创建核心线程并执行新添加的任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
//(如果状态不是RUNNING或者因为并发导致创建失败), 再次获取状态并向下执行
c = ctl.get();
}
// 如果是Running状态, 向任务队列添加任务
if (isRunning(c) && workQueue.offer(command)) {
// 添加任务后重新再次检查状态, 防止并发状态下线程池状态被修改;
int recheck = ctl.get();
// 如果状态已经不是running, 尝试移除任务
if (!isRunning(recheck) && remove(command))
// 如果移除成功, 调用拒绝策略
reject(command);
// 如果当前是Running状态 或者 移除失败, 并且已经没有worker, 添加一个非核心线程, 去执行任务队列中的任务;
// 防止在shutdown状态下无法处理任务队列中的任务, 进而无法进入tidying状态;
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果不是Running 或者 任务添加失败, 尝试创建非核心线程并执行新添加的任务;
else if (!addWorker(command, false))
// 如果创建非核心线程失败, 执行拒绝策略;
reject(command);
}
addWorker( )
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// rs状态不是Running, 并且不是(shutdown且无worker且有任务), 那么不创建线程, 直接退出
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 内层for循环判断线程数量
for (;;) {
// 先判断总线程数量
// 然后如果当前要创建核心线程, 判断是否超出核心线程数量
// 如果是非核心线程, 判断是否超出最大线程数量
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// CAS方式将工作线程数 + 1, 跳出外层循环, 进入创建工作
if (compareAndIncrementWorkerCount(c))
break retry;
// 如果并发导致失败, 再次判断状态, 如果和之前的状态一致, 继续内层循环判断数量; 如果状态变化, 重新进行外层循环
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
// 添加worker并启动该线程
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// new worker时创建一个线程, 绑定到thread属性
w = new Worker(firstTask);
final Thread t = w.thread;
// 几乎一定会进入, 健壮性判断
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 防止在添加线程时执行了shutdown或者shutdownNow, 这两个函数里也是有锁mainLock的;
mainLock.lock();
try {
// 拿到最新的线程池状态
int rs = runStateOf(ctl.get());
// 如果线程池状态为Running或者是前面说过的shutdown + 有任务 + 无worker
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 健壮性判断, 无所谓
if (t.isAlive())
throw new IllegalThreadStateException();
// 创建线程, workers是一个set, 保存所有的工作线程
workers.add(w);
int s = workers.size();
// 记录了运行过程中工作线程的最大个数
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 添加工作线程成功, 启动工作线程, 设置标记
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 如果启动工作线程失败, 如果已经添加到workers, 删除, 将工作线程数 - 1;
// 然后尝试修改线程池状态到TYDING状态
// 不重要
if (! workerStarted)
addWorkerFailed(w);
}
// 返回当前线程是否启动成功
return workerStarted;
篇幅限制下面就只能给大家展示小册部分内容了。整理了一份核心面试笔记包括了:Java面试、Spring、JVM、MyBatis、Redis、MySQL、并发编程、微服务、Linux、Springboot、SpringCloud、MQ、Kafka 面试专题
需要全套面试笔记【点击此处即可】免费获取
Worker类
-
继承了AQS, 实现了Runnable, Runnable很好理解, 因为工作线程要执行任务, 这个任务就是run方法;
-
持有一个 Thread对象, Worker 就是放到这个它持有的 Thread对象里运行的;
-
继承AQS是为了在shutdown时根据Worker的不同状态进行不同的处理;
shutdown方法会对每个worker线程尝试执行tryLock方法获取锁; 成功获取到锁则设置中断标记, 获取失败表明当前worker正在工作, 不允许中断
AQS 初始时 state = -1, shutdown无法中断此worker
runWorker方法一开始通过 unlock 设置 state = 0, shutDown可以中断
一旦在runWoker方法中取到任务, 又会通过lock方法改state = 1; 表示不可以中断, state = 1时, shutdown再尝试lock会失败;
所以继承AQS是为了实现一种针对shutdown的锁的机制
那shutdownNow怎么办? shutdownNow有两个位置, 一个是将状态改为STOP, 而runWorker方法内每次执行任务之前都会判断当前状态是否>= STOP, 如果是, 会自己设置interrupt位; 另一个是在shutdownNow方法内, 会直接遍历所有worker, 直接调用interrupt方法
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
// 持有一个线程对象
final Thread thread;
// 当前正在执行的任务
Runnable firstTask;
Worker(Runnable firstTask) {
// AQS属性设置为-1; 标志当前线程刚刚初始化, 还未执行runWorker方法
// 这样的线程不允许被中断
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
runWorker
// runWorker方法是Worker类的方法, 方法传参是this, 即 runWorker(this)
final void runWorker(Worker w) {
// 拿到的是Wordker持有的Thread, 这两个一一对应, 所以可以直接简称worker线程
Thread wt = Thread.currentThread();
// 拿到任务, 两种情况, 一个是task = null这时应该去任务队列拿任务, 一种是线程<核心线程数,直接拿到了任务;
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 这个unlock方法是重写过的, 最终是设置AQS的state置为0, 表示此工作线程允许被中断
boolean completedAbruptly = true;
try {
// 如果task != null, 直接执行, 如果是null, 从任务队列中取任务(仍有可能是null)
// getTask返回null时, 当前线程被回收
while (task != null || (task = getTask()) != null) {
// 被lock, state = 1; 此时不允许被中断
w.lock();
// 如果当前状态 >= STOP(STOP, TYDING, TERMINATE), 后面的判断是为了应对并发状况
if ((runStateAtLeast(ctl.get(), STOP)||(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
// 并且没有设置中断标志
&&!wt.isInterrupted()){
// 设置中断标记
wt.interrupt();
}
// STOP状态判断完毕, 执行任务;
try {
// 空实现, 提供给开发者的扩展, 自定义线程池时可以重写此方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 空实现, 提供给开发者的扩展, 自定义线程池时可以重写此方法
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
// unlock, 标记当前线程可以被中断
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
getTask( )
boolean timedOut = false; // Did the last poll() time out?
// 死循环尝试取任务, 返回null则线程被回收
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果状态 >= STOP 或者 状态为shutdown且任务队列为空, 都可以直接中断当前线程
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 如果当前状态是Running, 正常运行, 那么判断数量
// 标记当前线程是否允许被中断, 一般不允许 线程数 <= 核心线程数时被中断
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果当前线程数 > 核心线程数, 当前线程空闲时间超时, 并且当前线程数 > 1 或者 任务队列为空, 中断当前线程
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// CAS操作将当前线程数--;
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 当前线程允许被中断吗? 允许则通过poll方法设置最大等待时间去取任务;
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
// 不允许则一直阻塞等待拿任务;(当前线程数 <= 核心线程数)
workQueue.take();
if (r != null)
return r;
// 如果poll方法超过等待时间没取到任务, 设置超时标记
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
shutdown or Now
- shutdown和shutdownNow都不再接收新任务;
- shutdown当前正在执行的任务都会执行完毕; shutdownNow 会尝试停止当前正在执行的任务;
- shutdown方法会确保任务队列中所有任务被处理; shutdownNow不会;
- 在runWorker方法中, 如果getTask方法返回null, 则当前线程被回收; 如果当前状态是shutdown, 并且任务队列为空时, return null; 如果当前状态是STOP, 直接return null;
线程池就是随着应用启动一直运行, 很少有需要 shutdown 的场景; 也不要动态去创建;
考点
核心线程数的数量设置为多少
CPU 密集型 = 核心数 + 1
IO 密集型 = n * 核心数; n = RT / CPU 繁忙时间, 配到 50, 100都行;
底层公式: 线程数 = 核心数 * 期望利用率 * ( 1 + IO时间 / CPU时间 );
如果 IO 密集和 CPU 密集的任务共存呢?
设置两个线程池, 一个配 核心数 + 1, 一个配 n * 核心数;
篇幅限制下面就只能给大家展示小册部分内容了。整理了一份核心面试笔记包括了:Java面试、Spring、JVM、MyBatis、Redis、MySQL、并发编程、微服务、Linux、Springboot、SpringCloud、MQ、Kafka 面试专题
需要全套面试笔记【点击此处即可】免费获取
舱壁模式: 不同的业务使用自己的线程池, 这样一个业务故障不会影响其它线程池; 例如 Sentinel 中就有用到
需要动态调整线程池的大小吗? 不要, 就一个静态的, static new 出来就不要变了
获取核心数
Runtime.getRuntime().availableProcessors();
- 1
JDK8 2014年发布的, Docker 那时候还没出;
如果在Docker里面跑 JDK8, 获取核心线程数还是获取的机器本身的核心数, 而非Docker的核心数;
JDK8 的高版本解决了这个问题, 但是收费;
解决: 写死核心线程数, 公司的Docker配置基本不会动的;
任务队列大小设置?
任务队列本质是放在内存的, 不要太大, 过长性能会有影响; 比如 ArrayBlockingQueue, 队头取元素需要 On 来移动;
设置成 LinkedBlockingQueue 合适一些, 因为从队头取元素, LinkedBlockingQueue 更快;
大小建议设置为512 或者 1024;
拓展: 如果有些地方用到 HashMap 和 Set , 他们的大小没有限制, 因为大了并不会导致效率降低;
最大线程数设成多少?
直接和核心线程数设置成一样;
推荐写法
submit 和 excute 的区别
submi 可以提交有返回值的任务, 比如可以直接提交一个 Callable 实现类对象, 也可以提交一个 Runnable;
submit 方法返回一个 Future 对象, 通过这个对象来获取返回值;
原理是构造一个 FutureTask, 里面放我们传入的 Callable 或 Runable 实现类对象;
如果提交的是 Runnable, 需要进行转换, 转换成一个 Callable, 才能保存到 FutureTask, 需要设置一个结果值, 默认是null, 我们也可以在 submit 的时候传入; Runnable 执行完成后我们通过get, 拿到的就是这个设置好的默认结果值;
转换Callable 的时候, 实际上就是做了一个 Callable实现类, 其 call 方法先执行我们传入的 Runnable 的 run 方法, 然后返回我们预先设置的结果值
ThreadPoolExecutor::excute
提交没有返回值的任务 Runnable;
当一个任务阻塞时, Worker会转而执行别的任务吗? – 不会
这种调用方式, 比先调一个拿到结果, 再调另一个, 更快;
虽然执行了8个阻塞2S的任务, 但整个运行时间也就 2S 多点; 如果一个一个调用那要 16S;
List<FutureTask> list;
for(int i = 0; i < 8; i++){
submit(list.nextFutureTask());
}
for(int i = 0; i < 8; i++){
list.nextFutureTask().get();
}
篇幅限制下面就只能给大家展示小册部分内容了。整理了一份核心面试笔记包括了:Java面试、Spring、JVM、MyBatis、Redis、MySQL、并发编程、微服务、Linux、Springboot、SpringCloud、MQ、Kafka 面试专题
需要全套面试笔记【点击此处即可】免费获取
优点
- 降低资源消耗:线程池能够重复利用已创建的线程,避免了频繁创建和销毁线程的开销。这样可以减少系统资源的消耗,提高资源利用率。
- 提高响应速度:线程池中的线程可以提前创建好并且保持在就绪状态,当任务到达时可以立即执行,减少创建线程的等待时间;
- 提高系统稳定性:线程池可以限制线程的数量,可以防止因为线程过多导致系统资源不足而造成系统崩溃的情况
添加任务的流程
运行状态回收 Worker 的依据
-
如果 getTask 返回 null, 则回收 Worker ;
-
在正常运行时, 有一个循环, 循环内,
-
首先, 会有 allowCoreThreadTimeOut 参数和 当前工作线程数量是否大于核心线程数量 来判断当前线程是否允许被回收;
-
如果允许回收, 并且上一次用 poll 取任务是超时的, 并且 ( 当前还有其它工作线程 || 任务队列为空) , 则返回null;
当前还有其它工作线程 || 任务队列为空, 这是为了防止有其它线程在同一时间 修改状态为 shutdown
-
如果上面的判断没有进入, 就会去取任务, 根据是否允许回收采用 带超时时间(就是 keepAliveTime)的 poll (允许回收) 或者 take(不允许回收);
shutdown 和 shutdownNow
-
shutdown 回收的本质, 还是通过 getTask 实现的; getTask 返回 null, 则 worker 的 run 方法退出循环, 自然结束运行;
-
getTask 方法发现当前是 STOP(也就是shutdownNow对应的状态), 直接return null;
-
如果当前是 shutdown, 会判断一下是不是还有任务, 没有任务就 return null;
-
Worker 线程继承了 AQS, 实现了不可重入的锁机制; 当 Woker 线程取到任务时, 会调用自己的 lock 方法加锁, 然后去执行任务;
-
shutdown 方法尝试中断 Worker 线程时, 先调用 tryLock 方法, 获取锁成功以后才能设置 interrupt标志 ; 所以, 正在执行任务的 Worker 不会被shutdown打断;
-
而 shutdownNow 方法, 直接遍历所有worker, 调用 interrupt 方法终止任务执行; 至于正在执行任务的 Worker 到底能不能终止, 就要看我们提交的任务里, 是否设计了基于 interrupt 标志来结束任务执行的代码;
-
总的来说, shutdown 不接受新任务, 正在进行的任务会处理完成, 任务队列中的任务会处理完成; 是一种优雅的关闭;
-
而 shutdownNow, 不接受新任务, 正在进行的任务会尝试终止(正在工作的 worker 设置了 interrupt ), 任务队列中的任务不会处理;
运行原理
- 底层采用阻塞队列保存任务, 添加任务的时候, …
- 工作线程 Worker 继承了 AQS, 实现了 Runnable, 持有一个 Thread;
- 通过将自己放到持有的 Thread 对象中运行;
- Worker 的 run 方法是在循环中不断的调用 getTask 取 Runnable 任务, 然后调用其 run 方法; (如果是submit 提交的 Callable 任务, 被封装成 FutureTask, 还是一个 Runnable)
- 取到任务以后会上锁, 锁自己; 这是为了不被 shutdown 打断;
- 如果 getTask 返回的值为 null, 工作线程的 run 方法的循环就会退出, Worker 线程执行完毕, 也就会被销毁;
篇幅限制下面就只能给大家展示小册部分内容了。整理了一份核心面试笔记包括了:Java面试、Spring、JVM、MyBatis、Redis、MySQL、并发编程、微服务、Linux、Springboot、SpringCloud、MQ、Kafka 面试专题
需要全套面试笔记【点击此处即可】免费获取