首页 > 其他分享 >CompletableFuture异步多线程

CompletableFuture异步多线程

时间:2023-08-10 21:47:40浏览次数:40  
标签:异步 whenComplete System CompletableFuture println 多线程 public out

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public static void main(String[] args) throws InterruptedException, ExecutionException {
        long startTime = System.currentTimeMillis();
        //调用用户服务获取用户基本信息
        CompletableFuture<String> userFuture = CompletableFuture.supplyAsync(() -> {
            try {
                //模拟查询商品耗时500毫秒
                Thread.sleep(600);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "用户A";
        });

        //调用商品服务获取商品基本信息
        CompletableFuture<String> goodsFuture = CompletableFuture.supplyAsync(() ->
                //模拟查询商品耗时500毫秒
        {
            try {
                Thread.sleep(400);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "商品A";
        });

        System.out.println("获取用户信息:" + userFuture.get());
        System.out.println("获取商品信息:" + goodsFuture.get());

        //模拟主程序耗时时间
        Thread.sleep(600);
        System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
    }

运行结果

 

CompletableFuture创建方式

1、常用的4种创建方式

CompletableFuture源码中有四个静态方法用来执行异步任务

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier){..}  
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor){..}  
public static CompletableFuture<Void> runAsync(Runnable runnable){..}  
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor){..}  
  • 「supplyAsync」执行任务,支持返回值。

  • 「runAsync」执行任务,没有返回值。

「supplyAsync方法」

//使用默认内置线程池ForkJoinPool.commonPool(),根据supplier构建执行任务  
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)  
//自定义线程,根据supplier构建执行任务  
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) 

「runAsync方法」

//使用默认内置线程池ForkJoinPool.commonPool(),根据runnable构建执行任务  
public static CompletableFuture<Void> runAsync(Runnable runnable)   
//自定义线程,根据runnable构建执行任务  
public static CompletableFuture<Void> runAsync(Runnable runnable,  Executor executor)  

2、结果获取的4种方式

对于结果的获取CompltableFuture类提供了四种方式

//方式一  
public T get()  
//方式二  
public T get(long timeout, TimeUnit unit)  
//方式三  
public T getNow(T valueIfAbsent)  
//方式四  
public T join()  

说明:

  • 「get()和get(long timeout, TimeUnit unit)」 => 在Future中就已经提供了,后者提供超时处理,如果在指定时间内未获取结果将抛出超时异常

  • 「getNow」 => 立即获取结果不阻塞,结果计算已完成将返回结果或计算过程中的异常,如果未计算完成将返回设定的valueIfAbsent值

  • 「join」 => 方法里不会抛出异常

public void testCompletableGet() throws InterruptedException, ExecutionException {  

    CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> {  
        try {  
            Thread.sleep(1000);  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
        return "商品A";  
    });  

    // getNow方法测试,如果未计算完,则返回设定的默认值valueIfAbsent
    System.out.println(cp1.getNow("商品B"));  

    //join方法测试   
    CompletableFuture<Integer> cp2 = CompletableFuture.supplyAsync((() -> 1 / 0));  //不会抛出异常
    System.out.println(cp2.join());  //抛出异常
   System.out.println("-----------------------------------------------------");  
    //get方法测试  
    CompletableFuture<Integer> cp3 = CompletableFuture.supplyAsync((() -> 1 / 0));  
    System.out.println(cp3.get());  
}  

「运行结果」

  • 第一个执行结果为 「商品B」,因为要先睡上1秒结果不能立即获取

  • join方法获取结果方法里不会抛异常,但是执行结果会抛异常,抛出的异常为CompletionException

  • get方法获取结果方法里将抛出异常,执行结果抛出的异常为ExecutionException

 

 

异步回调方法

 示例

