首页 > 编程语言 >CompletableFuture源码学习

CompletableFuture源码学习

时间:2023-10-27 17:34:40浏览次数:55  
标签:学习 源码 CompletableFuture AsyncSupply Supplier null final

了解到CompletableFuture的基础用法之后,我们不禁好奇,以前的Future模式不支持如此好用的异步编程,CompletableFuture是如何做到的呢?这就需要我们去阅读源码了,通过源码我们才能了解到其设计思想和实现方式,我们分析下supplyAsync 和 thenApplyAsync 这两个,并且是提供线程池的接口,因为如果不提供自定义线程池,就会用默认的,如下:

private static final boolean USE_COMMON_POOL =
        (ForkJoinPool.getCommonPoolParallelism() > 1);

private static final Executor ASYNC_POOL = USE_COMMON_POOL ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
        
static final class ThreadPerTaskExecutor implements Executor {
        public void execute(Runnable r) { new Thread(r).start(); }
}

上面有一个判断USE_COMMON_POOL,其中用到了ForkJoinPool.getCommonPoolParallelism(),这个是ForkJoin中通用池的并行级别,默认是Runtime.getRuntime().availableProcessors() - 1,所以你的电脑有四核心,那么ForkJoinPool.getCommonPoolParallelism()的值就是3,如果只有一个核心,该值是0,所以USE_COMMON_POOL为true,那么你的电脑至少三个核心。

如果USE_COMMON_POOL为false,那么就会用ThreadPerTaskExecutor,由上面代码可知,这是个单线程的执行器。

在CompletableFuture源码中,有两个成员属性比较重要(volatile保证多线程之间值的可见性),如下所示:

volatile Object result;       // Either the result or boxed AltResult
volatile Completion stack;    // Top of Treiber stack of dependent actions

其中result为当前阶段的计算结果,注意看上面的注释,还有可能值为AltResult,该类仅有一个成员变量Throwable ex,该类的作用官方描述为:

An AltResult is used to box null as a result, as well as to hold exceptions.

即用AltResult代替null和持有计算过程中发生的异常,源码如下:

static final class AltResult { // See above
    final Throwable ex;        // null only for NIL
    AltResult(Throwable x) { this.ex = x; }
}

stack

CompletableFuture内部使用了Treiber stack,Treiber stack算法是属于无锁并发栈,内部使用CAS(compare-and-swap)来实现无锁并发算法。详情请看:Treiber stack设计

supplyAsync源码

测试代码如下:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
  return Thread.currentThread().getName();
}, Executors.newFixedThreadPool(5));

System.out.println(future.join());

我们分析如下supplyAsync实现:

CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

该方法会直接调asyncSupplyStage方法,首先先去检查传入的执行器是否为ForkJoinPool.commonPool(),如果是会直接用ForkJoinPool.commonPool(),代码如下所示:

static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier<U> f) {
    if (f == null) throw new NullPointerException(); // 空指针判断
    CompletableFuture<U> d = new CompletableFuture<U>(); // 新建CompletableFuture
    e.execute(new AsyncSupply<U>(d, f)); // 新建AsyncSupply(ForkJoinTask),丢到传入的线程池执行
    return d; // 立即返回
}

asyncSupplyStage方法里首先做空指针判断,接着新建一个新的CompletableFuture, 然后新建一个AsyncSupply,将刚才新建的CompletableFuture和传入的Supplier传给AsyncSupply,接着直接将AsyncSupply丢到传入的线程池中进行执行,最后立即返回,不会等待执行结束。

AsyncSupply源码如下:

/**
 * 该类继承自ForkJoinTask,为ForkJoin计算任务,且实现了Runnable, AsynchronousCompletionTask,
 * 代表可以直接丢到线程池里面运行
 */
static final class AsyncSupply<T> extends ForkJoinTask<Void>
    implements Runnable, AsynchronousCompletionTask {
    CompletableFuture<T> dep; Supplier<? extends T> fn;
    AsyncSupply(CompletableFuture<T> dep, Supplier<? extends T> fn) {
        this.dep = dep; this.fn = fn;
    }

    public final Void getRawResult() { return null; }
    public final void setRawResult(Void v) {}
    public final boolean exec() { run(); return false; }

    public void run() {
        CompletableFuture<T> d; Supplier<? extends T> f;
        if ((d = dep) != null && (f = fn) != null) { // 判断d, f 是否为空
            dep = null; fn = null; // 这里置空,防止多次执行
            if (d.result == null) { // 如果当前任务还没执行完,result没有值
                try {
                    d.completeValue(f.get()); // 等待任务结束,并利用CAS设置结果,可能设置失败
                } catch (Throwable ex) {
                    d.completeThrowable(ex); // 出现异常
                }
            }
            d.postComplete(); // 这个后面再说,目前没有用
        }
    }
}

