首页 > 编程语言 > java parallelStream 线程堵塞问题笔记

java parallelStream 线程堵塞问题笔记

时间:2023-07-19 13:55:06浏览次数:54  
标签:java ForkJoinPool System 线程 println new parallelStream out

定义:

Stream(流)是JDK8中引入的一种类似与迭代器(Iterator)的单向迭代访问数据的工具。ParallelStream则是并行的流,它通过Fork/Join 框架(JSR166y)来拆分任务,加速流的处理过程。最开始接触parallelStream很容易把其当做一个普通的线程池使用,因此也出现了上面提到的开始的时候打标,结束的时候去掉标的动作。

ForkJoinPool又是什么

ForkJoinPool是在Java 7中引入了一种新的线程池,其简单类图如下:

可以看到ForkJoinPool是ExecutorService的实现类,是一种线程池。创建了ForkJoinPool实例之后,可以通过调用submit(ForkJoinTask task) 或invoke(ForkJoinTask task)方法来执行指定任务。 ForkJoinTask表示线程池中执行的任务,其有两个主要的抽象子类:RecusiveAction和RecusiveTask。其中RecusiveTask代表有返回值的任务,而RecusiveAction代表没有返回值的任务。它们的类图如下:

 

ForkJoinPool来支持使用分治法(Divide-and-Conquer Algorithm)来解决问题,即将一个任务拆分成多个“小任务”并行计算,再把多个“小任务”的结果合并成总的计算结果。相比于ThreadPoolExecutor,ForkJoinPool能够在任务队列中不断的添加新任务,在线程执行完任务后可以再从任务列表中选择其他任务来执行;并且可以选择子任务的执行优先级,因此能够方便的执行具有父子关系的任务。ForkJoinPool内部维护了一个无限队列来保存需要执行的任务,而线程的数量则是通过构造函数传入,如果没有向构造函数中传入希望的线程数量,那么当前计算机可用的CPU数量会被设置为线程数量作为默认值(最大为MAX_CAP = 0x7fff)。

ParallelStream可能引起阻塞

对CPU密集型的任务来说,并行流使用ForkJoinPool,为每个CPU分配一个任务,这是非常有效率的,但是如果任务不是CPU密集的,而是I/O密集的,并且任务数相对线程数比较大,那么直接用ParallelStream并不是很好的选择。

 

  1.   package com.chenkang.test.util;
  2.    
  3.   import com.google.common.collect.Lists;
  4.    
  5.   import java.util.Date;
  6.   import java.util.List;
  7.   import java.util.concurrent.CountDownLatch;
  8.   import java.util.concurrent.TimeUnit;
  9.    
  10.   /**
  11.   * @author chenkang
  12.   * @date 2022/9/27 13:03
  13.   */
  14.   public class ParallelStream {
  15.    
  16.   /* public static void main(String[] args) throws InterruptedException {
  17.   List<Integer> lists = Lists.newArrayList();
  18.   TimeUnit.SECONDS.sleep(20);
  19.   //获取jvm 核数
  20.   System.out.println(Runtime.getRuntime().availableProcessors());
  21.   for (int i = 0; i < Runtime.getRuntime().availableProcessors(); i++) {
  22.   lists.add(i);
  23.   }
  24.   Date start = new Date();
  25.   System.out.println(lists.size());
  26.   CountDownLatch countDownLatch= new CountDownLatch(3);
  27.   new Thread(()->{
  28.   ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
  29.   forkJoinPool.submit(() -> {
  30.   lists.parallelStream().forEach(e -> {
  31.   try {
  32.   TimeUnit.SECONDS.sleep(10);
  33.   } catch (InterruptedException e1) {
  34.   e1.printStackTrace();
  35.   }
  36.   });
  37.   System.out.println("执行1down");
  38.   countDownLatch.countDown();
  39.   });
  40.   }).start();
  41.    
  42.   new Thread(()->{
  43.   ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
  44.   forkJoinPool.submit(() -> {
  45.   lists.parallelStream().forEach(e -> {
  46.   try {
  47.   TimeUnit.SECONDS.sleep(10);
  48.   } catch (InterruptedException e1) {
  49.   e1.printStackTrace();
  50.   }
  51.   });
  52.   System.out.println("执行2down");
  53.   countDownLatch.countDown();
  54.   });
  55.   }).start();
  56.    
  57.   new Thread(()->{
  58.   ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
  59.   forkJoinPool.submit(() -> {
  60.   lists.parallelStream().forEach(e -> {
  61.   try {
  62.   TimeUnit.SECONDS.sleep(10);
  63.   } catch (InterruptedException e1) {
  64.   e1.printStackTrace();
  65.   }
  66.   });
  67.   System.out.println("执行3down");
  68.   countDownLatch.countDown();
  69.   });
  70.   }).start();
  71.    
  72.   countDownLatch.await();
  73.   Date end = new Date();
  74.    
  75.   System.out.println((end.getTime() - start.getTime()) / 1000);
  76.   }*/
  77.    
  78.   public static void main(String[] args) throws InterruptedException {
  79.   List<Integer> lists = Lists.newArrayList();
  80.   TimeUnit.SECONDS.sleep(20);
  81.   //获取jvm 核数
  82.   System.out.println(Runtime.getRuntime().availableProcessors());
  83.   for (int i = 0; i < Runtime.getRuntime().availableProcessors(); i++) {
  84.   lists.add(i);
  85.   }
  86.   Date start = new Date();
  87.   System.out.println(lists.size());
  88.   CountDownLatch countDownLatch= new CountDownLatch(3);
  89.   new Thread(()->{
  90.   lists.parallelStream().forEach(e -> {
  91.   try {
  92.   TimeUnit.SECONDS.sleep(10);
  93.   } catch (InterruptedException e1) {
  94.   e1.printStackTrace();
  95.   }
  96.   });
  97.   System.out.println("执行1down");
  98.   countDownLatch.countDown();
  99.   }).start();
  100.    
  101.   new Thread(()->{
  102.   lists.parallelStream().forEach(e -> {
  103.   try {
  104.   TimeUnit.SECONDS.sleep(10);
  105.   } catch (InterruptedException e1) {
  106.   e1.printStackTrace();
  107.   }
  108.    
  109.   });
  110.   System.out.println("执行2down");
  111.   countDownLatch.countDown();
  112.   }).start();
  113.    
  114.   new Thread(()->{
  115.   lists.parallelStream().forEach(e -> {
  116.   try {
  117.   TimeUnit.SECONDS.sleep(10);
  118.   } catch (InterruptedException e1) {
  119.   e1.printStackTrace();
  120.   }
  121.   });
  122.   System.out.println("执行3down");
  123.   countDownLatch.countDown();
  124.   }).start();
  125.    
  126.   countDownLatch.await();
  127.   Date end = new Date();
  128.    
  129.   System.out.println((end.getTime() - start.getTime()) / 1000);
  130.   }
  131.    
  132.    
  133.    
  134.    
  135.   }

