首页 > 其他分享 >CompletableFuture 异步编排的简单使用

CompletableFuture 异步编排的简单使用

时间:2024-03-31 20:34:34浏览次数:23  
标签:异步 System 编排 CompletableFuture future02 println future01 out

目录

1、创建异步对象

2、计算完成时回调方法

3、handle方法

4、线程串行化方法

5、两任务组合 - 都要完成

6、两任务组合 - 一个完成

7、多任务组合


如果在我们的业务中某些功能需要其他一些功能执行完成之后才能开始执行(比如获取其他功能的返回结果),这样就需要用到异步编排功能。

Future 是 Java 5 添加的类,用来描述一个异步计算的结果。你可以使用isDone方法检查计算是否完成,或者使用get阻塞住调用线程,直到计算完成返回结果,你也可以使用cancel 方法停止任务的执行。

虽然Future以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的 CPU 资源,而且也不能及时地得到计算结果。

Future的使用限制:

  • 并发执行多任务:Future只提供了get()方法获取结果,并且是阻塞的。所以,除了等待,别无他法
  • 无法对多个任务进行链式调度:如果我们希望在计算任务完成后执行特定的动作,比如发送邮件,但是Future中没有提供这样的能力
  • 无法组合多个任务:如果我们运行了10个任务,并且期望在他们全部执行结束后执行特定动作,在Future中是无法实现的。
  • 没有异常处理:Future接口中没有关于异常处理的方法

在 Java 8 中, 新增加了一个包含 50 个方法左右的类: CompletableFuture,提供了非常强大的 Future 的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合 CompletableFuture 的方法。 CompletableFuture 类实现了 Future 接口,所以你还是可以像以前一样通过get方法阻塞或者轮询的方式获得结果。

1、创建异步对象

CompletableFuture 提供了四个静态方法来创建一个异步操作。

runXxxx 都是没有返回结果的,supplyXxx 都是可以获取返回结果的 (使用get方法得到返回值)

可以传入自定义的线程池,否则就用默认的线程池

    @Test
    public void test3() throws ExecutionException, InterruptedException {

        System.out.println("主函数开始执行");

        CompletableFuture<Void> runAsync=CompletableFuture.runAsync(() -> {
            System.out.println("线程 "+Thread.currentThread().getName()+"=》开始执行");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("线程 "+Thread.currentThread().getName()+"=》执行结束");
        });

        CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {
            System.out.println("线程 "+Thread.currentThread().getName()+"=》开始执行");
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("线程 "+Thread.currentThread().getName()+"=》执行结束");
            return Thread.currentThread().getName()+"线程返回的信息";
        });

        runAsync.get();
        System.out.println(supplyAsync.get());

        System.out.println("主函数执行结束");

    }

2、计算完成时回调方法

whenCompleteXXX可以得到计算结果和异常,但是不能修改返回值。

exceptionally 可以得到异常结果,可以修改返回值。

带有 Async 默认是异步执行的.

示例代码:

@Test
void contextLoads() throws ExecutionException, InterruptedException {
​
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {
        @Override
        public Integer get() {
            System.out.println("当前线程:" + Thread.currentThread().getName());
            int i = 10/0;
            return 1024;
        }
    }).whenComplete(new BiConsumer<Integer, Throwable>() {
        @Override
        public void accept(Integer res, Throwable exception) {
            System.out.println("返回结果是:" + res);
            System.out.println("异常是:" + exception);
        }
    }).exceptionally(new Function<Throwable, Integer>() {
        @Override
        public Integer apply(Throwable exception) {
            System.out.println("异常是:" + exception);
            return 666;
        }
    });
​
    Integer integer = future.get();
    System.out.println(integer);
​
}

返回结果:

当前线程:pool-1-thread-1
返回结果是:null
异常是:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
异常是:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
666

3、handle方法

handle方法算是结合了上面的whenComplete方法和exceptionally方法。

示例代码

