首页 > 编程语言 >CompletableFuture异步编程

CompletableFuture异步编程

时间:2023-12-28 17:56:22浏览次数:36  
标签:异步 编程 任务 CompletableFuture 完成 返回值 线程

一、基本介绍

1.1 多线程编程的发展过程

  1. 创建线程的方式
  • 继承 Thread 类
  • 实现 Runnable 接口

特点:没有参数,没有返回值,没办法抛出异常

  1. JDK 1.5 进阶版Callable + Future

Callable接口中定义的 V call() throws Exception,该方法可以返回泛型值 V,并能够抛出异常。 Callable 只能在 ExecutorService 中使用, 直接通过executorService.submit(new CallAbleTask(i)),返回的结果是Future,结果信息从Future里面取出,具体的业务逻辑在call中执行。

特点:有返回值在Future中,能抛出异常,只能通过阻塞或者轮询的方式获取结果,不支持设置回调方法

  1. JDK1.8 高级版CompletableFuture

CompletableFuture是实现异步化的工具类,上手难度较低,且功能强大。

在之前我们一般通过Future实现异步。若要设置回调一般会使用guava的ListenableFuture,回调的引入又会导致回调地狱。

 

 

CompletableFuture实现了两个接口:Future、CompletionStage。Future表示异步计算的结果,CompletionStage用于表示异步执行过程中的一个步骤(Stage),这个步骤可能是由另外一个CompletionStage触发的,随着当前步骤的完成,也可能会触发其他一系列CompletionStage的执行。从而我们可以根据实际业务对这些步骤进行多样化的编排组合,CompletionStage接口正是定义了这样的能力,我们可以通过其提供的thenApply、thenCompose等函数式编程方法来组合编排这些步骤。

因此completableFuture能实现整个异步调用接口的扁平化和流式处理,支持通过函数式编程的方式对各类操作进行组合编排,解决原有Future处理一系列链式异步请求时的复杂编码。

特点:不需要手工分配线程,JDK 自动分配;代码语义清晰,异步任务链式调用;支持编排异步任务

二、CompletableFuture常用方法

 

CompletableFuture提供了四个静态方法来创建一个异步操作:

 

 

这四个方法的区别:

1)runAsync() 以Runnable函数式接口类型为参数,没有返回结果,supplyAsync() 以Supplier函数式接口类型为参数,返回结果类型为U;Supplier接口的 get()是有返回值的(会阻塞);

2)使用没有指定Executor的方法时,内部使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。

3)默认情况下CompletableFuture会使用公共的ForkJoinPool线程池,这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置ForkJoinPool线程池的线程数)。如果所有CompletableFuture共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。

所以,这里建议强制传线程池,且根据实际不同的业务类型做线程池隔离。

 

常用的组合依赖关系

  1. 依赖关系

thenApply():把前面任务的执行结果,交给后面的Function

thenCompose():用来连接两个有依赖关系的任务,结果由第二个任务返回

  1. and集合关系

thenCombine():合并任务,有返回值

thenAccepetBoth():两个任务执行完成后,将结果交给thenAccepetBoth处理,无返回值

runAfterBoth():两个任务都执行完成后,执行下一步操作(Runnable类型任务)

  1. or聚合关系

applyToEither():两个任务哪个执行的快,就使用哪一个结果,有返回值

acceptEither():两个任务哪个执行的快,就消费哪一个结果,无返回值

runAfterEither():任意一个任务执行完成,进行下一步操作(Runnable类型任务)

  1. 并行执行

allOf():当所有给定的 CompletableFuture 完成时,返回一个新的 CompletableFuture

anyOf():当任何一个给定的CompletablFuture完成时,返回一个新的CompletableFuture

  1. 结果处理

whenComplete:当任务完成时,将使用结果(或 null)和此阶段的异常(或 null如果没有)执行给定操作

exceptionally:返回一个新的CompletableFuture,当前面的CompletableFuture完成时,它也完成,当它异常完成时,给定函数的异常触发这个CompletableFuture的完成

 

 