1.分别执行查看执行时间 链接jconsole 查看线程数量  这个是第二种 

 第一种

 可以看到线程数量提升 执行效率提高 第二种是因为commonpool 线程限制的原因

原文:https://www.csdn.net/tags/MtjaEg1sODI5MzctYmxvZwO0O0OO0O0O.html

https://blog.csdn.net/weixin_38845058/article/details/127070935

 

标签:java,ForkJoinPool,System,线程,println,new,parallelStream,out
From: https://www.cnblogs.com/tiancai/p/17565381.html

相关文章

  • Java基础 变量、常量、作用域
    Java基础变量、常量、作用域变量-变量是什么:就是可以变化的量!-Java是一种强类型的语言,每个变量都必须声明其类型-Java变量是程序中最基本的存储单元,其要素包括变量名,变量类型和作用域**注意事项:1每个变量都有类型,类型可以是基本类型,也可以是引用类型2......
  • java协程线程之虚拟线程
    前言众所周知,java是没有协程线程的,在我们如此熟知的jdk1.8时代,大佬们想出来的办法就是异步io,甚至用并行的stream流来实现,高并发也好,缩短事件处理时间也好;大家都在想着自己认为更好的实现方式;在来说说吧,我为什么会在今天研究这个破b玩意儿呢,这事情还的从一个月前的版本维护说......
  • 影响 Java 程序的性能的因素和性能指标
    有哪些因素会影响Java程序的性能?执行速度:程序的反应是否迅速,响应时间是否足够短内存分配:内存分配是否合理,是否过多地消耗内存或者存在泄漏启动时间:程序从运行到可以正常处理业务需要花费多少时间负载承受能力:当系统压力上升时,系统的执行速度、响应时间的上升曲线是否平缓......
  • JavaScript - 支持word上传的富文本编辑器
    ​ 在之前在工作中遇到在富文本编辑器中粘贴图片不能展示的问题,于是各种网上扒拉,终于找到解决方案,在这里感谢一下知乎中众大神以及TheViper。通过知乎提供的思路找到粘贴的原理,通过TheViper找到粘贴图片的方法。其原理为一下步骤:监听粘贴事件;【用于插入图片】获取光标位置;【......
  • 浅谈Java容器
    Java容器容器类是Java以类库的形式供用户开发程序时可直接使用的各种数据结构。所谓数据结构就是以某种方式将数据组织在一起,并存储在计算机中。数据结构不仅可以存储数据,还支持访问和处理数据的操作。在面向对象思想里,一种数据结构被认为是一个容器。数组是一种简单的数据结构,......
  • Java基础入门
    一、注释方式标识符单行注释//多行注释/**/文档注释/***/二、基础1、进制进制前缀二进制0b八进制0十进制无十六进制0x2、数据类型typevarName[=value][{,varName[=value]}];bytenum1=127;shortnum2=32767;intn......
  • Java高并发之CyclicBarrier简介(转)
    原文:https://juejin.cn/post/7209617649885184058作者:xindoo来源:稀土掘金  Java中的CyclicBarrier是一种同步工具,它可以让多个线程在一个屏障处等待,直到所有线程都到达该屏障处后,才能继续执行。CyclicBarrier可以用于协调多个线程的执行,以便它们可以在某个点上同步执行......
  • JavaScript 的优雅编程技巧:Singleton Pattern
    JavaScript的优雅编程技巧:SingletonPattern定义单例模式:保证一个类仅有一个实例,并提供一个访问的全局访问点。特点仅有一个实例对象全局都可访问该实例主动实例化延迟实例化类似单例模式的使用实践jQuery,lodash,moment....电商中的购物车(因为一个用户只有一......
  • javaSe笔试题
    1 ==和equals区别 ......
  • 【Javascript】数组扩展方法:根据key重新分组
    1//数组扩展:根据key重新分组2//field:按什么字段分组3Array.prototype.GroupByKey=function(field)4{5varoriginalArr=this6lettempArr=[]7letresultData=[]8for(leti=0;i<originalArr.length;i++)9{10......