首页 > 其他分享 >深入理解 CompletableFuture 的底层原理

深入理解 CompletableFuture 的底层原理

时间:2024-07-11 10:29:26浏览次数:23  
标签:异步 java util CompletableFuture import 原理 public 底层

引言

在现代 Java 编程中,异步编程变得越来越重要。为了实现高效和非阻塞的代码,Java 8 引入了 CompletableFuture,一个用于构建异步应用程序的强大工具。本文将详细探讨 CompletableFuture 的底层原理,展示其工作机制,并通过代码示例说明如何在实际应用中使用它。

异步编程的背景

异步编程是指在程序运行过程中,不等待某个操作完成,而是继续执行其他操作,待异步操作完成后再处理其结果。这样可以提高程序的效率,特别是在 I/O 操作和网络请求等耗时操作中。

在 Java 8 之前,实现异步编程主要依赖于 Future 接口。然而,Future 存在一些局限性,例如无法手动完成、不能链式调用等。为了解决这些问题,Java 8 引入了 CompletableFuture

什么是 CompletableFuture

CompletableFuture 是 Java 8 中新增的类,实现了 FutureCompletionStage 接口,提供了强大的异步编程能力。CompletableFuture 允许以非阻塞的方式执行任务,并且可以通过链式调用来组合多个异步操作。

CompletableFuture 的特点

  1. 手动完成:可以手动设置 CompletableFuture 的结果或异常。
  2. 链式调用:支持多个 CompletableFuture 的链式调用,形成复杂的异步任务流。
  3. 组合操作:提供了丰富的方法来组合多个异步任务,例如 thenCombinethenAcceptBoth 等。
  4. 异常处理:提供了灵活的异常处理机制,可以在任务链中处理异常。

CompletableFuture 的底层原理

工作机制

CompletableFuture 的核心是基于 ForkJoinPool 实现的。ForkJoinPool 是一种特殊的线程池,适用于并行计算任务。它采用了工作窃取算法,能够有效利用多核 CPU 的性能。

当我们提交一个任务给 CompletableFuture 时,它会将任务提交到默认的 ForkJoinPool.commonPool() 中执行。我们也可以指定自定义的线程池来执行任务。

状态管理

CompletableFuture 具有以下几种状态:

  • 未完成(Pending):任务尚未完成。
  • 完成(Completed):任务已经成功完成,并返回结果。
  • 异常(Exceptionally Completed):任务在执行过程中抛出了异常。

这些状态通过内部的 volatile 变量来管理,并使用 CAS(Compare-And-Swap) 操作保证线程安全。

任务调度

CompletableFuture 的任务调度机制基于 ForkJoinPool 的工作窃取算法。当一个线程完成当前任务后,会从其他线程的任务队列中窃取任务执行,从而提高 CPU 利用率。

下面我们通过一个简单的示例代码来理解 CompletableFuture 的基本用法。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 创建一个 CompletableFuture 实例
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
            return "Hello, World!";
        });

        // 阻塞等待结果
        String result = future.get();
        System.out.println(result);
    }
}

在上面的示例中,我们创建了一个 CompletableFuture 实例,并使用 supplyAsync 方法异步执行任务。supplyAsync 方法会将任务提交到默认的 ForkJoinPool 中执行。最后,我们使用 get 方法阻塞等待结果并打印输出。

链式调用

CompletableFuture 的一个重要特性是支持链式调用。通过链式调用,我们可以将多个异步任务组合在一起,形成一个任务流。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureChainExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
            return "Hello, World!";
        }).thenApply(result -> {
            return result + " from CompletableFuture";
        }).thenApply(String::toUpperCase);

        String finalResult = future.get();
        System.out.println(finalResult);
    }
}

在这个示例中,我们使用 thenApply 方法对前一个任务的结果进行处理,并返回一个新的 CompletableFuture 实例。通过链式调用,我们可以将多个任务串联在一起,形成一个任务流。

组合操作

CompletableFuture 提供了多种方法来组合多个异步任务。以下是一些常用的组合操作示例:

  1. thenCombine:组合两个 CompletableFuture,并将两个任务的结果进行处理。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureCombineExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 5);
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 10);

        CompletableFuture<Integer> combinedFuture = future1.thenCombine(future2, Integer::sum);

        System.out.println(combinedFuture.get());  // 输出 15
    }
}

2. thenAcceptBoth:组合两个 CompletableFuture,并对两个任务的结果进行消费处理。

import java.util.concurrent.CompletableFuture;

public class CompletableFutureAcceptBothExample {
    public static void main(String[] args) {
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 5);
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 10);

        future1.thenAcceptBoth(future2, (result1, result2) -> {
            System.out.println("Result: " + (result1 + result2));
        }).join();
    }
}

