首页 > 编程语言 >FutureTask源码分析

FutureTask源码分析

时间:2023-04-24 09:01:40浏览次数:48  
标签:分析 执行 源码 线程 当前 FutureTask null 节点

1、Callable 与 FutureTask介绍

1.1、Callable

  创建线程有两种方式,一种是继承Thread类,一种是实现Runnable接口重写run方法。其实Thread也实现了Runable接口。

  在Runable接口中,仅有一个无参无返回结果的run方法。Runable接口详情:

1 @FunctionalInterface
2 public interface Runnable {
3     public abstract void run();
4 }

  Callable接口的功能与Runable接口类似,唯一不同的是Callable接口可以返回线程执行的结果并抛出异常,在Runable接口中,仅有一个无参有结果的call方法,可抛出异常。Callable接口详情:

1 @FunctionalInterface
2 public interface Callable<V> {
3     V call() throws Exception;
4 }

1.2、FutureTask

  Thread构造函数详情: 

  

  在Thread中,没有Callable类型的入参构造函数,只有Runable类型。需要一个实现了Runable接口的对象封装Callable,这个对象是FutureTask。有关FutureTask的详细介绍,在后文会做详细介绍。

2、Callable与FutureTask 的使用

  示例代码如下:

 1 import java.util.concurrent.Callable;
 2 import java.util.concurrent.FutureTask;
 3 import java.util.concurrent.TimeUnit;
 4 public class TestCallable {
 5 
 6     public static void main(String[] args) throws Exception {
 7         // 创建FutureTask
 8         FutureTask task = new FutureTask<>(new Callable<String>() {
 9             @Override
10             public String call() throws Exception {
11                 // sleep  5 秒
12                 TimeUnit.SECONDS.sleep(5);
13                 // 返回线程执行结果
14                 return Thread.currentThread().getName() + " == CumCallable == ";
15             }
16         });
17         // 启动线程
18         new Thread(task).start();
19         // 阻塞等待
20         Object o = task.get();
21         // 主线程执行
22         System.out.println(Thread.currentThread().getName() +  " == main ==");
23         System.out.println(o);
24     }
25 }

3、Callable与FutureTask源码分析

  FutureTask类图关系如下;

  

  FutureTask实现了Runable、Future接口。

1、Future

  Runable接口这里不做介绍了,主要看Future接口详情如下:

 1 // 操作线程任务
 2 public interface Future<V> {
 3 
 4     /**
 5      * 试图取消当前执行的任务
 6      *     如果任务已经执行完成、或已经被取消、或不能被取消,返回false
 7      *     如果任务还未启动,就已经被取消,该任务永远不会被运行         
 8      *     如果任务已经启动,mayInterruptIfRunning参数决定正在执行的线程是否被中断停止该任务
 9      */
10     boolean cancel(boolean mayInterruptIfRunning);
11 
12     /**
13      * 在任务正常执行完成前,判断其是否被取消
14      */
15     boolean isCancelled();
16 
17     /**
18      * 判断任务是否执行完成
19      */
20     boolean isDone();
21 
22     /**
23      * 一直阻塞等待,直到获取执行结果 
24      */
25     V get() throws InterruptedException, ExecutionException;
26 
27     /**
28      * 阻塞等待timeout时间,获取执行结果,阻塞时间timeout已到达,抛出异常
29      */
30     V get(long timeout, TimeUnit unit)
31         throws InterruptedException, ExecutionException, TimeoutException;
32 }

  Future接口提供了操作线程的方法,如取消当前执行的任务、获取执行的结果、判断是否执行完成等。Future对线程的任务做了增强处理。

2、RunnableFuture

  RunnableFuture继承Runable、Future接口,并重写了run方法。FutureTask实现此方法,并在run方法中调用了call方法,获取执行结果。
1 public interface RunnableFuture<V> extends Runnable, Future<V> {
2     // 线程执行未取消,通过此方法设置线程执行结果
3     void run();
4 }

