- 应人们对性能和体验的要求,异步在项目中用的越来越多,CompletableFuture 和Parallel Stream无疑是异步并发的利器。既然两者都可以实现异步并发,那么带来一个问题:什么时候该使用哪个呢,哪个场景下使用哪个会更好呢?这篇文章因此出现,旨在当执行异步进行编程时CompletableFuture与Parallel Stream的比较,从而你可以由此知道什么场景下使用哪个
博客新地址:https://yaoyuanyy.github.io
实例场景
我们将使用下面的类去构建一个运行长时间的任务
class MyTask {
private final int duration;
public MyTask(int duration) {
this.duration = duration;
}
public int calculate() {
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(duration * 1000);
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
return duration;
}
}
123456789101112131415
我们创建10个任务,每个持续1秒
List<MyTask> tasks = IntStream.range(0, 10)
.mapToObj(i -> new MyTask(1))
.collect(toList());
1234
我们怎样更有效的计算这个任务列表呢
方式 1: 串行
你可能第一个想到的是串行执行,正如下面
public static void runSequentially(List<MyTask> tasks) {
long start = System.nanoTime();
List<Integer> result = tasks.stream()
.map(MyTask::calculate)
.collect(toList());
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
System.out.println(result);
}
123456789
正如你所预期的, 它花费了10秒, 因为每个任务在主线程一个接一个的执行
方式 2: 使用parallel stream
一个快速的改善方式是使用parallel stream, 如下代码:
public static void useParallelStream(List<MyTask> tasks) {
long start = System.nanoTime();
List<Integer> result = tasks.parallelStream()
.map(MyTask::calculate)
.collect(toList());
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
System.out.println(result);
}
123456789
输出:
main
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-2
main
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-1
main
Processed 10 tasks in 3043 millis
1234567891011
它花费了3秒多,因为此次并发执行使用了4个线程 (3个是ForkJoinPool线程池中的, plus 加上main 线程).
方式 3: 使用CompletableFutures
让我们看看使用CompletableFutures是否执行的更有效率:
public static void useCompletableFuture(List<MyTask> tasks) {
long start = System.nanoTime();
List<CompletableFuture<Integer>> futures =
tasks.stream()
.map(t -> CompletableFuture.supplyAsync(() -> t.calculate()))
.collect(Collectors.toList());
List<Integer> result =
futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
System.out.println(result);
}
123456789101112131415
以上代码,我们首先获取CompletableFutures集合,然后在每个future上调用join方法去等待他们逐一执行完。注意,join方法类似于get方法,唯一的不通点是前者不会抛出任何的受检查异常,所以在lambda表达式中更方便一些.
再有,你必须使用两个独立的stream(pipelines)管道,而不是将两个map操作放在一起,因为stream的中间操作都是懒加载的(intermediate stream operations are lazy),你最终必须按顺序处理你的任务。这就是为什么首先需要CompletableFuture在list中,然后允许他们开始执行,直到执行完毕.
输出:
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-1
Processed 10 tasks in 4010 millis
1234567891011
它花费了4秒去处理这10个任务。你可以注意到这次仅仅使用了3个ForkJoinPool线程,不像parallel stream,main线程没有被使用.
方式 4: 使用带有自定义Executor的CompletableFuture
CompletableFutures比parallel streams优点之一是你可以指定不用的Executor去处理他们的任务。这意味着基于你的项目,你能选择更合适数量的线程。我的例子不是cpu密集型的任务,我能选择增加大于Runtime.getRuntime().getAvailableProcessors()数量的线程,如下所示
public static void useCompletableFutureWithExecutor(List<MyTask> tasks) {
long start = System.nanoTime();
ExecutorService executor = Executors.newFixedThreadPool(Math.min(tasks.size(), 10));
List<CompletableFuture<Integer>> futures =
tasks.stream()
.map(t -> CompletableFuture.supplyAsync(() -> t.calculate(), executor))
.collect(Collectors.toList());
List<Integer> result =
futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
System.out.println(result);
executor.shutdown();
}
1234567891011121314151617
输出:
pool-1-thread-2
pool-1-thread-4
pool-1-thread-3
pool-1-thread-1
pool-1-thread-5
pool-1-thread-6
pool-1-thread-7
pool-1-thread-8
pool-1-thread-9
pool-1-thread-10
Processed 10 tasks in 1009 millis
1234567891011
在这次改进之后,它花费了1秒去处理这10个任务.
正如你看到的,CompletableFutures可以更多的控制线程池的数量。如果你的任务是io密集型的,你应该使用CompletableFutures;否则如果你的任务是cpu密集型的,使用比处理器更多的线程是没有意义的,所以选择parallel stream,因为它更容易使用.
扩展:parallel stream有一些需要注意的点
- Stateless behaviors
- Side-effects
- Ordering
https://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.html
博客新地址:https://yaoyuanyy.github.io
refer to:
http://fahdshariff.blogspot.com/