首页 > 编程语言 >【深度挖掘Java并发编程底层源码】「底层技术原理体系」带你零基础认识和分析学习相关的异步任务提交机制FutureTask的底层原理

【深度挖掘Java并发编程底层源码】「底层技术原理体系」带你零基础认识和分析学习相关的异步任务提交机制FutureTask的底层原理

时间:2023-08-16 18:33:17浏览次数:51  
标签:null 任务 源码 线程 FutureTask 原理 执行 方法 底层

FutureTask的基本介绍

FutureTask是Java中的一个类,它实现了Future接口和Runnable接口,并且被用作线程执行的任务。FutureTask可以在多线程环境下异步执行一个任务并获取其结果。

FutureTask的特点用法

  1. 异步执行:通过将耗时的任务交给FutureTask,在一个单独的线程中执行,当前线程可以继续执行其他任务,不会被阻塞,从而提高程序的并发性。
  2. 获取结果:FutureTask提供了get()方法,用于在任务完成后获取其执行结果。如果任务尚未完成,get()方法将阻塞当前线程,直到任务完成并返回结果。
  3. 取消任务:可以使用cancel()方法取消任务的执行。如果任务尚未开始执行,则会取消任务的执行;如果任务已经开始执行,则根据传入的参数决定是否中断正在执行的任务。
  4. 线程安全:FutureTask是线程安全的,可以在多个线程中共享使用。多个线程可以同时调用FutureTask的get()方法等待任务结果。

FutureTask常见的应用场景是在并发编程中,将计算密集型的任务交给FutureTask后台执行,并在需要时获取结果。这样可以充分利用系统资源,提高程序的性能和响应能力。

FutureTask的执行流程

【深度挖掘Java并发编程底层源码】「底层技术原理体系」带你零基础认识和分析学习相关的异步任务提交机制FutureTask的底层原理_线程池

任务提交

在之前的线程池分析中,我们介绍了AbstractExecutorService的实现。AbstractExecutorService是Java中的一个抽象类,它实现了ExecutorService接口,并提供了一些通用的逻辑和方法来简化线程池的实现。

AbstractExecutorService的主要特点和用法
  1. 线程池管理:AbstractExecutorService提供了默认的线程池管理逻辑。我们可以通过继承AbstractExecutorService并实现其中的抽象方法来定制自己的线程池。
  2. 任务执行:通过调用submit()或invokeAll()等方法,可以将任务提交给AbstractExecutorService来执行。它会在适当的时候自动创建线程并执行任务。
  3. 异常处理:AbstractExecutorService提供了一些默认的异常处理机制,比如可以捕获任务执行过程中抛出的异常,并进行相应的处理操作。我们也可以在继承AbstractExecutorService时自定义异常处理逻辑。
  4. 生命周期管理:AbstractExecutorService定义了线程池的生命周期方法,如shutdown()和isShutdown()等,用于管理线程池的启动和关闭。
对象模型转换传递流程

【深度挖掘Java并发编程底层源码】「底层技术原理体系」带你零基础认识和分析学习相关的异步任务提交机制FutureTask的底层原理_Java_02

submit方法提交任务

AbstractExecutorService主要用于实现ExecutorService接口中的submit()和invokeAll()等方法。它提供了一些默认的实现,使得我们可以更方便地创建和管理线程池,并执行任务。

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}
RunnableFuture任务模型

通过使用RunnableFuture,我们可以更加灵活地管理线程池,处理任务的提交和执行,并在需要时进行异常处理和线程池的关闭。最终进行构建一个FutureTask对象。

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}
FutureTask

对于通过submit方法提交的任务,无论是Runnable还是Callable接口实现,都会被统一封装为FutureTask对象,并传递给execute方法。

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // 确保 callable 的可见性
}

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;      // 确保可调用的可见性
}

注意,AbstractExecutorService是一个抽象类,不能直接实例化。我们可以继承AbstractExecutorService类并实现其中的抽象方法,或者使用Java提供的具体实现类如ThreadPoolExecutor来创建并使用线程池