API过多,整体可以进行如下分类:

  • 带run的方法,无入参,无返回值;
  • 带supply的方法,无入参,有返回值;
  • 带accept的方法,有入参,无返回值;
  • 带apply的方法,有入参,有返回值;
  • 带handle的方法,有入参,有返回值,并且带异常处理;
  • 以Async结尾的方法,都是异步的,否则是同步的; ​方法不以 Async 结尾,意味着 Action 使用相同的线程执行,而 Async 可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)
  • 以Either结尾的方法,只需完成任意一个;
  • 以Both/Combine结尾的方法,必须所有都完成。 ​
  • join 阻塞等待,不会抛异常;
  • get 阻塞等待,会抛异常;
  • complete(T value) 不阻塞,如果任务已完成,返回处理结果。如果没完成,则返回传参value;
  • completeExceptionally(Throwable ex) 不阻塞,如果任务已完成,返回处理结果。如果没完成,抛异常。

 

三、CompletableFuture的使用场景

1、异步组合任务一 + 任务二都完成,再执行任务三,并返回任务三的结果

 1 ExecutorService executorService = Executors.newFixedThreadPool(10);//线程池
 2 CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
 3     System.out.println("任务一开始");
 4     return 1;
 5 },executorService);
 6 CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
 7     System.out.println("任务二开始");
 8     return 2;
 9 });
10 //thenCombine明确任务3要等待任务1和任务2都完成后才能开始;
11 CompletableFuture<Integer> future3 = future1.thenCombine(future2,(res1,res2)->{
12     System.out.println("感知任务完成,合并两个任务,返回一个新的结果");
13     return res1+","+res2; 
14 },executorService);

 

2、异步组合任务一 、任务二、任务三全部完成,再执行任务四

这种多元依赖可以通过allOf或anyOf方法来实现,区别是当需要多个依赖全部完成时使用allOf,当多个依赖中的任意一个完成即可时使用anyOf,如下代码:

ExecutorService executorService = Executors.newFixedThreadPool(10);//线程池
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务一开始");
    return 1;
},executorService);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务二开始");
    return 2;
});
CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务三开始");
    return 3;
});
CompletableFuture<Void> future4 = CompletableFuture.allOf(future1, future2, future3);
CompletableFuture<String> result = future4.thenApply(v -> {
  //这里的join并不会阻塞,因为传给thenApply的函数是在future1、future2、future3全部完成时,才会执行 。
  result1 = future1.join();
  result2 = future2.join();
  result3 = future3.join();
  //根据result1、result2、result3组装最终result;
  return "result";
});

 

四、案例分析

以下代码哪些地方使用不当?哪些地方可以使用其他方法来实现?

 1 public void syncBiddingByProjects(List<Long> projectIds) {
 2         if (CollectionUtils.isEmpty(projectIds)) {
 3             return;
 4         }
 5         //多线程并发处理
 6         BiddingServiceImpl biddingService = (BiddingServiceImpl) AopContext.currentProxy();
 7         AtomicInteger finishTask = new AtomicInteger(0);
 8         CompletableFuture[] futures = projectIds.stream().map(projectId -> CompletableFuture
 9                 .supplyAsync(() -> syncBiddingByProjectId(biddingService, projectId), biddingSyncPool)
10                 .whenComplete((result, exc) -> {
11                     if (null != result && result) {
12                         finishTask.incrementAndGet();
13                     }
14                 })).toArray(CompletableFuture[]::new);
15 
16         //开始等待所有任务执行完成
17         CompletableFuture<Void> headerFuture = CompletableFuture.allOf(futures);
18         try {
19             headerFuture.join();
20         } catch (Exception ex) {
21             log.error("syncBiddingByProjects fail, allCnt={}, sucCnt={}", projectIds.size(), finishTask.get(), ex);
22             throw BusinessException.create("拉取数据的任务执行失败,allCnt=" +  projectIds.size() +", sucCnt=" + finishTask.get());
23         }
24     }

 

标签:异步,编程,任务,CompletableFuture,完成,返回值,线程
From: https://www.cnblogs.com/selinamee/p/17933235.html

