首页 > 编程语言 >FutureTask源码阅读

FutureTask源码阅读

时间:2024-01-22 14:46:11浏览次数:40  
标签:状态 private 任务 源码 线程 阅读 FutureTask null

目录

本人的源码阅读主要聚焦于类的使用场景,一般只在java层面进行分析,没有深入到一些native方法的实现。并且由于知识储备不完整,很可能出现疏漏甚至是谬误,欢迎指出共同学习

本文基于corretto-17.0.9源码,参考本文时请打开相应的源码对照,否则你会不知道我在说什么

简介

FutureTask是Future的一个实现,提供了可取消的异步任务执行。FutureTask具体实现了RunnableTask这个接口:

public interface RunnableFuture<V> extends Runnable, Future<V> {
  void run();
}

作为Future,为什么实现了Runnable而不是Callable呢,因为实现Runnable接口可以方便地将这个任务交给Executor执行,然后任务内部封装了一个Callable对象,并且在run方法中最终会调用Callable,并将结果保存在内部,当调用get的时候获取结果。

例子

Callable<String> task = ...; // 实际执行的任务
FutureTask<String> future = new FutureTask<>(task);
executor.execute(future); // 执行任务
future.get(); // 获取结果

代码分析

成员变量

FutureTask的核心是状态机,即内部将Waiter Thread抽象出各种状态,底层通过Thread和LockSupport检测线程状态并改变线程的实际状态(阻塞、中断等),而状态机的状态转换是通过CAS进行状态转换的。先来看看FutureTask为任务定义的几种状态

public class FutureTask<V> implements RunnableFuture<V> {
  // 表示任务的状态。状态初始化为NEW,所有可能的状态转换如下:
  // NEW -> COMPLETING -> NORMAL
  // NEW -> COMPLETING -> EXCEPTIONAL
  // NEW -> CANCELLED
  // NEW -> INTERRUPTING -> INTERRUPTED
  private volatile int state;
  