RunnableAdapter适配器模型

RunnableAdapter的作用是将一个Runnable对象包装成一个Callable对象。在适配器的call()方法中,会调用任务的run()方法来执行任务,并返回预先指定的结果。

public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
 }

通过使用RunnableAdapter,我们可以将Runnable任务在提交给ExecutorService时,以Callable的方式进行处理和执行。

static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    
    public T call() {
        task.run();
        return result;
    }
}

该适配器的设计使得我们可以更加方便地将Runnable任务与Callable任务一同处理,充分利用ExecutorService的统一接口,并且无需对Runnable任务进行额外的修改。

任务状态

FutureTask的状态定义,说明了FutureTask对象可能处于的不同状态。初始状态为NEW,后续可能会演化为COMPLETING、NORMAL、EXCEPTIONAL、CANCELLED、INTERRUPTING、INTERRUPTED等状态。

private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;

【深度挖掘Java并发编程底层源码】「底层技术原理体系」带你零基础认识和分析学习相关的异步任务提交机制FutureTask的底层原理_线程池_03

这些状态描述了FutureTask在执行过程中的不同情况,包括正常完成、遇到异常、被取消、被中断等。了解FutureTask的状态有助于我们理解和处理FutureTask对象在多线程环境下的行为。

任务执行

首先,方法会检查FutureTask的状态是否为NEW,并尝试使用compareAndSwapObject方法将执行线程设置为当前线程。然后,方法将执行并处理相关任务,并根据任务是否成功完成设置相应的结果。最后,清除执行线程(runner)并重新获取FutureTask的状态,根据状态处理可能的取消操作。

public void run() {
    if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) {
        return;
    }
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran) {
                set(result);
            }
        }
    } finally {
        // 让runner非空直到state被稳定设置,以防止并发调用run()
        runner = null;
        // 在将runner设置为空之后必须重新读取state以防止泄漏中断
        int s = state;
        if (s >= INTERRUPTING) {
            handlePossibleCancellationInterrupt(s);
        }
    }
}

【深度挖掘Java并发编程底层源码】「底层技术原理体系」带你零基础认识和分析学习相关的异步任务提交机制FutureTask的底层原理_Java_04

设置线程任务数据结果

对于Callable对象,FutureTask会直接执行它的call方法。如果call方法执行成功,FutureTask会调用set方法来设置执行结果;如果遇到异常,FutureTask会调用setException方法来设置异常。

protected void set(V v) {
    // 首先使用CAS(比较并交换)将状态设置为中间状态COMPLETING
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        // 将状态设置为正常状态
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 最终状态
        finishCompletion();
    }
}

protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        // 将状态设置为异常状态
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // 最终状态
        finishCompletion();
    }
}

下面是对于Callable直接执行其call方法的处理过程。

【深度挖掘Java并发编程底层源码】「底层技术原理体系」带你零基础认识和分析学习相关的异步任务提交机制FutureTask的底层原理_线程池_05

  1. 通过CAS操作将FutureTask的状态设置为中间状态COMPLETING。
  2. 根据执行的结果(成功或异常),将结果或异常存储在outcome变量中,并将状态设置为相应的最终状态(正常状态或异常状态)。
  3. 完成执行并处理相关的完成操作。

获取线程任务数据结果

下面的这两个方法都是用来对全局变量outcome进行赋值的。而当我们通过get方法在另一个线程中获取结果时,将会执行以下过程:

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

如果任务尚未完成,则会等待任务完成:

private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    // 使用for循环阻塞当前线程
    for (;;) {
        // 响应中断
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }
        int s = state;
        // 任务已经完成或者已经抛出异常,直接返回
        if (s > COMPLETING) {
            // WaitNode已经创建,当前可以设置为null了
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // 不能超时
            Thread.yield();
        else if (q == null)
            q = new WaitNode();
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                q.next = waiters, q);
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            LockSupport.park(this);
    }
}

