首页 > 其他分享 >别再问我Runnable、Callable、Future、FutureTask有什么关联了

别再问我Runnable、Callable、Future、FutureTask有什么关联了

时间:2023-07-07 20:34:53浏览次数:34  
标签:Runnable Callable 任务 Future 线程 FutureTask 方法

Runnable与Callable

众所周知,当我们使用线程来运行Runnable任务时,是不支持获取返回值的,因为Runnable接口的run()方法使用void修饰的,方法不支持返回值。而在很多场景下,我们一方面需要通过线程来异步执行任务,以便提升性能,另一方面还期望能获取到任务的执行结果。尤其是在RPC框架中,异步获取任务返回值,几乎是每一个RPC接口要实现的功能。这个时候,使用Runnable显然就无法满足我们的需求了,因此Callable就出现了。

Callable与Runnable类似,它是一个接口,也只有一个方法:call(),不同的是Callable的call()方法有是有返回值的,返回值的类型是一个泛型,泛型由创建Callable对象时指定。

public interface Callable<V> {
 V call() throws Exception;
}

Runnable对象可以传入到Thread类的构造方法中,通过Thread来运行Runnable任务,而Callable接口则不能直接传入到Thread中来运行,Callable接口通常结合线程池来使用。线程池ThreadPoolExecutor中除了提供execute()方法来提交任务以外,还提供了submit()的三个重载方法来提交任务,这三个方法均有返回值。 ThreadPoolExecutor类继承了抽象类AbstractExecutorService,在AbstractExecutorService中定义了submit()重载的三个方法。具体定义如下。

submit()方法

可以看到,submit()的三个重载方法的返回值均是Future类型的对象,那么Future又是何方神圣呢?

Future与FutureTask

当任务提交到线程池后,我们可能需要获取任务的返回值,或者想要知道任务有没有执行完成,甚至有时候因为特殊情况需要取消任务,那么这个时候应该怎么办呢?

在JUC包下,为我们提供了一个工具类:Future。Future是一个接口,它提供了5个方法,当一个任务通过submit()方法提交到线程池后,线程池会返回一个Future类型的对象,我们可以通过Future对象的这5个方法来获取任务在线程池中的状态。这些方法定义如下。

Future接口中的方法

Future接口有一个具体的实现类:FutureTask。事实上线程池ThreadPoolExecutor的三个submit()重载方法,返回的Future类型的对象,都是FutureTask的实例对象。FutureTask的UML图如下。

FutureTask类的UML图

从UML图上可以看到,FutureTask是直接实现了RunnableFuture接口,而RunnableFuture接口又继承了Runnable和Future接口,因此FutureTask既是Runnable类型,又是Future类型。 当调用submit()方法来向线程池中提交任务时,无论提交的是Runnable类型的任务,还是提交的是Callable类型的任务,最终都是将任务封装成一个FutureTask对象。下面以Future<T> submit(Callable<T> task)方法为例,来看下源码。

public <T> Future<T> submit(Callable<T> task) {
 if (task == null) throw new NullPointerException();
 // 调用newTaskFor()将Callable任务封装成一个FutureTask
 RunnableFuture<T> ftask = newTaskFor(task);
 // 执行任务
 execute(ftask);
 return ftask;
}

// newTaskFor
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
 // 直接new一个FutureTask对象
 return new FutureTask<T>(callable);
}

当submit()方法提交的是Runnable任务时,会调用newTaskFor()方法的另一个重载方法来将任务封装成一个FutureTask对象,所以说最终线程池中执行的都是FutureTask类型的任务。(注意,Runnable类型的对象,最终会通过Executors.callable()方法,将Runnable对象封装为一个Callable类型的对象。Executors.callable()的原理是使用适配器模式,适配器为 RunnableAdapter类)

知道了Future与这些类之间的关系后,下面就来分析下线程池是如何执行一个FutureTask任务的?以及又是如何通过Future.get()方法就能获取到任务的返回值的?

设计原理与数据结构

