首页 > 其他分享 >CompletableFuture使用详解

CompletableFuture使用详解

时间:2024-07-10 15:28:36浏览次数:14  
标签:System 任务 详解 CompletableFuture 使用 println 线程 out

文章目录

CompletableFuture是jdk8的新特性。CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步会点、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。
在这里插入图片描述

Future接口请看Future详解
CompletionStage 接口中的方法比较多,CompletableFuture 的函数式能力就是这个接口赋予的。从这个接口的方法参数你就可以发现其大量使用了 Java8 引入的函数式编程。
在这里插入图片描述

CompletableFuture有如下使用场景:

  • 创建异步任务
  • 简单任务异步回调
  • 多个任务组合处理

一、创建 CompletableFuture

常见的创建 CompletableFuture 对象的方法如下:

  • 通过 new 关键字
  • 基于 CompletableFuture 自带的静态工厂方法:runAsync()supplyAsync()

1.1 new 关键字

通过 new 关键字创建 CompletableFuture 对象这种使用方式可以看作将CompletableFuture当做 Future 来使用。下面咱们来看一个简单的案例。

CompletableFuture<String> resultFuture = new CompletableFuture<>();

// 假设在未来的某个时刻,我们得到了最终的结果。
//这时,我们可以调用 complete() 方法为其传入结果,
//这表示 resultFuture 已经被完成了。complete() 方法只能调用一次,后续调用将被忽略。
resultFuture.complete(rpcResponse);

//获取异步计算的结果也非常简单,直接调用 get() 方法即可。
//调用 get() 方法的线程会阻塞直到 CompletableFuture 完成运算。
rpcResponse = completableFuture.get();

1.2 静态工厂方法

如果你已经知道计算的结果的话,可以使用静态方法 completedFuture() 来创建 CompletableFuture

CompletableFuture<String> future = CompletableFuture.completedFuture("hello!");
assertEquals("hello!", future.get());

completedFuture() 方法底层调用的是带参数的 new方法,只不过,这个方法不对外暴露。

public static <U> CompletableFuture<U> completedFuture(U value) {
    return new CompletableFuture<U>((value == null) ? NIL : value);
}

二、创建异步任务

CompletableFuture创建异步任务,一般有supplyAsyncrunAsync两个方法

2.1 supplyAsync

supplyAsync是创建带有返回值的异步任务。它有如下两个方法,一个是使用默认线程池(ForkJoinPool.commonPool())的方法,一个是带有自定义线程池的重载方法

// 带返回值异步请求,默认线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
 
// 带返回值的异步请求,可以自定义线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
 

演示代码(以supplyAsync(Supplier<U> supplier, Executor executor为例):


public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 自定义线程池
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
            System.out.println("do something....");
            return "result";
        }, executorService);
 
        //等待子任务执行完成get()会阻塞,join()不会
        System.out.println("结果->" + cf.get());
}

2.2 runAsync

runAsync是创建没有返回值的异步任务。它有如下两个方法,一个是使用默认线程池(ForkJoinPool.commonPool())的方法,一个是带有自定义线程池的重载方法

// 不带返回值的异步请求,默认线程池
public static CompletableFuture<Void> runAsync(Runnable runnable)
 
// 不带返回值的异步请求,可以自定义线程池
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)

示例代码:

public class FutureTest {

    public static void main(String[] args) {
        //可以自定义线程池
        ExecutorService executor = Executors.newCachedThreadPool();
        //runAsync的使用
        CompletableFuture<Void> runFuture = CompletableFuture.runAsync(() -> System.out.println("run,hello"), executor);
        //supplyAsync的使用
        CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(() -> {
                    System.out.print("supply,hello");
                    return "hello"; }, executor);
        //runAsync的future没有返回值,输出null
        System.out.println(runFuture.join());
        //supplyAsync的future,有返回值
        System.out.println(supplyFuture.join());
        executor.shutdown(); // 线程池需要关闭
    }
}

2.3 获取任务结果的方法

// 如果完成则返回结果,否则就抛出具体的异常
public T get() throws InterruptedException, ExecutionException 
 
// 最大时间等待返回结果,否则就抛出具体异常
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
 
// 完成时返回结果值,否则抛出unchecked异常。为了更好地符合通用函数形式的使用,如果完成此 CompletableFuture所涉及的计算引发异常,则此方法将引发unchecked异常并将底层异常作为其原因
public T join()
 
// 如果完成则返回结果值(或抛出任何遇到的异常),否则返回给定的 valueIfAbsent。
public T getNow(T valueIfAbsent)
 
// 如果任务没有完成,返回的值设置为给定值
public boolean complete(T value)
 
// 如果任务没有完成,就抛出给定异常
public boolean completeExceptionally(Throwable ex) 
 