AsyncSupply内部类里面,有一个run方法,由于我们将其丢到了线程池中运行,所以就会执行run方法。在这个方法里面,会执行我们传给supplyAsync的计算任务,并将结果通过CAS写到


title: CompletableFuture源码学习
categories: [Java, JUC]
tags: [java, JUC, 异步编程]
author: Mingshan
date: 2019-11-27

标签:学习,源码,CompletableFuture,AsyncSupply,Supplier,null,final
From: https://www.cnblogs.com/mingshan/p/17792830.html

相关文章

  • cmake学习
    基础的一个cmake文件:cmake_minimum_required(VERSION3.25)project(app)set(CMAKE_CXX_STANDARD20)set(EXECUTABLE_OUTPUT_PATHbin/)set(SRC_LISTsrc/main.cppsrc/util.cpp)add_executable(app${SRC_LIST})target_include_directories(appPUBLICinclude/)......
  • 「学习笔记」网络流
    「学习笔记」网络流点击查看目录目录「学习笔记」网络流知识点一些基础定义最大流Ford-Fulkerson算法(增广路算法)Edmonds-Karp算法Dinic算法最小割费用流EK费用流ZKW费用流例题[SCOI2007]蜥蜴[SDOI2015]星际战争士兵占领[HNOI2007]紧急疏散EVACUATE[SDOI2009]晨跑[SDO......
  • docker 数据卷-学习
    容器数据卷容器数据存储路径同步在宿主机文件目录做数据持久化保存(目录挂载、映射)不进行这一步,会导致删除容器后,数据直接丢失。容器间数据卷也可以进行共享数据卷的使用,类似于Linux下对目录或文件进行mount1、宿主机目录映射容器内部目录-v宿主机目录:容器内目录(......
  • 第六周学习笔记
    Linux进程管理多任务处理在编程中,多任务处理是指同时执行多个任务或进程的能力。这种能力可以通过并发编程来实现,其中任务可以是同时执行的线程、进程或协程。进程的概念进程:进程是对映像的执行在操作系统内核中,每个进程用一个独特的数据结构表示,叫作进程控制块(PCB)或任务控制......
  • 第七周学习笔记
    并发编程并行计算导论顺序算法与并行算法顺序算法:begin  step_1  step_2  ……  step_nend//nextstep并行算法:cobegin  task_1  task_2  ……  task_ncoend//nextstep并行性与并发性在单CPU系统中,一次只能执行一个任务。不同的任务只能......
  • 按摩学习笔记
    特点按摩,以经络和解剖作为理论基础。具有:经济简便,随时随地,简单有效的特点。能够起到:放松肌肉,舒筋活血,振奋精神,消除病痛等作用。亲身体会以上特点,是我个人经过亲身体会,比较认同的部分。最深刻的体会有:1.个人因为工作经常久坐,导致翻译搜索复制......
  • 【论文解读】RLAIF基于人工智能反馈的强化学习
    【论文解读】RLAIF基于人工智能反馈的强化学习一、简要介绍人类反馈强化学习(RLHF)可以有效地将大型语言模型(LLM)与人类偏好对齐,但收集高质量的人类偏好标签是一个关键瓶颈。论文进行了一场RLHF与来自人工智能反馈的RL的比较(RLAIF)-一种由现成的LLM代替人类标记偏好的......
  • 群签名学习笔记
    群签名算法模型如图所示,群签名包括3个参与主体,群管理、群成员以及验证者。群管理负责群证书的创建管理、群成员管理以及在验证方发起挑战时对成员进行验证追踪群成员加入群后可获得群管理颁发的群证书,并使用证书对某信息进行签名。验证方可验证群签名的合法性。     在群签名......
  • tornado——关于tornado的异步操作学习
    关于tornado的异步操作学习yieldhttp_client.fetch和yieldtornado.gen.Task(http_client.fetch的区别实际上,yieldhttp_client.fetch和yieldtornado.gen.Task(http_client.fetch)是等价的,它们在功能上是相同的。tornado.gen.Task是Tornado4.0版本之前的写法,而yieldh......
  • 易语言银行余额虚拟生成器制作,提供源码思路,仅供学习
    今天这边带来的是一个图片生成器,是用易语言进行开发的,整个代码我算了一下不超过10行,然后就需要一个图片框组件和三个编辑框,三个标签,一个按钮就能实现,真的非常非常简单,大家可以照猫画虎哈,这也仅仅只是为大家做的一个演示示范。软件截图:程序集源码分享:.版本2.程序集窗口程......