想要弄明白Future接口的几个方法的原理,那么就必须先搞明白FutureTask的数据接口,以及其设计的原理。

既然想要在线程池外部通过其他线程获取到池中任务的状态,而线程池中的任务都是FutureTask类型,那么在FutureTask这个对象中,肯定存在和任务状态有关的变量。

在FutureTask中定义了十分重要的属性。如下表所示。

FutureTask字段说明

state的取值有7种。每种取值的含义如下代码注释。

private volatile int state;
// 任务的初始状态,当新建一个FutureTask任务时,state值默认为NEW
private static final int NEW = 0;
// 任务处于完成中,什么是完成中呢?有两种情况
// 1. 任务正常被线程执行完成了,但是还没有将返回值赋值给outcome属性
// 2. 任务在执行过程中出现了异常,被捕获了,然后处理异常了,在将异常对象赋值给outcome属性之前
private static final int COMPLETING = 1;
// 任务正常被执行完成,并将任务的返回值赋值给outcome属性之后,会处于NORMAL状态
private static final int NORMAL = 2;
// 任务出了异常,并将异常对象赋值给outcome属性之后
private static final int EXCEPTIONAL = 3;
// 调用cancle(false),任务被取消了
private static final int CANCELLED = 4;
// 调用cancle(true),任务取消,但是在线程中断之前
private static final int INTERRUPTING = 5;
// 调用cancle(true),任务取消,但是在线程中断之后
private static final int INTERRUPTED = 6;

虽然任务的状态有7中取值,但大致可以将其分为三类:初始状态、中间状态、最终状态。这些状态的变化关系,如下图所示。

任务状态

  • 当一个任务被提交到线程池后,它的初始状态为NEW;当任务被正常执行完成后,会先将任务的状态设置为COMPLETING;然后将任务的返回值(即Callable的call()方法的返回值)赋值给FutureTask的outcome属性,当赋值完成后,再将任务的状态设置为NORMAL。这是一个任务正常执行的流程,也就是对应图中①所示的线路。
  • 当任务被提交到线程池后,线程在执行任务中出现了异常,那么会现将任务的状态由NEW设置为COMPLETING;然后将异常对象赋值给outcome属性,当赋值完成后,再将任务状态设置为EXCEPTIONAL。这是任务出现异常的情况,也就是对应图中②所示的线路。
  • 当任务被提交到线程池后,如果调用Future对象的cancle()方法,当cancle()传入的传入的参数为false时,会直接将任务的状态由NEW设置为CANCELLED,也就是对应图中③所对应的路线。
  • 当cancle()方法传入的参数为true时,会先将任务状态设置为INTERRUPTING;然后调用执行当前任务的线程的interrupt()方法,最后再设置任务状态为INTERRUPTED,也就是图中④所对应的线路。

源码分析

当调用submit()方法提交任务到线程池后,会先调用newTaskFor()方法将任务封装成一个FutureTask对象,然后调用execute()方法来执行任务。在execute()方法中会先启动Worker线程,当线程启动后,会调用线程的runWorker()方法。在runWorker()方法中最终会调用到task.run()方法,也就是FutureTask的run()方法。关于这一步详细的源码分析可以参考这篇文章:线程池ThreadPoolExecutor的实现原理

下面只分析下FutureTask.run()方法。在run()方法中,最终会调用callable属性的call()方法。当任务正常执行完后,会调用FutureTask的set()方法来更新任务的状态以及保存任务的返回值,最后唤醒获取任务结果的处于等待中的线程。如果出现异常,将会调用setException()方法来更新任务状态,保存异常,唤醒等待中的线程。下面是run()方法的源码,我对源码进行了删减,只保留了核心逻辑。

public void run() {
 ......
 try {
 Callable<V> c = callable;
 if (c != null && state == NEW) {
 V result;
 boolean ran;
 try {
 // 执行任务
 result = c.call();
 ran = true;
 } catch (Throwable ex) {
 // 出异常时将state置为EXCEPTIONAL
 setException(ex);
 }
 if (ran)
 // 设置任务状态为COMPLETING,然后保存返回值,最后再设置为NORMAL
 set(result);
 }
 } finally {
 // 其他处理
 ......
 }
}