下面是我们通过get方法在另一个线程中获取结果的过程。

【深度挖掘Java并发编程底层源码】「底层技术原理体系」带你零基础认识和分析学习相关的异步任务提交机制FutureTask的底层原理_Java_06

  1. 首先,通过调用get方法获取FutureTask的状态值。
  2. 如果状态值小于或等于COMPLETING,则调用awaitDone方法来等待任务的完成。
  3. 在awaitDone方法中,根据是否设置了超时时间计算出等待的截止时间,并使用for循环来阻塞当前线程。
  4. 在循环中,对中断请求进行响应,并检查任务的状态。
  5. 如果设置了超时时间,则根据剩余的等待时间使用LockSupport.parkNanos进行阻塞。
  6. 如果没有设置超时时间,则使用LockSupport.park进行阻塞。
  7. 循环会一直执行,直到任务完成或超时,才会返回任务的状态值。
  8. 最后,通过调用report方法来返回相应的结果。总之,该过程描述了在另一个线程中通过get方法等待任务完成的过程。它通过阻塞当前线程并监测任务状态来实现等待,直到任务完成后才返回结果。

上报线程任务数据结果

如果任务已完成或者等待任务直到完成后,将会调用report方法来返回结果, report 方法,发现状态为异常的话将包装成 ExecutionException((Throwable)x); 这个异常就是我们在使用 get 的时候需要捕获的异常。

private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V) x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable) x);
}

如果状态s等于NORMAL,表示任务正常完成,将会返回实际的结果。如果状态s大于等于CANCELLED,则抛出CancellationException异常。否则,抛出ExecutionException异常。通过这样的设计,无论是任务正常完成还是抛出异常,都能够在线程池中执行的任务中被感知到。

任务取消

通过调用cancel方法可以取消FutureTask的执行,并在任务完成后通过finishCompletion方法来唤醒等待的线程。被唤醒的线程在awaitDone方法中继续循环,检查任务的状态以获取结果。

public boolean cancel(boolean mayInterruptIfRunning) {
    // 如果任务不是刚创建或者是刚创建但是更改为指定状态失败则返回 false
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion();
    }
    return true;
}
  1. FutureTask类中有一个cancel方法,接受一个布尔类型参数mayInterruptIfRunning,用于表示在执行中的任务是否可以中断。
  2. cancel方法首先检查任务的状态,如果状态不是NEW或者尝试将状态更改为INTERRUPTING或CANCELLED失败,则返回false。
  3. 如果允许中断正在运行的任务(mayInterruptIfRunning为true),则调用Thread的interrupt方法来中断任务的线程,并将任务的状态设置为INTERRUPTED。
  4. 然后,调用finishCompletion方法来完成任务的完成操作。
  5. 在finishCompletion方法中,首先尝试将waiters设置为null,以便其他线程无法再加入等待队列。然后循环遍历等待队列,唤醒等待的线程,并将等待队列中节点的thread属性设置为null。
  6. 在唤醒线程后,被唤醒的线程会在awaitDone方法中继续循环,检查任务的状态。
  7. 如果任务的状态大于COMPLETING,则表示任务已完成或抛出异常,直接返回状态。

总结说明

  • FutureTask是一个用于异步执行任务并获取结果的Java类。它提供了获取结果、取消任务等功能,帮助我们更好地处理多线程编程中的任务执行和结果获取。
  • AbstractExecutorService是Java中用于简化线程池实现的一个抽象类。通过继承并实现其中的方法,我们可以更方便地管理线程池的创建、任务的提交和执行。它提供了默认的异常处理和生命周期管理,并可以根据需要进行定制和扩展。

标签:null,任务,源码,线程,FutureTask,原理,执行,方法,底层
From: https://blog.51cto.com/alex4dream/7112189

