首页 > 其他分享 >CompletableFuture使用说明

CompletableFuture使用说明

时间:2024-03-03 14:11:06浏览次数:41  
标签:System 说明 任务 CompletableFuture 使用 println 线程 out

前言
创建线程的方式只有两种:继承Thread或者实现Runnable接口。 但是这两种方法都存在一个缺陷,没有返回值

Java 1.5 以后,可以通过向线程池提交一个Callable来获取一个包含返回值的Future对象

Future接口的局限性
当Future的线程进行了一个非常耗时的操作,那我们的主线程也就阻塞了。

当我们在简单业务上,可以使用Future的另一个重载方法get(long,TimeUnit)来设置超时时间,避免我们的主线程被无穷尽地阻塞。

单纯使用Future接口或者FutureTask类并不能很好地完成以下我们所需的业务

将两个异步计算合并为一个,这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果
等待Future集合中的所有任务都完成。
仅等待Future集合种最快结束的任务完成,并返回它的结果。
通过编程方式完成一个Future任务的执行
当Future的完成时间完成时会收到通知,并能使用Future的计算结果进行下一步的的操作,不只是简单地阻塞等待操作的结果
什么是CompletableFuture
在Java 8中, 新增类: CompletableFuture,结合了Future的优点,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果

CompletableFuture被设计在Java中进行异步编程。主线程不用为了任务的完成而阻塞/等待,你可以用主线程去并行执行其他的任务。 使用这种并行方式,极大地提升了程序的表现。

CompletableFuture实现了Future接口,因此有异步执行返回结果的能力。
CompletableFuture实现了CompletionStage接口,该接口是Java8新增得一个接口,用于异步执行中的阶段处理,其大量用在Lambda表达式计算过程中,目前只有CompletableFuture一个实现类。
public class CompletableFuture implements Future, CompletionStage {
方法命名规则
带有Async后缀方法都是异步另外线程执行,没有就是复用之前任务的线程

带有Apply标识方法都是可以获取返回值+有返回值的

带有Accept标识方法都是可以获取返回值

带有run标识的方法不可以获取返回值和无返回值,只是运行

get方法和join方法
join:阻塞获取结果或抛出非受检异常。
get: 阻塞获取结果或抛出受检测异常,需要显示进行try...catch处理
不同线程池使用
默认线程池执行
/**

  • 默认线程池
  • 运行结果:
  • main.................start.....
  • main.................end......
  • 当前线程:ForkJoinPool.commonPool-worker-9
  • 运行结果:5
    */
    @Test
    public void defaultThread() {
    System.out.println("main.................start.....");
    CompletableFuture.runAsync(() -> {
    System.out.println("当前线程:" + Thread.currentThread().getName());
    int i = 10 / 2;
    System.out.println("运行结果:" + i);
    });
    System.out.println("main.................end......");
    }

默认使用 ForkJoinPool.commonPool(),commonPool是一个会被很多任务共享的线程池,commonPool 设计时的目标场景是运行 非阻塞的 CPU 密集型任务,为最大化利用 CPU,其线程数默认为 CPU 数量- 1

哪些地方使用了commonPool
CompletableFuture
Parallel Streams
为什么要引入commonPool
为了避免任何并行操作都引入一个线程池,最坏情况会导致在单个JVM上创建了太多的池线程,降低效率。
commonPool线程池是怎么创建和使用的
ForkJoinTask一定会运行在一个ForkJoinPool中,如果没有显式地交它提交到ForkJoinPool,会使用一个common池(全进程共享)来执行任务。
自定义线程池执行
自定义一个线程池

private ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue());
使用定义的线程池

/**

  • 自定义线程池
  • 运行结果:
  • main.................start.....
  • main.................end......
  • 当前线程:pool-1-thread-1
  • 运行结果:5
    */
    @Test
    public void myThread() {
    System.out.println("main.................start.....");
    CompletableFuture.runAsync(() -> {
    System.out.println("当前线程:" + Thread.currentThread().getName());
    int i = 10 / 2;
    System.out.println("运行结果:" + i);
    },executor);
    System.out.println("main.................end......");
    }

开启一个异步
runAsync-无返回值
使用runAsync开启一个异步任务线程,该方法无结果返回,适合一些不需要结果的异步任务

