1. CompletableFuture 简介
在Java中CompletableFuture用于异步编程,异步编程是编写非阻塞的代码,运行的任务在一个单独的线程,与主线程隔离,并且会通知主线程它的进度,成功或者失败。
在这种方式中,主线程不会被阻塞,不需要一直等到子线程完成。主线程可以并行的执行其他任务。
使用这种并行方式,可以极大的提高程序的性能。
CompletableFuture 实现了 Future 和 CompletionStage接口,并且提供了许多关于创建,链式调用和组合多个 Future 的便利方法集,而且有广泛的异常处理支持。
CompletionStage 接口代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段
2. 问题场景
-
将两个异步计算合并为一个——这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果。
-
等待Future集合中的所有任务都完成。
-
仅等待Future集合中最快结束的任务完成(有可能因为它们试图通过不同的方式计算同一个值),并返回它的结果。
-
通过编程方式完成一个Future任务的执行(即以手工设定异步操作结果的方式)。
-
应对Future的完成事件(即当Future的完成事件发生时会收到通知,并能使用Future计算的结果进行下一步的操作,不只是简单地阻塞等待操作的结果)。
3. 方法详述
1. runAsync 和 supplyAsync方法
CompletableFuture 提供了四个静态方法来创建一个异步操作。
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
- runAsync 不支持返回值
- supplyAsync 支持返回值
public void runAsyncTest() throws ExecutionException, InterruptedException {
CompletableFuture<Void> f1 = CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
System.out.println("wpz");
}
});
CompletableFuture<Void> f2 = CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
System.out.println("pz");
}
});
//调用 get 方法就阻塞在这里
//f1.get();
//f2.get();
System.out.println(Thread.currentThread().getName() + ":main");
}
public void supplyAsyncTest() throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "wpz";
}
});
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "wpz";
}
});
//get() 方法有返回值,同样会阻塞主线程
System.out.println(future.get());
System.out.println(future1.get());
}
2. whenComplete
当 CompletableFuture 的计算结果完成,可以执行特定的Action。
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
- whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。
- whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。
public void callback() throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
System.out.println( "supplyAsync 线程:" + Thread.currentThread().getName() );
return "z";
}
}).whenCompleteAsync(new BiConsumer<String, Throwable>() {
@Override
public void accept(String s, Throwable throwable) {
System.out.println("whenCompleteAsync:" + Thread.currentThread().getName());
System.out.println("whenCompleteAsync:" + s);
}
}).whenComplete(new BiConsumer<String, Throwable>() {
@Override
public void accept(String s, Throwable throwable) {
System.out.println("whenComplete:" + Thread.currentThread().getName());
System.out.println("whenCompleteAsync:" + s);
}
});
}
3. exceptionally
当 CompletableFuture 的计算结果抛出异常时,执行后续的回调
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
public void exceptionallyTest() throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
System.out.println( "supplyAsync 线程:" + Thread.currentThread().getName() );
System.out.println(1/0);
return "wpz";
}
}).exceptionally(new Function<Throwable, String>() {
@Override
public String apply(Throwable throwable) {
System.out.println(throwable.getMessage());
return "xpz";
}
});
//此时 get 的值,是 exceptionally() 方法的返回值
System.out.println(future.get());
}
如果是检查异常,由于需要处理,所以需要封装成 new RuntimeException(new IOException("wpz"))
运行时异常,或者方法上添加 @SneakyThrows
来骗过编译器
4. thenApply
当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化。
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
- thenApplyAsync 交由线程池处理
- thenApply 当前主线程处理
public void thenApplyTest() throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "wpz";
}
}).thenApply(new Function<String, String>() {
@Override
public String apply(String s) {
System.out.println(Thread.currentThread().getName() + ":" + s);
return s + "1994";
}
}).thenApplyAsync(new Function<String, String>() {
@Override
public String apply(String s) {
System.out.println(Thread.currentThread().getName() + ":" + s);
return s + "sx";
}
});
System.out.println(future.get());
}
5. handle
handle 是执行任务完成时对结果的处理。
handle 方法和 thenApply 方法处理方式基本一样。不同的是 handle 是在任务完成后再执行,还可以处理异常的任务。thenApply 只可以执行正常的任务,任务出现异常则不执行 thenApply 方法。
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
public void handleTest() throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
System.out.println(1 / 0);
return "wpz";
}
}).handle(new BiFunction<String, Throwable, String>() {
@Override
public String apply(String s, Throwable throwable) {
if (throwable != null) {
return s + "sx";
}
return s + "1994";
}
});
System.out.println(future.get());
}
6. thenAccept
接收任务的处理结果,并消费处理,无返回结果。
public CompletableFuture<Void> thenAccept(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
与 whenComplete 相比:
-
- 返回值被消费了,返回值变成了
CompletableFuture<Void>
,whenComplete 不影响原来的返回值(引用类型除外,这就是另一个问题了)
- 返回值被消费了,返回值变成了
-
- 专注于结果,对异常无法感知。whenComplete 则会感知异常。
public void thenAcceptTest() throws ExecutionException, InterruptedException {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
//无法感知异常,如果这里有异常则直接报错
//System.out.print(1/0);
return "wpz";
}
}).thenAccept(new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println(s);
}
});
future.get();
}
7. thenRun
跟 thenAccept 方法不一样的是,不关心任务的处理结果。只要上面的任务执行完成,就开始执行 thenRun 。
public CompletableFuture<Void> thenRun(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor);
public void thenRunTest() throws ExecutionException, InterruptedException {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "wpz";
}
}).thenRun(new Runnable() {
@Override
public void run() {
System.out.println("没有结果,反正完事了");
}
});
future.get();
}
8. thenCombine
thenCombine 会把 两个 CompletableFuture 的任务都执行完成后,把两个任务的结果一块交给 thenCombine 来处理。
public <U,V> CompletableFuture <V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletableFuture <V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletableFuture <V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
public void thenCombineTest() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
System.out.println(Thread.currentThread().getName());
return 1;
}
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
System.out.println(Thread.currentThread().getName());
return 2;
}
});
CompletableFuture<Integer> future = future1.thenCombine(future2, new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) {
System.out.println(Thread.currentThread().getName());
return integer + integer2;
}
});
System.out.println( future.get());
}
9. thenAcceptBoth
当两个 CompletionStage 都执行完成后,把结果一块交给 thenAcceptBoth 来进行消耗
public <U> CompletableFuture <Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletableFuture <Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletableFuture <Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor);
public void thenAcceptBothTest() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
System.out.println(Thread.currentThread().getName());
return 1;
}
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
System.out.println(Thread.currentThread().getName());
return 2;
}
});
CompletableFuture<Void> future = future1.thenAcceptBoth(future2, new BiConsumer<Integer, Integer>() {
@Override
public void accept(Integer integer, Integer integer2) {
System.out.println(integer + integer2);
}
});
future.get();
}
10. applyToEither
两个 CompletableFuture,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的转化操作,有种竞标的感觉。
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
public void applyToEither() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
return 1;
}
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
return 2;
}
});
CompletableFuture<Integer> future = future1.applyToEither(future2, new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) {
return integer;
}
});
System.out.println( future.get() );
}
11. acceptEither
两个 CompletableFuture,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的消耗操作。
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);
public void acceptEither() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
return 1;
}
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
return 2;
}
});
CompletableFuture<Void> future = future1.acceptEither(future2, new Consumer<Integer>() {
@Override
public void accept(Integer integer) {
System.out.println(integer);
}
});
future.get();
}
12. runAfterEither
两个 CompletableFuture,任何一个完成了都会执行下一步的操作(Runnable)
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action);
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);
public void runAfterEither() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
return 1;
}
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
return 2;
}
});
CompletableFuture<Void> future = future1.runAfterEither(future2, new Runnable() {
@Override
public void run() {
System.out.println("pz 来了");
}
});
future.get();
}
13. runAfterBoth
两个 CompletableFuture,都完成了计算才会执行下一步的操作(Runnable)
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,Runnable action);
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);
public void runAfterBoth() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
return 1;
}
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
return 2;
}
});
CompletableFuture<Void> future = future1.runAfterBoth(future2, new Runnable() {
@Override
public void run() {
System.out.println("呵呵");
}
});
future.get();
}
14. thenCompose
thenCompose 方法允许你对两个 CompletableFuture 进行流水线操作,第一个操作完成时,将其结果作为参数传递给第二个操作。
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) ;
public void thenCompose() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
return 1;
}
});
CompletableFuture<Integer> future = future1.thenCompose(new Function<Integer, CompletionStage<Integer>>() {
@Override
public CompletionStage<Integer> apply(Integer integer) {
return CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
return integer + 2;
}
});
}
});
System.out.println( future.get() );
}
标签:JUC,get,System,CompletableFuture,Override,new,public
From: https://www.cnblogs.com/cnff/p/18373892