首页 > 编程语言 > JAVA-- 在Java8 Parallel Stream中如何自定义线程池?

JAVA-- 在Java8 Parallel Stream中如何自定义线程池?

时间:2023-07-19 14:02:26浏览次数:34  
标签:JAVA 自定义 Stream ForkJoinPool 线程 new customThreadPool forkJoinPool

使用Parallel Stream时,在适当的环境中,通过适当地使用并行度级别,可以在某些情况下获得性能提升。
如果程序创建一个自定义ThreadPool,必须记住调用它的shutdown()方法来避免内存泄漏。

Parallel Stream默认使用的线程池

如下代码示例,Parallel Stream并行处理使用的线程池是ForkJoinPool.commonPool(),这个线程池是由整个应用程序共享的线程池。

@Test
    public void givenList_whenCallingParallelStream_shouldBeParallelStream(){
        List<Long> aList = new ArrayList<>();
        Stream<Long> parallelStream = aList.parallelStream();

        assertTrue(parallelStream.isParallel());
    }
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

如何自定义线程池

简单示例

如下代码示例说明如下:

  • 使用的ForkJoinPool构造函数的并行级别为4。为了确定不同环境下的最佳值,需要进行一些实验,但一个好的经验法则是根据CPU的核数选择数值。
  • 接下来,处理并行流的内容,在reduce调用中对它们进行汇总。

这个简单的示例可能不能充分说明使用自定义线程池的用处,但是在不希望将公共线程池与长时间运行的任务绑定在一起(例如处理来自网络源的数据)或应用程序中的其他组件正在使用公共线程池的情况下,其好处就很明显了。

    @Test
    public void giveRangeOfLongs_whenSummedInParallel_shouldBeEqualToExpectedTotal()
        throws InterruptedException, ExecutionException {

        long firstNum = 1;
        long lastNum = 1_000_000;

        List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
            .collect(Collectors.toList());

        ForkJoinPool customThreadPool = new ForkJoinPool(4);
        long actualTotal = customThreadPool.submit(
            () -> aList.parallelStream().reduce(0L, Long::sum)).get();

        assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
    }
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

复杂点的示例

通过查看日志,可以看到使用了自定义的线程池,提高了并发处理的效率

	@Test
    public void testCustomThreadPool() throws ExecutionException, InterruptedException {
        List<Long> firstRange = LongStream.rangeClosed(1, 10).boxed()
            .collect(Collectors.toList());

        List<Long> secondRange = LongStream.rangeClosed(5000, 6000).boxed()
            .collect(Collectors.toList());

        ForkJoinPool forkJoinPool = new ForkJoinPool(3);
        Future<Long> future = forkJoinPool.submit(() -> {
            return firstRange.parallelStream().map((number) -> {
                try {
                    print(Thread.currentThread().getName() +" 正在处理 "+number);
                    Thread.sleep(5);
                } catch (InterruptedException e) {
                }finally {
                    return number;
                }
            }).reduce(0L, Long::sum);
        });
        assertEquals((1 + 10) * 10 / 2, future.get());

        forkJoinPool.shutdown();

        ForkJoinPool forkJoinPool2 = new ForkJoinPool(10);

        forkJoinPool2.submit(() -> {
            secondRange.parallelStream().forEach((number) -> {
                try {
                    print(Thread.currentThread().getName() +" 正在处理 "+number);
                    Thread.sleep(1);
                } catch (InterruptedException e) {
                }
            });
        });
        forkJoinPool2.shutdown();
        TimeUnit.SECONDS.sleep(2);
    }
    
 	private static void print(String msg){
        System.out.println(msg);
    }
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

求质数

实现方案有如下2种:

  • 将Parallel task 直接提交给自定义的ForkJoinPool中
  • 将自定义线程池传递到完整的future.supplyAsync方法中
public class StreamTest {
    @Test
    public void testCompletableFuture()throws InterruptedException, ExecutionException {
        ForkJoinPool forkJoinPool = new ForkJoinPool(2);
        CompletableFuture<List<Long>> primes = CompletableFuture.supplyAsync(() ->
                //parallel task here, for example
                range(1, 1_000_000).parallel().filter(StreamTest::isPrime).boxed().collect(toList()),
            forkJoinPool
        );
         forkJoinPool.shutdown();
        System.out.println(primes.get());
    }

    @Test
    public void testCustomForkJoinPool() throws InterruptedException {
        final int parallelism = 4;
        ForkJoinPool forkJoinPool = null;
        try {
            forkJoinPool = new ForkJoinPool(parallelism);
            final List<Integer> primes = forkJoinPool.submit(() ->
                // Parallel task here, for example
                IntStream.range(1, 1_000_000).parallel()
                    .filter(StreamTest::isPrime)
                    .boxed().collect(Collectors.toList())
            ).get();
            System.out.println(primes);
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        } finally {
            if (forkJoinPool != null) {
                forkJoinPool.shutdown();
            }
        }
    }

    public static boolean isPrime(long n) {
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
    }

    private static void print(String msg){
        System.out.println(msg);
    }
}
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

注意事项:小心内存泄漏【Memory Leak】