/***

  • 无返回值
  • runAsync
  • 结果:
  • main.................start.....
  • main.................end......
  • 当前线程:33
  • 运行结果:5
    */
    @Test
    public void runAsync() {
    System.out.println("main.................start.....");
    CompletableFuture.runAsync(() -> {
    System.out.println("当前线程:" + Thread.currentThread().getId());
    int i = 10 / 2;
    System.out.println("运行结果:" + i);
    }, executor);
    System.out.println("main.................end......");
    }

supplyAsync-有返回值
使用completableFuture.get()方法获取结果,这时程序会阻塞到这里直到结果返回。

/**

  • 有返回值
  • supplyAsync
  • 结果:
  • main.................start.....
  • 当前线程:33
  • 运行结果:5
  • main.................end.....5
    */
    @Test
    public void supplyAsync() throws ExecutionException, InterruptedException {
    System.out.println("main.................start.....");
    CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
    System.out.println("当前线程:" + Thread.currentThread().getId());
    int i = 10 / 2;
    System.out.println("运行结果:" + i);
    return i;
    }, executor);
    System.out.println("main.................end....." + completableFuture.get());
    }

如果要超时就得往下执行,请使用completableFuture.get(long timeout, TimeUnit unit)方法。

线程串行化方法
带有Async后缀方法都是异步另外线程执行,没有就是复用之前任务的线程

thenApply-上面任务执行完执行+获取返回值+有返回值
/**

  • 上面任务执行完执行+可以拿到结果+可以返回值

  • 结果:

  • thenApplyAsync当前线程:33

  • thenApplyAsync运行结果:5

  • thenApplyAsync任务2启动了。。。。。上步结果:5

  • main.................end.....hello10

  • @throws ExecutionException

  • @throws InterruptedException
    */
    @Test
    public void thenApplyAsync() throws ExecutionException, InterruptedException {

    CompletableFuture thenApplyAsync = CompletableFuture.supplyAsync(() -> {
    System.out.println("thenApplyAsync当前线程:" + Thread.currentThread().getId());
    int i = 10 / 2;
    System.out.println("thenApplyAsync运行结果:" + i);
    return i;
    }, executor).thenApplyAsync(result -> {
    System.out.println("thenApplyAsync任务2启动了。。。。。上步结果:" + result);
    return "hello" + result * 2;
    }, executor);
    System.out.println("main.................end....." + thenApplyAsync.get());
    }

thenAccept-上面任务执行完执行+获取返回值
/**

  • 上面任务执行完执行+可以拿到结果
  • 结果:
  • thenAcceptAsync当前线程:33
  • thenAcceptAsync运行结果:5
  • thenAcceptAsync任务2启动了。。。。。上步结果:5
  • @throws ExecutionException
  • @throws InterruptedException
    */
    @Test
    public void thenAcceptAsync() throws ExecutionException, InterruptedException {
    CompletableFuture thenAcceptAsync = CompletableFuture.supplyAsync(() -> {
    System.out.println("thenAcceptAsync当前线程:" + Thread.currentThread().getId());
    int i = 10 / 2;
    System.out.println("thenAcceptAsync运行结果:" + i);
    return i;
    }, executor).thenAcceptAsync(result -> {
    System.out.println("thenAcceptAsync任务2启动了。。。。。上步结果:" + result);
    }, executor);
    }

thenRun-上面任务执行完执行
/**

  • 上面任务执行完执行
  • 结果
  • main.................start.....
  • 当前线程:33
  • 运行结果:5
  • 任务2启动了。。。。。
    */
    @Test
    public void thenRunAsync() throws ExecutionException, InterruptedException {
    System.out.println("main.................start.....");
    CompletableFuture voidCompletableFuture = CompletableFuture.supplyAsync(() -> {
    System.out.println("当前线程:" + Thread.currentThread().getId());
    int i = 10 / 2;
    System.out.println("运行结果:" + i);
    return i;
    }, executor).thenRunAsync(() -> {
    System.out.println("任务2启动了。。。。。");
    }, executor);
    }
    thenCompose-接收返回值并生成新的任务
    当原任务完成后接收返回值,返回一个新的任务

