1、Callable 与 FutureTask介绍
1.1、Callable
创建线程有两种方式,一种是继承Thread类,一种是实现Runnable接口重写run方法。其实Thread也实现了Runable接口。
在Runable接口中,仅有一个无参无返回结果的run方法。Runable接口详情:
1 @FunctionalInterface 2 public interface Runnable { 3 public abstract void run(); 4 }
Callable接口的功能与Runable接口类似,唯一不同的是Callable接口可以返回线程执行的结果并抛出异常,在Runable接口中,仅有一个无参有结果的call方法,可抛出异常。Callable接口详情:
1 @FunctionalInterface 2 public interface Callable<V> { 3 V call() throws Exception; 4 }
1.2、FutureTask
Thread构造函数详情:
在Thread中,没有Callable类型的入参构造函数,只有Runable类型。需要一个实现了Runable接口的对象封装Callable,这个对象是FutureTask。有关FutureTask的详细介绍,在后文会做详细介绍。
2、Callable与FutureTask 的使用
示例代码如下:
1 import java.util.concurrent.Callable; 2 import java.util.concurrent.FutureTask; 3 import java.util.concurrent.TimeUnit; 4 public class TestCallable { 5 6 public static void main(String[] args) throws Exception { 7 // 创建FutureTask 8 FutureTask task = new FutureTask<>(new Callable<String>() { 9 @Override 10 public String call() throws Exception { 11 // sleep 5 秒 12 TimeUnit.SECONDS.sleep(5); 13 // 返回线程执行结果 14 return Thread.currentThread().getName() + " == CumCallable == "; 15 } 16 }); 17 // 启动线程 18 new Thread(task).start(); 19 // 阻塞等待 20 Object o = task.get(); 21 // 主线程执行 22 System.out.println(Thread.currentThread().getName() + " == main =="); 23 System.out.println(o); 24 } 25 }
3、Callable与FutureTask源码分析
FutureTask类图关系如下;
FutureTask实现了Runable、Future接口。
1、Future
Runable接口这里不做介绍了,主要看Future接口详情如下:
1 // 操作线程任务 2 public interface Future<V> { 3 4 /** 5 * 试图取消当前执行的任务 6 * 如果任务已经执行完成、或已经被取消、或不能被取消,返回false 7 * 如果任务还未启动,就已经被取消,该任务永远不会被运行 8 * 如果任务已经启动,mayInterruptIfRunning参数决定正在执行的线程是否被中断停止该任务 9 */ 10 boolean cancel(boolean mayInterruptIfRunning); 11 12 /** 13 * 在任务正常执行完成前,判断其是否被取消 14 */ 15 boolean isCancelled(); 16 17 /** 18 * 判断任务是否执行完成 19 */ 20 boolean isDone(); 21 22 /** 23 * 一直阻塞等待,直到获取执行结果 24 */ 25 V get() throws InterruptedException, ExecutionException; 26 27 /** 28 * 阻塞等待timeout时间,获取执行结果,阻塞时间timeout已到达,抛出异常 29 */ 30 V get(long timeout, TimeUnit unit) 31 throws InterruptedException, ExecutionException, TimeoutException; 32 }
Future接口提供了操作线程的方法,如取消当前执行的任务、获取执行的结果、判断是否执行完成等。Future对线程的任务做了增强处理。
2、RunnableFuture
RunnableFuture继承Runable、Future接口,并重写了run方法。FutureTask实现此方法,并在run方法中调用了call方法,获取执行结果。1 public interface RunnableFuture<V> extends Runnable, Future<V> { 2 // 线程执行未取消,通过此方法设置线程执行结果 3 void run(); 4 }
3、FutureTask
FutureTask是基于 CAS + state + WaitNode节点链表 实现的。CAS保证多线程场景下的原子性;state线程状态,控制代码的返回及执行流程;WaitNode节点链表记录挂起的线程。
1、属性
1.1、执行任务线程状态
1.2、线程执行结果、等待队列头
callable:构造函数中传入的Callable对象,用于执行call()方法;
outcome:正常执行完,outcome记录执行结果;执行出现异常,outcome记录异常;
runner:当前正在执行的线程;
waiters:等待队列的头节点,单向链表的头。
1.3、WaitNode
waitNode是一个节点,存储在单向链表中,用于记录等待执行的线程,便于当前线程执行完后,唤醒等待的线程。
2、构造函数
FutureTask的构造函数入参支持Callable,也支持Runable。构造函数完成对callable、state属性的初始化操作。详情如下:
入参为Runable的构造函数是如何转换成Callable的呢?Executors#callable() 详情如下:
1 // Runable 转为 Callable,并返回入参 result 2 public static <T> Callable<T> callable(Runnable task, T result) { 3 // 非空判断 4 if (task == null) 5 throw new NullPointerException(); 6 // 使用适配器,将Runable转换为Callable 7 return new RunnableAdapter<T>(task, result); 8 }
RunnableAdapter采用了适配器模式,将Runable转换为Callable。我们来看看是怎么转换的。
RunableAdapter 与 Runable 组合。RunableAdapter实现了Callable接口,重写了call方法,在该方法中实际执行的是Runable的run方法,并返回传入的result结果。
3、run() - 入口方法
1、流程图
2、run()源码分析
FutureTask实现了Runable接口,从写了run()方法,run()作为线程执行的入口方法,先来看看具体做了哪些操作。
FutureTask#run() 详情如下:
1 public void run() { 2 // 当前线程不为新建状态 或者 当前线程cas获取锁资源失败,返回 3 if (state != NEW || 4 !UNSAFE.compareAndSwapObject(this, runnerOffset, 5 null, Thread.currentThread())) 6 return; 7 try { 8 // callable成员变量赋给局部变量 9 Callable<V> c = callable; 10 // callable不为空,当前线程状态为 新建 11 if (c != null && state == NEW) { 12 // 执行结果 13 V result; 14 // 执行结束标识 15 boolean ran; 16 try { 17 // 执行实现的call方法,并获取返回结果 18 result = c.call(); 19 // 执行结束标识设置为true -> 正常执行 20 ran = true; 21 } catch (Throwable ex) { 22 // 执行异常,返回null 23 result = null; 24 // 执行结束标识设置为true -> 执行出现异常 25 ran = false; 26 // 设置异常信息 27 setException(ex); 28 } 29 // 正常执行结束,存储执行结果 30 if (ran) 31 set(result); 32 } 33 } finally { 34 // 线程释放锁资源 35 runner = null; 36 // 中断线程的处理 37 int s = state; 38 if (s >= INTERRUPTING) 39 handlePossibleCancellationInterrupt(s); 40 } 41 }
在run()方法中,实际执行的是callable的call()方法,并用outcome成员变量存储线程执行的结果,若出现异常,outcome则存储异常信息。
1、正常执行结束
线程正常执行完,将线程的执行结果设置到FutureTask的outCome属性中,FutureTask#set()方法。
1 // 线程执行结果设置到outcome属性中 2 protected void set(V v) { 3 // 修改当前线程状态 NEW(新建) -> COMPLETING(运行中) 4 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { 5 // 线程执行结果赋值到成员变量outcome 6 outcome = v; 7 // 修改线程状态 COMPLETING(运行中) -> NORMAL(正常执行结束) 8 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); 9 // 唤醒因get()操作挂起的线程 10 finishCompletion(); 11 } 12 }
线程执行执行结束,线程状态经过了 NEW -> COMPLETING -> NORMAL 的变化,将结果赋值给outcome,唤醒因get()操作挂起的线程。
2、异常的处理
线程执行过程中异常的处理,FutureTask#setException() 详情如下:
1 // 设置异常信息 2 protected void setException(Throwable t) { 3 // cas操作,修改当前执行的线程状态 NEW(新建) -> COMPLETING(运行中) 4 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { 5 // 线程执行结果设置为异常 6 outcome = t; 7 // 修改当前执行的线程状态 COMPLETING(运行中) -> EXCEPTIONAL(异常) 8 UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); 9 // 唤醒因get()操作挂起的线程 10 finishCompletion(); 11 } 12 }
线程执行执行结束,线程状态经过了 NEW -> COMPLETING -> EXCEPTIONAL 的变化,将异常赋值给outcome,唤醒因get()操作挂起线程。
3、唤醒等待队列中的挂起线程
FutureTask#finishCompletion() 详情如下:
1 // 移除并唤醒所有等待执行的线程 2 private void finishCompletion() { 3 // q执行waiters链表的头节点 4 for (WaitNode q; (q = waiters) != null;) { 5 // cas操作,将waiters设置为null,担心外部线程使用 cancel 取消当前任务触发 finishCompletion() 6 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { 7 // 自旋 8 for (;;) { 9 // 获取当前Node封装的 thread 10 Thread t = q.thread; 11 // 当前线程不为 null 12 if (t != null) { 13 // 将当前等待节点的线程设置为null 14 q.thread = null; 15 // 唤醒当前节点因 get 操作阻塞的线程 16 LockSupport.unpark(t); 17 } 18 // 获取下一个WaitNode 19 WaitNode next = q.next; 20 // 等待节点链表中没有待唤醒的线程,结束循环 21 if (next == null) 22 break; 23 24 // 将下一个待唤醒的等待节点从 WaitNode链表中移除,便于GC 25 q.next = null; 26 q = next; 27 } 28 // 等待节点链表中没有待唤醒的线程,结束循环 29 break; 30 } 31 } 32 33 // JDK提供修改当前线程执行状态的拓展方法,默认不实现 34 done(); 35 36 // 执行完毕,callable 37 callable = null; 38 } 39 40 // JDK提供修改当前线程执行状态的拓展方法,默认不实现 41 protected void done() { }
4、处于中断过程中的线程处理
FutureTask实现了Future接口,Future提供了线程取消的方法cancel。若当前线程被取消,FutureTask执行cancel方法过程中,线程状态有一段时间是 INTERRUPTING (中断处理中) 的状态,在线程执行结束之前,FutureTask在 handlePossibleCancellationInterrupt 方法中 INTERRUPTING (中断处理中)状态 的线程做了特殊处理。
FutureTask#handlePossibleCancellationInterrupt() 详情如下:
1 private void handlePossibleCancellationInterrupt(int s) { 2 // 线程状态 INTERRUPTING (中断处理中) 3 if (s == INTERRUPTING) 4 while (state == INTERRUPTING) 5 // 让出CPU,等待线程状态变为 INTERRUPTED (已中断) 6 Thread.yield(); 7 }
4、get() - 获取线程执行结果
1、流程图
2、get()源码分析
获取线程执行结果,FutureTask#get() 详情如下:
1 public V get() throws InterruptedException, ExecutionException { 2 // 当前线程执行状态 3 int s = state; 4 // 当前线程处于 NEW(新建)、COMPLETING (运行中) ,阻塞挂起外部的get()获取结果的线程 5 if (s <= COMPLETING) 6 s = awaitDone(false, 0L); 7 // 线程被唤醒后,返回执行结果 8 return report(s); 9 }
当前线程状态为 NEW或COMPLETING ,挂起线程;等待线程正常执行结束、执行异常、线程取消调用finishCompletion()方法唤醒这些挂起的线程,再通过report方法,返回执行结果。
1、挂起新建、运行中的线程
挂起线程 FutureTask#awaitDone() 详情如下:
1 private int awaitDone(boolean timed, long nanos) throws InterruptedException { 2 // 是否带超时的阻塞,0不带超时 3 final long deadline = timed ? System.nanoTime() + nanos : 0L; 4 // 引用当前线程 封装成waitNode对象 5 WaitNode q = null; 6 // 当前线程 waitNode对象是否 已经添加进等待链表中,默认未添加 7 boolean queued = false; 8 // 自旋 9 for (;;) { 10 // 4.1、线程唤醒,说明线程是使用中断的方式唤醒,若interrupted() 返回为true 之后会将中断标记重置为false 11 if (Thread.interrupted()) { 12 // 当前线程node出队 13 removeWaiter(q); 14 // get方法抛出 中断异常 15 throw new InterruptedException(); 16 } 17 18 // 4.2、当前线程被其他线程 使用unpark()方式 19 20 // 获取当前线程执行状态 21 int s = state; 22 // 当前线程执行完成,将当前任务线程设置为null,并返回任务状态 23 // 状态是终态: NORMAL 、 EXCEPTIONAL 、 CANCELLED 24 if (s > COMPLETING) { 25 // 当前线程已创建WaitNode对象,将其中的thread设置为null,helpGC 26 if (q != null) 27 q.thread = null; 28 return s; 29 } 30 31 // 当前线程处于 运行中,让出CPU,进行下一次的抢占 32 else if (s == COMPLETING) 33 Thread.yield(); 34 35 // 1、第一次自旋,是新线程,当前等待节点为null,为当前线程创建WaitNode对象 36 else if (q == null) 37 // 创建等待节点 38 q = new WaitNode(); 39 40 // 2、第二次自旋,线程已创建WaitNode 对象, WaitNode未入队 41 else if (!queued) 42 // 当前线程node节点指向 原队列的头节点 waiters:一直指向队列的头 43 // cas设置waiters引用指向当前线程Node节点,若成功,queued为true;若失败,可能其他线程先一步入队了 44 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, 45 q.next = waiters, q); 46 47 // 3、第三次自旋,挂起线程操作 48 // 如果设置了超时时间,get操作的线程会被parkNanos,并过了超时时间的话,从 waiters 链表中删除当前 wait 49 50 else if (timed) { 51 // 获取剩余阻塞时间 52 nanos = deadline - System.nanoTime(); 53 // 剩余阻塞时间不足,将WaitNode从等待链表中移除,返回响应状态 54 if (nanos <= 0L) { 55 removeWaiter(q); 56 return state; 57 } 58 // 没有过超时时间,线程进入 TIMED_WAITING 状态 59 LockSupport.parkNanos(this, nanos); 60 } 61 // 未设置超时时间,get操作的线程会被park,进入 WAITING 状态 62 // 除非有其他线程唤醒 或 将当前线程中断 63 else 64 LockSupport.park(this); 65 } 66 }
awaitDone使用了自旋,根据每次自旋变量值的不同,走不同的分支,执行流程图如下:
2、移除链表中的WaitNode
将当前Node节点从链表中移除,FutureTask#removeWaiter() 详情如下:1 // 试图将超时 或 已中断的等待节点 从链表中移除,同时将被删除节点的上一节点指向被删除节点的下一节点,避免gc无法回收 2 private void removeWaiter(WaitNode node) { 3 // 要移除的WaitNode不为null 4 if (node != null) { 5 // 要移除的WaitNode中线程设置为null 6 node.thread = null; 7 retry: 8 // 自旋 9 for (;;) { 10 // pred :前驱节点 11 // q:初始为头节点,代表当前正在遍历的节点 12 // s:后继节点 13 for (WaitNode pred = null, q = waiters, s; q != null; q = s) { 14 // 当前遍历的节点的后继节点赋值给 s 15 s = q.next; 16 // 1、当前遍历的节点线程不为null,将当前遍历节点赋值给 它的前驱节点pred,开始下一次自旋 17 if (q.thread != null) 18 pred = q; 19 20 // 当前遍历的节点不为头节点,并且当前遍历节点的线程为null,将当前遍历节点的前驱节点指向它的后继节点 21 else if (pred != null) { 22 pred.next = s; 23 // 当前遍历线程节点的前驱节点线程也为null,说明它的前驱节点也要被删除,重新开始自旋 24 if (pred.thread == null) 25 continue retry; 26 } 27 28 // q.thread = null, pred == null ==> 头节点 29 // 当前遍历节点为头节点,cas操作,将队列头节点waiters的引用指向当前节点的后继节点, 30 // 即当前节点的下一节点作为头节点 31 else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, 32 q, s)) 33 continue retry; 34 } 35 break; 36 } 37 } 38 }
移除等待队列中执行完成、被取消的WaitNode节点,若WaitNode是头节点,将队列头节点的引用指向当前头节点的后继节点;若不为队列头节点,将当前WaitNode的前驱节点指向它的后继节点,流程图如下 :
为了便于理解removeWaiters的流程,下面给出等待队列的大体变化过程:
3、返回执行结果
根据线程状态返回执行结果,FutureTask#report() 详情如下:1 private V report(int s) throws ExecutionException { 2 // 获取线程状态 3 Object x = outcome; 4 // 正常执行结束 5 if (s == NORMAL) 6 // 返回执行结果 7 return (V)x; 8 // 已取消 | (已中断) ,抛CancellationException异常 9 if (s >= CANCELLED) 10 throw new CancellationException(); 11 // 异常,抛ExecutionException异常 12 throw new ExecutionException((Throwable)x); 13 }
通过当前线程的状态,判断是抛出异常,还是返回线程执行结果。
5、cancel() - 获取线程执行结果
5.1、流程图
5.2、源码分析
1 public boolean cancel(boolean mayInterruptIfRunning) { 2 // 当前正执行的线程状态不为 NEW(新建) 状态 ,并且线程状态变更失败 ,返回false 3 if (!(state == NEW && 4 UNSAFE.compareAndSwapInt(this, stateOffset, NEW, 5 mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) 6 return false; 7 try { 8 // 允许中断正在运行线程 9 if (mayInterruptIfRunning) { 10 try { 11 // 获取当前正在执行的线程 12 Thread t = runner; 13 // 执行interrupt()中断方法 14 if (t != null) 15 t.interrupt(); 16 } finally { 17 // 修改线程状态为已中断 18 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); 19 } 20 } 21 } finally { 22 // 唤醒因get()而挂起线程 23 finishCompletion(); 24 } 25 // 返回取消中断线程结果 26 return true; 27 }
标签:分析,执行,源码,线程,当前,FutureTask,null,节点 From: https://www.cnblogs.com/RunningSnails/p/17348324.html