3、FutureTask

  FutureTask是基于 CAS + state + WaitNode节点链表 实现的。CAS保证多线程场景下的原子性;state线程状态,控制代码的返回及执行流程;WaitNode节点链表记录挂起的线程。

1、属性

1.1、执行任务线程状态

  

1.2、线程执行结果、等待队列头 

  callable:构造函数中传入的Callable对象,用于执行call()方法;

  outcome:正常执行完,outcome记录执行结果;执行出现异常,outcome记录异常;

  runner:当前正在执行的线程;

  waiters:等待队列的头节点,单向链表的头。

  

1.3、WaitNode

  waitNode是一个节点,存储在单向链表中,用于记录等待执行的线程,便于当前线程执行完后,唤醒等待的线程。

2、构造函数

  FutureTask的构造函数入参支持Callable,也支持Runable。构造函数完成对callable、state属性的初始化操作。详情如下:  

  

  入参为Runable的构造函数是如何转换成Callable的呢?Executors#callable() 详情如下:

1 // Runable 转为 Callable,并返回入参 result
2 public static <T> Callable<T> callable(Runnable task, T result) {
3     // 非空判断
4     if (task == null)
5         throw new NullPointerException();
6     // 使用适配器,将Runable转换为Callable
7     return new RunnableAdapter<T>(task, result);
8 }

  RunnableAdapter采用了适配器模式,将Runable转换为Callable。我们来看看是怎么转换的。

  RunableAdapter 与 Runable 组合。RunableAdapter实现了Callable接口,重写了call方法,在该方法中实际执行的是Runable的run方法,并返回传入的result结果。

  

3、run() - 入口方法

1、流程图

 

2、run()源码分析

  FutureTask实现了Runable接口,从写了run()方法,run()作为线程执行的入口方法,先来看看具体做了哪些操作。

  FutureTask#run() 详情如下:

 1 public void run() {
 2     // 当前线程不为新建状态  或者 当前线程cas获取锁资源失败,返回
 3     if (state != NEW ||
 4         !UNSAFE.compareAndSwapObject(this, runnerOffset,
 5                                      null, Thread.currentThread()))
 6         return;
 7     try {
 8         // callable成员变量赋给局部变量
 9         Callable<V> c = callable;
10         // callable不为空,当前线程状态为 新建
11         if (c != null && state == NEW) {
12             // 执行结果
13             V result;
14             // 执行结束标识
15             boolean ran;
16             try {
17                 // 执行实现的call方法,并获取返回结果
18                 result = c.call();
19                 // 执行结束标识设置为true -> 正常执行
20                 ran = true;
21             } catch (Throwable ex) {
22                 // 执行异常,返回null
23                 result = null;
24                 // 执行结束标识设置为true -> 执行出现异常
25                 ran = false;
26                 // 设置异常信息
27                 setException(ex);
28             }
29             // 正常执行结束,存储执行结果
30             if (ran)
31                 set(result);
32         }
33     } finally {
34         // 线程释放锁资源
35         runner = null;
36         // 中断线程的处理
37         int s = state;
38         if (s >= INTERRUPTING)
39             handlePossibleCancellationInterrupt(s);
40     }
41 }

  在run()方法中,实际执行的是callable的call()方法,并用outcome成员变量存储线程执行的结果,若出现异常,outcome则存储异常信息。

1、正常执行结束

  线程正常执行完,将线程的执行结果设置到FutureTask的outCome属性中,FutureTask#set()方法。

 1 // 线程执行结果设置到outcome属性中
 2 protected void set(V v) {
 3     // 修改当前线程状态   NEW(新建) -> COMPLETING(运行中)
 4     if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
 5         // 线程执行结果赋值到成员变量outcome
 6         outcome = v;
 7         // 修改线程状态  COMPLETING(运行中) -> NORMAL(正常执行结束)
 8         UNSAFE.putOrderedInt(this, stateOffset, NORMAL); 
 9         // 唤醒因get()操作挂起的线程
10         finishCompletion();
11     }
12 }

  线程执行执行结束,线程状态经过了 NEW -> COMPLETING -> NORMAL 的变化,将结果赋值给outcome,唤醒因get()操作挂起的线程。

