首页 > 其他分享 >CompletableFuture多任务组合回调

CompletableFuture多任务组合回调

时间:2023-08-10 21:56:47浏览次数:43  
标签:异步 System 任务 线程 println CompletableFuture 回调 多任务

 

1、AND组合关系

thenCombine / thenAcceptBoth / runAfterBoth都表示:「当任务一和任务二都完成再执行任务三」

区别在于:

  • 「runAfterBoth」 不会把执行结果当做方法入参,且没有返回值

  • 「thenAcceptBoth」: 会将两个任务的执行结果作为方法入参,传递到指定方法中,且无返回值

  • 「thenCombine」:会将两个任务的执行结果作为方法入参,传递到指定方法中,且有返回值

public void testCompletableThenCombine() throws ExecutionException, InterruptedException {  
    //创建线程池  
    ExecutorService executorService = Executors.newFixedThreadPool(10);  
    //开启异步任务1  
    CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {  
        System.out.println("异步任务1,当前线程是:" + Thread.currentThread().getId());  
        int result = 1 + 1;  
        System.out.println("异步任务1结束");  
        return result;  
    }, executorService);  

    //开启异步任务2  
    CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {  
        System.out.println("异步任务2,当前线程是:" + Thread.currentThread().getId());  
        int result = 1 + 1;  
        System.out.println("异步任务2结束");  
        return result;  
    }, executorService);  

    //任务组合  
    CompletableFuture<Integer> task3 = task.thenCombineAsync(task2, (f1, f2) -> {  
        System.out.println("执行任务3,当前线程是:" + Thread.currentThread().getId());  
        System.out.println("任务1返回值:" + f1);  
        System.out.println("任务2返回值:" + f2);  
        return f1 + f2;  
    }, executorService);  

    Integer res = task3.get();  
    System.out.println("最终结果:" + res);  
}  

运行结果

异步任务1,当前线程是:17  
异步任务1结束  
异步任务2,当前线程是:18  
异步任务2结束  
执行任务3,当前线程是:19  
任务1返回值:2  
任务2返回值:2  
最终结果:4  

2、OR组合关系

applyToEither / acceptEither / runAfterEither 都表示:「两个任务,只要有一个任务完成,就执行任务三」

区别在于:

  • 「runAfterEither」:不会把执行结果当做方法入参,且没有返回值

  • 「acceptEither」: 会将已经执行完成的任务,作为方法入参,传递到指定方法中,且无返回值

  • 「applyToEither」:会将已经执行完成的任务,作为方法入参,传递到指定方法中,且有返回值

public void testCompletableEitherAsync() {  
    //创建线程池  
    ExecutorService executorService = Executors.newFixedThreadPool(10);  
    //开启异步任务1  
    CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {  
        System.out.println("异步任务1,当前线程是:" + Thread.currentThread().getId());  

        int result = 1 + 1;  
        System.out.println("异步任务1结束");  
        return result;  
    }, executorService);  

    //开启异步任务2  
    CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {  
        System.out.println("异步任务2,当前线程是:" + Thread.currentThread().getId());  
        int result = 1 + 2;  
        try {  
            Thread.sleep(3000);  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
        System.out.println("异步任务2结束");  
        return result;  
    }, executorService);  

    //任务组合  
    task.acceptEitherAsync(task2, (res) -> {  
        System.out.println("执行任务3,当前线程是:" + Thread.currentThread().getId());  
        System.out.println("上一个任务的结果为:"+res);  
    }, executorService);  
}  

运行结果

//通过结果可以看出,异步任务2都没有执行结束,任务3获取的也是1的执行结果  
异步任务1,当前线程是:17  
异步任务1结束  
异步任务2,当前线程是:18  
执行任务3,当前线程是:19  
上一个任务的结果为:2  

注意

如果把上面的核心线程数改为1也就是

ExecutorService executorService = Executors.newFixedThreadPool(1);  
 

