首页 > 编程语言 >Java 8: 异步利器 CompletableFuture vs Parallel Stream 选哪个

Java 8: 异步利器 CompletableFuture vs Parallel Stream 选哪个

时间:2023-11-03 11:55:05浏览次数:46  
标签:tasks Java Stream ForkJoinPool worker System vs duration commonPool

  • 应人们对性能和体验的要求,异步在项目中用的越来越多,CompletableFuture 和Parallel Stream无疑是异步并发的利器。既然两者都可以实现异步并发,那么带来一个问题:什么时候该使用哪个呢,哪个场景下使用哪个会更好呢?这篇文章因此出现,旨在当执行异步进行编程时CompletableFuture与Parallel Stream的比较,从而你可以由此知道什么场景下使用哪个

博客新地址:https://yaoyuanyy.github.io

实例场景
我们将使用下面的类去构建一个运行长时间的任务
class MyTask {
  private final int duration;
  public MyTask(int duration) {
    this.duration = duration;
  }
  public int calculate() {
    System.out.println(Thread.currentThread().getName());
    try {
      Thread.sleep(duration * 1000);
    } catch (final InterruptedException e) {
      throw new RuntimeException(e);
    }
    return duration;
  }
}
123456789101112131415

我们创建10个任务,每个持续1秒

List<MyTask> tasks = IntStream.range(0, 10)
                                    .mapToObj(i -> new MyTask(1))
                                    .collect(toList());
                                    
1234
我们怎样更有效的计算这个任务列表呢

方式 1: 串行

你可能第一个想到的是串行执行,正如下面

public static void runSequentially(List<MyTask> tasks) {
  long start = System.nanoTime();
  List<Integer> result = tasks.stream()
                              .map(MyTask::calculate)
                              .collect(toList());
  long duration = (System.nanoTime() - start) / 1_000_000;
  System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
  System.out.println(result);
}
123456789

正如你所预期的, 它花费了10秒, 因为每个任务在主线程一个接一个的执行

方式 2: 使用parallel stream

一个快速的改善方式是使用parallel stream, 如下代码:

public static void useParallelStream(List<MyTask> tasks) {
  long start = System.nanoTime();
  List<Integer> result = tasks.parallelStream()
                              .map(MyTask::calculate)
                              .collect(toList());
  long duration = (System.nanoTime() - start) / 1_000_000;
  System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
  System.out.println(result);
}
123456789

输出:

main
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-2
main
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-1
main
Processed 10 tasks in 3043 millis
1234567891011

它花费了3秒多,因为此次并发执行使用了4个线程 (3个是ForkJoinPool线程池中的, plus 加上main 线程).

方式 3: 使用CompletableFutures

让我们看看使用CompletableFutures是否执行的更有效率:

public static void useCompletableFuture(List<MyTask> tasks) {
  long start = System.nanoTime();
  List<CompletableFuture<Integer>> futures =
      tasks.stream()
           .map(t -> CompletableFuture.supplyAsync(() -> t.calculate()))
           .collect(Collectors.toList());
 
  List<Integer> result =
      futures.stream()
             .map(CompletableFuture::join)
             .collect(Collectors.toList());
  long duration = (System.nanoTime() - start) / 1_000_000;
  System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
  System.out.println(result);
}
123456789101112131415

以上代码,我们首先获取CompletableFutures集合,然后在每个future上调用join方法去等待他们逐一执行完。注意,join方法类似于get方法,唯一的不通点是前者不会抛出任何的受检查异常,所以在lambda表达式中更方便一些.

再有,你必须使用两个独立的stream(pipelines)管道,而不是将两个map操作放在一起,因为stream的中间操作都是懒加载的(intermediate stream operations are lazy),你最终必须按顺序处理你的任务。这就是为什么首先需要CompletableFuture在list中,然后允许他们开始执行,直到执行完毕.

输出:

ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-1
Processed 10 tasks in 4010 millis
1234567891011

它花费了4秒去处理这10个任务。你可以注意到这次仅仅使用了3个ForkJoinPool线程,不像parallel stream,main线程没有被使用.

方式 4: 使用带有自定义Executor的CompletableFuture

CompletableFutures比parallel streams优点之一是你可以指定不用的Executor去处理他们的任务。这意味着基于你的项目,你能选择更合适数量的线程。我的例子不是cpu密集型的任务,我能选择增加大于Runtime.getRuntime().getAvailableProcessors()数量的线程,如下所示

public static void useCompletableFutureWithExecutor(List<MyTask> tasks) {
  long start = System.nanoTime();
  ExecutorService executor = Executors.newFixedThreadPool(Math.min(tasks.size(), 10));
  List<CompletableFuture<Integer>> futures =
      tasks.stream()
           .map(t -> CompletableFuture.supplyAsync(() -> t.calculate(), executor))
           .collect(Collectors.toList());
 
  List<Integer> result =
      futures.stream()
             .map(CompletableFuture::join)
             .collect(Collectors.toList());
  long duration = (System.nanoTime() - start) / 1_000_000;
  System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
  System.out.println(result);
  executor.shutdown();
}
1234567891011121314151617

