Java 的 CompletableFuture
是处理异步编程的利器。它不仅简化了异步任务的执行,还提供了丰富的 API 来支持任务的组合、异常处理、以及多任务并行。
1. CompletableFuture
的基础概念
CompletableFuture
是 Java 8 中引入的,属于 java.util.concurrent
包。它实现了 Future
接口,允许在异步计算完成后获得结果。与 Future
不同,CompletableFuture
不仅提供了阻塞式获取结果的方法,还允许你以非阻塞方式继续执行后续操作。
2. CompletableFuture
的创建方式
CompletableFuture
可以通过多种方式创建:
-
CompletableFuture.supplyAsync(Supplier<U> supplier)
:用于异步执行任务,并在完成后返回结果。CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { // 模拟耗时任务 return "Task Result"; });
-
CompletableFuture.runAsync(Runnable runnable)
:用于异步执行任务,但不返回结果。CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { // 执行异步任务 System.out.println("Task Executed"); });
-
CompletableFuture.completedFuture(U value)
:返回一个已经完成的CompletableFuture
。CompletableFuture<String> future = CompletableFuture.completedFuture("Already Completed");
3. CompletableFuture
的链式操作
链式操作是 CompletableFuture
的核心功能之一。你可以将多个异步操作串联起来,以流式的方式处理复杂的异步任务流。
-
thenApply(Function<U, V> fn)
:在前一个任务完成后,使用其结果执行另一个任务,并返回一个新的CompletableFuture
。CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello") .thenApply(result -> result + ", World!");
-
thenAccept(Consumer<U> action)
:接受前一个任务的结果,并执行某个操作,但不返回新的CompletableFuture
。CompletableFuture.supplyAsync(() -> "Hello") .thenAccept(System.out::println); // 输出 "Hello"
-
thenRun(Runnable action)
:在前一个任务完成后,执行一个无参的操作。CompletableFuture.supplyAsync(() -> "Hello") .thenRun(() -> System.out.println("Task Completed"));
4. 任务的组合
CompletableFuture
提供了多种方式来组合多个异步任务。
-
thenCombine(CompletableFuture<U> other, BiFunction<T,U,V> fn)
:将两个CompletableFuture
的结果组合在一起。CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World"); CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> result1 + " " + result2);
-
thenCompose(Function<U, CompletableFuture<V>> fn)
:在一个异步任务完成后,启动另一个异步任务,并返回其结果。CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello") .thenCompose(result -> CompletableFuture.supplyAsync(() -> result + " World"));
-
allOf(CompletableFuture<?>... futures)
:等待所有传入的CompletableFuture
完成。CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2);
-
anyOf(CompletableFuture<?>... futures)
:一旦任意一个CompletableFuture
完成,就结束等待。CompletableFuture<Object> anyOfFutures = CompletableFuture.anyOf(future1, future2);
5. 异常处理
处理异步任务中的异常是确保程序健壮性的重要步骤。CompletableFuture
提供了以下几种方式来处理异常:
-
exceptionally(Function<Throwable, ? extends U> fn)
:当异常发生时,使用默认值或处理逻辑。CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { if (Math.random() > 0.5) { throw new RuntimeException("Failed Task"); } return "Success"; }).exceptionally(ex -> "Default Value");
-
handle(BiFunction<? super U, Throwable, ? extends V> fn)
:不论任务是否成功,都可以处理结果或异常。CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { if (Math.random() > 0.5) { throw new RuntimeException("Failed Task"); } return "Success"; }).handle((result, ex) -> { if (ex != null) { return "Error handled"; } return result; });
6. 使用自定义的 Executor
默认情况下,CompletableFuture
使用 ForkJoinPool.commonPool()
作为其线程池。你可以通过传递一个 Executor
来自定义线程池,以满足更复杂的性能需求。
Executor executor = Executors.newFixedThreadPool(5); CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { // 执行异步任务 }, executor);
7. 实际应用场景
CompletableFuture
在实际开发中有很多应用场景,以下是几个典型的例子:
-
并行处理多个 API 调用:在 Web 服务中,通常需要并行调用多个外部 API。使用
CompletableFuture
可以并行化这些调用,并在所有结果返回后进行处理。 -
异步处理大数据:在大数据处理场景中,
CompletableFuture
可以用于异步加载和处理数据,从而提高性能。 -
事件驱动架构:在事件驱动的架构中,
CompletableFuture
可以帮助处理异步事件流,确保事件之间的依赖关系正确处理。
8. 性能与优化建议
虽然 CompletableFuture
提供了强大的功能,但在使用时需要注意以下几点,以避免性能问题:
- 避免过度的嵌套:复杂的异步链可能导致代码难以维护,建议将逻辑拆分成小的、易于理解的任务。
- 合理使用线程池:确保使用适合的线程池配置,避免资源耗尽或线程饥饿。
- 捕获和处理所有异常:在异步操作中,未处理的异常可能会被静默丢弃,建议始终使用
handle
或exceptionally
来捕获异常。