3. allOf:组合多个 CompletableFuture,并在所有任务完成后执行操作。

import java.util.concurrent.CompletableFuture;

public class CompletableFutureAllOfExample {
    public static void main(String[] args) {
        CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
            System.out.println("Task 1 completed");
        });

        CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
            System.out.println("Task 2 completed");
        });

        CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2);

        combinedFuture.join();
        System.out.println("All tasks completed");
    }
}

异常处理

在异步任务中处理异常是非常重要的。CompletableFuture 提供了多种方法来处理任务执行过程中的异常。

  1. exceptionally:在任务抛出异常时,提供一个默认值。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureExceptionallyExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            if (true) {
                throw new RuntimeException("Exception occurred");
            }
            return "Hello, World!";
        }).exceptionally(ex -> {
            System.out.println("Exception: " + ex.getMessage());
            return "Default Value";
        });

        System.out.println(future.get());  // 输出 Default Value
    }
}

2. handle:无论任务是否抛出异常,都进行处理。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureHandleExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            if (true) {
                throw new RuntimeException("Exception occurred");
            }
            return "Hello, World!";
        }).handle((result, ex) -> {
            if (ex != null) {
                return "Default Value";
            }
            return result;
        });

        System.out.println(future.get());  // 输出 Default Value
    }
}

实战案例:构建异步数据处理管道

为了更好地理解 CompletableFuture 的实际应用,我们来构建一个异步数据处理管道。假设我们有一个数据源,需要对数据进行一系列的处理操作,并将处理结果输出到文件中。

数据源模拟

我们首先模拟一个数据源,该数据源会生成一系列数据。

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class DataSource {
    public List<Integer> getData() {
        return IntStream.range(0, 10).boxed().collect(Collectors.toList());
    }
}

数据处理

接下来,我们定义数据处理操作。假设我们需要对数据进行两步处理:首先对每个数据乘以 2,然后对结果进行累加。

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

public class DataProcessor {
    public List<Integer> processStep1(List<Integer> data) {
        return data.stream().map(x -> x * 2).collect(Collectors.toList());
    }

    public Integer processStep2(List<Integer> data) {
        return data.stream().reduce(0, Integer::sum);
    }

    public CompletableFuture<List<Integer>> processStep1Async(List<Integer> data) {
        return CompletableFuture.supplyAsync(() -> processStep1(data));
    }

    public CompletableFuture<Integer> processStep2Async(List<Integer> data) {
        return CompletableFuture.supplyAsync(() -> processStep2(data));
    }
}

结果输出

我们定义一个方法将处理结果输出到文件中。

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.concurrent.CompletableFuture;

public class ResultWriter {
    public void writeResult(String fileName, Integer result) throws IOException {
        Files.write(Paths.get(fileName), result.toString().getBytes());
    }

    public CompletableFuture<Void> writeResultAsync(String fileName, Integer result) {
        return CompletableFuture.runAsync(() -> {
            try {
                writeResult(fileName, result);
            } catch (IOException e) {
                throw new IllegalStateException(e);
            }
        });
    }
}

主程序

最后,我们在主程序中将上述组件组合在一起,构建异步数据处理管道。

import java.util.List;
import java.util.concurrent.CompletableFuture;

public class Main {
    public static void main(String[] args) {
        DataSource dataSource = new DataSource();
        DataProcessor dataProcessor = new DataProcessor();
        ResultWriter resultWriter = new ResultWriter();

        List<Integer> data = dataSource.getData();

        CompletableFuture<List<Integer>> step1Future = dataProcessor.processStep1Async(data);
        CompletableFuture<Integer> step2Future = step1Future.thenCompose(dataProcessor::processStep2Async);
        CompletableFuture<Void> writeFuture = step2Future.thenCompose(result -> resultWriter.writeResultAsync("result.txt", result));

        writeFuture.join();
        System.out.println("Data processing completed");
    }
}

在这个例子中,我们使用 CompletableFuture 将数据处理步骤和结果输出串联在一起,形成了一个完整的异步数据处理管道。通过 thenCompose 方法,我们将前一个任务的结果传递给下一个异步任务,从而实现了链式调用。

总结

本文深入探讨了 CompletableFuture 的底层原理,展示了其工作机制,并通过多个代码示例说明了如何在实际应用中使用 CompletableFuture。通过理解 CompletableFuture 的异步编程模型、状态管理、任务调度和异常处理机制,我们可以更好地利用这一强大的工具构建高效、非阻塞的 Java 应用程序。

