首页 > 其他分享 >CompletableFuture多线程并发处理

CompletableFuture多线程并发处理

时间:2024-06-21 09:10:19浏览次数:27  
标签:异步 执行 异常 并发 任务 CompletableFuture 多线程 方法

CompletableFuture多线程并发处理

    概要
    一个接口可能需要调用 N 个其他服务的接口,这在项目开发中还是挺常见的。举个例子:用户请求获取订单信息,可能需要调用用户信息、商品详情、物流信息、商品推荐等接口,
    如果是串行(按顺序依次执行每个任务)执行的话,接口的响应速度会非常慢。考虑到这些接口之间有大部分都是 无前后顺序关联 的,可以 并行执行 ,就比如说调用获取商品详情的时候,
    可以同时调用获取物流信息。通过并行执行多个任务的方式,接口的响应速度会得到大幅优化。

    对于 Java 程序来说,Java 8 才被引入的 CompletableFuture 可以帮助我们来做多个任务的编排,功能非常强大。

    一、 Future
    Future 类是异步思想的典型运用,主要用在一些需要执行耗时任务的场景,避免程序一直原地等待耗时任务执行完成,执行效率太低。

    简单理解就是:我有一个任务,提交给了 Future 来处理。任务执行期间我自己可以去做任何想做的事情。并且,在这期间我还可以取消任务以及获取任务的执行状态。
   一段时间之后,我就可以 Future 那里直接取出任务执行结果。

    二、 CompletableFuture介绍

    Future 在实际使用过程中存在一些局限性比如不支持异步任务的编排组合、获取计算结果的 get() 方法为阻塞调用。

    虽然用CountDownLatch可以解决多个异步任务需要相互依赖的场景,但是在Java8以后我们不在认为这是一种优雅的解决方式,接下来来了解下CompletableFuture的使用。

   Java 8 才被引入CompletableFuture 类可以解决Future 的这些缺陷。CompletableFuture 除了提供了更为好用和强大的 Future 特性之外,还提供了函数式编程、异步任务编排组合(可以将多个异步任务串联起来,组成一个完整的链式调用)等能力。

   CompletableFuture是高级的多线程功能,支持自定义线程池和系统默认的线程池,是多线程,高并发里面,经常需要用到的比直接创建线程,要简单易用的方法。它能够简化异步任务的执行和结果处理。

   如下图:

    

   CompletableFuture 同时实现了 Future 和 CompletionStage 接口。

   CompletionStage 接口描述了一个异步计算的阶段。很多计算可以分成多个阶段或步骤,此时可以通过它将所有步骤组合起来,形成异步计算的流水线。

   三、CompletableFuture 常见操作

   1. 创建CompletableFuture对象

   常见的创建 CompletableFuture 对象的方法如下:

   1)通过 new 关键字

   2)基于 CompletableFuture 自带的静态工厂方法:runAsync()、supplyAsync() 

   runAsync()

   说明:runAsync()方法接受的参数是 Runnable ,这是一个函数式接口,不允许返回值。当你需要异步操作且不关心返回结果的时候可以使用 runAsync() 方法。

   supplyAsync()

   supplyAsync()方法接受的参数是 Supplier<U> ,这也是一个函数式接口,U 是返回结果值的类型。需要异步操作且关心返回结果时 可以使用supplyAsync()方法。

   2. 处理异步计算的结果

   当我们获取到异步计算的结果之后,还可以对其进行进一步的处理,比较常用的方法有下面几个:
   thenApply()
   thenAccept()
   thenRun()
   whenComplete()

   1)  thenRun()/thenRunAsync()

   不需要从回调函数中获取返回结果,可以使用 thenAccept() 或者 thenRun()。这两个方法的区别在于 thenRun() 不能访问异步计算的结果。

   通俗点讲就是,做完第一个任务后,再做第二个任务,第二个任务也没有返回值。

   2)  thenRun 和thenRunAsync有什么区别呢?

   如果你执行第一个任务的时候,传入了一个自定义线程池。

   调用thenRun方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池。

   调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池。

   说明: 后面介绍的thenAccept和thenAcceptAsync,thenApply和thenApplyAsync等,它们之间的区别也是这个。

   3)thenAccept/thenAcceptAsync

   第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,但是回调方法是没有返回值的。

   4)  whenComplete

   说明:whenComplete() 的方法的参数是 BiConsumer<? super T, ? super Throwable> 。相对于 Consumer , BiConsumer 可以接收 2 个输入对象然后进行“消费”。
   当CompletableFuture的任务不论是正常完成还是出现异常它都会调用「whenComplete」这回调函数。

   正常完成:whenComplete返回结果和上级任务一致,异常为null;

   出现异常:whenComplete返回结果为null,异常为上级任务的异常;

   即调用get()时,正常完成时就获取到结果,出现异常时就会抛出异常,需要你处理该异常。

   3. 异常处理

   1) handle()

   handle() 方法用来处理任务执行过程中可能出现的抛出异常的情况。

 1     CompletableFuture<String> future
 2         = CompletableFuture.supplyAsync(() -> {
 3       if (true) {
 4         throw new RuntimeException("Computation error!");
 5       }
 6       return "hello!";
 7     }).handle((res, ex) -> {
 8       // res 代表返回的结果
 9       // ex 的类型为 Throwable ,代表抛出的异常
10       return res != null ? res : "world!";
11     });
12    System.out.println(future.get());
13 
14    //测试结果
15    world!

   2) exceptionally()

   exceptionally() 方法来处理异常情况。

   3) completeExceptionally()

 

   如果想让 CompletableFuture 的结果就是异常的话,可以使用 completeExceptionally() 方法为其赋值。

   4.  组合CompletableFuture

   1) thenCompose
