概述
在Java中要想实现线程,有四种手段:
- 继承Thread类
- 实现
java.lang.Runnable
接口 - 实现
java.util.concurrent.Callable
泛型接口, - 利用线程池
线程池通过线程复用机制,并对线程进行统一管理,优点:
- 降低系统资源消耗。通过复用已存在的线程,降低线程创建和销毁造成的消耗;
- 提高响应速度。当有任务到达时,无需等待新线程的创建便能立即执行;
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗大量系统资源,还会降低系统的稳定性,使用线程池可以进行对线程进行统一的分配、调优和监控。
ThreadPoolExecutor,线程池的核心类,后文简称为TPE。最基础的构造函数:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {}
参数
- corePoolSize:核心池的大小,在创建线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用
prestartAllCoreThreads()
或prestartCoreThread()
方法预创建线程,即在没有任务到来之前就创建corePoolSize个线程或一个线程。默认情况下,在创建线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中; - maximumPoolSize:线程池最大线程数,表示在线程池中最多能创建多少个线程;
- keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用
allowCoreThreadTimeOut(boolean)
方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0; - unit:参数keepAliveTime的时间单位,TimeUnit类中有7种静态属性:DAYS、HOURS、MINUTES、SECONDS、MILLISECONDS、MICROSECONDS、NANOSECONDS;
- workQueue:一个阻塞队列,用来存储等待执行的任务,对线程池的运行过程产生重大影响。前两者使用较少,一般使用后两者。线程池的排队策略与BlockingQueue有关。有以下几种选择:
- ArrayBlockingQueue:基于数组的有界阻塞队列,按FIFO排序任务
- PriorityBlockingQueue:具有优先级的无限阻塞队列,基于最小二叉堆实现
- LinkedBlockingQueue:基于链表结构的阻塞队列,如果不设置初始化容量,其容量为Integer.MAX_VALUE,即为无界队列。因此,如果线程池中线程数达到corePoolSize,且始终没有空闲线程(任务提交的平均速度快于被处理的速度),任务缓存队列可能出现无限制的增长。
- SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态。因此,如果线程池中始终没有空闲线程(任务提交的平均速度快于被处理的速度),可能出现无限制的线程增长。
- DelayQueue:没有大小限制的队列,只有当其指定的延迟时间后才能够从队列中获取到该元素,取数据的操作会被阻塞。
- threadFactory:线程工厂,主要用来创建线程;
- handler:拒绝策略;
面试常考题。当提交一个新任务到线程池时,具体的执行流程如下:
- 当提交任务,线程池会根据corePoolSize大小创建若干任务数量线程执行任务
- 当任务的数量超过corePoolSize数量,后续的任务将会进入阻塞队列阻塞排队
- 当阻塞队列也使用完后,则将会继续创建
maximumPoolSize-corePoolSize
个数量的线程(所谓的临时线程)来执行任务。任务处理完成,临时线程等待keepAliveTime之后被自动销毁 - 如果达到maximumPoolSize,阻塞队列还是满的状态,则将根据不同的拒绝策略对应处理
拒绝策略
即构造函数的RejectedExecutionHandler参数,其源码:
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
当要创建的线程数量大于线程池的最大线程数时,新的任务就会被拒绝,就会调用这个接口里的这个方法。
TPE提供四个拒绝策略:
- CallerRunsPolicy
调用者运行策略
功能:当触发拒绝策略时,只要线程池没有关闭,就由提交任务的当前线程处理。
使用场景:一般在不允许失败的、对性能要求不高、并发量较小的场景下使用,因为线程池一般情况下不会关闭,也就是提交的任务一定会被运行,但是由于是调用者线程自己执行的,当多次提交任务时,就会阻塞后续任务执行,性能和效率变慢。
缺点:可能会阻塞主线程?
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
- AbortPolicy
中止策略,构造函数的默认参数,直接抛异常RejectedExecutionException。
功能:当触发拒绝策略时,直接抛出拒绝执行的异常,也就是打断当前执行流程。一定要正确处理抛出的异常。
ExecutorService中的线程池实例队列都是无界的,也就是说把内存撑爆都不会触发拒绝策略。当自己自定义线程池实例时,使用这个策略一定要处理好触发策略时抛的异常,因为他会打断当前的执行流程。 - DiscardPolicy(丢弃策略)
接口方法空实现;
功能:直接静悄悄的丢弃这个任务,不触发任何动作
使用场景:如果你提交的任务无关紧要,你就可以使用它 。会悄无声息的吞噬你的的任务。所以这个策略基本上不用 - DiscardOldestPolicy(弃老策略)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
功能:如果线程池未关闭,就弹出队列头部的元素,然后尝试执行
使用场景:这个策略还是会丢弃任务,丢弃时也是毫无声息,但是特点是丢弃的是老的未执行的任务,而且是待执行优先级较高的任务。
基于这个特性,我能想到的场景就是,发布消息,和修改消息,当消息发布出去后,还未执行,此时更新的消息又来了,这个时候未执行的消息的版本比现在提交的消息版本要低就可以被丢弃了。因为队列中还有可能存在消息版本更低的消息会排队执行,所以在真正处理消息的时候一定要做好消息的版本比较。
属性
TPE类中的字段:
public class ThreadPoolExecutor extends AbstractExecutorService {
// 控制状态,用高3位来表示线程池的运行状态,低29位来表示线程池中工作线程的数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 值为29,表示偏移量
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程池的最大容量,其值的二进制为:00011111111111111111111111111111(29个1)
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 线程池的运行状态,共有5个状态,用高3位来表示
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// 任务缓存队列,用来存放等待执行的任务
private final BlockingQueue<Runnable> workQueue;
// 全局锁,对线程池状态等属性修改时需要使用这个锁
private final ReentrantLock mainLock = new ReentrantLock();
// 线程池中工作线程的集合,访问和修改需要持有全局锁
private final HashSet<Worker> workers = new HashSet<Worker>();
// 终止条件
private final Condition termination = mainLock.newCondition();
// 线程池中曾经出现过的最大线程数
private int largestPoolSize;
// 已完成任务的数量
private long completedTaskCount;
// 线程工厂
private volatile ThreadFactory threadFactory;
// 任务拒绝策略
private volatile RejectedExecutionHandler handler;
// 线程存活时间
private volatile long keepAliveTime;
// 是否允许核心线程超时
private volatile boolean allowCoreThreadTimeOut;
// 核心池大小,若allowCoreThreadTimeOut被设置,核心线程全部空闲超时被回收的情况下会为0
private volatile int corePoolSize;
// 最大池大小,不得超过CAPACITY
private volatile int maximumPoolSize;
// 默认的任务拒绝策略
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");
private final AccessControlContext acc;
}
线程池的运行状态总共有5种:
- RUNNING:高3位为111,接受新任务并处理阻塞队列中的任务
- SHUTDOWN:高3位为000,不接受新任务但会处理阻塞队列中的任务
- STOP:高3位为001,不会接受新任务,也不会处理阻塞队列中的任务,并且中断正在运行的任务
- TIDYING:高3位为010,所有任务都已终止,工作线程数量为0,线程池将转化到TIDYING状态,即将要执行terminated()钩子方法
- TERMINATED:高3位为011,terminated()方法已经执行结束
线程池中并没有使用单独的变量来表示线程池的运行状态,而是使用一个AtomicInteger类型的变量ctl来表示线程池的控制状态,其将线程池运行状态与工作线程的数量打包在一个整型中,用高3位来表示线程池的运行状态,低29位来表示线程池中工作线程的数量,对ctl的操作主要参考以下几个函数:
// 通过与的方式,获取ctl的高3位,即线程池的运行状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 通过与的方式,获取ctl的低29位,即线程池中工作线程的数量
private static int workerCountOf(int c) { return c & CAPACITY; }
// 通过或的方式,将线程池状态和线程池中工作线程的数量打包成ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }
// SHUTDOWN状态的值是0,比它大的均是线程池停止或清理状态,比它小的是运行状态
private static boolean isRunning(int c) { return c < SHUTDOWN;}
线程池状态的所有转换情况:
- RUNNING -> SHUTDOWN:调用shutdown(),可能在finalize()中隐式调用
- (RUNNING or SHUTDOWN) -> STOP:调用shutdownNow()
- SHUTDOWN -> TIDYING:当缓存队列和线程池都为空时
- STOP -> TIDYING:当线程池为空时
- TIDYING -> TERMINATED:当terminated()方法执行结束时
通常情况下,线程池有如下两种状态转换流程:
- RUNNING -> SHUTDOWN -> TIDYING -> TERMINATED
- RUNNING -> STOP -> TIDYING -> TERMINATED
提交任务
线程池框架提供两种方式提交任务,submit()和execute(),前者提交的任务可以返回任务执行的结果,后者不能。submit()方法的实现有以下三种:
public Future<?> submit(Runnable task);
public <T> Future<T> submit(Runnable task, T result);
public <T> Future<T> submit(Callable<T> task);
关闭
TPE提供两个方法用于线程池的关闭:
- shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但不再接受新的任务
- shutdownNow():立即终止线程池,并尝试打断正在执行的任务,清空任务缓存队列,返回尚未执行的任务
shutdown()方法将线程池运行状态设置为SHUTDOWN,此时线程池不会接受新的任务,但会处理阻塞队列中的任务。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
// 获取全局锁,下同
mainLock.lock();
try {
// 检查shutdown权限
checkShutdownAccess();
// 设置线程池运行状态为SHUTDOWN
advanceRunState(SHUTDOWN);
// 中断所有空闲worker
interruptIdleWorkers();
// 用onShutdown()钩子方法
onShutdown();
} finally {
// 释放锁
mainLock.unlock();
}
// 尝试终止线程池
tryTerminate();
}
shutdown()
首先会检查是否具有shutdown的权限,然后设置线程池的运行状态为SHUTDOWN,之后调用interruptIdleWorkers()
方法中断所有空闲的worker,再调用onShutdown()
钩子方法,最后尝试终止线程池。interruptIdleWorkers()
实现如下:
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
// onlyOne标识是否只中断一个线程
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 遍历workers集合
for (Worker w : workers) {
// worker对应的线程
Thread t = w.thread;
// 线程未被中断且成功获得锁
if (!t.isInterrupted() && w.tryLock()) {
try {
// 发出中断请求
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
// 若只中断一个线程,则跳出循环
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
shutdownNow()方法将线程池运行状态设置为STOP,此时线程池不会接受新任务,也不会处理阻塞队列中的任务,并且中断正在运行的任务。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 设置线程池运行状态为STOP
advanceRunState(STOP);
// 中断所有worker
interruptWorkers();
// 将任务缓存队列中等待执行的任务取出并放到list中
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 尝试终止线程池
tryTerminate();
// 返回任务缓存队列中等待执行的任务列表
return tasks;
}
shutdownNow()与shutdown()相似,不同之处在于,前者设置线程池的运行状态为STOP,之后调用interruptWorkers()
中断所有的worker(并非只是空闲的worker),尝试终止线程池之后,返回任务缓存队列中等待执行的任务列表。interruptWorkers
实现如下:
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
// 调用Worker类的interruptIfStarted()方法中断线程
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
监控
一般需要对线程池进行监控,方便出问题时进行查看。提供如下方法来获取运行状态:
- getCompletedTaskCount:已经执行完成的任务数量
- getLargestPoolSize:线程池里曾经创建过的最大的线程数量。这个主要是用来判断线程是否满过
- getActiveCount:获取正在执行任务的线程数据
- getPoolSize:获取当前线程池中线程数量的大小
- getTaskCount:获取预估的曾调度执行过的任务的总数
另外,线程池也预留3个扩展方法:
- beforeExecute
- afterExecute
- terminated
在runWorker方法里面,在执行任务之前会回调beforeExecute方法,执行任务之后会回调afterExecute方法,而这些方法默认都是空实现,可以继承ThreadPoolExecutor来扩展重写这些方法,来实现想要的功能。
Executors
JDK提供Executors工具类来快速创建线程池:
- newFixedThreadPool:固定线程数量的线程池,核心线程数与最大线程数相等
- newSingleThreadExecutor:单个线程数量的线程池
- newCachedThreadPool:接近无限大线程数量的线程池
- newScheduledThreadPool:带定时调度功能的线程池
虽然JDK提供快速创建线程池的方法,但其实不推荐使用Executors来创建线程池:
- newFixedThreadPool线程池使用LinkedBlockingQueue,队列的容量默认是无限大,实际使用中出现任务过多时会导致内存溢出;
- newCachedThreadPool线程池由于核心线程数无限大,当任务过多时,会导致创建大量的线程,可能机器负载过高,可能会导致服务宕机。
停止线程
非停止线程池。不能使用Thread.stop()
方法,不安全,且被标记为废弃的方法。有3种:
- 使用退出标志,使线程正常退出,也就是当run方法完成后线程终止
- 使用stop方法强行终止,不推荐,stop、suspend、resume一样都是过期作废的方法
- 使用interrupt方法中断线程
调用interrupt方法是在当前线程中打一个停止标志,并不是真的停止线程。
判断线程是否停止状态,Thread类中提供两种方法:
this.interrupted()
:测试当前线程是否已经中断,当前线程是指运行this.interrupted()
方法的线程。线程的中断状态由该方法清除。即,如果连续两次调用该方法,则第二次调用返回false。this.isInterrupted()
:测试线程是否已经中断;
设置
一般而言,关于线程池的几个核心参数的设置,需要根据具体的业务来确定。但是也有一些经验之谈,得看应用是CPU密集型还是IO密集型:
- CPU:可将线程数设置为N(CPU核心数)+1,比CPU核心数多出来的一个线程是为了防止线程偶发的缺页中断,或其他原因导致的任务暂停而带来的影响。一旦任务暂停,CPU就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用CPU的空闲时间
- IO:IO密集型应用于CPU关联不大,可考虑设置核心线程数=CPU核心数量*2
使用建议
多线程最佳实践建议:
- 使用线程池
- 给线程起名字
- 注意线程安全问题
拓展
伪共享
即false sharing,多线程系统(每个处理器有自己的局部缓存)中的一个性能问题。伪共享发生在不同处理器上的线程对变量的修改依赖于相同的缓存行。
第三方实现的拒绝策略
dubbo
@Slf4j
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
private final String threadName;
private final URL url;
private static volatile long lastPrintTime = 0;
private static Semaphore guard = new Semaphore(1);
public AbortPolicyWithReport(String threadName, URL url) {
this.threadName = threadName;
this.url = url;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
String msg = String.format("Thread pool is EXHAUSTED! Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d), Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!", threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(), e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(), url.getProtocol(), url.getIp(), url.getPort());
log.warn(msg);
dumpJStack();
throw new RejectedExecutionException(msg);
}
}
当Dubbo的工作线程触发线程拒绝后,主要做三个事情,原则就是尽量让使用者清楚触发线程拒绝策略的真实原因:
- 输出一条警告级别的日志,日志内容为线程池的详细设置参数,线程池当前的状态,及当前拒绝任务的一些详细信息。这个日志简直就是日志打印的典范(还有spring),便于快速定位问题所在
- 输出当前线程堆栈详情,当你通过上面的日志信息还不能定位问题时,案发现场的dump线程上下文信息就是你发现问题的救命稻草
- 继续抛出拒绝执行异常,使本次任务失败,这个继承JDK默认拒绝策略的特性
Netty
private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {
NewThreadRunsPolicy() {
super();
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
final Thread t = new Thread(r, "Temporary task executor");
t.start();
} catch (Throwable e) {
throw new RejectedExecutionException("Failed to start a new thread", e);
}
}
}
Netty中的实现很像JDK中的CallerRunsPolicy,舍不得丢弃任务。不同的是,CallerRunsPolicy是直接在调用者线程执行的任务。而Netty是新建一个线程来处理的。
所以,Netty的实现相较于调用者执行策略的使用面就可以扩展到支持高效率高性能的场景。但是Netty的实现里,在创建线程时未做任何的判断约束,也就是说只要系统还有资源就会创建新的线程来处理,直到new不出新的线程,才会抛创建线程失败的异常。
ActiveMQ
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
try {
executor.getQueue().offer(r, 60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");
}
throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");
}
});
ActiveMQ中的策略属于最大努力执行任务型,当触发拒绝策略时,在尝试一分钟的时间重新将任务塞进任务队列,当一分钟超时还没成功时,就抛出异常。
Pinpoint
public class RejectedExecutionHandlerChain implements RejectedExecutionHandler {
private final RejectedExecutionHandler[] handlerChain;
public static RejectedExecutionHandler build(List<RejectedExecutionHandler> chain) {
Objects.requireNonNull(chain, "handlerChain must not be null");
RejectedExecutionHandler[] handlerChain = chain.toArray(new RejectedExecutionHandler[0]);
return new RejectedExecutionHandlerChain(handlerChain);
}
private RejectedExecutionHandlerChain(RejectedExecutionHandler[] handlerChain) {
this.handlerChain = Objects.requireNonNull(handlerChain, "handlerChain must not be null");
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
for (RejectedExecutionHandler rejectedExecutionHandler : handlerChain) {
rejectedExecutionHandler.rejectedExecution(r, executor);
}
}
}
Pinpoint定义一个拒绝策略链,包装一个拒绝策略列表,当触发拒绝策略时,会将策略链中的rejectedExecution依次执行一遍。
可以自己实现这个接口,实现对这些超出数量的任务的处理。