2、异常的处理

  线程执行过程中异常的处理,FutureTask#setException() 详情如下:

 1 // 设置异常信息
 2 protected void setException(Throwable t) {
 3     // cas操作,修改当前执行的线程状态  NEW(新建) -> COMPLETING(运行中)
 4     if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
 5         // 线程执行结果设置为异常
 6         outcome = t;
 7         // 修改当前执行的线程状态  COMPLETING(运行中) -> EXCEPTIONAL(异常)
 8         UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); 
 9         // 唤醒因get()操作挂起的线程
10         finishCompletion();
11     }
12 }

  线程执行执行结束,线程状态经过了 NEW -> COMPLETING -> EXCEPTIONAL 的变化,将异常赋值给outcome,唤醒因get()操作挂起线程。

3、唤醒等待队列中的挂起线程

  FutureTask#finishCompletion() 详情如下:

 1 // 移除并唤醒所有等待执行的线程
 2 private void finishCompletion() {
 3     // q执行waiters链表的头节点
 4     for (WaitNode q; (q = waiters) != null;) {
 5         // cas操作,将waiters设置为null,担心外部线程使用 cancel 取消当前任务触发  finishCompletion()
 6         if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
 7              // 自旋
 8              for (;;) {
 9                 // 获取当前Node封装的 thread
10                 Thread t = q.thread;
11                 // 当前线程不为  null
12                 if (t != null) {
13                     // 将当前等待节点的线程设置为null
14                     q.thread = null;
15                     // 唤醒当前节点因 get 操作阻塞的线程
16                     LockSupport.unpark(t);
17                 }
18                 // 获取下一个WaitNode
19                 WaitNode next = q.next;
20                 // 等待节点链表中没有待唤醒的线程,结束循环
21                 if (next == null)
22                     break;
23                     
24                 // 将下一个待唤醒的等待节点从  WaitNode链表中移除,便于GC
25                 q.next = null;
26                 q = next;
27             }
28             // 等待节点链表中没有待唤醒的线程,结束循环
29             break;
30         }
31     }
32     
33     // JDK提供修改当前线程执行状态的拓展方法,默认不实现
34     done();
35     
36     // 执行完毕,callable
37     callable = null;
38 }
39 
40 // JDK提供修改当前线程执行状态的拓展方法,默认不实现
41 protected void done() { }

4、处于中断过程中的线程处理

  FutureTask实现了Future接口,Future提供了线程取消的方法cancel。若当前线程被取消,FutureTask执行cancel方法过程中,线程状态有一段时间是 INTERRUPTING (中断处理中) 的状态,在线程执行结束之前,FutureTask在 handlePossibleCancellationInterrupt 方法中 INTERRUPTING (中断处理中)状态 的线程做了特殊处理。

  FutureTask#handlePossibleCancellationInterrupt() 详情如下:

1 private void handlePossibleCancellationInterrupt(int s) {
2     // 线程状态 INTERRUPTING (中断处理中) 
3     if (s == INTERRUPTING)
4         while (state == INTERRUPTING)
5             // 让出CPU,等待线程状态变为  INTERRUPTED  (已中断)
6             Thread.yield();
7 }

4、get() - 获取线程执行结果

1、流程图

2、get()源码分析

  获取线程执行结果,FutureTask#get() 详情如下:

1 public V get() throws InterruptedException, ExecutionException {
2     // 当前线程执行状态
3     int s = state;
4     // 当前线程处于  NEW(新建)、COMPLETING (运行中) ,阻塞挂起外部的get()获取结果的线程
5     if (s <= COMPLETING)
6         s = awaitDone(false, 0L);
7     // 线程被唤醒后,返回执行结果
8     return report(s);
9 }

  当前线程状态为 NEW或COMPLETING ,挂起线程;等待线程正常执行结束、执行异常、线程取消调用finishCompletion()方法唤醒这些挂起的线程,再通过report方法,返回执行结果。

1、挂起新建、运行中的线程

  挂起线程 FutureTask#awaitDone() 详情如下:

 1 private int awaitDone(boolean timed, long nanos) throws InterruptedException {
 2     // 是否带超时的阻塞,0不带超时
 3     final long deadline = timed ? System.nanoTime() + nanos : 0L;
 4     // 引用当前线程 封装成waitNode对象
 5     WaitNode q = null;
 6     // 当前线程 waitNode对象是否  已经添加进等待链表中,默认未添加
 7     boolean queued = false;
 8     // 自旋
 9     for (;;) {
10         // 4.1、线程唤醒,说明线程是使用中断的方式唤醒,若interrupted() 返回为true 之后会将中断标记重置为false
11         if (Thread.interrupted()) {
12             // 当前线程node出队
13             removeWaiter(q);
14             // get方法抛出  中断异常
15             throw new InterruptedException();
16         }
17         
18         // 4.2、当前线程被其他线程 使用unpark()方式
19         
20         // 获取当前线程执行状态
21         int s = state;
22         // 当前线程执行完成,将当前任务线程设置为null,并返回任务状态
23         // 状态是终态: NORMAL 、 EXCEPTIONAL 、 CANCELLED
24         if (s > COMPLETING) {
25             // 当前线程已创建WaitNode对象,将其中的thread设置为null,helpGC
26             if (q != null)
27                 q.thread = null;
28             return s;
29         }
30         
31         // 当前线程处于  运行中,让出CPU,进行下一次的抢占
32         else if (s == COMPLETING) 
33             Thread.yield();
34             
35         // 1、第一次自旋,是新线程,当前等待节点为null,为当前线程创建WaitNode对象
36         else if (q == null)
37             // 创建等待节点
38             q = new WaitNode();
39             
40         // 2、第二次自旋,线程已创建WaitNode 对象, WaitNode未入队
41         else if (!queued)
42             // 当前线程node节点指向 原队列的头节点  waiters:一直指向队列的头
43             // cas设置waiters引用指向当前线程Node节点,若成功,queued为true;若失败,可能其他线程先一步入队了
44             queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
45                                                  q.next = waiters, q);
46         
47         // 3、第三次自旋,挂起线程操作        
48         // 如果设置了超时时间,get操作的线程会被parkNanos,并过了超时时间的话,从 waiters 链表中删除当前 wait
49         
50         else if (timed) {
51             // 获取剩余阻塞时间
52             nanos = deadline - System.nanoTime();
53             // 剩余阻塞时间不足,将WaitNode从等待链表中移除,返回响应状态
54             if (nanos <= 0L) {
55                 removeWaiter(q);
56                 return state;
57             }
58             // 没有过超时时间,线程进入 TIMED_WAITING 状态
59             LockSupport.parkNanos(this, nanos);
60         }
61         // 未设置超时时间,get操作的线程会被park,进入 WAITING 状态
62         // 除非有其他线程唤醒 或 将当前线程中断
63         else
64             LockSupport.park(this);
65     }
66 }

    awaitDone使用了自旋,根据每次自旋变量值的不同,走不同的分支,执行流程图如下:

  