三、异步回调处理

3.1 thenApply和thenApplyAsync

thenApply 表示某个任务执行完成后执行的动作,即回调方法,会将该任务的执行结果即方法返回值作为入参传递到回调方法中,带有返回值。
示例代码:

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf1 do something....");
            return 1;
        });
 
        CompletableFuture<Integer> cf2 = cf1.thenApplyAsync((result) -> {
            System.out.println(Thread.currentThread() + " cf2 do something....");
            result += 2;
            return result;
        });
        //等待任务1执行完成
        System.out.println("cf1结果->" + cf1.get());
        //等待任务2执行完成
        System.out.println("cf2结果->" + cf2.get());
}

//输出
先执行cf1再执行cf2
cf1结果->1
cf2结果->3

3.2 thenAccept和thenAcceptAsync

thenAccep表示某个任务执行完成后执行的动作,即回调方法,会将该任务的执行结果即方法返回值作为入参传递到回调方法中,无返回值。


public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf1 do something....");
            return 1;
        });
 
        CompletableFuture<Void> cf2 = cf1.thenAccept((result) -> {
            System.out.println(Thread.currentThread() + " cf2 do something....");
        });
 
        //等待任务1执行完成
        System.out.println("cf1结果->" + cf1.get());
        //等待任务2执行完成
        System.out.println("cf2结果->" + cf2.get());
//输出
cf1结果->1
cf2结果->null

3.3 thenRun和thenRunAsync

thenRun表示某个任务执行完成后执行的动作,即回调方法,无入参,无返回值。



public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf1 do something....");
            return 1;
        });
 
        CompletableFuture<Void> cf2 = cf1.thenRun(() -> {
            System.out.println(Thread.currentThread() + " cf2 do something....");
        });
 
        //等待任务1执行完成
        System.out.println("cf1结果->" + cf1.get());
        //等待任务2执行完成
        System.out.println("cf2结果->" + cf2.get());
}


//输出
cf1结果->1
cf2结果->null

3.4 whenComplete和whenCompleteAsync

whenComplete是当某个任务执行完成后执行的回调方法,会将执行结果或者执行期间抛出的异常传递给回调方法,如果是正常执行则异常为null,回调方法对应的CompletableFuture的result和该任务一致,如果该任务正常执行,则get方法返回执行结果,如果是执行异常,则get方法抛出异常。

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf1 do something....");
            int a = 1/0;
            return 1;
        });
 
        CompletableFuture<Integer> cf2 = cf1.whenComplete((result, e) -> {
            System.out.println("上个任务结果:" + result);
            System.out.println("上个任务抛出异常:" + e);
            System.out.println(Thread.currentThread() + " cf2 do something....");
        });
 
//        //等待任务1执行完成
//        System.out.println("cf1结果->" + cf1.get());
//        //等待任务2执行完成
        System.out.println("cf2结果->" + cf2.get());
        }

3.5 handle和handleAsync

whenComplete基本一致,区别在于handle的回调方法有返回值。

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf1 do something....");
            // int a = 1/0;
            return 1;
        });
 
        CompletableFuture<Integer> cf2 = cf1.handle((result, e) -> {
            System.out.println(Thread.currentThread() + " cf2 do something....");
            System.out.println("上个任务结果:" + result);
            System.out.println("上个任务抛出异常:" + e);
            return result+2;
        });
 
        //等待任务2执行完成
        System.out.println("cf2结果->" + cf2.get());
}
//输出
上个任务结果: 1
上个任务抛出异常:null
cf2结果->  3

3.6 有无Async的区别

使用then***方法时子任务与父任务使用的是同一个线程,而then***Async在子任务中是另起一个线程执行任务,并且then***Async可以自定义线程池,默认的使用ForkJoinPool.commonPool()线程池。

四、多任务组合处理

4.1 AND组合关系

thenCombinethenAcceptBothrunAfterBoth这三个方法都是将两个CompletableFuture组合起来处理,只有两个任务都正常完成时,才进行下阶段任务。

区别

  • thenCombine会将两个任务的执行结果作为所提供函数的参数,且该方法有返回值;
  • thenAcceptBoth同样将两个任务的执行结果作为方法入参,但是无返回值;
  • runAfterBoth没有入参,也没有返回值。注意两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。

示例代码:

public class ThenCombineTest {

    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {

        CompletableFuture<String> first = CompletableFuture.completedFuture("第一个异步任务");
        ExecutorService executor = Executors.newFixedThreadPool(10);
        CompletableFuture<String> future = CompletableFuture
                //第二个异步任务
                .supplyAsync(() -> "第二个异步任务", executor)
                // (w, s) -> System.out.println(s) 是第三个任务
                .thenCombineAsync(first, (s, w) -> {
                    System.out.println(w);
                    System.out.println(s);
                    return "两个异步任务的组合";
                }, executor);
        System.out.println(future.join());
        executor.shutdown();

    }
}
//输出
第一个异步任务
第二个异步任务
两个异步任务的组合

