首页 > 其他分享 >细说Fork/Join框架

细说Fork/Join框架

时间:2023-04-12 22:08:44浏览次数:37  
标签:Fork 细说 long start 任务 线程 Join


什么是Fork/Join框架?

Fork/Join框架是JDK1.7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干小任务,最终汇总每个小任务结果后得到大任务结果的框架。

Fork就是把一个大任务切分成若干个小任务并行执行;

Join就是合并这些小任务的执行结果,最后得到这个大任务的结果;

例子:

计算1+2+3+...+10000。可以分割成10个任务,每个任务分别执行1000个数进行求和。最终汇总这10个任务。

Fork/Join流程图

细说Fork/Join框架_java

采用什么算法?

Fork/Join采用的是工作窃取算法,指某个线程从其他线程里窃取任务出来执行。

细说Fork/Join框架_java_02

窃取算法优缺点

  • 优点:充分利用线程进行并行计算;
  • 缺点:在某些情况下还是存在竞争,比如:双向队列里只有一个任务时,并且该算法会消耗了更多的系统资源,比如创建多线程和多个双端队列。

设计方式

我们已经了解了Fork/Join框架的需求了,那么咱们可以继续思考一下,如果让我们来设计一个Fork/Join框架,该如何设计呢?这样会有助于咱们理解Fork/Join框架的设计。我们这里把Fork/Join框架分成两步:

  • 第一步,分割任务,首先我们需要有一个fork类来把大任务拆分成若干个子任务,也有你可能拆分出来的子任务依旧是很大,所以得不停地拆分,直到拆分为我们认为是小为止。
  • 第二步,执行任务并合并任务,分割的子任务分别放在双端队列里,然后启动几个线程分别从双端队列里获取任务并执行任务,子任务执行完成后的结果都统一放在一个队列里,启动一个线程从队列里那数据,然后合并。

Fork/Join使用两个类来完成以上两个步骤。

1,ForkJoinTask:我们使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork和join操作的机制。通常情况下,我们不需要直接继承ForkJoinTask类,只需要继承它的子类,Fork/Join框架提供了以下两个子类:

RecursiveAction:用于没有返回结果的任务。

RecursiveTask:用于又返回结果的任务

2,ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行。

任务拆分出来的子任务添加到当前工作现场所维护的双端队列中,进入队列的头部,当一个工作现场的队列里暂时没有任务时,他会随机从其他工作线程的队列的尾部获取一个任务。


Fork/Join框架的应用

计算:1+2+3+4+...+n

我们把这个大任务拆分成若干个小任务,每个任务负责两个数相加。1+2、3+4、4+5....

代码如下:

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
/**
 * 使用ForkJoin计算1+2=3+4
 */