相关文章

  • 跟着王洋老师学编程 - 1.7 键盘控制小球
    一、抽象方法/类和接口的定义抽象方法-无法清晰描述的方法,比如动物类的吃方法;抽象类-如果一个类中含有抽象方法,那这个类也必须要定义成抽象类;接口-如果一个类中只有抽象方法,没有属性,这就是一个纯抽象类,即接口。1abstractclassAnimal{//抽象类2pu......
  • 运维和编程语言
    1.脚本注释,脚本开发规范1.1.在shell脚本中,#后面的内容代表注释掉的内容,提供给开发者或使用者观看,系统会忽略此行1.2.注释可以单独写一行,也可以跟在命令后面1.3.尽量保持爱写注释的习惯,便于以后回顾代码的含义,尽量使用英文,而非中文 2.执行shell脚本的方式:2......
  • Python编程该怎么实现socket文件传输
    在网络编程中,Socket是一种常用的通信协议,它可以在计算机之间进行数据传输。在Python中,我们可以使用内置的socket模块来实现Socket文件传输。本文将介绍如何使用Python编程实现Socket文件传输的步骤和示例代码。步骤一:创建服务器端首先,我们需要创建一个服务器端来接收文件。以下是创......
  • Linux shell编程学习笔记36:read命令
     *更新日志 *2023-12-181.根据[美]威廉·肖特斯(Willian shotts)所著《Linux命令行大全(第2版)》            更新了-e、-i、-r选项的说明           2.更新了2.8的实例,增加了gif动图           3.补充......
  • 【AI编程工具】目前市面上常见的AI代码助手(AI Coding Assistant)
    目前市面上常见的AI代码助手(AICodingAssistant)有:GithubCopilot:提供更高效的代码编写、学习新的语言和框架以及更快的调试通义灵码_智能编码助手_AI编程_人工智能-阿里云AmazonCodeWhisper:实时代码建议CodeGeeX:国产免费编程AI助手iFlyCode:科大讯飞发布的编程新时代的智能助手Com......
  • java8 新特性(二)CompletableFuture类
    CompletableFuture是Java8中引入的一个新特性,它表示异步计算的结果。通过使用CompletableFuture,可以方便地处理异步计算,并能够在计算完成后执行回调函数。CompletableFuture是Java8中引入的一个功能强大的类,它实现了Future接口,并在此基础上进行了丰富的扩展,以简化异步编程的复......
  • 推荐编程学习的微信小程序工具
    CJavaPY编程之路网站微信小程序是一个非常值得推荐的编程学习工具。它可以帮助初学者快速掌握编程基础,是编程学习的必备工具。它包含了C、Java、Python等多种编程语言的学习内容,包括教程、示例代码等。该微信小程序是CJavaPY编程之路(www.cjavapy.com)网站的小程序版本,主要是编程......
  • 使用aiohttp异步调用API+request上传文件中文文档名乱码解决方案
    有时候在调用需要用异步调用API接口。在python中有很多框架,比如asyncio,Celery,Quart等。这里我选择了asyncio。Python3.5以上版本内置了asyncio库,可以用来编写单线程的并发代码。可以使用此库与aiohttp结合来发送异步HTTP请求。Python调用案例GETimportasyncioimportaio......
  • 牛客OJ在线编程常见输入输出练习
    练习链接:https://www.nowcoder.com/exam/test/76850250/detail?pid=27976983&examPageSource=Search 题目:A+B(4)输入数据包括多组。每组数据一行,每行的第一个整数为整数的个数n(1<=n<=100),n为0的时候结束输入。接下来n个正整数,即需要求和的每个正整数。示例:输入......
  • 关于 async 和 await 两个关键字(C#)【并发编程系列_5】
    关于async和await两个关键字(C#)【并发编程系列_5】 阅读目录〇、前言一、先通过一个简单的示例来互相认识下二、关于async关键字三、关于await关键字3.1await的用法示例3.2awaitforeach()示例3.3关于awaitusing()四、awaitTask和Task.GetAwai......