对于set()和setException()方法,比较简单,就是通过CAS来更新任务的状态,然后将任务的返回值赋值给outcome属性,最后调用finishCompletion()方法唤醒waiters这个属性构成的等待队列中的线程。(关于CAS相关的原理和知识,可以参考这两篇文章:初识CAS的实现原理 和 Unsafe类的源码解读以及使用场景)

接下来结合具体的源码来分析下Future.get()方法的执行过程。当调用Future.get()方法时,会调用FutureTask的get()方法。在get()方法,首先判断任务有没有完成,如果已经完成了,就直接返回结果,如果没有完成,则进行等待。

public V get() throws InterruptedException, ExecutionException {
 int s = state;
 // 如果状态处于NEW或者COMPLETING状态,表示任务还没有执行完成,需要等待
 if (s <= COMPLETING)
 // awaitDone()进行等待
 s = awaitDone(false, 0L);
 // 返回结果
 return report(s);
}

通过调用report(s)方法返回结果,在report()方法中,会先判断任务是不是处于NORMAL状态,即任务是否是被正常执行完成,只有正常执行完成了,才会返回结果,否则抛出对应的异常。

private V report(int s) throws ExecutionException {
 Object x = outcome;
 // 只有任务正常结束时,才会返回
 if (s == NORMAL)
 return (V)x;
 if (s >= CANCELLED)
 throw new CancellationException();
 throw new ExecutionException((Throwable)x);
}

当任务处于NEW或者COMPLETING状态时,表示任务正处于执行中或者任务的返回值还没有被赋值给outcome属性,所以这个时候,还不能返回结果,因此需要进入等待状态,即调用awaitDone()方法。在awaitDone()方法中,有一个无限for循环,先判断任务是否是处于COMPLETING状态。如果处于COMPLETING状态,就让当前线程先放弃CPU的调度权(为什么要放弃CPU的调度权呢?因为从COMPLETING变为NORMAL状态,或者其他状态,是一段很短的过程,让当前线程先放弃CPU的调度权,以便让其他线程得到CPU资源,而CPU的时间片也是一段很短的时间,当下次线程在获取到CPU资源的时候,此时任务的状态大概率会变为NORMAL或者其他最终状态,由于代码是处于for循环中的,所以会进入下一次循环)。如果当前任务不是处于COMPLETING状态,就会让线程进行park等待,具体是park超时等待呢,还是非超时等待呢?由awaitDone()方法传入的参数决定。(当调用park()方法后,这些线程又是在什么时候被唤醒的呢?当任务的状态变为最终状态后,会调用finishCompletion()方法,唤醒这些处于等待中的线程。)

下面是awaitDone()方法的部分源码,我对源码进行了删减,只保留的主要逻辑。

private int awaitDone(boolean timed, long nanos)
 throws InterruptedException {
 ......
 for (;;) {
 ......
 // 任务处于COMPLETING中,就让当前线程先暂时放弃CPU的执行权
 else if (s == COMPLETING) // cannot time out yet
 Thread.yield();
 ...... 
 else if (timed) {
 	// 超时时间计算
 nanos = deadline - System.nanoTime();
 if (nanos <= 0L) {
 removeWaiter(q);
 return state;
 }
 // 等待一段时间
 LockSupport.parkNanos(this, nanos);
 }
 else
 	// 等待
 LockSupport.park(this);
 }
}

总结

  • 本文主要介绍了Runnable接口和Callable接口的区别,前者没有返回值,能被Thread直接执行;后者有返回值,不能被Thread直接执行需要通过线程池来执行。
  • 接着介绍了Future接口的5个方法,以及它的实现类FutureTask的几个重要属性以及数据结构。无论是Runnable还是Callable对象,当提交到线程池后,均是被封装成一个FutureTask对象后执行。对于Future的使用场景,在Netty和Dubbo均有大量的应用。
  • 最后结合源码详细介绍了FutureTask的get()方法和run()方法的实现原理。