@Test
void contextLoads() throws ExecutionException, InterruptedException {
​
    CompletableFuture<Object> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {
            @Override
            public Integer get() {
                System.out.println("当前线程:" + Thread.currentThread().getName());
                int i = 10/0;
                return 1024;
            }
        }).handle((res,exception)->{
            //如果发生异常,res就等于空
            if(res!=null){
                return "返回的结果为:"+res;
            }

            //如果没有发生异常,exception就为空
            if(exception!=null){
                return "发生了异常,异常为:"+exception.getCause();
            }

            return 0;
        });

        System.out.println(future. Get());
}
返回结果

4、线程串行化方法

thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。

thenAccept 方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。

thenRun 方法:只要上面的任务执行完成,就开始执行 thenRun,不能获得上一个任务的返回值。

Function<?super T,?extend U> :

T:上一个任务返回结果的类型 。

U:当前任务的返回值类型。

    @Test
    public void test3() throws ExecutionException, InterruptedException {
        CompletableFuture<Object> future = CompletableFuture.supplyAsync(new Supplier<Object>() {
            @Override
            public Object get() {
                return 123;
            }
        }).thenApply(new Function<Object, Object>() {
            @Override
            public Object apply(Object o) {
                System.out.println("上一个任务的返回值:"+o);
                return 456;
            }
        });

        System.out.println(future. Get());
    }

 执行结果

5、两任务组合 - 都要完成

两个任务必须都完成,才能触发该任务。

thenCombine:组合两个 future,获取两个 future 的返回结果,并返回当前任务的返回值

thenAcceptBoth:组合两个 future,获取两个 future 任务的返回结果,然后处理任务,没有返回值。

runAfterBoth:组合两个 future,不需要获取 future 的结果,只需两个 future 处理完任务后, 处理该任务。

示例代码

@Test
void contextLoads() throws ExecutionException, InterruptedException {
​
    CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
        @Override
        public Integer get() {
            System.out.println("future01开始");
            System.out.println("future01结束");
            return 1024;
        }
    }, service);
​
    CompletableFuture<String> future02 = CompletableFuture.supplyAsync(new Supplier<String>() {
        @Override
        public String get() {
            System.out.println("future02开始");
            System.out.println("future02结束");
            return "hello";
        }
    }, service);
​
​
    CompletableFuture<String> future03 = future01.thenCombine(future02, (f1, f2) -> {
        System.out.println("future01的返回结果:" + f1);
        System.out.println("future02的返回结果:" + f2);
        return "future03";
    });
​
    System.out.println("==============================");
​
    System.out.println(future03.get());
​
}

返回结果:

future01开始
future01结束
future02开始
future02结束
future01的返回结果:1024
future02的返回结果:hello
==============================
future03

6、两任务组合 - 一个完成

当两个任务中,任意一个 future 任务完成的时候,执行任务。

applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。

acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。

runAfterEither:两个任务有一个执行完成,不需要获取 future 的结果,处理任务,也没有返回值

示例代码:

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Object> future01 = CompletableFuture.supplyAsync(() -> {
            System.out.println("future01开始");
            System.out.println("future01结束");
            return 1024;
        }, service);
​
        CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
            System.out.println("future02开始");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("future02结束");
            return "hello";
        }, service);
​
​
        CompletableFuture<String> future03 = future01.applyToEither(future02, res -> {
​
            System.out.println("返回值:" + res);
​
            return "future03";
        });
​
        System.out.println("==============================");
​
        System.out.println(future03.get());
    }

返回结果:

future01开始
future01结束
future02开始
返回值:1024
==============================
future03
future02结束

7、多任务组合

 

allOf:等待所有任务完成

anyOf:只要有一个任务完成

示例代码:

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Object> future01 = CompletableFuture.supplyAsync(() -> {
            System.out.println("future01开始");
            System.out.println("future01结束");
            return 1024;
        }, service);
​
        CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
            System.out.println("future02开始");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("future02结束");
            return "hello";
        }, service);
​
​
        CompletableFuture<Object> future03 = CompletableFuture.supplyAsync(() -> {
            System.out.println("future03开始");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("future03结束");
            return "RedMi";
        }, service);
​
        CompletableFuture<Void> allOf = CompletableFuture.allOf(future01, future02, future03);
​
        allOf.get();//等待三个任务都完成
​
        System.out.println("==============================");
