What
主要用于整理线程中容易忘记的点以及不太好理解的内容
shutdown vs shutdownNow
两者都是用于关闭线程池,但是也有着很大区别
shutdown方法行为
- 会使得线程池的状态变成
SHUTDOWN
,线程池不再接收新来的任务。 - 中断空闲的线程(从阻塞队列拿不到任务被阻塞),正在执行任务的线程不会被中断。
- 不会移除阻塞队列中等待的任务。
- 会执行完所有的任务后才会变成最终态
TERMINATED
(因为不会移除队列中的任务)。
shutdownNow方法行为
- 会使得线程池的状态变成
STOP
,线程池不再接收新来的任务。 - 中断所有运行的线程,无论是空闲还是正在执行任务的线程都会被中断。
- 移除阻塞队列中等待的任务。
- 执行完当前正在执行的任务就会变成最终态
TERMINATED
(因为会移除队列中的任务)。
shutdown方法中断线程时如何区分是空闲的线程还是正在执行的任务的线程?
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
// 中断空闲线程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 关键,线程没有被中断并且获取锁成功就中断当前线程, 而正在执行任务的线程会持有锁, 这里获取锁会失败, 便不会被中断
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
/**
* 此方法线程执行的核心逻辑, 线程启动后, 调用run方法, 而run方法调用该方法
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 轮训任务,若拿不到任务则退出循环,线程结束
while (task != null || (task = getTask()) != null) {
// 拿到任务执行, 加锁, 因此上面的中断方法获取不到锁, 就不会中断运行任务的线程了
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
线程何时退出
线程退出逻辑主要在runWorker和getTask方法中。
- 线程池状态是SHUTDOWN以及阻塞队列为空时(
shutdown
方法含义, 因为还要执行队列中剩余的任务)。 - 线程池状态是STOP时(
shutdownNow
方法含义,不需要判断队列是否为空,并且shutdownNow
方法会移除队列中剩余的任务)。 - 从阻塞队列中获取任务时超过指定时间并且线程池中的数量大于核心线程数量时(核心线程并不是通过标记来实现,只是简单通过比较数量)。
- 线程执行任务发生异常时会直接退出循环,不会再从阻塞队列中获取任务执行。
前面3点均体现在getTask方法中,后面1点体现在runWorker方法中。
/**
* 线程执行的核心逻辑, 循环获取任务执行
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 很容器分析出退出循环的条件是拿不到任务,或者执行任务发生异常
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 对应于第4点, 执行任务时发生异常
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 对应于第1点和第2点
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 对应于第3点
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果队列不为空,至少保证一个线程要再去拿任务
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 这里可以看出,获取任务时被中断, 只要不满足线程退出的条件, 继续会循环获取任务, 而不是直接结束
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
runWorker
方法中退出时的finally代码块会执行processWorkerExit
方法,用于收尾工作。其中还有两点巧妙的逻辑
- 会执行
tryTerminate
方法,终止线程池(shutdown
和shutdownNow
方法也会调用此方法,但是由于可能当时还有正在执行的线程,终止不掉,因此需要线程退出时再调用该方法)。 - 如果线程执行任务异常结束,会新建一个新的线程补上,这可以避免线程池里的线程执行任务时全部异常结束掉,而没有新线程来执行队列中的剩余任务。
中断线程的目的?
shutdown
和shutdownNow
都会去中断线程,主要有以下两个目的。
- 唤醒从阻塞队列中获取任务时导致等待的空闲线程,再一次去执行
getTask
方法中for循环的逻辑,从而退出。 - 用户代码也很有可能通过线程中断来退出run方法,事实上通过线程中断来退出线程也是比较推荐的一种方法,这样便可以终止正在执行任务的线程,不过这取决于用户代码实现。
invokeAll
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit) throws InterruptedException;
此方法用于批量提交任务,并等待任务全部执行完毕或者超时才会返回。
- 返回来的Future的isDone方法必然是true,future肯定是完成状态,正常结束、异常结束、被取消都有可能。
- 此方法除了会抛中断异常,还会抛
RejectedExecutionException
,毕竟内部调用的execute
方法。 - 当超时发生时,会逐个调用每一个Future对象的
cancel
方法,并中断执行任务的线程。此时已经完成的任务不受任何影响,因为任务已经是完成状态了,cancel
方法会直接返回;正在执行的任务状态被置为取消,但是任务该怎么跑还是怎么跑;未执行的任务状态被置为取消,轮到该任务执行时,也会直接结束,因为已经是取消状态了,不会执行后续逻辑。任务为取消状态时执行get
方法会抛CancellationException
异常,而不是返回null。
Executors.newCachedThreadPool()
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
达到的效果是按需创建线程,首先核心线程数量为0,也就超过60s后,还取不到任务,那么线程会退出。
其次SynchronousQueue
是一个容量为0的队列,只有put和take成对出现时才不会被阻塞。线程池判断队列是否满了的方法是调用阻塞队列的offer方法,该方法不会阻塞,队列满返回false,否则返回true。对于SynchronousQueue
来说,如果没有别的线程从队列中拿元素,则会返回false(容量为0),所以会创建一个新的线程来执行任务。当然了,因为指定线程的空闲存活时间是60s,也就意味着线程执行完任务后会从队列中拿任务,最多等待60s,在这60s之内,往队列里塞新的任务,此时offer会返回true,就不会创建一个新的线程了,由这个空闲的线程来执行。