运行结果就是下面的了,会发现根本没有执行任务3,显然是任务3直接被丢弃了。

异步任务1,当前线程是:17  
异步任务1结束  
异步任务2,当前线程是:17  

3、多任务组合

  • 「allOf」:等待所有任务完成

  • 「anyOf」:只要有一个任务完成

示例

allOf:等待所有任务完成

public void testCompletableAallOf() throws ExecutionException, InterruptedException {  
    //创建线程池  
    ExecutorService executorService = Executors.newFixedThreadPool(10);  
    //开启异步任务1  
    CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {  
        System.out.println("异步任务1,当前线程是:" + Thread.currentThread().getId());  
        int result = 1 + 1;  
        System.out.println("异步任务1结束");  
        return result;  
    }, executorService);  

    //开启异步任务2  
    CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {  
        System.out.println("异步任务2,当前线程是:" + Thread.currentThread().getId());  
        int result = 1 + 2;  
        try {  
            Thread.sleep(3000);  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
        System.out.println("异步任务2结束");  
        return result;  
    }, executorService);  

    //开启异步任务3  
    CompletableFuture<Integer> task3 = CompletableFuture.supplyAsync(() -> {  
        System.out.println("异步任务3,当前线程是:" + Thread.currentThread().getId());  
        int result = 1 + 3;  
        try {  
            Thread.sleep(4000);  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
        System.out.println("异步任务3结束");  
        return result;  
    }, executorService);  

    //任务组合  
    CompletableFuture<Void> allOf = CompletableFuture.allOf(task, task2, task3);  

    //等待所有任务完成  
    allOf.get();  
    //获取任务的返回结果  
    System.out.println("task结果为:" + task.get());  
    System.out.println("task2结果为:" + task2.get());  
    System.out.println("task3结果为:" + task3.get());  
}  

anyOf: 只要有一个任务完成

 
public void testCompletableAnyOf() throws ExecutionException, InterruptedException {  
    //创建线程池  
    ExecutorService executorService = Executors.newFixedThreadPool(10);  
    //开启异步任务1  
    CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {  
        int result = 1 + 1;  
        return result;  
    }, executorService);  

    //开启异步任务2  
    CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {  
        int result = 1 + 2;  
        return result;  
    }, executorService);  

    //开启异步任务3  
    CompletableFuture<Integer> task3 = CompletableFuture.supplyAsync(() -> {  
        int result = 1 + 3;  
        return result;  
    }, executorService);  

    //任务组合  
    CompletableFuture<Object> anyOf = CompletableFuture.anyOf(task, task2, task3);  
    //只要有一个有任务完成  
    Object o = anyOf.get();  
    System.out.println("完成的任务的结果:" + o);  
}  

CompletableFuture使用有哪些注意点

 

使用的一些注意点。

1、Future需要获取返回值,才能获取异常信息

@Test  
public void testWhenCompleteExceptionally() {  
    CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {  
        if (1 == 1) {  
            throw new RuntimeException("出错了");  
        }  
        return 0.11;  
    });  

    //如果不加 get()方法这一行,看不到异常信息  
    //future.get();  
}  

Future需要获取返回值,才能获取到异常信息。如果不加 get()/join()方法,看不到异常信息。



2、CompletableFuture的get()方法是阻塞的

CompletableFutureget()方法是阻塞的,如果使用它来获取异步调用的返回值,需要添加超时时间。

//反例  
 CompletableFuture.get();  
//正例  
CompletableFuture.get(5, TimeUnit.SECONDS);  

3、不建议使用默认线程池

CompletableFuture代码中又使用了默认的 「ForkJoin线程池」,处理的线程个数是电脑 「CPU核数-1」。在大量请求过来的时候,处理逻辑复杂的话,响应会很慢。一般建议使用自定义线程池,优化线程池配置参数。

4、自定义线程池时,注意饱和策略

CompletableFuture的get()方法是阻塞的,我们一般建议使用future.get(5, TimeUnit.SECONDS)。并且一般建议使用自定义线程池。

