public class ThreadPoolExecutor extends AbstractExecutorService
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ctl int32位 高3位状态位,低29位工作线程数量
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
COUNT_BITS=32-3=29
private static final int COUNT_BITS = Integer.SIZE - 3;
CAPACITY=000 111111.....
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
101 000000... 注意这个是原码
实际是补码参与运算
10000....1补码111111......
111 000000... <-是补码结果,下次参与位运算也是此数
private static final int RUNNING = -1 << COUNT_BITS;
000 000000...
private static final int SHUTDOWN = 0 << COUNT_BITS;
001 000000...
private static final int STOP = 1 << COUNT_BITS;
010 000000...
private static final int TIDYING = 2 << COUNT_BITS;
011 000000...
private static final int TERMINATED = 3 << COUNT_BITS;
默认拒绝策略是报错
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
ThreadPoolExecutor构造方法
Worker构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
//注意Worker实现了Runnable
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
Worker(Runnable firstTask) {
//初始设置AQS状态-1
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//默认Executors.defaultThreadFactory()
//注意this是Worker对象
this.thread = getThreadFactory().newThread(this);
}
//所以Worker.start()执行的是runWorker(worker对象);
public void run() {
runWorker(this);
}
}
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
//创建Worker的执行线程
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
//默认非守护线程
t.setDaemon(false);
//默认优先级为5
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
父类AbstractExecutorService定义了submit入参Callable方法
包装成RunnableFuture执行子类重写execute(Runnable command);
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
//封装成RunnableFuture
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
提交线程任务
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//获取状态码+线程数 int值
int c = ctl.get();
//小于核心线程数
if (workerCountOf(c) < corePoolSize) {
//添加核心线程 true 是否核心线程
if (addWorker(command, true))
return;
//添加失败
c = ctl.get();
}
//run状态并且添加到队列成功
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//二次检查,非run状态便移除队列,并且包含tryTerminate()方法
if (! isRunning(recheck) && remove(command))
//走拒绝策略
reject(command);
//二次检查,run状态或者移除队列失败=>
else if (workerCountOf(recheck) == 0)
//添加非核心线程,空方法任务
addWorker(null, false);
}
//添加非核心线程任务失败,走拒绝策略
else if (!addWorker(command, false))
reject(command);
}
新增Worker工作线程
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//rs为Running状态
//rs如果刚好为SHUTDOWN,提交空任务,并且队列不为空情况,是可以添加Worker
//线程池会把正在执行的任务及队列中等待执行的任务都执行完毕后,再去关闭;
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//如果给工作线程超过最大或者 (core=true 核心线程数core=false 最大线程数
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS添加wc,跳出retry循环
if (compareAndIncrementWorkerCount(c))
break retry;
//CAS失败,状态变更,继续retry循环
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
////CAS失败,状态没有变更,继续内部循环
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//注意Worker是实现Runnable,w.thread是
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
//Running状态或者SHUTDOWN状态新增空任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//新建线程未启动,如果是alive状态,说明已经被启动了,则抛出异常。
//抛出异常未捕获,
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
//largestPoolSize用于记录线程池中曾经存在的最大的线程数量
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//添加成功,t.start();,设置workerStarted true
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
//失败处理
addWorkerFailed(w);
}
return workerStarted;
}
addWorkerFailed
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
//移除创建worker
workers.remove(w);
//减少wc
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
final void tryTerminate() {
for (;;) {
int c = ctl.get();
//RUNNING||TIDYING||TERMINATED
//SHUTDOWN && 阻塞队列不为空
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//STOP || SHUTDOWN 阻塞队列为空
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//ctlOf(TIDYING, 0)=>TIDYING头状态+工作线程数组成
//尝试CAS设置TIDYING且wc=0
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
//再设置为TERMINATED且wc=0
ctl.set(ctlOf(TERMINATED, 0));
//termination条件唤醒
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
真正运行任务
final void runWorker(Worker w) {
//Worker线程
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
//worker继承AQS,unlock方法执行release(1)
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//获取任务
while (task != null || (task = getTask()) != null) {
//AQS state由0->1,执行acquire(1),设置当前线程独占
//失败就走AQS等待队列
w.lock();
//线程状态为STOP以及以上 或者 Thread.interrupted()线程中断为true,
//然后复原为false,并且线程状态为STOP以及以上
//二次判断
//wt.isInterrupted()获取为复原false
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
//中断
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
//真正执行运行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
//当前工作线程完成task总数
w.completedTasks++;
//w.unlock();执行,设置独占线程null,设置State=0
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
w.unlock();执行,设置独占线程null,设置State=0
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
获取阻塞队列任务
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//SHUTDOWN状态并且阻塞队列为空 || STOP以上状态
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
//do while循环直到减少一个ctl中工作线程数
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
//allowCoreThreadTimeOut开启,核心线程关闭或超过核心线程,需要超时判断
boolean timed = allowCore || wc > corePoolSize;
//wc大于最大线程数 或者 配置超时且获取超时
//并且 (wc>1 或者 阻塞队列任务为空)
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//配置超时,超时时间内拿去任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
//条件队列等待获取队列任务
workQueue.take();
if (r != null)
return r;
//超时未获取,指定超时
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
如果runWorker(Worker w) 方法第一个try和第二个try之间发生异常
completedAbruptly = true;
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//统计总完成数量
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
//Running ShutDown
if (runStateLessThan(c, STOP)) {
//completedAbruptly=false,第二个try内执行finally结束
if (!completedAbruptly) {
//允许回收核心线程,最小为0,否则最小核心线程
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//如果允许回收核心线程,阻塞队列有任务
if (min == 0 && ! workQueue.isEmpty())
//则工作线程最小为1
min = 1;
//满足条件就return
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//异常状态或者允许回收核心线程,阻塞队列有任务,但没有工作线程
//增加一个非核心线程
addWorker(null, false);
}
}
标签:解析,return,mainLock,int,源码,线程,ctl,null,ThreadPoolExecutor
From: https://www.cnblogs.com/wsyphaha/p/18080939