thenApply()转换的是泛型中的类型,相当于将CompletableFuture 转换生成新的CompletableFuture
thenCompose()用来连接两个CompletableFuture,是生成一个新的CompletableFuture。
/**

  • 当原任务完成后接收返回值,返回一个新的任务

  • 结果:

  • hello: thenCompose
    /
    @Test
    public void thenCompose() {
    CompletableFuture cf = CompletableFuture.completedFuture("hello")
    .thenCompose(str -> CompletableFuture.supplyAsync(() -> {
    return str + ": thenCompose";
    },executor));
    System.out.println(cf.join());
    }
    任务组合
    thenCombine-消费两个结果+返回结果
    /
    *

  • 两任务组合 都要完成

  • completableFuture.thenCombine()获取两个future返回结果,有返回值

  • 结果:

  • 任务1线程:33

  • 任务1运行结果:5

  • 任务2线程:34

  • 任务2运行结果:

  • 任务5启动。。。结果1:5。。。结果2:hello

  • 任务5结果hello-->5
    */
    @Test
    public void thenCombine() throws ExecutionException, InterruptedException {
    CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务1线程:" + Thread.currentThread().getId());
    int i = 10 / 2;
    System.out.println("任务1运行结果:" + i);
    return i;
    }, executor);

    CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务2线程:" + Thread.currentThread().getId());
    System.out.println("任务2运行结果:");
    return "hello";
    }, executor);
    CompletableFuture thenCombineAsync = future1.thenCombineAsync(future2, (result1, result2) -> {
    System.out.println("任务5启动。。。结果1:" + result1 + "。。。结果2:" + result2);
    return result2 + "-->" + result1;
    }, executor);
    System.out.println("任务5结果" + thenCombineAsync.get());
    }
    thenAcceptBoth-消费两个结果+无返回
    /**

  • 两任务组合 都要完成

  • completableFuture.thenAcceptBoth() 获取两个future返回结果,无返回值

  • 结果:

  • 任务1线程:33

  • 任务1运行结果:5

  • 任务2线程:34

  • 任务2运行结果:

  • 任务4启动。。。结果1:5。。。结果2:hello
    */
    @Test
    public void thenAcceptBothAsync() throws ExecutionException, InterruptedException {
    CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务1线程:" + Thread.currentThread().getId());
    int i = 10 / 2;
    System.out.println("任务1运行结果:" + i);
    return i;
    }, executor);

    CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务2线程:" + Thread.currentThread().getId());
    System.out.println("任务2运行结果:");
    return "hello";
    }, executor);

    CompletableFuture thenAcceptBothAsync = future1.thenAcceptBothAsync(future2, (result1, result2) -> {
    System.out.println("任务4启动。。。结果1:" + result1 + "。。。结果2:" + result2);
    }, executor);

}
runAfterBoth-两个任务完成接着运行
/**

  • 两任务组合 都要完成

  • completableFuture.runAfterBoth() 组合两个future

  • 结果:

  • 任务1线程:33

  • 任务1运行结果:5

  • 任务2线程:34

  • 任务2运行结果:

  • 任务3启动。。。
    */
    @Test
    public void runAfterBothAsync() {
    CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务1线程:" + Thread.currentThread().getId());
    int i = 10 / 2;
    System.out.println("任务1运行结果:" + i);
    return i;
    }, executor);

    CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务2线程:" + Thread.currentThread().getId());
    System.out.println("任务2运行结果:");
    return "hello";
    }, executor);

    CompletableFuture runAfterBothAsync = future1.runAfterBothAsync(future2, () -> {
    System.out.println("任务3启动。。。");
    }, executor);

}
两任务完成一个就执行
applyToEither-其中一个执行完执行+获取返回值+有返回值
/**

  • 两任务组合,一个任务完成就执行

  • objectCompletableFuture.applyToEither() 其中一个执行完执行+获取返回值+有返回值

  • 结果:

  • 任务1线程:33

  • 任务2线程:34

  • 任务2运行结果:

  • 任务5开始执行。。。结果:hello

  • 任务5结果:hello world

  • Process finished with exit code 0
    */
    @Test
    public void applyToEither() throws ExecutionException, InterruptedException {
    CompletableFuture

  • 两任务组合,一个任务完成就执行

  • objectCompletableFuture.acceptEither() 其中一个执行完执行+获取返回值

  • 结果:

  • 任务1线程:33

  • 任务2线程:34

  • 任务2运行结果:

  • 任务4开始执行。。。结果:hello
    */
    @Test
    public void acceptEither() {
    CompletableFuture

}

runAfterEither-有一任务完成就执行
/**

  • 两任务组合,一个任务完成就执行

  • objectCompletableFuture.runAfterEither() 其中一个执行完执行

  • 结果:

  • 任务1线程:33

  • 任务2线程:34

  • 任务2运行结果:

  • 任务3开始执行。。。
    */
    @Test
    public void runAfterEither() {
    CompletableFuture

多任务组合
allOf-等待全部完成后才执行
/**

  • 多任务组合

  • allOf 等待所有任务完成

  • 结果:

  • 任务1

  • 任务3

  • 任务2

  • allOf任务1-------任务2-------任务3

  • @throws ExecutionException

  • @throws InterruptedException
    */
    @Test
    public void allOf() throws ExecutionException, InterruptedException {
    CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务1");
    return "任务1";
    }, executor);
    CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
    try {
    Thread.sleep(2000);
    System.out.println("任务2");
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    return "任务2";
    }, executor);
    CompletableFuture future3 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务3");
    return "任务3";
    }, executor);

    CompletableFuture allOf = CompletableFuture.allOf(future1, future2, future3);
    //等待所有任务完成
    //allOf.get();
    allOf.join();
    System.out.println("allOf" + future1.get() + "-------" + future2.get() + "-------" + future3.get());

}
anyOf-等待其中之一完成后就执行