但是如果线程池拒绝策略是DiscardPolicy或者DiscardOldestPolicy,当线程池饱和时,会直接丢弃任务,不会抛弃异常。因此建议,CompletableFuture线程池策略最好使用AbortPolicy,然后耗时的异步线程,做好线程池隔离哈。

 

标签:异步,System,任务,线程,println,CompletableFuture,回调,多任务
From: https://www.cnblogs.com/ixtao/p/17621602.html

相关文章

  • CompletableFuture异步多线程
    importjava.util.concurrent.CompletableFuture;importjava.util.concurrent.ExecutionException;publicstaticvoidmain(String[]args)throwsInterruptedException,ExecutionException{longstartTime=System.currentTimeMillis();//调用用户服......
  • CompletableFuture supplyAsync()
    CompletableFuture中的方法publicstaticCompletableFuture<Void>runAsync(Runnablerunnable)publicstaticCompletableFuture<Void>runAsync(Runnablerunnable,Executorexecutor)publicstatic<U>CompletableFuture<U>supplyAsync(Supplie......
  • PageOffice 在线编辑 office文件,回调父页面
    一、子页面调用父页面的方法varvalue=window.external.CallParentFunc("ParentFunName(Arguments);");//父页面的JS函数有返回值window.external.CallParentFunc("ParentFunName(Arguments);");//父页面的JS函数无返回值二、是否需要传递参数(子页面关闭窗口的同时刷新父页面......
  • 微信公众号授权回调 vue网址中带#号的处理
    1、改变vue模式为history,小编没有试2、通过配置nginx实现   A、替换跳转网址中的#为其他字符串,例如我的    consturl=this._getUrl("https://open.weixin.qq.com/connect/oauth2/authorize",{appid:this.appId,......
  • Java8 高级功能CompletableFuture
    CompletableFuture功能测试CompletableFuture类实现了CompletionStage和Future接口。Future是Java5添加的类,用来描述一个异步计算的结果,但是获取一个结果时方法较少,要么通过轮询isDone,确认完成后,调用get()获取值,要么调用get()设置一个超时时间。但是这个get()方法会阻塞住调用......
  • HAL库回调机制
    初始化回调HAL_PPP_Init()  调用↓HAL_PPP_MspInit()配置外设HAL中断回调中断信号 中断服务函数PPP_IRQHandler()中断处理公用函数HAL_PPP_IRQHandler()中断处理回调函数__weakvoidHAL_PPP_xxxCallback()......
  • quarkus依赖注入之七:生命周期回调
    欢迎访问我的GitHub这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos本篇概览本篇的知识点是bean的生命周期回调:在bean生命周期的不同阶段,都可以触发自定义代码的执行触发自定义代码执行的具体方式,是用对应的注解去修饰要执行的方法,如下图......
  • Qt TwinCAT3中的变量回调函数的时间戳读取方式
    官网提供了例程,官网真是个宝库。基本ADS的操作都里面有例程了,但是可能会稍微分散一点,不过多看几遍,也就慢慢整理你所需要的东西出来了。https://infosys.beckhoff.com/index_en.htm1#include<Windows.h>2#include<conio.h>3#include<winbase.h>45#include<TcA......
  • 微信支付回调
    在微信支付的回调中,常用参数列举如下:$resultArray['out_trade_no']:商户订单号。$resultArray['transaction_id']:订单号。$resultArray['amount']['total']:订单金额。$resultArray['mch_id']:商户号,即微信支付分配的商户号。$resultArray['appid']:公众账号ID,......
  • 多任务派发线程处理示例supplyAsync
    packagecom.cytc.test;importjava.util.ArrayList;importjava.util.List;importjava.util.Random;importjava.util.concurrent.CompletableFuture;importjava.util.concurrent.LinkedBlockingQueue;importjava.util.concurrent.ThreadPoolExecutor;importjava......