首页 > 其他分享 >【揭秘】ForkJoinPool全面解析

【揭秘】ForkJoinPool全面解析

时间:2024-01-31 16:02:52浏览次数:16  
标签:执行 int ForkJoinPool 任务 线程 numbers 解析 揭秘

【揭秘】ForkJoinPool全面解析 - 程序员古德

文章摘要

ForkJoinPool是Java中的并行计算框架,其优点在于能够高效利用多核处理器资源,它采用分治策略将大任务拆分成小任务,通过工作窃取算法平衡负载,从而实现任务的并行执行和快速完成,此外,ForkJoinPool还提供了简洁的API和丰富的任务控制机制,支撑开发人员开发高效的并行代码。

核心概念

ForkJoinPool 是 Java 并发包 java.util.concurrent 中的一个类,它主要用于解决可以通过分治策略(Divide-and-Conquer)来并行处理的问题,这类问题通常可以被分解为更小的子问题,子问题和原问题在结构上相同或类似,只不过规模不同,通过递归地将问题分解为更小的部分,ForkJoinPool 可以利用多核处理器并行地处理这些子问题,然后再将结果合并起来,从而高效地解决问题。

ForkJoinPool 的主要特点包括:

  1. 工作窃取算法(Work-Stealing Algorithm):当一个线程完成了自己的任务后,它可以从其他线程的任务队列中“窃取”任务来执行,这有助于平衡负载和提高处理器的利用率。
  2. 递归分解与合并:非常适合处理可以递归分解的问题,如排序、搜索、数值计算等,开发者需要实现 ForkJoinTask 接口(通常使用它的子类 RecursiveAction 用于无返回值的任务,或使用 RecursiveTask 用于有返回值的任务)来定义问题的分解和结果的合并。
  3. 非阻塞设计:使用内部队列来管理任务,避免了使用锁或其他同步机制,从而减少了线程间的竞争和阻塞。
  4. 并行度控制:允许开发者控制并行执行的线程数量,可以根据处理器的核心数来优化性能。

ForkJoinPool 适用于那些可以自然分解为多个独立子任务,并且这些子任务之间不需要太多通信或同步的问题,常见的使用场景包括并行数组处理(如排序、过滤、映射)、并行集合处理(如归约操作)、科学计算中的并行算法(如矩阵乘法、快速傅里叶变换)等。

代码案例

import java.util.concurrent.ForkJoinPool;  
import java.util.concurrent.RecursiveAction;  
  
public class ForkJoinSumCalculator {  
  
    public static void main(String[] args) {  
        // 定义一个需要求和的数组  
        int[] numbers = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};  
  
        // 创建一个ForkJoinPool实例,它将使用可用的所有处理器  
        ForkJoinPool pool = new ForkJoinPool();  
  
        // 创建一个ForkJoinTask来执行求和操作  
        SumTask task = new SumTask(numbers, 0, numbers.length);  
  
        // 提交任务到ForkJoinPool并等待它的完成  
        pool.invoke(task);  
  
        // 输出最终求和结果  
        System.out.println("Sum of all numbers: " + task.getSum());  
  
        // 关闭ForkJoinPool(虽然在这个例子中它并不是严格必要的,因为程序即将退出)  
        pool.shutdown();  
    }  
  
    // 定义一个继承自RecursiveAction的任务类  
    static class SumTask extends RecursiveAction {  
        private static final long serialVersionUID = 1L;  
  
        // 阈值,当数组长度小于这个值时,直接计算结果而不再拆分  
        private static final int THRESHOLD = 5;  
  
        private int[] numbers;  
        private int startIndex;  
        private int endIndex;  
        private int sum; // 存储子数组的和  
  
        public SumTask(int[] numbers, int startIndex, int endIndex) {  
            this.numbers = numbers;  
            this.startIndex = startIndex;  
            this.endIndex = endIndex;  
        }  
  
        // 获取当前任务计算的和  
        public int getSum() {  
            return sum;  
        }  
  
        @Override  
        protected void compute() {  
            // 如果任务足够小,直接计算  
            if (endIndex - startIndex <= THRESHOLD) {  
                sum = calculateDirectly();  
            } else {  
                // 否则,拆分任务  
                int middleIndex = startIndex + (endIndex - startIndex) / 2;  
                SumTask leftTask = new SumTask(numbers, startIndex, middleIndex);  
                SumTask rightTask = new SumTask(numbers, middleIndex, endIndex);  
  
                // 递归执行任务  
                invokeAll(leftTask, rightTask);  
  
                // 合并结果  
                sum = leftTask.getSum() + rightTask.getSum();  
            }  
        }  
  
        // 直接计算子数组的和  
        private int calculateDirectly() {  
            int localSum = 0;  
            for (int i = startIndex; i < endIndex; i++) {  
                localSum += numbers[i];  
            }  
            return localSum;  
        }  
    }  
}