public class MyForkJoinTest extends RecursiveTask<Integer> {
//阈值
    private static final int THRESHOLD = 2;
    private int start;
    private int end;
    public MyForkJoinTest(int start, int end) {
this.start = start;
        this.end = end;
    }
@Override
    protected Integer compute() {
int sum = 0;
        boolean canCompute = (end - start) <= THRESHOLD;
        if (canCompute) {
//只有两个数
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else {
//多个数,拆分任务
            int middle = (start + end) / 2;
            MyForkJoinTest left = new MyForkJoinTest(start, middle);
            MyForkJoinTest right = new MyForkJoinTest(middle + 1, end);
            //执行子任务
            left.fork();
            right.fork();
            int leftResult = left.join();
            int rightResult = right.join();
            //合并任务
            sum = leftResult + rightResult;
        }
return sum;
    }

public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        //生成一个计算任务,负责计算1+2+3+...+100
        MyForkJoinTest myForkJoinTest = new MyForkJoinTest(1, 100);
        //执行任务
        Future<Integer> result = forkJoinPool.submit(myForkJoinTest);
        try {
            System.out.println(result.get());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

计算结果:

细说Fork/Join框架_java_03

通过上面这个例子,我们进一步了解ForkJoinTask,ForkJoinTask与一般任务的主要区别在于它需要实现compute方法,在这个方法里,首先需要判断任务是否足够小,如果足够小就直接执行任务。如果不够小,就必须分割成为足够小的任务,每个任务在调用fork方法时,又会进入compute方法,看看当前子任务是否需要继续拆分成子任务,如果不需要继续拆分,则执行当前子任务并返回结果。使用join方法会等待子任务执行完成并得到结果。

Fork/Join框架的异常处理

Fork/Join在执行的时候可能会抛出异常,但是我们没办法在主线程里直接捕获到异常,所以ForkJoinTask提供了isCompletedAbnormally()方法,用来检查任务是否已经抛出异常或者是否已经被取消了,并且可以通过ForkJoinTask的getException()方法来获取异常,代码如下:

if(task.isCompletedAbnormally()){
    System.out.println(task.getException());
}

getException的源码:

public final Throwable getException() {
int s = status & DONE_MASK;
    return ((s >= NORMAL)    ? null :
            (s == CANCELLED) ? new CancellationException() :
            getThrowableException());
}

人如没有完成或者任务没有抛出异常返回null,

任务被取消返回CancellationException

其他情况返回Throwable

Fork/Join框架的实现原理

和传统的线程池使用AQS的实现逻辑不同,ForkJoin引入全新的结构来标识:

  • ForkJoinPool: 用于执行ForkJoinTask任务的执行池,不再是传统执行池 Worker+Queue 的组合模式,而是维护了一个队列数组WorkQueue,这样在提交任务和线程任务的时候大幅度的减少碰撞。
  • WorkQueue: 双向列表,用于任务的有序执行,如果WorkQueue用于自己的执行线程Thread,线程默认将会从top端选取任务用来执行 - LIFO。因为只有owner的Thread才能从top端取任务,所以在设置变量时, int top; 不需要使用 volatile
  • ForkJoinWorkThread: 用于执行任务的线程,用于区别使用非ForkJoinWorkThread线程提交的task;启动一个该Thread,会自动注册一个WorkQueue到Pool,这里规定,拥有Thread的WorkQueue只能出现在WorkQueue数组的奇数位
  • ForkJoinTask: 任务, 它比传统的任务更加轻量,不再对是RUNNABLE的子类,提供fork/join方法用于分割任务以及聚合结果。
  • 为了充分施展并行运算,该框架实现了复杂的 worker steal算法,当任务处于等待中,thread通过一定策略,不让自己挂起,充分利用资源,当然,它比其他语言的协程要重一些。

jdk1.8环境下的。下面再来一个0到10000000000L以内的数据相加耗时对比:

import java.util.concurrent.RecursiveTask;
public class ForkJoinCalculate extends RecursiveTask<Long> {
    private static final long serialVersionUID = 13475679780L;
    private long start;
    private long end;
    //阈值
    private static final long THRESHOLD = 10000L;
    public ForkJoinCalculate(long start, long end) {
this.start = start;
        this.end = end;
    }

@Override
    protected Long compute() {
long length = end - start;
        if (length <= THRESHOLD) {
long sum = 0;
            for (long i = start; i <= end; i++) {
                sum += i;
            }
return sum;
        } else {
long middle = (start + end) / 2;
            ForkJoinCalculate left = new ForkJoinCalculate(start, middle);
            left.fork(); //拆分,并将该子任务压入线程队列
            ForkJoinCalculate right = new ForkJoinCalculate(middle + 1, end);
            right.fork();
            return left.join() + right.join();
        }

    }
}


import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;
public class TestForkJoin {
public static void test1(){
long start = System.currentTimeMillis();
        ForkJoinPool pool = new ForkJoinPool();
        ForkJoinTask<Long> task = new ForkJoinCalculate(0L, 10000000000L);
        long sum = pool.invoke(task);
        System.out.println(sum);
        long end = System.currentTimeMillis();
        System.out.println("test1耗费的时间为: " + (end - start));
    }
public static void test2(){
long start = System.currentTimeMillis();
        long sum = 0L;
        for (long i = 0L; i <= 10000000000L; i++) {
            sum += i;
        }
        System.out.println(sum);
        long end = System.currentTimeMillis();
        System.out.println("test2耗费的时间为: " + (end - start));
    }

public static void test3(){
long start = System.currentTimeMillis();
        Long sum = LongStream.rangeClosed(0L, 10000000000L)
                .parallel().sum();
        System.out.println(sum);
        long end = System.currentTimeMillis();
        System.out.println("test3耗费的时间为: " + (end - start));
    }
public static void main(String[] args) {
test1();
        test2();
        test3();
    }
}


运行结果:

-5340232216128654848

test1耗费的时间为: 2060

-5340232216128654848

test2耗费的时间为: 5262

-5340232216128654848

test3耗费的时间为: 1939

 

我们观察上面可以看出来执行10000000000L的相加操作各自执行完毕的时间不同。观察到当数据很大的时候ForkJoin比普通线程实现有效的多,但是相比之下ForkJoin的实现实在是有点麻烦,这时候Java 8 就为我们提供了一个并行流来实现ForkJoin实现的功能。可以看到并行流比自己实现ForkJoin还要快。

Java 8 中将并行流进行了优化,我们可以很容易的对数据进行并行流的操作,Stream API可以声明性的通过parallel()与sequential()在并行流与穿行流中随意切换!

参考:《Java并发编程的艺术》


标签:Fork,细说,long,start,任务,线程,Join
From: https://blog.51cto.com/u_11702014/6186298

相关文章

  • fork 了别人的仓库后如何保持同步更新
    给fork配置一个remotegitremote-v2.配置远程分支地址[email protected].将上游分支提交点提交到本地。gitfetchupstream4.切换到本地分支gitcheckouttrunk-develop-v2.0-liwei找到对应的远程分支进行合并gitmergeupstream/trunk-develop......
  • Qt——.pro文件及详细说明
    使用过QT框架进行项目开发的小伙伴,也许都知道.pro文件,但文件里面的具体配置方式可能比较模糊。本文将详细说明文件里各配置项的含义,并对其编写方法进行归纳总结,以便各位读者可以在未来的项目开发中快速而准确地编写.pro文件。1.配置项详细说明注释:注释是从一行的#开始,到这......
  • 【并发编程】Java7 - ForkJoin,将大任务拆分成小任务
    1.简介  Java7提供了可以将大任务拆分成小任务执行再合并结果的框架——Fork/Join。其中,将大任务拆分成足够执行的小任务并发执行的过程称为Fork,将这些小任务结果整合后形成最终的结果的过程称为Join。  Fork/Join框架的具体体现为ForkJoinTask抽象类,该类继承了Future,运行......
  • ERROR 658 (HY000): Proxy ERROR: Join internal error: Table 'mysql.proc' doesn'te
    ERROR658(HY000):ProxyERROR:Joininternalerror:Table'mysql.proc'doesn'texist迁移数据库至TDSQL,版本5.0到8.0,执行sql报错现象  查了资料发现mysql8.0的mysql的proc表确实淘汰不用了解决方法使用其他函数替换,JSON_CONTAINS替换为 locate,JSON_Array>>......
  • 线程插队Join
    publicclasstest04{publicstaticvoidmain(String[]args)throwsInterruptedException{Qq=newQ();Threadthread=newThread(q);for(inti=0;i<10;i++){Thread.sleep(1000);System.out.println......
  • Disjoint-Set-Union Sum (诈骗题)(区间DP, 位置顺序!!!!)
    题目大意: 给出一个序列P,n个点每次可以选择2个相邻区间进行合并,会产生一个贡献值,当然合并n-1就合并完了,问在所有的情况下,贡献和是多少  思路:易错点:这个所有情况,你枚举的合并的那个先后顺序是有关系的!!!因此直接去区间dp只能把各个合并的情况给弄......
  • MyBatis-Plus 写 Join 联表查询
    效果展示背景众所周知,MybatisPlus封装的mapper不支持join,如果需要支持就必须自己去实现。但是对于大部分的业务场景来说,都需要多表join,要不然就没必要采用关系型数据库了。使用方法仓库地址https://gitee.com/best_handsome/mybatis-plus-join安装Maven<depend......
  • FOR ALL ENTRIES IN 与 INNER JOIN 内表
    1、区别FORALLENTRIESIN与INNERJOIN内表,目的都是通过内表找数据库表与之对应的数据,但是有区别。1.1、写法FORALLENTRIESIN"--------------------@斌将军--------------------SELECTacdoca~rldnr,"总账会计中的分类账acdoca~rbukrs,"公司代码acdo......
  • SQL: Join的用法
    在SQL中,JOIN是将两个或多个表中的行连接起来的方法。JOIN的基本思想是将两个表中的数据按照某些条件进行匹配,然后将匹配的结果合并成一个新的表。常见的JOIN类型有INNERJOIN、LEFTJOIN、RIGHTJOIN和FULLOUTERJOIN。INNERJOININNERJOIN是最常用的JOIN类型,它......
  • ForkJoin
    ForkJoinPool是JDK7中,@authorDougLea加入的一个线程池类。Fork/Join框架的核心原理就是分治算法(Divide-and-Conquer)和工作窃取算法(work-stealingalgorithm)。Fork分解任务成独立的子任务,用多线程去执行这些子任务,Join合并子任务的结果。这样就能使用多线程的方式来执行一......