​
        System.out.println("三个任务都执行完成了");
​
    }

返回结果:

future01开始
future01结束
future02开始
future03开始
future03结束
future02结束
==============================
三个任务都执行完成了

 

标签:异步,System,编排,CompletableFuture,future02,println,future01,out
From: https://blog.csdn.net/stduyc/article/details/137176973

相关文章

  • 中间件 ZK分布式专题与Dubbo微服务入门 6-5 同步异步删除zk节点
    0课程地址https://coding.imooc.com/lesson/201.html#mid=12721 1重点关注1.1本节内容javaapi客户端删除节点,包含同步修改和异步修改,只做了异步,同步不通用(因为没有回调函数,不知道是否删除成功)也可以参照视频看下 1.2javaapi删除节点......
  • 中间件 ZK分布式专题与Dubbo微服务入门 6-3 同步异步创建zk节点
    0课程地址https://coding.imooc.com/lesson/201.html#mid=12719 1重点关注1.1本节内容javaapi客户端新增临时节点和永久节点 1.2javaapi新增节点同步调用/***同步或者异步创建节点,都不支持子节点的递归......
  • Vuex的核心组成、版本问题及store.js的使用、 Vuex中存值、取值以及获取变量值、异步
    Vuex的核心组成、版本问题及store.js的使用、Vuex中存值、取值以及获取变量值、异步同步操作和Vuex后台交互  //store//初始值//设置值mutations  ---this.$store.commit('setDemoValue方法名',value); //更新值action --this.$store.disp......
  • 基于任务的异步模式和基于时间/回调的异步模式
    问题场景描述webapi:需要向另一个服务器发送http请求,等待服务器的回调结果,若指定时间内比如10分钟没有收到回调则返回失败,否则处理回调返回。典型的基于时间/回调的异步模式,和经常使用的await模式不同,await是基于任务的异步模式,任务完成返回。而前面这种应用场景依赖回调处理......
  • 掌握C#中异步魔法:同步方法如何优雅调用异步方法
     概述:上述C#示例演示了如何在同步方法中调用异步方法。通过使用`async`和`await`关键字,实现了同步方法对异步方法的调用。建议使用`await`而不是`Result`来避免潜在的死锁问题。这种模式在处理异步任务时能够提高代码的可读性和性能。在C#中,从同步方法调用异步方法的过程涉及......
  • 协程&异步编程
    协程,也可以被称为微线程,是一种用户态内的上下文切换技术。简而言之,其实就是通过一个线程实现代码块相互切换执行协程一般应用在有IO操作的程序中,因为协程可以利用IO等待的时间去执行一些其他的代码,从而提升代码执行效率。async事件循环事件循环,可以理解为while循环,在周期性......
  • 12-Ajax异步交互技术
     同步与异步操作最主要的区别:同步操作必须按照以上步骤执行,而异步操作在第四步响应客户端时,可以继续执行第二步请求服务器,即客户端可以执行其它操作 数据地址:console-mock.apipost.cn/mock/4250f8d4-b605-47eb-9777-07e29548dbb8/list <!DOCTYPEhtml><htmllang="......
  • Ajax 与 Axios 异步请求
    Ajax与Axios异步请求一、服务器对外提供了哪些资源1.网页中如何请求数据 数据,也是服务器对外提供的一种资源。只要是资源,必然要通过请求–处理–响应的方式进行获取。如果要在网页中请求服务器上的数据资源,则需要用到XMLHttpRequest对象。XMLHttpRequest(简称xhr)是......
  • 23.异步模式-生产者、消费者
    1.与保护性暂停GuardedObject不同,不需要产生结果与消费结果的线程一一对应。2.消费队列可以用来平衡生产和消费的线程资源。3.生产者负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据。4.消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据。5.jdk......
  • C# 异步与 Unity 协程(实例讲解)
    C#异步编程实例:假设我们有一个需要从Web获取数据的简单应用。我们可以使用C#的异步编程模型来避免UI线程被HTTP请求阻塞1usingSystem.Net.Http;2usingSystem.Threading.Tasks;34publicclassAsyncExample5{6publicasyncTask<string>FetchDataFromWebAsync(st......