首页 > 其他分享 >玩转CompletableFuture线程异步编排,看这一篇就够了

玩转CompletableFuture线程异步编排,看这一篇就够了

时间:2024-02-18 20:12:20浏览次数:21  
标签:executor 就够 任务 CompletableFuture 返回值 线程 public

转载自:https://blog.csdn.net/w306026355/article/details/109707269

1、CompletableFuture介绍

CompletableFuture可用于线程异步编排,使原本串行执行的代码,变为并行执行,提高代码执行速度。

学习异步编排先需要学习线程池和lambda表达式相关知识,学习线程池可以移步我的另一篇博客

ThreadPoolExecutor线程池理解

2、CompletableFuture使用

说明:使用CompletableFuture异步编排大多方法都会有一个重载方法,会多出一个executor参数,用来传来自定义的线程池,如果不传就会使用默认的线程池。

下文举例都是使用自定义线程池,不再做特殊说明。

贴出自定义线程池的代码

ThreadPoolExecutor executor = new ThreadPoolExecutor(
        10,
        50,
        60,
        TimeUnit.SECONDS,
        new LinkedBlockingQueue<>(100000),
        Executors.defaultThreadFactory(),
        new ThreadPoolExecutor.AbortPolicy());
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

2.1、创建异步编排对象

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);

public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
 
  • 1
  • 2
  • 3
  • 4
  • 5

有两种格式,一种是supply开头的方法,一种是run开头的方法

  • supply开头:这种方法,可以返回异步线程执行之后的结果

  • run开头:这种不会返回结果,就只是执行线程任务

举例:

// 异步起线程执行业务 无返回值
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
    System.out.println("当前线程:" + Thread.currentThread().getId());
    int i = 10 / 2;
    System.out.println("运行结果:" + i);
}, executor);


//异步起线程执行业务 有返回值
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    System.out.println("当前线程:" + Thread.currentThread().getId());
    int i = 10 / 0;
    System.out.println("运行结果:" + i);
    return i;
}, executor).whenComplete((res,exc)->{
    // 可以接收到返回值和异常类型,但是无法处理异常
    System.out.println("异步任务成功完成了...结果是:" + res + ";异常是:" + exc);
}).exceptionally(throwable -> {
    // 处理异常,返回一个自定义的值,和上边返回值无关。
    return 10;
});

//方法执行完成后的处理
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    System.out.println("当前线程:" + Thread.currentThread().getId());
    int i = 10 / 0;
    System.out.println("运行结果:" + i);
    return i;
}, executor).handle((res,thr)->{
    // 无论线程是否正确执行,都会执行这里,可以对返回值进行操作。
    if(res != null){
        return res * 2;
    }
    if(thr != null){
        return 0;
    }
    return 0;
});
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

这里出现了三个新方法

CompletableFuture.whenComplete():用于接收带有返回值的CompletableFuture对象,无法修改返回值。

CompletableFuture.exceptionally():用于处理异常,只要异步线程中有抛出异常,则进入该方法,修改返回值。

CompletableFuture.handle():用于处理返回结果,可以接收返回值和异常,可以对返回值进行修改。

2.2、线程串行方法

// 使线程串行执行,无入参,无返回值
public CompletableFuture<Void> thenRun(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor);

// 使线程串行执行,有入参,无返回值
public CompletableFuture<Void> thenAccept(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor);

// 使线程串行执行,有入参,有返回值
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

解释:

  • 不以Async结尾的方法,都是在执行串行的时候,使用执行上一个方法的线程,也就是说从头串行到最后一个任务,使用的是同一个线程。
  • 而以Async结尾的方法,每串行一个方法,都会使用一个新线程。下文不再解释。

例子:

// 两个任务串行执行,任务2不用任务1的返回值,并且任务2无返回值。
CompletableFuture<Void> future1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务1:当前线程:" + Thread.currentThread().getId());
    int i = 10 / 4;
    System.out.println("任务1:运行结果:" + i);
    return i;
}, executor).thenRunAsync(() -> {
    System.out.println("任务2启动了");
}, executor);

// 两个任务串行执行,任务2要使用任务1的返回值,并且任务2无返回值。
CompletableFuture<Void> future2 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务1:当前线程:" + Thread.currentThread().getId());
    int i = 10 / 4;
    System.out.println("任务1:运行结果:" + i);
    return i;
}, executor).thenAcceptAsync(res -> {
    System.out.println("任务2启动了" + res);
}, executor);

// 两个任务串行执行,任务2要使用任务1的返回值,并且返回任务2的返回值。
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务1:当前线程:" + Thread.currentThread().getId());
    int i = 10 / 4;
    System.out.println("任务1:运行结果:" + i);
    return i;
}, executor).thenApplyAsync(res -> {
    System.out.println("任务2启动了" + res);
    return "Hello" + res;
}, executor);
// 有返回值时,需要使用CompletableFuture.get()方法,等待异步线程执行结束,从而获取到异步线程的返回值。
String s = future3.get();
System.out.println(s);
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