4.2 OR 组合的关系

applyToEither / acceptEither / runAfterEither 都表示:将两个CompletableFuture组合起来,只要其中一个执行完了,就会执行某个任务。
区别在于:

  • applyToEither:会将已经执行完成的任务,作为方法入参,传递到指定方法中,且有返回值
  • acceptEither: 会将已经执行完成的任务,作为方法入参,传递到指定方法中,且无返回值
  • runAfterEither: 不会把执行结果当做方法入参,且没有返回值。
public class AcceptEitherTest {
    public static void main(String[] args) {
        //第一个异步任务,休眠2秒,保证它执行晚点
        CompletableFuture<String> first = CompletableFuture.supplyAsync(()->{
            try{

                Thread.sleep(2000L);
                System.out.println("执行完第一个异步任务");}
                catch (Exception e){
                    return "第一个任务异常";
                }
            return "第一个异步任务";
        });
        ExecutorService executor = Executors.newSingleThreadExecutor();
        CompletableFuture<Void> future = CompletableFuture
                //第二个异步任务
                .supplyAsync(() -> {
                            System.out.println("执行完第二个任务");
                            return "第二个任务";}
                , executor)
                //第三个任务
                .acceptEitherAsync(first, System.out::println, executor);

        executor.shutdown();
    }
}
//输出
执行完第二个任务
第二个任务

4.3 AllOf

所有任务都执行完成后,才执行allOf返回的CompletableFuture。如果任意一个任务异常,allOfCompletableFuture,执行get方法,会抛出异常

public class allOfFutureTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Void> cf1 = CompletableFuture.runAsync(()->{
            System.out.println("我执行完了");
        });
        CompletableFuture<Void> cf2 = CompletableFuture.runAsync(() -> {
            System.out.println("我也执行完了");
        });
        CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(cf1, cf2).whenComplete((m,k)->{
            System.out.println("finish");
        });
    }
}
//输出
我执行完了
我也执行完了
finish

4.4 AnyOf

任意一个任务执行完,就执行anyOf返回的CompletableFuture。如果执行的任务异常,anyOfCompletableFuture,执行get方法,会抛出异常

public class AnyOfFutureTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Void> a = CompletableFuture.runAsync(()->{
            try {
                Thread.sleep(3000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("我执行完了");
        });
        CompletableFuture<Void> b = CompletableFuture.runAsync(() -> {
            System.out.println("我也执行完了");
        });
        CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(a, b).whenComplete((m,k)->{
            System.out.println("finish");
//            return "捡田螺的小男孩";
        });
        anyOfFuture.join();
    }
}
//输出
我也执行完了
finish

4.5 thenCompose

thenCompose方法会在某个任务执行完成后,将该任务的执行结果,作为方法入参,去执行指定的方法。该方法会返回一个新的CompletableFuture实例

  • 如果该CompletableFuture实例的result不为null,则返回一个基于该result新的CompletableFuture实例;
  • 如果该CompletableFuture实例为null,然后就执行这个新任务
public class ThenComposeTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<String> f = CompletableFuture.completedFuture("第一个任务");
        //第二个异步任务
        ExecutorService executor = Executors.newSingleThreadExecutor();
        CompletableFuture<String> future = CompletableFuture
                .supplyAsync(() -> "第二个任务", executor)
                .thenComposeAsync(data -> {
                    System.out.println(data); return f; //使用第一个任务作为返回
                }, executor);
        System.out.println(future.join());
        executor.shutdown();

    }
}
//输出
第二个任务
第一个任务

五、使用注意点

5.1 Future需要获取返回值,才能获取异常信息

ExecutorService executorService = new ThreadPoolExecutor(5, 10, 5L,
    TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
      int a = 0;
      int b = 666;
      int c = b / a;
      return true;
   },executorService).thenAccept(System.out::println);
   
 //如果不加 get()方法这一行,看不到异常信息
 //future.get();

5.2 CompletableFuture的get()方法是阻塞的

CompletableFuture的get()方法是阻塞的,如果使用它来获取异步调用的返回值,需要添加超时时间

//反例
 CompletableFuture.get();