相关文章

  • 深入解析Spring Boot自动配置原理
    大家好,我是你的技术达人小助手,在今天的技术博客中,我将带你深入解析SpringBoot自动配置的原理,揭示其神秘面纱,让你能更好地理解和利用这一强大的功能。准备好了吗?让我们一起开始这次关于SpringBoot自动配置的探索之旅吧!背景知识SpringBoot作为一款优秀的Java开发框架,以其方便的配......
  • 解密Spring Framework的核心原理与魔法
    嗨,亲爱的读者朋友们!今天,我将带你解密SpringFramework的核心原理与魔法,帮助你深入理解这个强大的Java开发框架。作为一个技术达人,我将为你揭开SpringFramework的神秘面纱,让你在开发中游刃有余。SpringFramework简介SpringFramework是一个全面的、模块化的Java开发框架,被广泛用于......
  • 赏帮赚系统源码平台APP开发
      近年来,赏金悬赏任务平台已经成为了互联网创业的热门领域之一。赏金悬赏平台通过将赏金分发给赏金猎人,让赏金猎人通过自己的技能和能力来获取赏金,从而实现双赢的局面。赏帮赚系统源码平台APP作为一款专业的赏金悬赏平台,其功能强大,使用简单,深受用户喜爱。  一、悬赏发布功......
  • 赏帮赚app任务源码
      人们的生活方式和消费观念也发生了翻天覆地的变化。越来越多的人选择通过互联网进行购物、娱乐、学习等消费行为。在这个过程中,赏帮赚APP作为一款新型的任务平台应运而生,成为了许多人赚取额外收入的选择之一。本文将对赏帮赚APP的任务源码功能进行分析与探讨,以了解其背后的技......
  • 交换机工作原理
    什么是交换机?公司刚成立,只有三个人,老板,IT工程师,销售,需要建立局域网,只需要买一个小型交换机,如腾达交换机,不需要配置,连接好网线,给3台电脑配置在同一网段即可例如:电脑1ip:192.168.1.1 电脑2ip:192.168.1.2 电脑3ip:192.168.1.3  子网掩码使用255.255.255.0 此处网关可......
  • 直播平台源码实现播放视频的方法
    我们在直播平台中上传视频的时候往往会不知道自己上传的是什么格式的视频,在APP软件播放的时候有时可能会播放不出来,这时候我们就会专门按照该视频的格式进行播放,顺利的将视频展示出来,下面就给大家讲解下直播平台源码实现m3u8、flv、mp4视频播放的方法。一、直播平台源码中播放m3u......
  • 0基础微信小程序搭建教程之禾匠商城源码搭建教程
    2022年版禾匠商城V4搭建教程(重新更新一份禾匠商城V4独立版搭建教程,因为之前的版本搭建跟现在有点不一样,现在一键安装比之前简单多了,废话不多现在开始!)准备工作:1、服务器一个,要好2核4G,安装系统CentOS7.5和宝塔面板。2、Nginx1.20,插件:PHP72、数据库5.6、Redis6.2.6,其它......
  • 深入理解 Flutter 图片加载原理
    前言随着Flutter稳定版本逐步迭代更新,京东APP内部的Flutter业务也日益增多,Flutter开发为我们提供了高效的开发环境、优秀的跨平台适配、丰富的功能组件及动画、接近原生的交互体验,但随之也带来了一些OOM问题,通过线上监控信息和Observatory工具结合分析我们发现问题的原因是由于Fl......
  • 直播系统源码开发中圆角效果的实现
    现在直播平台越来越多,登录后发现每个直播平台的页面效果都非常的吸引人,界面都非常的美观,很多按钮都采用了圆角的设计方法,显得更加的柔和,我们就来看看直播系统源码怎么实现吧!在直播系统源码开发中,可以通过创建一个自定义的DrawableXML文件来实现给Button设置圆角的效果。以下是创......
  • 手把手教你使用LabVIEW TensorRT实现图像分类实战(含源码)
    ‍‍......