输出:

pool-1-thread-2
pool-1-thread-4
pool-1-thread-3
pool-1-thread-1
pool-1-thread-5
pool-1-thread-6
pool-1-thread-7
pool-1-thread-8
pool-1-thread-9
pool-1-thread-10
Processed 10 tasks in 1009 millis
1234567891011

在这次改进之后,它花费了1秒去处理这10个任务.

正如你看到的,CompletableFutures可以更多的控制线程池的数量。如果你的任务是io密集型的,你应该使用CompletableFutures;否则如果你的任务是cpu密集型的,使用比处理器更多的线程是没有意义的,所以选择parallel stream,因为它更容易使用.

扩展:parallel stream有一些需要注意的点

  1. Stateless behaviors
  2. Side-effects
  3. Ordering
    https://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.html

博客新地址:https://yaoyuanyy.github.io
refer to:
http://fahdshariff.blogspot.com/

标签:tasks,Java,Stream,ForkJoinPool,worker,System,vs,duration,commonPool
From: https://www.cnblogs.com/xiondun/p/17807309.html

相关文章

  • APK检测管理系统 JAVA开源项目 毕业设计
    https://gf.bilibili.com/item/detail/1104293029为了帮助小白入门Java,博主录制了本项目配套的《项目手把手启动教程》,希望能给同学们带来帮助。一、摘要基于JAVA+Vue+SpringBoot+MySQL的APK检测管理系统,包含了软件档案模块、软件检测模块、软件举报模块、开放平台模块,还包含系统......
  • JAVA技术栈的有福啦!这款IDEA插件,写完代码即可调试
    国产API调试工具Apipost推出IDEA插件,写完代码就可以调试接口并一键生成接口文档!而且还可以根据已有的方法帮助您快速生成url和params。ApipostHelper=API调试工具+API管理工具+API搜索工具。在商店中搜索或直接点击下方链接即可下载:https://plugins.jetbrains.com......
  • JAVA技术栈的有福啦!这款IDEA插件,写完代码即可调试
    国产API调试工具Apipost推出IDEA插件,写完代码就可以调试接口并一键生成接口文档!而且还可以根据已有的方法帮助您快速生成url和params。ApipostHelper=API调试工具+API管理工具+API搜索工具。在商店中搜索或直接点击下方链接即可下载:https://plugins.jetbrains.com/p......
  • Java面试题:链表-合并两个排序的链表
    描述输入两个递增的链表,单个链表的长度为n,合并这两个链表并使新链表中的节点仍然是递增排序的。示例输入:{1,3,5},{2,4,6}返回值:{1,2,3,4,5,6}原题地址:https://www.nowcoder.com/practice/d8b6b4358f774294a89de2a6ac4d9337代码实现packagecom.example.demo.linked;......
  • JVS低代码表单引擎;轻松应对企业不同应用需求
    在日常的设计表单过程中,常常会有需要录入一大段文字的场景,例如评论、留言、产品介绍、内容说明等场景,那么简单的文本框组件就不满足了,这里JVS提供了两种描述类型的组件,多行文本框和富文本组件,如下图所示:多行文本框多行文本可允许用户在一个多行的文本框中输入较长的文本内容,但通常......
  • java基础学习:path,java_home环境变量配置
    1.path变量: 装jdk后会自动配置java和javac的path路径 2.JAVA_HOME环境变量:   ......
  • 数组 vs. 切片
    在Go编程语言中处理数据时,经常会遇到数组和切片。这两者是不同的数据结构,有各自的特性和用途。本文将对Go中的数组和切片进行比较,以帮助大家更好地理解它们。1.长度不同一个主要的区别是长度。在Go中,数组是具有固定长度的数据结构,一旦创建,其大小不可更改。相比之下,切片具有动态......
  • JAVA内存分配
    1.类(包含该类的方法)的字节码文件进入方法区处于候命状态2.虚拟机调用了该类的方法后,方法进入栈内存,并执行方法3.当运行方法过程中出现了“new”,就会在堆内存中开辟对应空间,并把该空间的地址返回给arr变量记录,因此就可以通过arr找到对应的堆内存空间 注意: ......
  • VS code 可以做什么?
    编写markdownVScode真的是非常好用的Markdown编写工具,我用他来编写Markdown的时间甚至比写代码还要多。比如,我每周写的公众号文章。相关插件:MarkdownMarkdownPreviewEnhancedMarkdownAllinOne编写python大多数同学写python选择pycharm,但他们经常会遇到很多问题,比如pychar......
  • java笔记_15_动态生成Excel文件
    //创建表头数据//内层List按纵向创建,外层List按横向添加,横向重复的名称会自动合并表格。List<List<String>>list=newArrayList<>();List<String>childList1=newArrayList<>();childList1.add("aaa");childList1.add("bbb");childList1.add......