在上面代码中,SumTask类有一个sum字段来存储计算的和,以及一个getSum方法来检索它,在compute方法中,如果任务的大小超过阈值,任务将被拆分为两个子任务,并且递归地执行,然后,将子任务的结果合并以计算总和,如果任务的大小小于或等于阈值,将直接计算子数组的和。

核心API

ForkJoinPool 提供了一个框架,用于将大任务分解成小任务,然后并行地执行这些小任务,最后再将结果合并起来,它提供的方法主要涉及到任务的提交、执行、管理和配置等方面,下面是一些常用方法的简要说明。

构造方法

  1. ForkJoinPool(): 创建一个默认并行级别的 ForkJoinPool,通常使用可用的处理器数量作为并行级别。
  2. ForkJoinPool(int parallelism): 创建一个具有指定并行级别的 ForkJoinPool

任务提交

  1. invoke(ForkJoinTask<?> task): 同步执行指定的任务,并等待其完成。

  2. submit(ForkJoinTask<?> task): 异步提交一个任务以供执行,并返回一个表示该任务的 Future

  3. execute(ForkJoinTask<?> task): 安排一个任务的执行,但不等待其完成。

任务管理

  1. awaitQuiescence(long timeout, TimeUnit unit): 等待所有任务完成执行,或者直到超时。
  2. shutdown(): 可能启动有序关闭,在该过程中执行现有任务但不接受新任务。
  3. shutdownNow(): 试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。
  4. isShutdown(): 如果此池已关闭或正在关闭,则返回 true
  5. isTerminated(): 如果关闭后所有任务都已完成,则返回 true
  6. awaitTermination(long timeout, TimeUnit unit): 请求关闭并等待所有任务完成执行,或者直到超时。

获取任务结果

  1. 对于 RecursiveTask(有返回值的任务),通常会在调用任务的 join 方法时获取任务结果。

配置和状态

  1. getParallelism(): 返回此 ForkJoinPool 的并行级别。
  2. getPoolSize(): 返回此 ForkJoinPool 中的活动线程估计数。
  3. getActiveThreadCount(): 返回此 ForkJoinPool 中当前活动的线程数。
  4. getRunningThreadCount(): 返回此 ForkJoinPool 中正在运行任务的线程数。
  5. getQueuedTaskCount(): 返回此 ForkJoinPool 工作队列中待处理的任务数估计值。
  6. getStealCount(): 返回从此 ForkJoinPool 中成功窃取的任务数估计值。

核心总结

【揭秘】ForkJoinPool全面解析 - 程序员古德

ForkJoinPool是Java并行计算的利器,其优点在于能高效地将大任务拆成小任务,通过工作窃取机制充分利用多核处理器,加速任务执行,但它也有缺点,比如任务划分不均可能导致部分处理器闲置,且更适合计算密集型而非IO密集型任务,使用时,建议合理划分任务,保持任务均衡,同时注意异常处理和线程资源管理。

关注我,每天学习互联网编程技术 - 程序员古德

标签:执行,int,ForkJoinPool,任务,线程,numbers,解析,揭秘
From: https://blog.51cto.com/bytegood/9511658

