Java构建线程的方式
- 集成Thread
- 实现Runnable
- 实现CallAble
- 线程池方式
- Java提供了Executors创建(不推荐,不方便进行控制)
- 推荐手动创建线程池ThreadPoolExecutor。
ThreadPoolExecutor参数
- int corePoolSize 核心线程数
- int maximumPoolSize 最大线程数,最大减核心是非核心线程。
- long keepAliveTime 最大空闲时间
- TimeUnit unit 时间单位
- BlockingQueue
workQueue 阻塞队列 - ThreadFactory threadFactory, 线程工厂
- RejectedExecutionHandler handler 拒绝策略
线程池的执行流程
拒绝策略在线程池无法接受新任务时会被执行,例如当线程池已满并且任务队列也已满时。
线程池属性
// ctl的高3位代表线程池状态,低29位代表线程池线程个数。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 即29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 2的29次方减1
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
// 运行状态保存在高三位中
// -1的补码(绝对值取反加1)高三位为111,表示正常接受任务
private static final int RUNNING = -1 << COUNT_BITS;
// 000 代表线程池不接受新任务,但仍会处理阻塞队列和正在进行的任务
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001 不接收新任务,也不会处理阻塞队列中的任务,中断正在执行的任务。
private static final int STOP = 1 << COUNT_BITS;
// 010 代表线程池即将关闭到达TERMINATED状态
private static final int TIDYING = 2 << COUNT_BITS;
// 011 线程池最终状态,已关闭。
private static final int TERMINATED = 3 << COUNT_BITS;
// 得到线程池状态
private static int runStateOf(int c) { return c & ~COUNT_MASK; }
// 获得线程池中线程数量
private static int workerCountOf(int c) { return c & COUNT_MASK; }
线程池状态
execute 执行过程
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. 如果当前运行的线程少于核心线程数(corePoolSize),
* 则尝试使用给定的任务启动一个新线程。通过调用addWorker原子性
* 地检查runState和workerCount,防止在不应该添加线程时出现误报,
* 从而返回false。
*
* 2. 如果任务可以成功排队,那么我们仍然需要再次检查是否应该添加线程
* (因为自上次检查以来现有线程已经死亡),或者线程池是否在
* 进入此方法后关闭。因此,我们重新检查状态,
* 在stopped状态必要时则回滚排队,如果线程为空则启动一个新线程。
*
* 3. 如果无法排队任务, 那么我们尝试添加一个新线程。
* 如果失败, 我们会知道线程池已关闭或饱和,因此执行reject。
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
// 创建核心线程
if (addWorker(command, true))
return;
c = ctl.get();
}
// 线程池如果正在运行,将任务放到阻塞队列,在执行signal信号后运行
if (isRunning(c) && workQueue.offer(command)) { // 此为创建核心线程失败时
int recheck = ctl.get();
// 不在运行时将任务移除掉并执行拒绝策略
if (!isRunning(recheck) && remove(command))
reject(command);
// 并发条件下此时任务可能为空
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
addWorker()
不断尝试获取线程池的控制状态并检查是否可以添加新的工作线程。
private boolean addWorker(Runnable firstTask, boolean core) {
// 尝试获取线程池的控制状态并检查是否可以添加新的工作线程。
retry:
for (int c = ctl.get();;) {
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty()))
// 如果线程池处于SHUTDOWN状态并且满足以下条件之一:已经处于STOP状态、有待执行的任务或工作队列为空,则返回false
return false;
for (;;) {
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
// 如果工作线程数量达到核心线程数或最大线程数的限制,则返回false
return false;
if (compareAndIncrementWorkerCount(c))
break retry; // 返回到retry处
c = ctl.get(); // Re-read ctl
if (runStateAtLeast(c, SHUTDOWN))
// 如果CAS操作失败是由于workerCount更改,则继续内部循环
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
// 尝试创建新的工作线程,并在必要时将其添加到线程池中。
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
// 如果线程池正在运行或者处于非STOP状态且没有待执行的任务,
// 则检查线程状态是否为NEW,
// 否则抛出IllegalThreadStateException异常
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
// 将工作线程添加到集合中
workers.add(w);
workerAdded = true;
int s = workers.size();
if (s > largestPoolSize)
// 更新最大线程池大小
largestPoolSize = s;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 启动工作线程
t.start();
// 标记线程为已启动
workerStarted = true;
}
}
} finally {
if (!workerStarted)
addWorkerFailed(w);
}
// 返回工作线程的状态
return workerStarted;
}
线程的运行 runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 循环获取任务,task不为空执行task,否则从阻塞队列中获取任务。
while (task != null || (task = getTask()) != null) {
w.lock();
// 检查线程池是否正在停止或线程是否被中断
if ((runStateAtLeast(ctl.get(), STOP) || // 即将关闭或已关闭,即TIDYING或者TERMINATED
(Thread.interrupted() && // 测试是否被中断
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted()) // 获取中断状态
wt.interrupt();
try {
beforeExecute(wt, task);
try {
// 执行任务
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
// 增加已完成任务的数量。
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
标签:task,false,int,学习,源码,线程,ctl,null,ThreadPoolExecutor
From: https://www.cnblogs.com/shames/p/17931526.html