/**

  • 多任务组合

  • anyOf 只要一个任务完成

  • 结果:

  • 任务1

  • anyOf--最先完成的是任务1

  • 任务3

  • 等等任务2

  • 任务2

  • @throws ExecutionException

  • @throws InterruptedException
    */
    @Test
    public void anyOf() throws ExecutionException, InterruptedException {
    CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务1");
    return "任务1";
    }, executor);
    CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
    try {
    Thread.sleep(2000);
    System.out.println("任务2");
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    return "任务2";
    }, executor);

    CompletableFuture future3 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务3");
    return "任务3";
    }, executor);
    CompletableFuture

/**

  • 入参为结果或者异常,返回新结果
  • 结果:
  • main.................start.....
  • 当前线程:33
  • main.................end.....报错返回
    */
    @Test
    public void handle() throws ExecutionException, InterruptedException {
    System.out.println("main.................start.....");
    final CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
    System.out.println("当前线程:" + Thread.currentThread().getId());
    int i = 10 / 0;
    System.out.println("运行结果:" + i);
    return i;
    }, executor).handleAsync((in, throwable) -> {
    if (throwable != null) {
    return "报错返回";
    }
    return "正确了";
    });
    System.out.println("main.................end....." + completableFuture.get());

}
whenComplete-感知结果或异常并返回相应信息
whenComplete虽然得到异常信息,但是不能修改返回信息

/**

  • 有返回值并且有后续操作 whenComplete

  • 结果:

  • main.................start.....

  • 当前线程:33

  • 异步完成。。。。结果是:null...异常是:java.util.concurrent.CompletionException: java.lang.ArithmeticException: 除以零

  • 报错了2

  • @throws ExecutionException

  • @throws InterruptedException
    */
    @Test
    public void whenComplete() {
    System.out.println("main.................start.....");
    final CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
    System.out.println("当前线程:" + Thread.currentThread().getId());
    int i = 10 / 0;
    System.out.println("运行结果:" + i);
    return i;
    }, executor).whenComplete((result, throwable) -> {
    //whenComplete虽然得到异常信息,但是不能修改返回信息
    System.out.println("异步完成。。。。结果是:" + result + "...异常是:" + throwable);
    });

    try {
    System.out.println("main.................end..T..." + completableFuture.get());
    } catch (InterruptedException e) {
    System.out.println("报错了1");
    } catch (ExecutionException e) {
    System.out.println("报错了2");
    }
    }
    exceptionally-捕获异常并返回指定值
    /**

  • 方法完成后的感知

  • 感知错误并返回指定值 exceptionally

  • 结果:

  • main.................start.....

  • 当前线程:33

  • 执行了exceptionally

  • main.................end.....0

  • @throws ExecutionException

  • @throws InterruptedException
    */
    @Test
    public void exceptionally() throws ExecutionException, InterruptedException {
    System.out.println("main.................start.....");
    CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
    System.out.println("当前线程:" + Thread.currentThread().getId());
    int i = 10 / 0;
    System.out.println("运行结果:" + i);
    return i;
    }, executor).exceptionally(throwable -> {
    //R apply(T t);
    //exceptionally可以感知错误并返回指定值
    System.out.println("执行了exceptionally");
    return 0;
    });
    System.out.println("main.................end....." + completableFuture.get());
    }