public void testCompletableThenRunAsync() throws InterruptedException, ExecutionException {  
    long startTime = System.currentTimeMillis();  
      
    CompletableFuture<Void> cp1 = CompletableFuture.runAsync(() -> {  
        try {  
            //执行任务A  
            Thread.sleep(600);  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  

    });  

    CompletableFuture<Void> cp2 =  cp1.thenRun(() -> {  
        try {  
            //执行任务B  
            Thread.sleep(400);  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
    });  

    // get方法测试  
    System.out.println(cp2.get());  

    //模拟主程序耗时时间  
    Thread.sleep(600);  
    System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");  
}  
  
//运行结果    
/**  
 *  null  
 *  总共用时1610ms  
 */  

「thenRun 和thenRunAsync有什么区别呢?」

如果你执行第一个任务的时候,传入了一个自定义线程池:

  • 调用thenRun方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池。

  • 调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池。

说明: 后面介绍的thenAcceptthenAcceptAsyncthenApplythenApplyAsync等,它们之间的区别也是这个。

2、thenAccept/thenAcceptAsync

 第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,但是回调方法是没有返回值的。
public void testCompletableThenAccept() throws ExecutionException, InterruptedException {  
    long startTime = System.currentTimeMillis();  
    CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> {  
        return "dev";  

    });  
    CompletableFuture<Void> cp2 =  cp1.thenAccept((a) -> {  
        System.out.println("上一个任务的返回结果为: " + a);  //dev
    });  
   
    cp2.get();  //返回null
}  

3、 thenApply/thenApplyAsync

表示第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,并且回调方法是有返回值的。

示例

public void testCompletableThenApply() throws ExecutionException, InterruptedException {  
    CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> {  
        return "dev";  

    }).thenApply((a) -> {  
        if(Objects.equals(a,"dev")){  
            return "dev";  
        }  
        return "prod";  
    });  

    System.out.println("当前环境为:" + cp1.get());  

    //输出: 当前环境为:dev  
}  

异常回调

CompletableFuture的任务不论是正常完成还是出现异常它都会调用 「whenComplete」这回调函数。

  • 「正常完成」:whenComplete返回结果和上级任务一致,异常为null;

  • 「出现异常」:whenComplete返回结果为null,异常为上级任务的异常;

即调用get()时,正常完成时就获取到结果,出现异常时就会抛出异常,需要你处理该异常。

1、只用whenComplete

public void testCompletableWhenComplete() throws ExecutionException, InterruptedException {  
    CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {  

        if (Math.random() < 0.5) {  
            throw new RuntimeException("出错了");  
        }  
        System.out.println("正常结束");  
        return 0.11;  

    }).whenComplete((aDouble, throwable) -> {  
        if (aDouble == null) {  
            System.out.println("whenComplete aDouble is null");  
        } else {  
            System.out.println("whenComplete aDouble is " + aDouble);  
        }  
        if (throwable == null) {  
            System.out.println("whenComplete throwable is null");  
        } else {  
            System.out.println("whenComplete throwable is " + throwable.getMessage());  
        }  
    });  
    System.out.println("最终返回的结果 = " + future.get());  
}  

正常完成,没有异常时:

正常结束  
whenComplete aDouble is 0.11  
whenComplete throwable is null  
最终返回的结果 = 0.11  

 

出现异常时:get()会抛出异常 

whenComplete aDouble is null  
whenComplete throwable is java.lang.RuntimeException: 出错了  
  
java.util.concurrent.ExecutionException: java.lang.RuntimeException: 出错了  
 at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)  
 at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)  

 

2、whenComplete + exceptionally示例

public void testWhenCompleteExceptionally() throws ExecutionException, InterruptedException {  
    CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {  
        if (Math.random() < 0.5) {  
            throw new RuntimeException("出错了");  
        }  
        System.out.println("正常结束");  
        return 0.11;  

    }).whenComplete((aDouble, throwable) -> {  
        if (aDouble == null) {  
            System.out.println("whenComplete aDouble is null");  
        } else {  
            System.out.println("whenComplete aDouble is " + aDouble);  
        }  
        if (throwable == null) {  
            System.out.println("whenComplete throwable is null");  
        } else {  
            System.out.println("whenComplete throwable is " + throwable.getMessage());  
        }  
    }).exceptionally((throwable) -> {  
        System.out.println("exceptionally中异常:" + throwable.getMessage());  
        return 0.0;  
    });  

    System.out.println("最终返回的结果 = " + future.get());  
}  

当出现异常时,exceptionally中会捕获该异常,给出默认返回值0.0。

 
whenComplete aDouble is null  
whenComplete throwable is java.lang.RuntimeException: 出错了  
exceptionally中异常:java.lang.RuntimeException: 出错了  
最终返回的结果 = 0.0  

 





 
 
 
 
 
 
 
 

 

 

 

 

 

 

 

 

 

 

 