希望这篇文章能够帮助你全面理解 CompletableFuture,并在实际开发中灵活应用。如果你有任何问题或建议,欢迎随时交流!

标签:异步,java,util,CompletableFuture,import,原理,public,底层
From: https://blog.csdn.net/m0_68570169/article/details/140255213

相关文章

  • 文件系统(九):一文看懂yaffs2文件系统原理
    liwen012024.07.07前言yaffs是专为nandflash设计的一款文件系统,与jffs类似,都是属于日志结构文件系统。与jffs不同的是,yaffs文件系统利用了nandflash一些特有属性,所以在数据读写擦除和回收上都有较大的差异。关于jffs2文件系统的介绍可以查看《文件系统(八):LinuxJFFS......
  • HashMap和ConcurrentHashMap的底层实现原理
    (1)HashMap底层实现原理在JDK1.7版本之前,HashMap数据结构是数组和链表,HashMap通过哈希算法将元素的键(Key)映射到数组中的槽位(Bucket)。如果多个键映射到同一个槽位,它们会以链表的形式存储在同一个槽位上,因为链表的查询时间是O(n),所以冲突很严重,一个索引上的链表非常长,......
  • 471、基于51单片机的自行车(速度,里程,电机,LCD1602)(程序+Proteus仿真+原理图+流程图+元器
    毕设帮助、开题指导、技术解答(有偿)见文未目录方案选择单片机的选择显示器选择方案一、设计功能二、Proteus仿真图单片机模块设计三、原理图四、程序源码资料包括:需要完整的资料可以点击下面的名片加下我,找我要资源压缩包的百度网盘下载地址及提取码。方案选择......
  • 成功实现FaceTime数据筛选,FaceTime蓝号检测,检测是否开通FaceTime功能的实现原理
    FaceTime是苹果公司iOS和macOS(以前称MacOSX或OSX)内置的一款视频通话软件,通过Wi-Fi或者蜂窝数据接入互联网,在两个装有FaceTime的设备之间实现视频通话。其要求通话双方均具有装有FaceTime的苹果设备,苹果ID以及可接入互联网的3G/4G/5G或者Wi-Fi网络。一、Windows电脑上部署......
  • CompletableFuture使用详解
    文章目录一、创建CompletableFuture1.1new关键字1.2静态工厂方法二、创建异步任务2.1supplyAsync2.2runAsync2.3获取任务结果的方法三、异步回调处理3.1thenApply和thenApplyAsync3.2thenAccept和thenAcceptAsync3.3thenRun和thenRunAsync3.4whenComplete和......
  • 计算机组成原理-第七章输入/输出系统
    2.I/O接口I/O接口是主机和外设之间的交接界面,通过接口可以实现主机和外设之间的信息交换。2.1I/O接口的功能进行地址译码和设备选择实现主机和外设的通信联络控制实现数据缓冲信号格式的转换传送控制命令和状态信息2.2I/O接口的基本结构I/O接口在主机侧通过I/O总线与......
  • 太原理工大学数值计算方法实验报告(计科)【计算机数值方法】(wz)
    实验目的和要求(必填)1.掌握方程求根数值方法的基本方法,算法设计,实验验证和结果分析。2.选择使用二分法、迭代法、牛顿法、割线法等方法对给定的方程进行根的求解;3.进行结果判断和误差分析。二、实验内容和原理(必填)熟悉使用二分法、迭代法、牛顿法、割线法等方法对给定的方程......
  • 太原理工数据结构实验报告(计科)
    实验一线性表一、实验目的和要求本次实验的主要目的是为了使学生熟练掌握线性表的基本操作在顺序存储结构和链式存储结构上的实现,提高分析和解决问题的能力。要求仔细阅读并理解下列例题,上机通过,并观察其结果,然后独立完成后面的实验题。(1学时)二、实验内容和原理1.建立如......
  • malloc实现原理【Liunx】
    malloc实现原理malloc是什么?malloc,calloc,realloc的区别malloc的实现原理malloc的两种实现方式为什么使用brk?为什么使用mmap?malloc怎么定界的malloc分配的是虚拟内存上的空间吗?malloc是什么?  通过malloc,我们可以开辟一个自定义大小的内存空间。  通过上图......
  • 大话光学原理:3.干涉与衍射
    一、干涉        这是一束孤独的光,在真空的无垠中悄无声息地穿行。忽然,一堵高耸的墙壁挡住了它的去路,它别无选择,只能硬着头皮冲撞而去。在摸索中,它意外地发现墙壁上竟有两道孔隙,笔直而细长,宛如量身定做,似乎在等待着它轻盈的穿透。然而,刚刚逃脱束缚,一道新的屏障又挡在......