  // 刚创建FutureTask到任务执行中这段时间内都处于NEW状态
  private static final int NEW          = 0;
  // 任务结束(包括正常结束和异常)之前的过渡状态
  private static final int COMPLETING   = 1;
  // 任务正常结束后的状态
  private static final int NORMAL       = 2;
  // 任务抛出异常后的状态
  private static final int EXCEPTIONAL  = 3;
  // 任务被取消后的状态
  private static final int CANCELLED    = 4;
  // 状态中断前的过渡状态
  private static final int INTERRUPTING = 5;
  // 任务被中断后的状态
  private static final int INTERRUPTED  = 6;

一定要注意,这里的状态是任务的状态,而不是worker thread的状态,也不是waiter thread的状态

分析代码逻辑前,先明确一下这些状态的具体含义。

首先两个过渡状态COMPLETING、INTERRUPTING怎么理解呢?拿COMPLETING举例子:

// 任务运行结束后保存返回结果
protected void set(V v) {
  // 设置为COMPLETING,防止其它线程设置为其他状态
  if (STATE.compareAndSet(this, NEW, COMPLETING)) {
    // 保存结果
    outcome = v;
    // 设置为NORMAL
    STATE.setRelease(this, NORMAL);
    finishCompletion();
  }
}

这下知道了,“保存结果”属于临界区操作,并且从逻辑上来说只能在保存结果后,才能去设NORMAL,因此就只能先设置一个过渡状态COMPLETING(完成中),防止其他线程同时进入临界区。

至此,我们只需要关注NEW、NORMAL、EXCEPTIONAL、CANCELLED、INTERRUPTED这几个状态,因为COMPLETING只是用来保护NORMAL/EXCEPTIONAL的,同理INTERRUPTING保护INTERRUPTED,没有实际的作用。

其次,NORMAL和EXCEPTIONAL也可以归为一类,两者都表示任务结束,前者对应任务正常返回,后者对应任务抛出异常。

综上,我们最终只需要关注NEW、NORMAL/EXCEPTIONAL(视为一种状态)、CANCELLED、INTERRUPTED四种状态,其中NORMAL/EXCEPTIONAL、CANCELLED、INTERRUPTED这几个状态都属于最终状态(terminal),也就是状态一旦变成他们其中的任何一种,都不会再变成其他状态。

最后简单看看其他成员变量

// 实际需要运行的任务,通过构造函数传入
private Callable<V> callable;
// 保存执行结果或执行抛出的异常
private Object outcome;
// Worker Thread
private volatile Thread runner;
// 保存所有的Waiter Thread,相当于阻塞队列(实际上是Treiber stack)
private volatile WaitNode waiters;

最后一个waiters实际上是阻塞栈(称为Treiber stack,一种lock-free的栈),即FILO。之所以不用队列,我认为是因为这些阻塞者之间并没有竞争锁的关系,一旦任务完成或取消,他们一下子全部被释放,因此没必要将他们公平地释放,不差那几纳秒。其次,相比队列得要两个指针分别指向头和尾,栈只需要一个栈顶指针就能方便地操作,主打一个简单。waiters并不是重点,知道是用来干什么的就可以。

方法

构造函数就不看了,比较简单。

根据先有鸡后有蛋的原则,先分析任务的执行(run),再分析结果的获取(get)。先看看被Woker Thread真正用于执行任务的run方法:

public void run() {
  // 如果state不是NEW了,或者已经存在runner,说明任务正在/已经 被 完成/取消,直接返回
  if (state != NEW ||
    !RUNNER.compareAndSet(this, null, Thread.currentThread()))
    return;
  try {
    Callable<V> c = callable;
    if (c != null && state == NEW) {
      V result;
      boolean ran;
      try {
        // 执行任务
        result = c.call();
        ran = true;
      } catch (Throwable ex) {
        // 任务自身抛出异常,setException设置结果为异常
        result = null;
        ran = false;
        setException(ex);
      }
      // 任务正常返回,保存结果
      if (ran)
        set(result);
    }
  } finally {
    // 至此任务已经完成,做收尾工作
    runner = null;
    int s = state;
    if (s >= INTERRUPTING)
      handlePossibleCancellationInterrupt(s);
  }
}

流程主要就是,状态非NEW或者runner不为空的话就不用运行任务,因为它可能已经在运行或者已经运行结束。然后运行任务后通过set或者setException设置结果。最后还有一个非常细节的handlePossibleCancellationInterrupt方法,不过得放在文末说,因为还涉及到其他的方法,目前就先当他没有任何用途。

接下来再看get方法:

public V get() throws InterruptedException, ExecutionException {
  int s = state;
  // 如果任务还没有变成终态,则进行阻塞等待
  if (s <= COMPLETING)
    s = awaitDone(false, 0L);
  // 阻塞结束,返回结果
  return report(s);
}

get比较简单,看一下awaitDone是怎么阻塞等待的:

private int awaitDone(boolean timed, long nanos) throws InterruptedException {
  long startTime = 0L;    // Special value 0L means not yet parked
  FutureTask.WaitNode q = null;
  boolean queued = false;
  for ( ; ; ) {
    int s = state;
    // 分支1:如果任务已经是终态,则直接返回
    if (s > COMPLETING) {
      if (q != null)
        q.thread = null;
      return s;
    }
    // 分支2:如果任务正处于过渡态,则自旋等待任务进入终态
    else if (s == COMPLETING)
      Thread.yield();
    // 分支3:如果线程被中断,那么将其从等待栈删除,并抛中断异常
    else if (Thread.interrupted()) {
      removeWaiter(q);
      throw new InterruptedException();
    }
    // 分支4:如果没有被阻塞的其他线程
    else if (q == null) {
      // 如果不需要等待,直接返回
      if (timed && nanos <= 0L)
        return s;
      // 创建节点准备入栈
      q = new FutureTask.WaitNode();
    }
    // 分支5:如果还没入栈,将其入栈
    else if (!queued)
      queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
    // 分支6:如果为有限等待,计算等待时长并且parkNanos
    else if (timed) {
      final long parkNanos;
      // 计算应该等待的时长
      if (startTime == 0L) {
        startTime = System.nanoTime();
        if (startTime == 0L)
          startTime = 1L;
        parkNanos = nanos;
      } else {
        long elapsed = System.nanoTime() - startTime;
        // 如果已经超时,则交给上层负责抛出超时异常
        if (elapsed >= nanos) {
          removeWaiter(q);
          return state;
        }
        parkNanos = nanos - elapsed;
      }
      // 上面nanoTime函数可能耗时长,此时任务状态可能已经变成了终态,进行二次检查
      if (state < COMPLETING)
        LockSupport.parkNanos(this, parkNanos);
    }
    // 分支7:如果为无限等待,直接park
    else
      LockSupport.park(this);
  }
}

awaitDone使用了一个for( ; ; )无限循环,里面有多个分支处理不同的情况,这是想干嘛。看过Doug Lea大神写过的其他一些代码就知道,比如AQS中的acuqire、enqueue函数等,for( ; ; ) { if else }这种写法其实也构成了一个FSM(有限状态机),只不过没有像FutureTask那样显式地抽象成几个状态常量,而是根据当前某些变量的值选择进入对应的分支,然后进入下一个状态或终态...注意要与FutureTask任务的状态区分开来,这里只是将awaitDone这块代码逻辑用状态机的方式理解而已。

7个分支分别对应7个状态,有的状态是过渡态(分支1,3,分支4中不需要等待的情况),有的是终态,具体的逻辑写在注释里。

看下removeWaiter如何实现:

private void removeWaiter(FutureTask.WaitNode node) {
  if (node != null) {
    node.thread = null;
    retry:
    for (;;) {
      // pred为q的前驱,q为被移除节点,s为q的后继
      for (FutureTask.WaitNode pred = null, q = waiters, s; q != null; q = s) {
        s = q.next;
        if (q.thread != null)
          pred = q;
        // 让pred.next指向s,即移除q
        else if (pred != null) {
          pred.next = s;
          if (pred.thread == null)
            continue retry;
        }
        else if (!WAITERS.compareAndSet(this, q, s))
          continue retry;
      }
      break;
    }
  }
}

removeWaiter其实不只移除node,还移除其他所有无效节点(thread==null的节点,因为不再与线程绑定,节点已经没用了)。注意最外层循环,这个循环是用来当发生race的时候重新遍历栈的,尽最大的努力去移除所有的无效节点。之所以是“尽最大努力”,是因为它只检测本次遍历的前驱节点或栈顶,如果是中间某个已经遍历过的节点成为了无效节点,只能下一次removeWaiter的时候才能检测出来。

removeWaiter主要还是针对单个节点的移除,如果任务已经结束,还需要一次性移除所有节点,也就是finishCompletion函数完成的功能:

private void finishCompletion() {
  for (FutureTask.WaitNode q; (q = waiters) != null;) {
		// 直接通过null out the waiters删除栈,保证只有一个线程在释放栈上的节点
    if (WAITERS.weakCompareAndSet(this, q, null)) {
      for (;;) {
        Thread t = q.thread;
        // 唤醒线程
        if (t != null) {
          q.thread = null;
          LockSupport.unpark(t);
        }
        // 遍历下一个节点
        FutureTask.WaitNode next = q.next;
        if (next == null)
          break;
        q.next = null;
        q = next;
      }
      break;
    }
  }

  done();

  callable = null;        // to reduce footprint
}

最外层的循环是为了避免有节点waiter意外入队,进行二次检查。

最后再回头看看之前介绍run方法时遗留的handlePossibleCancellationInterrupt

private void handlePossibleCancellationInterrupt(int s) {
    // It is possible for our interrupter to stall before getting a
    // chance to interrupt us.  Let's spin-wait patiently.
  if (s == INTERRUPTING)
    while (state == INTERRUPTING)
      Thread.yield();
}

看起来他好像啥也没做,但其实是为了解决一个隐蔽的bug:在cancel(true)的内部,会尝试对runner.interrupt:

Thread t = runner;
if (t != null) // 第一行
  t.interrupt(); // 第二行

如果在第一行执行完后,发生线程调度,调度到工作线程,然后run方法刚好跑完,然后该线程在线程池的安排下去执行了其他的任务,此时线程调度回执行cancel(true)的线程,它继续运行第二行,诶,它本来要中断的是之前的那个任务,但interrupt是针对线程而不是针对任务进行中断的,此时就错误地中断了第二个任务,造成bug。

因此handlePossibleCancellationInterrupt的目的是让中断的发生在run还没退出前发生,这样就不会错误地中断该工作线程执行的下一个任务。

参考链接

「简书」【细谈Java并发】谈谈FutureTask

标签:状态,private,任务,源码,线程,阅读,FutureTask,null
From: https://www.cnblogs.com/nosae/p/17979988

相关文章