标签:异步,whenComplete,System,CompletableFuture,println,多线程,public,out
From: https://www.cnblogs.com/ixtao/p/17621570.html

相关文章

  • java多线程:死锁
    一、死锁的定义   多线程以及多进程改善了系统资源的利用率并提高了系统的处理能力。然而,并发执行也带来了新的问题——死锁。所谓死锁是指多个线程因竞争资源而造成的一种僵局(互相等待),若无外力作用,这些进程都将无法向前推进。   所谓死锁是指两个或两个以上的线程在......
  • CompletableFuture supplyAsync()
    CompletableFuture中的方法publicstaticCompletableFuture<Void>runAsync(Runnablerunnable)publicstaticCompletableFuture<Void>runAsync(Runnablerunnable,Executorexecutor)publicstatic<U>CompletableFuture<U>supplyAsync(Supplie......
  • 多线程(三)
    1、线程安全的懒汉式1.1、线程安全的懒汉式,代码如下://一个单一设计模式的类如下:publicclassBank{privatedoubleaccount;privateStringname;privatestaticBankinstance=null;//私有化构造器privateBank(){}privateBank(dou......
  • python多线程学习记录
    Python多线程参考文章:python多线程详解(超详细)、Python线程池(threadpool)创建及使用+实例代码、第二十章多线程1、多线程的概念2、python多线程的基本使用方法3、多线程的优点及与多进程的关系1、多线程的概念线程也叫轻量级进程,是操作系统能够进行运算调度......
  • 异步编程和多线程的关系
    引用自“https://zhuanlan.zhihu.com/p/570792890中bluecyan的留言”异步编程,它允许我们多个任务(Task)可以同时执行。多线程技术就是CPU利用多个线程来并发地运行多段逻辑。任务是逻辑层面的,线程是操作系统层面的,由线程ID标识,任务比线程抽象层级更高。异步任务可由线程实现,也可......
  • java定时任务中创建多线程却只有一个线程运行的问题
    在定时任务中开启了多线程。。但是却只有第一个线程运行。。原因是?参考:https://www.cnpython.com/java/515558在您的例子中,它是MyRunnable的单个实例,因此当一个线程在synchronized块内执行工作时,所有其他线程将等待工作完成。因此,有效地说,一次只有一个线程在做真正的工作......
  • .NET Core多线程 (2) 异步 - 上
    去年换工作时系统复习了一下.NETCore多线程相关专题,学习了一线码农老哥的《.NET5多线程编程实战》课程,我将复习的知识进行了总结形成本专题。本篇,我们来复习一下异步的相关知识点,预计阅读时间10分钟。理解异步的本质(1)异步是什么?举个例子,在高峰期去餐厅吃饭,会先排队拿个小票,......
  • 最新版 redis-py 操作 redis(同步、异步、集群、连接池)
    现在的Python异步操作redis,有三种(aredis、aioredis、asynio_redis)但是都不推荐背景从redis.py4.2.0rc1+开始,Aioredis已经集成到redis-py中,并且Aioredis将不再更新维护,导入方式:fromredisimportasyncioasaioredis,本次验证的是redis==4.6.0#!/usr/bin/e......
  • 在使用异步请求后把值存入localstore并且在其他变量中首次通过localstore获取不到值
    问题写了一个方法A,里面有使用AXIOS进行请求(异步),并且把请求后的数据存入localstore,此方法在onMounted中进行调用,之后在其他地方使用时(通过读取localstore来获取值并给变量赋值),发现首次调用是获取不了值进行展示的,刷新后或者切换页面回来后就是按预期展示了。推断方法A是异步执行......
  • Linux异步通知---fasync_helper()、kill_fasync()函数介绍与使用
    转载:Linux异步通知---fasync_helper()、kill_fasync()函数介绍与使用_面朝大海0902的博客-CSDN博客一、fasync_helper()与kill_fasync()函数应用程序通过fcntl置FASYNC标志位,触发对应驱动文件的fasync()函数执行(上节有解释原因Linux异步通知—signal()、fcntl()函数介绍与使用),该......