有先后依赖顺序:
可以使用 thenCompose() 按顺序链接两个 CompletableFuture 对象,实现异步的任务链。它的作用是将前一个任务的返回结果作为下一个任务的输入参数,从而形成一个依赖关系。

   2) thenCombine

   没有先后依赖顺序:

   thenCombine会在两个任务都执行完成后,把两个任务的结果合并。两个任务是并行执行的,它们之间并没有先后依赖顺序。

   3) acceptEither

  如果我们想要实现 task1 和 task2 中的任意一个任务执行完后就执行 task3 的话,可以使用 acceptEither()。

   5. 并行运行多个CompletableFuture

   我们可以通过 CompletableFuture 的 allOf()这个静态方法来并行运行多个 CompletableFuture 。实际项目中,我们经常需要并行运行多个互不相关的任务,这些任务之间没有依赖关系,可以互相独立地运行。

   比说我们要读取处理 6 个文件,这 6 个任务都是没有执行顺序依赖的任务,但是我们需要返回给用户的时候将这几个文件的处理的结果进行统计整理。像这种情况我们就可以使用并行运行多个 CompletableFuture 来处理。

   1)allOf

   allOf() 方法会等到所有的 CompletableFuture 都运行完成之后再返回。

   说明:调用 join() 可以让程序等future1 和 future2 都运行完了之后再继续执行。

   2)anyOf

   anyOf() 方法不会等待所有的 CompletableFuture 都运行完成之后再返回,只要有一个执行完成即可!

   三、CompletableFuture 使用建议

   1. 使用自定义线程池

   CompletableFuture 默认使用ForkJoinPool.commonPool() 作为执行器,这个线程池是全局共享的,可能会被其他任务占用,导致性能下降或者饥饿。
   因此,建议使用自定义的线程池来执行 CompletableFuture 的异步任务,可以提高并发度和灵活性。

   2. 尽量避免使用get

   CompletableFuture的get()方法是阻塞的,尽量避免使用。如果必须要使用的话,需要添加超时时间,否则可能会导致主线程一直等待,无法执行其他任务。

   3. 正确进行异常处理

  使用 CompletableFuture的时候一定要以正确的方式进行异常处理,避免异常丢失或者出现不可控问题。

  1)使用 whenComplete 方法可以在任务完成时触发回调函数,并正确地处理异常,而不是让异常被吞噬或丢失。

  2)使用 exceptionally 方法可以处理异常并重新抛出,以便异常能够传播到后续阶段,而不是让异常被忽略或终止。

  3)使用 handle 方法可以处理正常的返回结果和异常,并返回一个新的结果,而不是让异常影响正常的业务逻辑。

  4)使用 CompletableFuture.allOf 方法可以组合多个 CompletableFuture,并统一处理所有任务的异常,而不是让异常处理过于冗长或重复。

  4. 合理组合多个异步任务

  正确使用 thenCompose() 、 thenCombine() 、acceptEither()、allOf()、anyOf()等方法来组合多个异步任务,以满足实际业务的需求,提高程序执行效率。

 

  参考链接: 

  https://javaguide.cn/java/concurrent/completablefuture-intro.html

 