2、移除链表中的WaitNode

  将当前Node节点从链表中移除,FutureTask#removeWaiter() 详情如下:
 1 // 试图将超时 或 已中断的等待节点 从链表中移除,同时将被删除节点的上一节点指向被删除节点的下一节点,避免gc无法回收
 2 private void removeWaiter(WaitNode node) {
 3     // 要移除的WaitNode不为null
 4     if (node != null) {
 5         // 要移除的WaitNode中线程设置为null
 6         node.thread = null;
 7         retry:
 8         // 自旋
 9         for (;;) {
10             // pred :前驱节点
11             // q:初始为头节点,代表当前正在遍历的节点
12             // s:后继节点
13             for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
14                 // 当前遍历的节点的后继节点赋值给 s
15                 s = q.next;
16                 // 1、当前遍历的节点线程不为null,将当前遍历节点赋值给 它的前驱节点pred,开始下一次自旋
17                 if (q.thread != null)
18                     pred = q;
19                     
20                 // 当前遍历的节点不为头节点,并且当前遍历节点的线程为null,将当前遍历节点的前驱节点指向它的后继节点
21                 else if (pred != null) {
22                     pred.next = s;
23                     // 当前遍历线程节点的前驱节点线程也为null,说明它的前驱节点也要被删除,重新开始自旋
24                     if (pred.thread == null) 
25                         continue retry;
26                 }
27 
28                 // q.thread = null,  pred == null  ==> 头节点
29                 // 当前遍历节点为头节点,cas操作,将队列头节点waiters的引用指向当前节点的后继节点,
30                 // 即当前节点的下一节点作为头节点
31                 else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
32                                                       q, s))
33                     continue retry;
34             }
35             break;
36         }
37     }
38 }

  移除等待队列中执行完成、被取消的WaitNode节点,若WaitNode是头节点,将队列头节点的引用指向当前头节点的后继节点;若不为队列头节点,将当前WaitNode的前驱节点指向它的后继节点,流程图如下 :

  

   为了便于理解removeWaiters的流程,下面给出等待队列的大体变化过程:

  

3、返回执行结果

  根据线程状态返回执行结果,FutureTask#report() 详情如下:
 1 private V report(int s) throws ExecutionException {
 2     // 获取线程状态
 3     Object x = outcome;
 4     // 正常执行结束
 5     if (s == NORMAL)
 6         // 返回执行结果
 7         return (V)x;
 8     // 已取消 | (已中断) ,抛CancellationException异常
 9     if (s >= CANCELLED)
10         throw new CancellationException();
11     // 异常,抛ExecutionException异常
12     throw new ExecutionException((Throwable)x);
13 }

  通过当前线程的状态,判断是抛出异常,还是返回线程执行结果。

5、cancel() - 获取线程执行结果

5.1、流程图

  