标签:System,说明,任务,CompletableFuture,使用,println,线程,out
From: https://www.cnblogs.com/pp531/p/18049986

相关文章

  • 使用 CSS 如何禁用浏览器打印页面 All In One
    使用CSS如何禁用浏览器打印页面AllInOneprint.css禁用PDF导出网页有时,你的网站或应用程序可能希望改善用户在打印内容时的体验。有几种可能的情况:你希望根据纸张的大小和形状调整布局。你希望使用不同的样式来增强纸上内容的外观。你希望使用更高分辨率的图像以......
  • PowerShell中,你可以使用以下命令来操作Windows防火墙并记录流量信息
    在PowerShell中,你可以使用以下命令来操作Windows防火墙并记录流量信息:操作Windows防火墙:查看当前的防火墙规则:powershellCopyCodeGet-NetFirewallRule创建新的防火墙规则:powershellCopyCodeNew-NetFirewallRule-DisplayName"MyFirewallRule"-DirectionInbound-A......
  • 无法创建spring2.X版本,无法使用JDK8, 用idea创建spring2.X版本,使用JDK8解决方案
    1、解释原因spring2.X版本在2023年11月24日停止维护了,因此创建spring项目时不再有2.X版本的选项,只能从3.1.X版本开始选择而Spring3.X版本不支持JDK8,最低支持JDK17,因此JDK8也无法主动选择了当然,停止维护只代表我们无法用idea主动创建spring2.X版本的项目了,不代表我们无法使用,该......
  • 在Avalonia项目中使用MediatR和MS.DI库实现事件驱动通信
    大家好,我是沙漠尽头的狼!AvaloniaUI是一个强大的跨平台.NET客户端开发框架,让开发者能够针对Windows、Linux、macOS、Android和iOS等多个平台构建应用程序。在构建复杂的应用程序时,模块化和组件间的通信变得尤为重要。Prism框架提供了模块化的开发方式,支持插件的热拔插,而MediatR则......
  • 使用git
    1.使用gitGit是以仓库为单位。先创建仓库才能使用Git,不同的仓库之间相互隔离仓库分为2种:本地:存储文件夹中远程:存储在互联网中初级使用流程从远程克隆(clone)本地在本地修改代码,完成之后进行提交(commit)从本地推送(push)到远程中级使用流程从远程克隆(clone)本地在本地创......
  • Go语言精进之路读书笔记第43条——使用testdata管理测试依赖的外部数据文件
    43.1testdata目录Go语言规定:Go工具链将忽略名为testdata的目录。开发者可以在名为testdata的目录下存放和管理测试代码依赖的数据文件,数据文件可作为输入也可作为输出gotest命令在执行时会将被测试程序包源码所在目录设置为其工作目录,可以这样使用f,err:=os.Open("testda......
  • Blazor使用QuickGrid
    @usingMicrosoft.AspNetCore.Components.QuickGrid<PageTitle>PromotionGrid</PageTitle><h1>PromotionGridExample</h1><QuickGridItems="@people"><PropertyColumnProperty="@(p=>p.PersonId)"Sor......
  • go1.22 新特性(日常使用相关)
    for循环循环共享变量问题Go在1.22版本之前,for循环迭代器的变量是一个单一变量,使用不当,会导致意想不到的行为,可能会造成共享循环变量的问题。如依旧要使用旧版本,可以主动配置GOEXPERIMENT=loopvarpackagemainimport( "fmt" "time")funcmain(){ nums:=[]int{1......
  • Ehcache 介绍(2)--Ehcache2 基本使用
    本文主要介绍Ehacche2的基本使用,文中所使用到的软件版本:Java1.8.0_341、Ehcache2.10.9.2。1、引入依赖<dependency><groupId>net.sf.ehcache</groupId><artifactId>ehcache</artifactId><version>2.10.9.2</version></dependency>......
  • 使用TensorRT-LLM进行生产环境的部署指南
    TensorRT-LLM是一个由Nvidia设计的开源框架,用于在生产环境中提高大型语言模型的性能。该框架是基于TensorRT深度学习编译框架来构建、编译并执行计算图,并借鉴了许多FastTransformer中高效的Kernels实现,并且可以利用NCCL完成设备之间的通讯。虽然像vLLM和TGI这样的框架是......