2.3、两任务并行执行完成,再执行新任务

解释:两任务并行执行,并且都执行完成之后,串行另一个任务,也就是说在两个任务执行并行执行完成之后,需要再执行另一个任务。

// 线程并行执行完成,并且执行新任务action,新任务无入参,无返回值
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action);
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action);
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor);

// 线程并行执行完成,并且执行新任务action,新任务有入参,无返回值
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action);
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action);
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor);

// 线程并行执行完成,并且执行新任务action,新任务有入参,有返回值
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor);
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

例子:

// 任务1
CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务1线程:" + Thread.currentThread().getId());
    int i = 10 / 4;
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("任务1结束:");

    return i;
}, executor);

// 任务2
CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务2线程:" + Thread.currentThread().getId());
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("任务2结束");

    return "Hello";
}, executor);

// 任务1和任务2都完成,在不使用任务1和任务2的返回值情况下执行任务3,并且任务3没有返回值
CompletableFuture<Void> future1 = future01.runAfterBothAsync(future02,
                                                             () -> System.out.println("任务3开始"), executor);

// 任务1和任务2都完成,使用任务1和任务2的返回值情况下执行任务3,并且任务3没有返回值
CompletableFuture<Void> future2 = future01.thenAcceptBothAsync(future02,
                                                               (f1, f2) -> System.out.println("任务3开始,之前的结果" + f1 + "-->" + f2),
                                                               executor);

// 任务1和任务2都完成,使用任务1和任务2的返回值情况下执行任务3,并且任务3有返回值
CompletableFuture<String> future3 = future01.thenCombineAsync(future02,
                                                              (f1, f2) -> f1 + ":" + f2 + "->haha", 
                                                              executor);
String str = future3.get();
System.out.println(str);
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

2.4 、两任务并行执行,其中一个执行完,就执行新任务。

解释:两任务并行执行,只要其中有一个执行完,就开始执行新任务。

// 任务并行执行,只要其中有一个执行完,就开始执行新任务action,新任务无入参,无返回值
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action);
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action);
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor);

// 任务并行执行,只要其中有一个执行完,就开始执行新任务action,新任务有入参(入参类型为Object,因为不确定是哪个任务先执行完成),无返回值
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action);
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action);
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action,Executor executor);

// 任务并行执行,只要其中有一个执行完,就开始执行新任务action,新任务有入参(入参类型为Object,因为不确定是哪个任务先执行完成),有返回值
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn);
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn);
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn,Executor executor);
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

例子:

// 任务1
CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务1线程:" + Thread.currentThread().getId());
    int i = 10 / 4;
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("任务1结束:");

    return i;
}, executor);

// 任务2
CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务2线程:" + Thread.currentThread().getId());
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("任务2结束");

    return "Hello";
}, executor);

// 任务1和任务2并行执行,只要有一个执行完成,就执行任务3,不使用任务1 或 任务2线程的结果,并且任务3没有返回值
future01.runAfterEitherAsync(future02,()-> System.out.println("任务3开始,之前的结果"), executor);

// 任务1和任务2并行执行,只要有一个执行完成,就执行任务3,使用任务1 或 任务2线程的结果,并且任务3没有返回值
future01.acceptEitherAsync(future02, (res)-> System.out.println("任务3开始,之前的结果" + res), executor);

// 任务1和任务2并行执行,只要有一个执行完成,就执行任务3,使用任务1 或 任务2线程的结果,并且任务3有返回值
CompletableFuture<Object> future = future01.applyToEitherAsync(future02, (res) -> {
    System.out.println("任务3开始,之前的结果" + res);
    return res.toString() + "->哈哈";
}, executor);
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

2.5、多任务组合(只要有一个执行完就返回)

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);
 
  • 1

例子:

CompletableFuture<String> future01 = CompletableFuture.supplyAsync(() -> "任务1", executorService);
CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> "任务2", executorService);
CompletableFuture<String> future03 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "任务3";
}, executorService);

// 只要异步线程队列有一个任务率先完成就返回,这个特性可以用来获取最快的那个线程结果。
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future01, future02, future03);

// 获取若干个任务中最快完成的任务结果
// .join()和.get()都会阻塞并获取线程的执行情况
// .join()会抛出未经检查的异常,不会强制开发者处理异常 .get()会抛出检查异常,需要开发者处理
Object o1 = anyOf.get();
Object o2 = anyOf.join();
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

2.6、多任务组合(全部执行完才返回)

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);
 
  • 1