正如前面所讨论的,整个应用程序默认使用公共线程池。公共线程池是一个静态ThreadPool实例。
因此,如果使用默认线程池,就不会发生内存泄漏。

但是针对使用自定义线程池的场景下,customThreadPool对象不会被解引用和垃圾收集——相反,它将等待分配新任务【the customThreadPool object won’t be dereferenced and garbage collected — instead, it will be waiting for new tasks to be assigned】
也就是说,每次调用测试方法时,都会创建一个新的customThreadPool对象,并且它不会被释放。

解决这个问题很简单:在执行了这个方法之后关闭customThreadPool对象:

@Test
    public void giveRangeOfLongs_whenSummedInParallel_shouldBeEqualToExpectedTotal()
        throws InterruptedException, ExecutionException {

        long firstNum = 1;
        long lastNum = 1_000_000;

        List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
            .collect(Collectors.toList());
        ForkJoinPool customThreadPool = new ForkJoinPool(4);
        try {
            long actualTotal = customThreadPool.submit(
                () -> aList.parallelStream().reduce(0L, Long::sum)).get();

            assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
        }finally {
            customThreadPool.shutdown();
        }
    }
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
转自:https://blog.csdn.net/penriver/article/details/130186662

标签:JAVA,自定义,Stream,ForkJoinPool,线程,new,customThreadPool,forkJoinPool
From: https://www.cnblogs.com/tiancai/p/17565390.html

相关文章

  • Java 生成旋螺矩阵
    @TestpublicvoidvirtualMain(){int[][]matrix=generateMatrix(9);MyArray.printSquareArray(matrix,2);}publicint[][]generateMatrix(intn){int[][]res=newint[n][n];intsquare=n*n,i=(int)......
  • 【技术积累】Java中的常用类【一】
    Math类Math类是Java中的一个数学工具类,提供了一系列常用的数学方法。下面是Math类的常用方法及其案例:abs()返回一个数的绝对值。intnum=-10;intabsNum=Math.abs(num);System.out.println(absNum);//输出:10解释:abs()方法返回num的绝对值,即10。ceil()返回大于或等......
  • java parallelStream 线程堵塞问题笔记
    定义:Stream(流)是JDK8中引入的一种类似与迭代器(Iterator)的单向迭代访问数据的工具。ParallelStream则是并行的流,它通过Fork/Join框架(JSR166y)来拆分任务,加速流的处理过程。最开始接触parallelStream很容易把其当做一个普通的线程池使用,因此也出现了上面提到的开始的时候打标,结束......
  • Java基础 变量、常量、作用域
    Java基础变量、常量、作用域变量-变量是什么:就是可以变化的量!-Java是一种强类型的语言,每个变量都必须声明其类型-Java变量是程序中最基本的存储单元,其要素包括变量名,变量类型和作用域**注意事项:1每个变量都有类型,类型可以是基本类型,也可以是引用类型2......
  • java协程线程之虚拟线程
    前言众所周知,java是没有协程线程的,在我们如此熟知的jdk1.8时代,大佬们想出来的办法就是异步io,甚至用并行的stream流来实现,高并发也好,缩短事件处理时间也好;大家都在想着自己认为更好的实现方式;在来说说吧,我为什么会在今天研究这个破b玩意儿呢,这事情还的从一个月前的版本维护说......
  • 影响 Java 程序的性能的因素和性能指标
    有哪些因素会影响Java程序的性能?执行速度:程序的反应是否迅速,响应时间是否足够短内存分配:内存分配是否合理,是否过多地消耗内存或者存在泄漏启动时间:程序从运行到可以正常处理业务需要花费多少时间负载承受能力:当系统压力上升时,系统的执行速度、响应时间的上升曲线是否平缓......
  • JavaScript - 支持word上传的富文本编辑器
    ​ 在之前在工作中遇到在富文本编辑器中粘贴图片不能展示的问题,于是各种网上扒拉,终于找到解决方案,在这里感谢一下知乎中众大神以及TheViper。通过知乎提供的思路找到粘贴的原理,通过TheViper找到粘贴图片的方法。其原理为一下步骤:监听粘贴事件;【用于插入图片】获取光标位置;【......
  • 浅谈Java容器
    Java容器容器类是Java以类库的形式供用户开发程序时可直接使用的各种数据结构。所谓数据结构就是以某种方式将数据组织在一起,并存储在计算机中。数据结构不仅可以存储数据,还支持访问和处理数据的操作。在面向对象思想里,一种数据结构被认为是一个容器。数组是一种简单的数据结构,......
  • Java基础入门
    一、注释方式标识符单行注释//多行注释/**/文档注释/***/二、基础1、进制进制前缀二进制0b八进制0十进制无十六进制0x2、数据类型typevarName[=value][{,varName[=value]}];bytenum1=127;shortnum2=32767;intn......
  • 带你玩转自定义view系列--Android画笔的详解
    View的简介View是Android所有控件的基类,接下来借鉴网上的一张图片让大家一目了然(图片出自:http://blog.51cto.com/wangzhaoli/1292313)imageAndroid画笔的详解Android提供了2D图形绘制的各种工具,如Canvas(画布)、Point(点)、Paint(画笔)、Rectangles(矩形)等,利用这些工具可以直接在......