相关文章

  • 云邮件服务器,mail服务器,邮箱群发,全自动邮件群发,http协议群发实现解析
    /*对于部署mail服务器的VPS机器笔者建议最好使用腾讯云或阿里云,毕竟在很多时候群发被被拦截的几率能大大的降低。从而达到理想的状态。域名建议联盟绿色认证或全新无任何黑历史的域名最佳。*/ 一、Mail服务器架设1:Mail服务器使用腾讯云VPS服务器(开通以后切记先解封25端口)......
  • 如何对混合日志进行自动化解析
    本文分享自华为云社区《【AIOps】日志分析领域难题:如何对混合日志进行自动化解析?》,作者:DevAI。AIOps智能运维,是指将AI应用于运维领域,基于已有的运维数据通过AI的方式来解决传统运维没办法解决的问题;本文聚焦在AIOps领域的日志解析子课题。现有的日志解析器 (logparser) 在实际......
  • DNS轮询解析是什么?
    传统的负载均衡技术通常需要专门的硬件或软件,但DNS轮询解析是一种负载分配、负载平衡或容错技术,通过管理域名系统(DNS)对来自客户计算机的地址请求的响应,按照适当的统计模型,提供多个冗余的互联网协议服务主机,将流量分散到多个服务器上。例如,Web服务器、FTP服务器。一、DNS轮询解析的......
  • 深入解析CompletableFuture的功能和用法
    https://zhuanlan.zhihu.com/p/650390185?utm_id=01.CompletableFuture简介1.1概述CompletableFuture是Java8中引入的一个类,它实现了CompletionStage接口,提供了一组丰富的方法来处理异步操作和多个任务的结果。它支持链式操作,可以方便地处理任务的依赖关系和结果转换......
  • 解决gpt返回json Python没法解析的情况
    importreimportjsondefreplace_newlines(match):#在匹配的字符串中替换\n和\rreturnmatch.group(0).replace('\n','\\n').replace('\r','\\r')defclean_json_str(json_str:str)->str:""&......
  • 如何对混合日志进行自动化解析
    本文分享自华为云社区《【AIOps】日志分析领域难题:如何对混合日志进行自动化解析?》,作者:DevAI。AIOps智能运维,是指将AI应用于运维领域,基于已有的运维数据通过AI的方式来解决传统运维没办法解决的问题;本文聚焦在AIOps领域的日志解析子课题。现有的日志解析器(logparser)在实际......
  • nginx-go-crossplane crossplane golang 版本的nginx 配置解析包
    nginx-go-crossplane属于python版本crossplanenginx配置解析包的golang移植可以实现nginx配置解析转换为json格式的数据,当然也支持将json转换为nginx配置格式说明对于希望基于nginx搞自己的流量统一平台,同时希望基于api管理的,nginx-go-crossplane是一个很不错的选择......
  • MyBatis 源码系列:MyBatis 解析配置文件、二级缓存、SQL
    解析全局配置文件启动流程分析Stringresource="mybatis-config.xml";//将XML配置文件构建为Configuration配置类reader=Resources.getResourceAsReader(resource);//通过加载配置文件流构建一个SqlSessionFactoryDefaultSqlSessionFactorySqlSessionFactorysqlMapp......
  • vs+qt中使用opengl及关键报错“无法打开包括文件: no such file or directory”与“err
    参考链接https://blog.csdn.net/qq_22533607/article/details/79792083http://t.csdnimg.cn/T8II5http://t.csdnimg.cn/JP8k7基础准备:vs中配置qt插件(略)关键步骤:创建QtWidgetApplication项目将BaseClass修改成QWidget,方框中的内容可以不勾,个人习惯ui文件中添加open......
  • 揭秘财务数据可视化大屏的五大优势
    在大数据时代,财务数据的管理与分析对于企业的决策和发展至关重要。然而,面对海量的数据,如何快速、准确地获取有价值的信息,一直是企业面临的挑战。这时,财务数据可视化大屏的出现,为企业提供了一个全新的解决方案。 山海鲸可视化搭建的财务数据可视化大屏是一个集成的、直观的、交......