文章目录
CompletableFuture是jdk8的新特性。CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步会点、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。
Future接口请看Future详解
CompletionStage 接口中的方法比较多,CompletableFuture 的函数式能力就是这个接口赋予的。从这个接口的方法参数你就可以发现其大量使用了 Java8 引入的函数式编程。
CompletableFuture有如下使用场景:
- 创建异步任务
- 简单任务异步回调
- 多个任务组合处理
一、创建 CompletableFuture
常见的创建 CompletableFuture 对象的方法如下:
- 通过
new
关键字 - 基于 CompletableFuture 自带的静态工厂方法:
runAsync()
、supplyAsync()
1.1 new 关键字
通过 new
关键字创建 CompletableFuture 对象这种使用方式可以看作将CompletableFuture当做 Future 来使用。下面咱们来看一个简单的案例。
CompletableFuture<String> resultFuture = new CompletableFuture<>();
// 假设在未来的某个时刻,我们得到了最终的结果。
//这时,我们可以调用 complete() 方法为其传入结果,
//这表示 resultFuture 已经被完成了。complete() 方法只能调用一次,后续调用将被忽略。
resultFuture.complete(rpcResponse);
//获取异步计算的结果也非常简单,直接调用 get() 方法即可。
//调用 get() 方法的线程会阻塞直到 CompletableFuture 完成运算。
rpcResponse = completableFuture.get();
1.2 静态工厂方法
如果你已经知道计算的结果的话,可以使用静态方法 completedFuture()
来创建 CompletableFuture
CompletableFuture<String> future = CompletableFuture.completedFuture("hello!");
assertEquals("hello!", future.get());
completedFuture()
方法底层调用的是带参数的 new
方法,只不过,这个方法不对外暴露。
public static <U> CompletableFuture<U> completedFuture(U value) {
return new CompletableFuture<U>((value == null) ? NIL : value);
}
二、创建异步任务
CompletableFuture创建异步任务,一般有supplyAsync
和runAsync
两个方法
2.1 supplyAsync
supplyAsync是创建带有返回值的异步任务。它有如下两个方法,一个是使用默认线程池(ForkJoinPool.commonPool()
)的方法,一个是带有自定义线程池的重载方法
// 带返回值异步请求,默认线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
// 带返回值的异步请求,可以自定义线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
演示代码(以supplyAsync(Supplier<U> supplier, Executor executor
为例):
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 自定义线程池
ExecutorService executorService = Executors.newSingleThreadExecutor();
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
System.out.println("do something....");
return "result";
}, executorService);
//等待子任务执行完成get()会阻塞,join()不会
System.out.println("结果->" + cf.get());
}
2.2 runAsync
runAsync是创建没有返回值的异步任务。它有如下两个方法,一个是使用默认线程池(ForkJoinPool.commonPool()
)的方法,一个是带有自定义线程池的重载方法
// 不带返回值的异步请求,默认线程池
public static CompletableFuture<Void> runAsync(Runnable runnable)
// 不带返回值的异步请求,可以自定义线程池
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
示例代码:
public class FutureTest {
public static void main(String[] args) {
//可以自定义线程池
ExecutorService executor = Executors.newCachedThreadPool();
//runAsync的使用
CompletableFuture<Void> runFuture = CompletableFuture.runAsync(() -> System.out.println("run,hello"), executor);
//supplyAsync的使用
CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(() -> {
System.out.print("supply,hello");
return "hello"; }, executor);
//runAsync的future没有返回值,输出null
System.out.println(runFuture.join());
//supplyAsync的future,有返回值
System.out.println(supplyFuture.join());
executor.shutdown(); // 线程池需要关闭
}
}
2.3 获取任务结果的方法
// 如果完成则返回结果,否则就抛出具体的异常
public T get() throws InterruptedException, ExecutionException
// 最大时间等待返回结果,否则就抛出具体异常
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
// 完成时返回结果值,否则抛出unchecked异常。为了更好地符合通用函数形式的使用,如果完成此 CompletableFuture所涉及的计算引发异常,则此方法将引发unchecked异常并将底层异常作为其原因
public T join()
// 如果完成则返回结果值(或抛出任何遇到的异常),否则返回给定的 valueIfAbsent。
public T getNow(T valueIfAbsent)
// 如果任务没有完成,返回的值设置为给定值
public boolean complete(T value)
// 如果任务没有完成,就抛出给定异常
public boolean completeExceptionally(Throwable ex)
三、异步回调处理
3.1 thenApply和thenApplyAsync
thenApply 表示某个任务执行完成后执行的动作,即回调方法,会将该任务的执行结果即方法返回值作为入参传递到回调方法中,带有返回值。
示例代码:
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread() + " cf1 do something....");
return 1;
});
CompletableFuture<Integer> cf2 = cf1.thenApplyAsync((result) -> {
System.out.println(Thread.currentThread() + " cf2 do something....");
result += 2;
return result;
});
//等待任务1执行完成
System.out.println("cf1结果->" + cf1.get());
//等待任务2执行完成
System.out.println("cf2结果->" + cf2.get());
}
//输出
先执行cf1再执行cf2
cf1结果->1
cf2结果->3
3.2 thenAccept和thenAcceptAsync
thenAccep表示某个任务执行完成后执行的动作,即回调方法,会将该任务的执行结果即方法返回值作为入参传递到回调方法中,无返回值。
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread() + " cf1 do something....");
return 1;
});
CompletableFuture<Void> cf2 = cf1.thenAccept((result) -> {
System.out.println(Thread.currentThread() + " cf2 do something....");
});
//等待任务1执行完成
System.out.println("cf1结果->" + cf1.get());
//等待任务2执行完成
System.out.println("cf2结果->" + cf2.get());
//输出
cf1结果->1
cf2结果->null
3.3 thenRun和thenRunAsync
thenRun表示某个任务执行完成后执行的动作,即回调方法,无入参,无返回值。
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread() + " cf1 do something....");
return 1;
});
CompletableFuture<Void> cf2 = cf1.thenRun(() -> {
System.out.println(Thread.currentThread() + " cf2 do something....");
});
//等待任务1执行完成
System.out.println("cf1结果->" + cf1.get());
//等待任务2执行完成
System.out.println("cf2结果->" + cf2.get());
}
//输出
cf1结果->1
cf2结果->null
3.4 whenComplete和whenCompleteAsync
whenComplete是当某个任务执行完成后执行的回调方法,会将执行结果或者执行期间抛出的异常传递给回调方法,如果是正常执行则异常为null,回调方法对应的CompletableFuture的result和该任务一致,如果该任务正常执行,则get方法返回执行结果,如果是执行异常,则get方法抛出异常。
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread() + " cf1 do something....");
int a = 1/0;
return 1;
});
CompletableFuture<Integer> cf2 = cf1.whenComplete((result, e) -> {
System.out.println("上个任务结果:" + result);
System.out.println("上个任务抛出异常:" + e);
System.out.println(Thread.currentThread() + " cf2 do something....");
});
// //等待任务1执行完成
// System.out.println("cf1结果->" + cf1.get());
// //等待任务2执行完成
System.out.println("cf2结果->" + cf2.get());
}
3.5 handle和handleAsync
跟whenComplete基本一致,区别在于handle的回调方法有返回值。
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread() + " cf1 do something....");
// int a = 1/0;
return 1;
});
CompletableFuture<Integer> cf2 = cf1.handle((result, e) -> {
System.out.println(Thread.currentThread() + " cf2 do something....");
System.out.println("上个任务结果:" + result);
System.out.println("上个任务抛出异常:" + e);
return result+2;
});
//等待任务2执行完成
System.out.println("cf2结果->" + cf2.get());
}
//输出
上个任务结果: 1
上个任务抛出异常:null
cf2结果-> 3
3.6 有无Async的区别
使用then***
方法时子任务与父任务使用的是同一个线程,而then***Async
在子任务中是另起一个线程执行任务,并且then***Async
可以自定义线程池,默认的使用ForkJoinPool.commonPool()
线程池。
四、多任务组合处理
4.1 AND组合关系
thenCombine
、thenAcceptBoth
和runAfterBoth
这三个方法都是将两个CompletableFuture组合起来处理,只有两个任务都正常完成时,才进行下阶段任务。
区别:
thenCombine
会将两个任务的执行结果作为所提供函数的参数,且该方法有返回值;thenAcceptBoth
同样将两个任务的执行结果作为方法入参,但是无返回值;runAfterBoth
没有入参,也没有返回值。注意两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。
示例代码:
public class ThenCombineTest {
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
CompletableFuture<String> first = CompletableFuture.completedFuture("第一个异步任务");
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> future = CompletableFuture
//第二个异步任务
.supplyAsync(() -> "第二个异步任务", executor)
// (w, s) -> System.out.println(s) 是第三个任务
.thenCombineAsync(first, (s, w) -> {
System.out.println(w);
System.out.println(s);
return "两个异步任务的组合";
}, executor);
System.out.println(future.join());
executor.shutdown();
}
}
//输出
第一个异步任务
第二个异步任务
两个异步任务的组合
4.2 OR 组合的关系
applyToEither
/ acceptEither
/ runAfterEither
都表示:将两个CompletableFuture组合起来,只要其中一个执行完了,就会执行某个任务。
区别在于:
applyToEither
:会将已经执行完成的任务,作为方法入参,传递到指定方法中,且有返回值acceptEither
: 会将已经执行完成的任务,作为方法入参,传递到指定方法中,且无返回值runAfterEither
: 不会把执行结果当做方法入参,且没有返回值。
public class AcceptEitherTest {
public static void main(String[] args) {
//第一个异步任务,休眠2秒,保证它执行晚点
CompletableFuture<String> first = CompletableFuture.supplyAsync(()->{
try{
Thread.sleep(2000L);
System.out.println("执行完第一个异步任务");}
catch (Exception e){
return "第一个任务异常";
}
return "第一个异步任务";
});
ExecutorService executor = Executors.newSingleThreadExecutor();
CompletableFuture<Void> future = CompletableFuture
//第二个异步任务
.supplyAsync(() -> {
System.out.println("执行完第二个任务");
return "第二个任务";}
, executor)
//第三个任务
.acceptEitherAsync(first, System.out::println, executor);
executor.shutdown();
}
}
//输出
执行完第二个任务
第二个任务
4.3 AllOf
所有任务都执行完成后,才执行allOf
返回的CompletableFuture。如果任意一个任务异常,allOf
的CompletableFuture,执行get
方法,会抛出异常
public class allOfFutureTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> cf1 = CompletableFuture.runAsync(()->{
System.out.println("我执行完了");
});
CompletableFuture<Void> cf2 = CompletableFuture.runAsync(() -> {
System.out.println("我也执行完了");
});
CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(cf1, cf2).whenComplete((m,k)->{
System.out.println("finish");
});
}
}
//输出
我执行完了
我也执行完了
finish
4.4 AnyOf
任意一个任务执行完,就执行anyOf
返回的CompletableFuture。如果执行的任务异常,anyOf
的CompletableFuture,执行get
方法,会抛出异常
public class AnyOfFutureTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> a = CompletableFuture.runAsync(()->{
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("我执行完了");
});
CompletableFuture<Void> b = CompletableFuture.runAsync(() -> {
System.out.println("我也执行完了");
});
CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(a, b).whenComplete((m,k)->{
System.out.println("finish");
// return "捡田螺的小男孩";
});
anyOfFuture.join();
}
}
//输出
我也执行完了
finish
4.5 thenCompose
thenCompose
方法会在某个任务执行完成后,将该任务的执行结果,作为方法入参,去执行指定的方法。该方法会返回一个新的CompletableFuture实例
- 如果该CompletableFuture实例的
result
不为null,则返回一个基于该result
新的CompletableFuture实例; - 如果该CompletableFuture实例为null,然后就执行这个新任务
public class ThenComposeTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> f = CompletableFuture.completedFuture("第一个任务");
//第二个异步任务
ExecutorService executor = Executors.newSingleThreadExecutor();
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "第二个任务", executor)
.thenComposeAsync(data -> {
System.out.println(data); return f; //使用第一个任务作为返回
}, executor);
System.out.println(future.join());
executor.shutdown();
}
}
//输出
第二个任务
第一个任务
五、使用注意点
5.1 Future需要获取返回值,才能获取异常信息
ExecutorService executorService = new ThreadPoolExecutor(5, 10, 5L,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
int a = 0;
int b = 666;
int c = b / a;
return true;
},executorService).thenAccept(System.out::println);
//如果不加 get()方法这一行,看不到异常信息
//future.get();
5.2 CompletableFuture的get()方法是阻塞的
CompletableFuture的get()方法是阻塞的,如果使用它来获取异步调用的返回值,需要添加超时时间
//反例
CompletableFuture.get();
//正例
CompletableFuture.get(5, TimeUnit.SECONDS);`
所以在实际开发中如果没有特殊需求尽量通过join()方法异步获取结果
5.3 默认线程池的注意点
CompletableFuture代码中又使用了默认的线程池,处理的线程个数是电脑CPU核数-1。在大量请求过来的时候,处理逻辑复杂的话,响应会很慢。一般建议使用自定义线程池
,优化线程池配置参数。
5.4 自定义线程池时,注意饱和策略
CompletableFuture的get()
方法是阻塞的,我们一般建议使用future.get(3, TimeUnit.SECONDS)
。并且一般建议使用自定义线程池。但是如果线程池拒绝策略是DiscardPolicy或者DiscardOldestPolicy,当线程池饱和时,会直接丢弃任务,不会抛弃异常。因此建议,CompletableFuture线程池策略最好使用AbortPolicy,然后耗时的异步线程,做好线程池隔离哈。