  • Linux下源码安装
    Linux下源码安装很多开源库都没有说明怎么安装,这里记录一下一般方法。步骤以wldgb为例:克隆下源码后,发现README中没有说怎么安装,观察文件:一般来说,autogen.sh是用来生成configure的,然后configure是用来生成makefile的。如果不确定,可以看一下这些文件中的内容,就知道大概是怎......
  • 【快速阅读三】使用泊松融合实现单幅图的无缝拼贴及消除两幅图片直接的拼接缝隙。
    泊松融合还可以创建一些很有意思的图片,比如一张图片任意规格平铺,使用泊松融合后,平铺的边界处过渡的很自然,另外,对于两张图片,由于局部亮度等等的影响,导致拼接在一起时不自然,也可以使用泊松融合予以解决。在【快速阅读二】从OpenCv的代码中扣取泊松融合算......
  • 【快速阅读二】从OpenCv的代码中扣取泊松融合算子(Poisson Image Editing)并稍作优化
    泊松融合是一种非常不错多图融合算法,在OpenCv的相关版本中也包含了该算子模块,作者尝试着从OpenCv的大仓库中扣取出该算子的全部代码,并分享了一些在扣取代码中的心得和收获。泊松融合我自己写的第一版程序大概是2016年在某个小房间里折腾出来的,当时是用......
  • zookeeper源码(06)ZooKeeperServer及子类
    ZooKeeperServer实现了单机版zookeeper服务端功能,子类实现了更加丰富的分布式集群功能:ZooKeeperServer|--QuorumZooKeeperServer|--LeaderZooKeeperServer|--LearnerZooKeeperServer|--FollowerZooKeeperServer|--ObserverZooKeeperServer|-......
  • 基于pytest搭建接口自动化测试框架,提供源码
     基于pytest搭建接口自动化测试框架 框架整体介绍和方法教程第三代框架使用教程,该框架比第二代这个完善了很多https://blog.csdn.net/aaaaaaaaanjjj/article/details/129597973新框架(第二代比这个功能多了很多,用例使用yaml编写)pytest+yaml设计接口自动化框架过程记录......
  • 华企盾DSC:外发文件设置编辑权限 阅读次数 阅后即焚 文件过期
    互联网时代,信息流通迅速,一份关键的内部文件一旦外泄,可能毁掉公司数月、甚至数年的努力。企业多次碰壁后终于发现,仅仅依靠员工层层审批、体系内控制,已难以防止数据泄密这一严重问题。更为糟糕的是,一旦文件发送出去,系统往往不能有效地控制未授权阅读的发生。痛定思痛,企业用户渴望有......
  • [转帖]MySQL多版本并发控制机制(MVCC)-源码浅析
    https://zhuanlan.zhihu.com/p/144682180 MySQL多版本并发控制机制(MVCC)-源码浅析前言作为一个数据库爱好者,自己动手写过简单的SQL解析器以及存储引擎,但感觉还是不够过瘾。<<事务处理-概念与技术>>诚然讲的非常透彻,但只能提纲挈领,不能让你玩转某个真正的数据库。感谢c......
  • 《Deep Long-Tailed Learning: A Survey》阅读笔记
    论文标题《DeepLong-TailedLearning:ASurvey》深度长尾学习:调查作者YifanZhang、BingyiKang、BryanHooi、ShuichengYan(IEEEFellow)和JiashiFeng来自新加坡国立大学计算机学院、字节跳动AILab和SEAAILab初读摘要长尾类别不平衡(long-tailedclassimbala......
  • Feign源码解析7:nacos loadbalancer不支持静态ip的负载均衡
    背景在feign中,一般是通过eureka、nacos等获取服务实例,但有时候调用一些服务时,人家给的是ip或域名,我们这时候还能用Feign这一套吗?可以的。有两种方式,一种是直接指定url:这种是服务端自己会保证高可用、负载均衡那些。但也可能对方给了多个url(一般不会这样,但是在app场景下,为了......
  • Rocketmq学习3——消息发送原理源码浅析
    一丶概述RocketMQ消息发送的原理流程可以分为以下几个步骤:1.创建生产者在发送消息前,客户端首先需要创建一个消息生产者(Producer)实例,并设置必要的配置参数,如NameServer地址、生产组名称、消息发送失败的重试次数等。2.启动生产者创建生产者后,需要调用启动方法来初始化生产......