//正例
CompletableFuture.get(5, TimeUnit.SECONDS);`

所以在实际开发中如果没有特殊需求尽量通过join()方法异步获取结果

5.3 默认线程池的注意点

CompletableFuture代码中又使用了默认的线程池,处理的线程个数是电脑CPU核数-1。在大量请求过来的时候,处理逻辑复杂的话,响应会很慢。一般建议使用自定义线程池,优化线程池配置参数。

5.4 自定义线程池时,注意饱和策略

CompletableFutureget()方法是阻塞的,我们一般建议使用future.get(3, TimeUnit.SECONDS)。并且一般建议使用自定义线程池。但是如果线程池拒绝策略是DiscardPolicy或者DiscardOldestPolicy,当线程池饱和时,会直接丢弃任务,不会抛弃异常。因此建议,CompletableFuture线程池策略最好使用AbortPolicy,然后耗时的异步线程,做好线程池隔离哈。

标签:System,任务,详解,CompletableFuture,使用,println,线程,out
From: https://blog.csdn.net/SRY12240419/article/details/140295289

相关文章

  • 打造个性化科学工具箱:使用conda-build自定义软件包
    打造个性化科学工具箱:使用conda-build自定义软件包引言Conda是一个强大的包管理系统,广泛用于Python社区,尤其在数据科学和机器学习领域。除了安装现成的包,Conda还允许用户通过conda-build工具构建和分享自己的软件包。本文将详细介绍如何在Conda环境中使用conda-build构建......
  • git 基本使用
    一、基本概念工作区workspace:本地项目存放文件的位置暂存区index/stage:暂时存放文件的地方,通过add命令将工作区的文件添加到缓冲区本地仓库repository:通常使用commit命令将暂存区的文件添加到本地仓库远程仓库remote:通常使用clone命令将远程仓库代码拷贝下来,本地代码......
  • 基于springboot+layui+thymeleaf的学生成绩管理系统设计与实现(源码+SQL+使用说明)
    本项目适合做计算机相关专业的毕业设计,课程设计,技术难度适中、工作量比较充实。完整资源获取点击下载完整资源1、资源项目源码均已通过严格测试验证,保证能够正常运行;2、项目问题、技术讨论,可以给博主私信或留言,博主看到后会第一时间与您进行沟通;3、本项目比较适合计算......
  • VMware出现虚拟机似乎正在使用,但就是无法打开
    如果电脑上的VMware还在运行虚拟机时电脑突然关机,再次打开电脑会发现之前运行的虚拟机突然无法打开了,点击运行时会出现该虚拟机似乎还在运行中,请按“获取所有权(T)”按钮获取它的所有权。这个时候千万不要慌,先点取消,千万不能以为是虚拟机坏了,然后直接选择从磁盘中删除。 点击......
  • 首次使用DevEcoStudio
    1、双击桌面快捷方式,进入首次运行的欢迎页面由于咱们之前电脑上没有安装过此软件,所以直接保持默认选项不导入配置,然后点击......
  • 修改Word文档的创建时间我使用简鹿文件重命名工具
    在日常生活或工作中,有时我们需要修改文件的创建时间以符合特定的需求或避免时间上的混淆。对于Word文档而言,虽然Word软件本身不提供直接修改文件创建时间的功能,但我们可以借助第三方工具来实现这一目的。简鹿文件重命名工具就是这样一款实用的软件,它不仅支持文件重命名,还......
  • STM32时钟详解(基于STM32F429)
    目录前言一、时钟源组成二、时钟树三、时钟代码分析前言STM32的时钟就像是这个微控制器(MCU)的“心跳”或者“节拍器”。它决定了STM32内部各个部分(比如CPU、GPIO端口、串口通信等)的运行速度和时序。想象一下,如果你有一个机器人在做动作,时钟就是控制它每一步动作的速度......
  • SQL Server Compact的简单使用
    今天遇到公司之前一个项目,使用的是SQLServerCompact(SQLCE)数据库,记录一下创建SDF数据库:1.VisualStudio2022安装扩展,SQLiteandSQLServerCompactToolbox2.工具-->SQLite/SQLServerCompactToolbox2.添加一个连接 3.选择路径或者创建一个新的数据库4.已经......
  • Nuxt框架中内置组件详解及使用指南(五)
    title:Nuxt框架中内置组件详解及使用指南(五)date:2024/7/10updated:2024/7/10author:cmdragonexcerpt:摘要:本文详细介绍了Nuxt框架中和组件的使用方法与配置,包括安装、基本用法、属性详解、示例代码以及高级功能如事件处理、自定义图片属性和图片格式回退策略。同时,还......
  • 从零学习大模型——使用GLM-4-9B-Chat + BGE-M3 + langchain + chroma建立的本地RAG应
    BGE-M3是第一个具有多功能、多语言和多粒度特性的文本检索模型。多功能:可以同时执行三种检索功能:单向量检索、多向量检索和稀疏检索。多语言:支持100多种工作语言。多粒度:它能够处理不同粒度的输入,从短句子到长达8192个词汇的长文档。为了构建RAG应用,我们需要用到向量数......