5.2、源码分析

 1 public boolean cancel(boolean mayInterruptIfRunning) {
 2     // 当前正执行的线程状态不为 NEW(新建) 状态 ,并且线程状态变更失败 ,返回false
 3     if (!(state == NEW &&
 4           UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
 5               mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
 6         return false;
 7     try {   
 8         // 允许中断正在运行线程
 9         if (mayInterruptIfRunning) {
10             try {
11                 // 获取当前正在执行的线程
12                 Thread t = runner;
13                 // 执行interrupt()中断方法
14                 if (t != null)
15                     t.interrupt();
16             } finally {
17                 // 修改线程状态为已中断
18                 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
19             }
20         }
21     } finally {
22         // 唤醒因get()而挂起线程
23         finishCompletion();
24     }
25     // 返回取消中断线程结果
26     return true;
27 }

  

标签:分析,执行,源码,线程,当前,FutureTask,null,节点
From: https://www.cnblogs.com/RunningSnails/p/17348324.html

相关文章

  • Map - TreeSet & TreeMap 源码解析
    Java7-TreeSet&TreeMap总体介绍前者仅仅是对后者做了一层包装,也就是说TreeSet里面有一个TreeMap(适配器模式)。因此本文将重点分析TreeMap。JavaTreeMap实现了SortedMap接口,也就是说会按照key的大小顺序对Map中的元素进行排序,key大小的评判可以通过其本身的自然顺序(natu......
  • Map - LinkedHashSet&Map源码解析
    上篇文章讲了HashMap。HashMap是一种非常常见、非常有用的集合,但在多线程情况下使用不当会有线程安全问题。大多数情况下,只要不涉及线程安全问题,Map基本都可以使用HashMap,不过HashMap有一个问题,就是迭代HashMap的顺序并不是HashMap放置的顺序,也就是无序。HashMap的这一缺点往往会......
  • 【深入浅出Spring原理及实战】「源码调试分析」深入源码探索Spring底层框架的的refres
    学习Spring源码的建议阅读Spring官方文档,了解Spring框架的基本概念和使用方法。下载Spring源码,可以从官网或者GitHub上获取。阅读Spring源码的入口类,了解Spring框架的启动过程和核心组件的加载顺序。阅读Spring源码中的注释和文档,了解每个类和方法的作用和用法。调试Spring源码,可以......
  • vue2源码-十三、nextTick在哪里使用?原理是什么?
    nextTick在哪里使用?原理是什么?nextTick内部采用了异步任务进行包装(多个nextTick调用会被合并成一次,内部会合并回调)最后在异步任务中批处理。主要应用场景就是异步更新(默认调度的时候就会添加一个·nextTick任务)用户为了获取最终的渲染结果需要在内部任务执行之后再执行用户逻......
  • 比赛分析(完整版)
    一、数据分析该数据集采用COCO格式给出标注信息,即将所有训练图像的标注都存放到一个json文件中,数据以字典嵌套的形式存放。对数据集进行分析:检测数据集涉及3个场景,分别是“火灾检测”、“工业仪表检测”和“安全帽检测”,共7114张图像。这些图像按8:2的比例划分为训练集、验证集......
  • 基于 Amazon SageMaker 构建细粒度情感分析应用
    背景介绍细粒度情感分析(Aspect-BasedSentimentAnalysis,ABSA)由于其广阔的业务价值而吸引越来越多的从业者投身其中,通过分析客户评论数据中的情感偏好往往有利于企业探寻客户关注点,挖掘客户需求,加速产品迭代,提高营销效率,完善售后服务等。毫不夸张地说,发掘出客户的声音(voiceofcu......
  • 【开源项目】无锡~超经典智慧城市智慧无锡 CIM/BIM数字孪生可视化项目——开源工程及
    智慧无锡免费提供工程和源码,为城市管理和发展提供更智能化的解决方案。项目介绍智慧无锡项目利用数字孪生技术,将无锡市的地理信息、公共数据和实时监测数据进行整合,以数字化形式呈现城市的各种信息和场景。在工程中,利用AI处理地形影像,在溪梁区使用高精度的max模型,其他区域使用AI生......
  • 手游(明日方舟)营收与社区动态评论关系分析
    importpandasaspdimportnumpyasnpimportmatplotlibasmpfrompandas.core.algorithmsimportSelectN,diffimportseabornassefrommatplotlibimportpyplotaspltimportwordcloudimportjiebaimportloggingfromPILimportImage##设置中文plt.rcPa......
  • 直播预告 | 字节跳动云原生大数据分析引擎 ByConity 与 ClickHouse 有何差异?
    ByContiy是字节跳动开源的一款云原生的大数据分析引擎,擅长交互式查询和即席查询,具有支持多表关联复杂查询、集群扩容无感、离线批数据和实时数据流统一汇总等特点。ByConity从1月份发布开源beta版本之后,陆续收到社区询问ByConity和ClickHouse差异的反馈:“ByConity有没......
  • web前端三大主流框架对比分析
    web前端开发框架是在前端工程师中经常会用到的内容,可以大大减少项目中的bug,节约开发成本,加快项目周期。在使用web前端开发框架之前,需要先了解web前端三三大主流框架有哪些。目前web前端三大框架Angular、React、Vue,这三种框架各有优势,下面将对web前端三大主流框架对比、分析......