标签:异步,执行,异常,并发,任务,CompletableFuture,多线程,方法
From: https://www.cnblogs.com/hld123/p/18259868

相关文章

  • C#设计:实现文件的多线程下载
    一、程序设计要求能够在下载过程中显示进度信息(如总大小、已下载大小、进度、下载速度、剩余大小、剩余时间、状态、下载的网址等)。支持从指定的URL下载文件。支持多线程并发下载文件。提供友好的用户界面(UI)来下载。具有良好的可扩展性,能够方便地添加新功能或修改现有功能。代......
  • MVCC多版本并发控制
    MVCC(MultiVersionConcurrencyControl)多版本并发控制,是指在使用READCOMMITTED、REPEATABLEREAD这两种隔离级别的事务执行SELECT操作时访问记录的版本链的过程,使不同事务的读写操作能够并发执行,提升系统性能。MVCC机制的核心是在做SELECT操作前会生产一个ReadView,READCO......
  • java多线程
    目录多线程的实现方式多线程的第一种实现方式 继承Thread类的方式进行实现多线程的第二种实现方式 实现Runnable接口的方式进行实现利用Callable接口和Future接口方式实现 多线程中常用的成员方法 StringgetName()                返回此线程的名......
  • Java学习基础笔记——多线程基础部分
    第十三章多线程基础13.1线程介绍13.1.1线程相关概念13.2线程创建13.2.1创建线程的两种方式13.2.2继承Threadvs实现Runnable的区别13.2.3线程终止13.3线程方法13.3.1常用方法第一组13.3.2常用方法第二组13.3.3用户线程和守护线程13.4Synchronized13......
  • 如何实现多线程下载大文件
    如何实现多线程下载大文件在应用开发中,实现多线程下载大文件是一个常见的需求,可以有效提高下载效率和用户体验。以下是实现多线程下载大文件的详细步骤和知识点:实现步骤:使用request模块:request模块提供了文件上传下载的基础能力,支持任务管理系统的默认并发功能,简化了下......
  • 深入理解AQS:Java并发编程中的核心组件
    目录AQS简介AQS的设计思路AQS的核心组成部分状态(State)同步队列(SyncQueue)条件队列(ConditionQueue)AQS的内部实现节点(Node)锁的获取与释放独占锁共享锁条件变量AQS的应用案例ReentrantLockCountDownLatchSemaphore总结参考文献AQS简介AbstractQueuedSynchronizer(AQ......
  • 【JavaEE精炼宝库】多线程(7)定时器
    目录一、定时器的概念二、标准库中的定时器三、自己实现一个定时器3.1MyTimerTask实现:3.2MyTimer实现:一、定时器的概念定时器也是软件开发中的⼀个重要组件。类似于一个"闹钟"。达到一个设定的时间之后,就执行某个指定好的代码(可以用来完成线程池里面的非核心线程......
  • 多线程设计模式之Future模式
    在JDK中实现线程同步等待闭环(FutureTask/Future)中已经涉及到了Future模式,相对与多线程设计模式之WorkerThread模式有何异同呢?在多线程设计模式之WorkerThread模式中client和worker之间没有任何直接联系,即worker执行的结果client是不关心的;Future模式与之的差别就在于此线......
  • 鸿蒙内核源码分析(并发并行篇) | 听过无数遍的两个概念
    理解并发概念并发(Concurrent):多个线程在单个核心运行,同一时间只能一个线程运行,内核不停切换线程,看起来像同时运行,实际上是线程被高速的切换.通俗好理解的比喻就是高速单行道,单行道指的是CPU的核数,跑的车就是线程(任务),进程就是管理车的公司,一个公司可以有很多台车.并发......
  • Java项目开发中异步调用场景控制并发数
    场景项目基于SpringBoot搭建,默认使用TomcatWeb容器,对于每个HTTP请求,TomcatWeb容器会分配1个线程来处理请求。在pom.xml里查看依赖关系:spring-boot-starter-web添加了tomcat-embed-core依赖Tomcat线程池配置可在application.yml配置:server:tomcat:max-threads:5......