例子:

CompletableFuture<String> future01 = CompletableFuture.supplyAsync(() -> "任务1", executorService);
CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> "任务2", executorService);
CompletableFuture<String> future03 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "任务3";
}, executorService);

// 串联起若干个线程任务, 没有返回值
CompletableFuture<Void> all = CompletableFuture.allOf(future01, future02, future03);
// 等待所有线程执行完成
// .join()和.get()都会阻塞并获取线程的执行情况
// .join()会抛出未经检查的异常,不会强制开发者处理异常 .get()会抛出检查异常,需要开发者处理
all.join();
all.get();
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
 

标签:executor,就够,任务,CompletableFuture,返回值,线程,public
From: https://www.cnblogs.com/wanghengbin/p/18019890

相关文章

  • C# 多线程
    什么是线程线程是操作系统中能够独立运行的最小单位,也是程序中能够并发执行的一段指令序列。线程是进程的一部分,一个进程可以包含多个线程,这些线程共享进程的资源。进程有入口线程,也可以创建更多的线程。为什么要多线程批量重复任务希望同时进行多个不同任务希望同时进行......
  • 监控Java虚拟线程
    目录监控Java虚拟线程简介虚拟线程监控的具体细节跟踪牵制线程(pinnedthreads)我的框架如何使用虚拟线程?监控ForkJoinPool结论参考监控Java虚拟线程开发便利性与运行高效性简介在我之前的文章中,我们已经讨论了什么是虚拟线程(VTs),他们与物理线程(PTs)之间的区别,以及如......
  • 多线程系列(一) -线程技术入门知识讲解
    一、简介在很多场景下,我们经常听到采用多线程编程,能显著的提升程序的执行效率。例如执行大批量数据的插入操作,采用单线程编程进行插入可能需要30分钟,采用多线程编程进行插入可能只需要5分钟就够了。既然多线程编程技术如此厉害,那什么是多线程呢?在介绍多线程之前,我们还得先......
  • Java21 + SpringBoot3使用Spring Security时如何在子线程中获取到认证信息
    目录前言原因分析解决方案方案1:手动设置线程中的认证信息方案2:使用DelegatingSecurityContextRunnable创建线程方案3:修改SpringSecurity安全策略通过设置JVM参数修改安全策略通过SecurityContextHolder修改安全策略总结前言近日心血来潮想做一个开源项目,目标是做一款可以适配多......
  • C#多线程精解:优雅终止线程的实用方法与技巧
     概述:在C#多线程编程中,合理终止线程是关键挑战。通过标志位或CancellationToken,实现安全、协作式的线程终止,确保在适当时机终止线程而避免资源泄漏。应用场景:在C#多线程编程中,有时需要终止正在运行的线程,例如在用户取消操作、程序关闭等情况下。思路:线程终止通常涉及到合......
  • 线程池参数
    1,线程池创建importjava.util.concurrent.ThreadPoolExecutor;ThreadPoolExecutorthreadPool=newThreadPoolExecutor(7个参数); 2,线程池参数介绍intcorePoolSize:核心线程数;intmaximumPoolSize:最大线程数;longkeepAliveTime:线程空闲的存活时间;Ti......
  • 关于多线程的介绍
    一、进程与线程1.进程:进程是操作系统中一种非常重要的软件资源,当我们把一个可执行程序exe运行起来的时候,系统就会随之创建一个进程,如果这个程序结束系统会随之销毁对应的进程。当运行exe文件时,exe文件中的很多内容都加载到内存中,通过分配资源来执行这个程序包含的指令的过程叫......
  • 线程池和进程池
    线程池和进程池(1)简介在Python中,线程池(ThreadPoolExecutor)和进程池(ProcessPoolExecutor)是用于并发执行任务的两种机制。它们都可以有效地管理并发执行的任务,并且能够自动管理线程或进程的生命周期,从而简化了并发编程。(1)线程池基于线程的并发:线程池利用了线程的并发执行来......
  • C++多线程 第五章 C++内存模型和原子类型
    第五章C++内存模型和原子类型无论其他语言如何,C++是一门系统编程语言.委员会希望不再需要一个比C++低级的语言.内存模型基础C++程序中所有的数据均是由对象(object)组成的.C++标准定义对象为"存储区域",经管它会为这些对象分配属于它们的类型和生存期.无论什么类型,对象......
  • 线程池工作流程 工厂流水线打比方
    线程池工作原理关于线程池的工作原理,我用下面的7幅图来展示。1.通过execute方法提交任务时,当线程池中的线程数小于corePoolSize时,新提交的任务将通过创建一个新线程来执行,即使此时线程池中存在空闲线程。2.通过execute方法提交任务时,当线程池中线程数量达到corePoolSize时,新......