/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
// command可以是普通的runnable也可以是futureTask
public void execute(Runnable command) {
// 非空判断
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
// 获取ctl最新值
// ctl:高3位表示线程池状态;低位表示线程池中线程数量
int c = ctl.get();
// workerCountOf获取出ctl表示的线程数量
// 条件成立:表示当前线程数小于核心线程数 --> 直接创建一个新的worker
if (workerCountOf(c) < corePoolSize) {
// addWorker --> 创建线程的过程,会创建worker对象,并且将command作为firstTask
// core --> true:采用corePoolSize限制;false:采用maximumPoolSize限制
if (addWorker(command, true))
// 创建成功后,直接返回
// addWorker会启动新创建的worker,执行firstTask
return;
// 执行到这条语句,说明addWorker失败了。可能的情况:
// 1. 存在并发,execute有多个线程同时调用。不止一个线程workerCountOf(c) < corePoolSize条件成立,并且向线程池中创建了worker。
// 这个时候,线程池中的核心线程数已经达到最大值
// 2. 当前线程池状态发生了改变:当线程池状态不是RUNNING时,addWorker会失败
// 特殊情况:SHUTDOWN状态下,也有可能创建成功。前提firstTask==null,并且当前queue不为空
c = ctl.get();
}
// 执行到这里,有几种情况?
// 1. 当前线程数达到corePoolSize
// 2. addWorker失败
// 条件成立:说明当前线程池是running状态,则尝试将task放入workQueue中
if (isRunning(c) && workQueue.offer(command)) {
// 执行到这里说明offer提交任务成功了
// 再次获取ctl,保存到recheck中
int recheck = ctl.get();
// 如果非running条件成立,则或执行remove
// 条件1. ! isRunning(recheck)
// --> true 说明,提交任务到队列后,线程池状态被外部线程修改了。比如:shutdown()或者shutdownNow()
// 这种情况,需要把刚提交的任务删除掉
// 条件2. remove(command)
// --> true 提交之后,线程池中的线程还未消费(处理)
// --> false 提交之后,在shutdown() shutdownNow()之前,就被线程池中的线程处理了
if (! isRunning(recheck) && remove(command))
// 提交之后,线程池状态为非running 且 任务出队成功 --> 拒绝策略
reject(command);
// 有几种情况会到这里?
// 1.当前线程池是running状态 --> 这个概率最大
// 2.当前线程池是非running状态,但是remove task失败
// 避免一种情况:当前线程池是running状态,但是线程池中的存活线程数量是0
// 如果是0的话,会很尴尬 --> 任务没有线程去做了 --> 担保机制:保证线程池在running状态下,最起码有一个线程在工作
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 执行到这里,有几种情况?
// 1.offer失败
// 2.当前线程池是非running状态
// 1.offer失败,说明:当前queue满了。
// 如果当前线程数尚未达到maximumPoolSize --> 创建新的worker直接执行command
// 如果当前线程数达到maximumPoolSize --> 返回false
// 2.线程池状态为非running状态。因为command!=null,所以addWorker一定返回false
else if (!addWorker(command, false))
reject(command);
}
标签:execute,task,--,running,源码,command,线程,addWorker,ThreadPoolExecutor
From: https://www.cnblogs.com/daheww/p/16721144.html