使用 CompletableFuture 可以大大简化处理多线程之间的异步调用关系,如串行依赖、并行、聚合等等。
CompletableFuture 是对 Future 接口的扩展和增强,进行了丰富的接口方法扩展,完美的弥补了 Future 的不足。
本篇博客通过代码的方式,展示 CompletableFuture 的常用方法,体验其强大灵活的异步任务编排能力,在博客的最后会提供源代码的下载。
一、搭建工程
新建一个名称为 springboot_completablefuture 的 SpringBoot 工程,其结构如下:
为了能够更简单直观的展示 CompletableFuture 的用法,本篇博客的 demo 也是尽可能的简单简化,示例代码都是在测试代码中。
首先看一下 pom 文件,非常简单,由于不需要对外提供 web 接口,因此只引入了基本的 springboot 依赖包,以及测试依赖包。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.jobs</groupId>
<artifactId>springboot_completablefuture</artifactId>
<version>1.0</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.5</version>
</parent>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
</project>
二、最基本最常用的方法
最基本最常用的方法,就是 runAsync 和 supplyAsync 了,详细的介绍内容,可以查看代码中的注释。
package com.jobs;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.concurrent.*;
//展示常用的 runAsync 和 supplyAsync 的用法
@SpringBootTest
public class AppTest1 {
/*
//runAsync 方法以Runnable函数式接口类型为参数,没有返回结果
public static CompletableFuture<Void> runAsync(Runnable runnable)
//supplyAsync 方法以Supplier函数式接口类型为参数,调用其 get() 方法是有返回值的(会阻塞)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
*/
@Test
void test1() throws Exception {
CompletableFuture.runAsync(() -> {
System.out.println("调用runAsync没有返回结果");
});
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("调用supplyAsync有返回结果");
return "调用supplyAsync成功";
});
//最好使用带有超时限制的 get 方法获取返回值,否则程序会一直被阻塞
System.out.println(future.get(3, TimeUnit.SECONDS));
}
/*
//没有指定线程池的话,CompletableFuture 会默认使用公共的 ForkJoinPool.commonPool() 作为它的线程池执行异步代码。
//如果指定线程池,则使用指定的线程池运行。如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,
//就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。
//所以,强烈建议你要根据不同的业务类型创建不同的自定义线程池,以避免互相干扰。
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
*/
@Test
void test2() {
ThreadPoolExecutor executor
= new ThreadPoolExecutor(1, 5, 20, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
CompletableFuture<Void> runFuture = CompletableFuture.runAsync(() -> {
System.out.println("使用线程池执行runAsync...");
}, executor);
CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("使用线程池执行supplyAsync...");
return "调用supplyAsync成功...";
}, executor);
//使用 join 和 get 都可以获取到返回值,区别在于:
//使用 join 不需要使用 try catch 包裹,去处理异常。
//使用 get 必须要使用 try catch 处理异常,或者将异常抛给调用者处理
System.out.println(supplyFuture.join());
executor.shutdown();
}
}
三、常用的任务连接方法
常用的主要有:thenRun 、thenAccept 、thenApply 、exceptionally 等,具体详细介绍可查看代码中的注释
package com.jobs;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.concurrent.CompletableFuture;
//展示 thenRun 、thenAccept 、 thenApply 、 exceptionally 等用法
@SpringBootTest
public class AppTest2 {
/*
//thenRun 会在上一个 CompletableFuture 计算完成的时候执行一个 Runnable
//Runnable 并不使用上一个 CompletableFuture 计算的结果。
//因此使用 thenRun 方法,相当于进行了一个异步回调操作
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
*/
@Test
void test1() {
CompletableFuture.supplyAsync(() -> {
System.out.println("第一阶段已经调用...");
return "第一阶段调用成功...";
}).thenRun(() -> System.out.println("thenRun被调用..."));
//调用 thenRun 方法执行时,则第二个任务和第一个任务是共用同一个线程池。
//调用 thenRunAsync 执行时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是公共的 ForkJoin 线程池
//下面介绍的 thenAccept 和 thenAcceptAsync,thenApply 和 thenApplyAsync 等等,都是这个区别,后面不再赘述
}
/*
第一个任务执行完成后,thenAccept 方法会将执行结果,作为入参,执行第二个任务,但是第二个任务不允许有返回结果。
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
*/
@Test
void test2() {
CompletableFuture.supplyAsync(() -> {
System.out.println("第一个任务执行了...");
return "第一个任务执行完了";
}
).thenAccept((r) -> {
System.out.println("第一个任务的执行结果是:" + r);
System.out.println("第二个任务执行完了");
});
}
/*
第一个任务执行完成后,thenApply 方法会将执行结果,作为入参,执行第二个任务,第二个任务是有返回结果的。
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
*/
@Test
void test3() {
CompletableFuture<String> future =
CompletableFuture.supplyAsync(() -> {
System.out.println("第一个任务已经执行了...");
int num = 1 + 2;
return num;
}
).thenApply((r) -> {
int result = 10 + r;
return "计算的最终结果是:" + result;
});
System.out.println(future.join());
}
/*
thenCompose 与 thenApply 使用的效果相同,只是换了一种编码方式,建议还是使用 thenApply 比较直观一些
我个人认为,只需要会用 thenApply 即可,thenCompose 没啥用处,只需要了解即可,不需要掌握。
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;
*/
@Test
void test4() {
CompletableFuture<String> future =
CompletableFuture.supplyAsync(() -> {
System.out.println("第一个任务已经执行了...");
int num = 1 + 2;
return num;
}
).thenCompose(r ->
CompletableFuture.supplyAsync(() -> {
int result = 10 + r;
return "计算的最终结果是:" + result;
})
);
System.out.println(future.join());
}
/*
whenComplete 在功能上与 thenRun 很相似,唯一的区别是:
whenComplete 有返回值,但是它的返回值是上一个任务的返回值,自己的方法中不能有返回值。
我个人认为,whenComplete 没啥用处,只需要了解即可,不需要掌握。
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
*/
@Test
void test5() {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("第一个任务已经执行了...");
int num = 1 + 2;
return num;
}
);
CompletableFuture<Integer> future2 = future1.whenComplete((r, throwable) -> {
System.out.println("第二个任务已经执行了...");
});
System.out.println(future2.join());
}
/*
当某个任务执行异常时,可以使用 exceptionally 方法捕获异常信息
*/
@Test
void test6() {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
int result = 5 / 0;
System.out.println("执行方法完成");
return "运行结果是:" + result;
}
).exceptionally((e) -> {
return "异常信息:" + e.getMessage();
});
//必须调用 CompletableFuture 的 get 或 join 方法,才能获取到异常信息
System.out.println(future.join());
}
/*
handle方法,可以获取到某个任务的异常信息,handle 方法也可以返回处理结果
*/
@Test
void test7() {
CompletableFuture<String> future =
CompletableFuture.supplyAsync(() -> {
int result = 5 / 0;
System.out.println("执行方法完成");
return "运行结果是:" + result;
}
).handle((r, exception) -> {
System.out.println("上个任务执行完,传过来的结果是:" + r);
if (exception != null) {
System.out.println(exception.getMessage());
}
return "handle 已经处理完了...";
});
System.out.println(future.join());
}
}
四、任务之间的 and 连接和 or 连接
And 关系连接的方法主要是:thenCombine 、thenAcceptBoth 、runAfterBoth
Or 关系连接的方法主要是:applyToEither 、 acceptEither 、 runAfterEither
package com.jobs;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
//展示 and 关系 和 or 关系 的用法
@SpringBootTest
public class AppTest3 {
//and关系:thenCombine:会将两个任务的执行结果作为方法入参,传递到指定方法中,且有返回值
@Test
void test1() {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
int number1 = 5;
System.out.println("第一个任务:" + number1);
return number1;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
int number2 = 6;
System.out.println("第二个任务:" + number2);
return number2;
});
CompletableFuture<Integer> result = future1
.thenCombine(future2, (number1, number2) -> number1 + number2);
System.out.println("最终结果:" + result.join());
}
//and关系:thenAcceptBoth: 会将两个任务的执行结果作为方法入参,传递到指定方法中,且无返回值
@Test
void test2() {
CompletableFuture<Integer> futrue1 = CompletableFuture.supplyAsync(() -> {
int number1 = 5;
System.out.println("第一个任务:" + number1);
return number1;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
int number2 = 7;
System.out.println("第二个任务:" + number2);
return number2;
});
futrue1.thenAcceptBoth(future2,
(number1, number2) -> System.out.println("最终结果:" + (number1 + number2)));
}
//and关系:runAfterBoth 不会把执行结果当做方法入参,且没有返回值。
@Test
void test3() {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("第一个任务:1");
return 1;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("第二个任务:2");
return 2;
});
future1.runAfterBoth(future2, () -> System.out.println("上面两个任务都执行完成了。"));
}
//-------------------------------------------------
//or关系:applyToEither:会将已经执行完成的任务,作为方法入参,传递到指定方法中,且有返回值
@Test
void test4() throws InterruptedException {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
int number = new Random().nextInt(3);
System.out.println("第一个任务:" + number);
try {
TimeUnit.SECONDS.sleep(number);
} catch (InterruptedException e) {
e.printStackTrace();
}
return number;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
int number = new Random().nextInt(3);
System.out.println("第二个任务:" + number);
try {
TimeUnit.SECONDS.sleep(number);
} catch (InterruptedException e) {
e.printStackTrace();
}
return number;
});
//使用最新执行完的那个任务的返回结果,有返回结果
future1.applyToEither(future2, number -> {
System.out.println("最快结果:" + number);
return number;
});
TimeUnit.SECONDS.sleep(3);//线程阻塞的方法
}
//or关系:acceptEither: 会将已经执行完成的任务,作为方法入参,传递到指定方法中,且无返回值
@Test
void test5() throws InterruptedException {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
int number = new Random().nextInt(3);
System.out.println("第一个任务:" + number);
try {
TimeUnit.SECONDS.sleep(number);
} catch (InterruptedException e) {
e.printStackTrace();
}
return number;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
int number = new Random().nextInt(3);
System.out.println("第二个任务:" + number);
try {
TimeUnit.SECONDS.sleep(number);
} catch (InterruptedException e) {
e.printStackTrace();
}
return number;
});
//使用最新执行完的那个任务的返回结果,无返回结果
future1.acceptEither(future2, number -> System.out.println("最快结果:" + number));
TimeUnit.SECONDS.sleep(3);//线程阻塞的方法
}
//or关系:runAfterEither:不会把执行结果当做方法入参,且没有返回值。
@Test
void test6() throws InterruptedException {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
int number = new Random().nextInt(3);
System.out.println("第一个任务:" + number);
try {
TimeUnit.SECONDS.sleep(number);
} catch (InterruptedException e) {
e.printStackTrace();
}
return number;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
int number = new Random().nextInt(3);
System.out.println("第二个任务:" + number);
try {
TimeUnit.SECONDS.sleep(number);
} catch (InterruptedException e) {
e.printStackTrace();
}
return number;
});
//只要其中一个任务完成,就执行该方法,没有参数,也无返回结果
future1.runAfterEither(future2, () -> System.out.println("已经有一个任务完成了")).join();
TimeUnit.SECONDS.sleep(3);//线程阻塞的方法
}
}
五、多个任务之间的连接处理
主要有 anyOf 、 allOf ,以及在循环中使用 CompletableFuture ,具体细节看代码和相关注释
package com.jobs;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
//展示多个任务之间的连接处理
@SpringBootTest
public class AppTest4 {
/*
anyOf 方法的参数是多个给定的 CompletableFuture,当其中的任何一个完成时,方法返回这个 CompletableFuture。
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
*/
@Test
void test1() throws Exception {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
int number = new Random().nextInt(3);
System.out.println("第一个任务:" + number);
try {
TimeUnit.SECONDS.sleep(number);
} catch (InterruptedException e) {
e.printStackTrace();
}
return number;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
int number = new Random().nextInt(3);
System.out.println("第二个任务:" + number);
try {
TimeUnit.SECONDS.sleep(number);
} catch (InterruptedException e) {
e.printStackTrace();
}
return number;
});
CompletableFuture<Object> result = CompletableFuture.anyOf(future1, future2);
System.out.println(result.get());
TimeUnit.SECONDS.sleep(3);//线程阻塞的方法
}
/*
anyOf 方法的参数是多个给定的 CompletableFuture,其作用是等待所有的任务都完成,没有返回结果。
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
*/
@Test
void test2() throws InterruptedException {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
int number = new Random().nextInt(3);
System.out.println("第一个任务:" + number);
try {
TimeUnit.SECONDS.sleep(number);
} catch (InterruptedException e) {
e.printStackTrace();
}
return number;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
int number = new Random().nextInt(3);
System.out.println("第二个任务:" + number);
try {
TimeUnit.SECONDS.sleep(number);
} catch (InterruptedException e) {
e.printStackTrace();
}
return number;
});
CompletableFuture.allOf(future1, future2);
TimeUnit.SECONDS.sleep(3);//线程阻塞的方法
}
//在 for 循环中,批量使用 CompletableFuture 最后等待所有的执行结果
@Test
void test3() throws Exception {
List<CompletableFuture<Integer>> futureList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
futureList.add(CompletableFuture.supplyAsync(() -> {
List<Integer> numlist = getRandomNumber(10);
Collections.sort(numlist, (n1, n2) -> n1 > n2 ? -1 : 1);
//打印出倒序排列后的列表
System.out.println(numlist);
int max = numlist.get(0);
return max;
}));
}
//等待所有任务完成
CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]));
//获取每个任务的执行结果,然后求和
int sum = 0;
for (CompletableFuture<Integer> cf : futureList) {
Integer max = cf.get();
System.out.println("最大值为:" + max);
sum = sum + max;
}
System.out.println("求和后的结果为:" + sum);
}
//获取 num 个随机整数
List<Integer> getRandomNumber(Integer num) {
if (num <= 0) {
num = 10;
}
List<Integer> numlist = new ArrayList<>();
for (Integer i = 0; i < num; i++) {
Random rd = new Random();
int val = rd.nextInt(100);
numlist.add(val);
}
return numlist;
}
}
本篇博客的源代码下载地址为:https://files.cnblogs.com/files/blogs/699532/springboot_completablefuture.zip