标签:Runnable,Callable,任务,Future,线程,FutureTask,方法
From: https://www.cnblogs.com/fcjedorfjoeij/p/17535989.html

相关文章

  • Java线程池详解:Future的使用和实现
    提交到线程池中执行的异步任务都会返回一个任务的Future,所以这里先介绍一下Future的使用和实现。异步任务通常会被提交到线程池中去执行,但任务并非提交到线程池后就不管不顾了,在某些时刻我们希望能够取消任务,同时也希望在任务执行完成后获取到任务的执行结果。Java提供了Futur......
  • std::future、std::promise、std::packaged_task、std::async
    std::promisestd::promise:用于获取线程中变量的结果,如下:#include<iostream>#include<thread>#include<future>voidprintMessage(std::promise<std::string>&&prms,std::stringmessage){std::this_thread::sleep_for(std::chrono::sec......
  • CompletableFuture 类
    CompletableFuture异步编排在Java8中,新增加了一个包含50个方法左右的类:CompletableFuture,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法Completab......
  • 【Netty】「萌新入门」(四)异步编程模型:利用 Future 和 Promise 提高性能与响应能力
    前言本篇博文是《从0到1学习Netty》中入门系列的第四篇博文,主要内容是介绍Netty中Future与Promise的使用,通过使用异步的方式提高程序的性能和响应速度,往期系列文章请访问博主的Netty专栏,博文中的所有代码全部收集在博主的GitHub仓库中;为什么要使用异步?使用异步编程模式......
  • 【Netty】「萌新入门」(三)ChannelFuture 与 CloseFuture
    前言本篇博文是《从0到1学习Netty》中入门系列的第三篇博文,主要内容是介绍Netty中ChannelFuture与CloseFuture的使用,解决连接问题与关闭问题,往期系列文章请访问博主的Netty专栏,博文中的所有代码全部收集在博主的GitHub仓库中;连接问题与ChannelFuture在Netty中,所有的......
  • C++之future
    背景在C++多线程编程中,同步线程间的操作和结果通常是一个关键问题。C++11引入了std::future这一同步原语,用于表示异步操作的结果。本文将介绍C++中std::future的使用方法、优势以及与其他同步方法的对比。使用std::futurestd::future表示一个异步操作的结果,可以用于获取操作的......
  • CompletableFuture之批量上传
    前言最近接到一个需求,批量上传图片到服务器及实时更新上传进度。当处理大量文件上传任务时,效率是一个关键因素。传统的串行方式会导致任务耗时较长,而使用并发处理可以极大地提高上传效率。想到很久之前用CompletableFuture优化过一些多统计的业务场景,效果都还不错,因此在这里也使......
  • std::future 如何保存多个对象进行同步等待
    std::future是一个C++11引入的标准库类,可用于异步获取计算结果。通常情况下,std::future可以通过get()函数来等待异步操作完成,并获取其结果。如果需要等待多个异步操作完成并获取它们各自的结果,可以使用std::future的姊妹类std::shared_future来实现。std::shared_futu......
  • 抽象基类Callable、Hashable
    抽象基类Callable、Hashable的主要作用是为内置函数isinstance提供支持,以一种安全的方式判断对象能不能调用或散列。 若想检查是否能调用,可以使用内置的callable()函数;但是没有类似的hashable()函数,因此测试对象是否可散列,最好使用isinstance(my_obj,Hashable)。 ......
  • CompletableFuture使用详解
    一、介绍简单的任务,用Future获取结果还好,但我们并行提交的多个异步任务,往往并不是独立的,很多时候业务逻辑处理存在串行[依赖]、并行、聚合的关系。如果要我们手动用Fueture实现,是非常麻烦的。CompletableFuture是Future接口的扩展和增强。CompletableFuture实现了Future接口,并......