Future接口和实现Future接口的FutureTask类,代表异步计算的结果。FutureTask除了实现Future接口外,还实现了Runnable接口。因此,FutureTask可以交给 Executor执行,也可以由调用线程直接执行(FutureTask.run())。
类图
核心属性
/**
*
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
// 任务状态
private volatile int state;
// 新增
private static final int NEW = 0;
// 完成
private static final int COMPLETING = 1;
// 正常结束
private static final int NORMAL = 2;
// 发生异常结束
private static final int EXCEPTIONAL = 3;
// 取消接收
private static final int CANCELLED = 4;
// 发起中断
private static final int INTERRUPTING = 5;
// 线程已经中断
private static final int INTERRUPTED = 6;
// 异步任务
private Callable<V> callable;
// 异步任务的返回值
private Object outcome; // non-volatile, protected by state reads/writes
可能发生的任务状态转换:
- NEW -> COMPLETING -> NORMAL
- NEW -> COMPLETING -> EXCEPTIONAL
- NEW -> CANCELLED
- NEW -> INTERRUPTING -> INTERRUPTED
NORMAL、EXCEPTIONAL、CANCELLED和INTERRUPTED都是最终状态,表示任务已结束。
构造函数
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
通过构造函数我们可以发现,FutureTask
可以接受一个Callable
或是Runnable
。如果是Runnable
需要我们传一个返回值进去。
run()
public void run() {
// 判断状态
if (state != NEW ||
// CAS 设置执行任务的线程(相当于加锁)
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
// 再次判断任务状态
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 执行任务
result = c.call();
// 正常结束,设置标记为true
ran = true;
} catch (Throwable ex) {
result = null;
// 异常结束,设置标记为false
ran = false;
// 异常处理,唤醒get方法阻塞线程
setException(ex);
}
if (ran)
// 保存结果,唤醒get方法阻塞线程
set(result);
}
} finally {
// 相当于解锁
runner = null;
// 重新检查状态,判断是否需要响应中断
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
- 先判断任务状态,并加锁,防止任务被重复执行
- 再次检查一下任务状态,因为在第一步检查完了的时候,任务状态有可能已经发生了变化
- 执行任务
- 如果异常则在catch里面保存异常,唤醒
get()
方法阻塞线程 - 根据执行标记位,保存结果,唤醒
get()
方法阻塞线程 - 设置任务状态标记位
- 解锁,并判断是否响应中断
cancel()
public boolean cancel(boolean mayInterruptIfRunning) {
// 判断任务状态,更新任务状态(相当于加锁)
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
// 中断线程
t.interrupt();
} finally { // final state
// 设置认为状态是已中断
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
// 唤醒被get方法阻塞的线程
finishCompletion();
}
return true;
}
尝试取消执行此任务。
- 当FutureTask处于未启动状态时,执行
FutureTask.cancel(...)
方法将导致此任务永远不会被执 行; - 当FutureTask处于已启动状态时,执行
FutureTask.cancel(true)
方法将以中断执行此任务线程 的方式来试图停止任务; - 当FutureTask处于已启动状态时,执行
FutureTask.cancel(false)
方法将不会对正在执行此任务的线程产生影响(让正在执行的任务运行完成); - 当FutureTask处于已完 成状态时,执行
FutureTask.cancel(…)
方法将返回false。
get()
public V get() throws InterruptedException, ExecutionException {
int s = state;
// 如果任务还未完成,那执行等待
if (s <= COMPLETING)
s = awaitDone(false, 0L);
// 返回结果
return report(s);
}
get方法比较简单,直接调用了两个方法。一个是执行等待awaitDone
,一个是返回结果report
。
- 当FutureTask处于未启动或已启动状态时,执行FutureTask.get()方法将导致调用线程阻塞;
- 当FutureTask处于已完成状态时,执行FutureTask.get()方法将导致调用线程立即返回结果或抛 出异常。
awaitDone()
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// 计算超时时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
// 响应中断
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
// 判断任务是否结束,如果是直接返回任务当前状态
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
// 判断任务是否是完成状态,如果是让出CPU执行权,等待任务最终结束
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 超时模式
else if (timed) {
// 判断是否超时
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
// 阻塞一定时间
LockSupport.parkNanos(this, nanos);
}
else
// 一直阻塞
LockSupport.park(this);
}
}
- 计算出超时时间
- 响应中断
- 判断任务是否结束,如果是直接返回任务状态
- 判断任务是否完成,如果是调用
yield
方法让出CPU执行权,等待任务最终结束 - 阻塞线程
- 循环第2步
report()
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
封装结果:如果正常结束就返回任务执行结果;如果是取消任务就抛出CancellationException
异常;如果是异常结束就抛出任务执行遇到的异常。
get方法和cancel方法的执行示意图
总结
FutureTask
的等待和唤醒使用的是LockSupport.park
和LockSupport.unpark(t)
方法。
参考
《java并发编程的艺术》
源码
https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases
spring-boot-student-concurrent 工程
layering-cache
为监控而生的多级缓存框架 layering-cache这是我开源的一个多级缓存框架的实现,如果有兴趣可以看一下