首页 > 其他分享 >一文看懂什么是fork/join

一文看懂什么是fork/join

时间:2023-07-01 23:12:28浏览次数:33  
标签:fork join 一文 队列 ForkJoinPool ForkJoinTask 任务 线程 执行

什么是Fork/Join

Fork/Join 是JUC并发包下的一个并行处理框架,实现了ExecutorService接口的多线程处理器,它专为那些可以通过递归分解成更细小的任务而设计,最大化的利用多核处理器来提高应用程序的性能。

Fork/Join的运行流程大致如下所示:

需要注意的是,图里的次级子任务可以一直分下去,一直分到子任务足够小为止,这里体现了分而治之(divide and conquer) 的算法思想。

工作窃取算法

工作窃取算法指的是在多线程执行不同任务队列的过程中,某个线程执行完自己队列的任务后从其他线程的任务队列里窃取任务来执行。

工作窃取流程如下图所示:

值得注意的是,当一个线程窃取另一个线程的时候,为了减少两个任务线程之间的竞争,我们通常使用双端队列来存储任务。被窃取的任务线程都从双端队列的头部拿任务执行,而窃取其他任务的线程从双端队列的尾部执行任务。

另外,当一个线程在窃取任务时要是没有其他可用的任务了,这个线程会进入阻塞状态以等待再次“工作”。

Fork/Join 实践

前面说Fork/Join框架简单来讲就是对任务的分割与子任务的合并,所以要实现这个框架,先得有任务。在Fork/Join框架里提供了抽象类ForkJoinTask来实现任务。

ForkJoinTask

ForkJoinTask是一个类似普通线程的实体,但是比普通线程轻量得多。

fork()方法:使用线程池中的空闲线程异步提交任务

public final ForkJoinTask<V> fork() {
    Thread t;
    // ForkJoinWorkerThread是执行ForkJoinTask的专有线程,由ForkJoinPool管理
    // 先判断当前线程是否是ForkJoin专有线程,如果是,则将任务push到当前线程所负责的队列里去
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        // 如果不是则将线程加入队列
        // 没有显式创建ForkJoinPool的时候走这里,提交任务到默认的common线程池中
        ForkJoinPool.common.externalPush(this);
    return this;
}

其实fork()只做了一件事,那就是把任务推入当前工作线程的工作队列里。

join()方法:等待处理任务的线程处理完毕,获得返回值。

我们看下join()的源码:

public final V join() {
    int s;
    // doJoin()方法来获取当前任务的执行状态
    if ((s = doJoin() & DONE_MASK) != NORMAL)
        // 任务异常,抛出异常
        reportException(s);
    // 任务正常完成,获取返回值
    return getRawResult();
}

/**
 * doJoin()方法用来返回当前任务的执行状态
 **/
private int doJoin() {
    int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
    // 先判断任务是否执行完毕,执行完毕直接返回结果(执行状态)
    return (s = status) < 0 ? s :
    // 如果没有执行完毕,先判断是否是ForkJoinWorkThread线程
    ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
        // 如果是,先判断任务是否处于工作队列顶端(意味着下一个就执行它)
        // tryUnpush()方法判断任务是否处于当前工作队列顶端,是返回true
        // doExec()方法执行任务
        (w = (wt = (ForkJoinWorkerThread)t).workQueue).
        // 如果是处于顶端并且任务执行完毕,返回结果
        tryUnpush(this) && (s = doExec()) < 0 ? s :
        // 如果不在顶端或者在顶端却没未执行完毕,那就调用awitJoin()执行任务
        // awaitJoin():使用自旋使任务执行完成,返回结果
        wt.pool.awaitJoin(w, this, 0L) :
    // 如果不是ForkJoinWorkThread线程,执行externalAwaitDone()返回任务结果
    externalAwaitDone();
}

我们在之前介绍过说Thread.join()会使线程阻塞,而ForkJoinPool.join()会使线程免于阻塞,下面是ForkJoinPool.join()的流程图:

RecursiveAction和RecursiveTask

通常情况下,在创建任务的时候我们一般不直接继承ForkJoinTask,而是继承它的子类RecursiveAction和RecursiveTask。

两个都是ForkJoinTask的子类,RecursiveAction可以看做是无返回值的ForkJoinTask,RecursiveTask是有返回值的ForkJoinTask。

此外,两个子类都有执行主要计算的方法compute(),当然,RecursiveAction的compute()返回void,RecursiveTask的compute()有具体的返回值。

ForkJoinPool

ForkJoinPool是用于执行ForkJoinTask任务的执行(线程)池。

ForkJoinPool管理着执行池中的线程和任务队列,此外,执行池是否还接受任务,显示线程的运行状态也是在这里处理。

我们来大致看下ForkJoinPool的源码:

@sun.misc.Contended
public class ForkJoinPool extends AbstractExecutorService {
    // 任务队列
    volatile WorkQueue[] workQueues;   
    
    // 线程的运行状态
    volatile int runState;  
    
    // 创建ForkJoinWorkerThread的默认工厂,可以通过构造函数重写
    public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory;
    
    // 公用的线程池,其运行状态不受shutdown()和shutdownNow()的影响
    static final ForkJoinPool common;
    
    // 私有构造方法,没有任何安全检查和参数校验,由makeCommonPool直接调用
    // 其他构造方法都是源自于此方法
    // parallelism: 并行度,
    // 默认调用java.lang.Runtime.availableProcessors() 方法返回可用处理器的数量
    private ForkJoinPool(int parallelism,
                         ForkJoinWorkerThreadFactory factory, // 工作线程工厂
                         UncaughtExceptionHandler handler, // 拒绝任务的handler
                         int mode, // 同步模式
                         String workerNamePrefix) { // 线程名prefix
        this.workerNamePrefix = workerNamePrefix;
        this.factory = factory;
        this.ueh = handler;
        this.config = (parallelism & SMASK) | mode;
        long np = (long)(-parallelism); // offset ctl counts
        this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
    }

}

WorkQueue
双端队列,ForkJoinTask存放在这里。

runState
ForkJoinPool的运行状态。SHUTDOWN状态用负数表示,其他用2的幂次表示。

当工作线程在处理自己的工作队列时,会从队列首取任务来执行(FIFO);如果是窃取其他队列的任务时,窃取的任务位于所属任务队列的队尾(LIFO)。

ForkJoinPool与传统线程池最显著的区别就是它维护了一个工作队列数组(volatile WorkQueue[] workQueues,ForkJoinPool中的每个工作线程都维护着一个工作队列)。

Fork/Join的使用

上面我们说ForkJoinPool负责管理线程和任务,ForkJoinTask实现fork和join操作,所以要使用Fork/Join框架就离不开这两个类了,只是在实际开发中我们常用ForkJoinTask的子类RecursiveTask 和RecursiveAction来替代ForkJoinTask。

下面我们用一个计算斐波那契数列第n项的例子来看一下Fork/Join的使用:

斐波那契数列数列是一个线性递推数列,从第三项开始,每一项的值都等于前两项之和:

1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89······

如果设f(n)为该数列的第n项(n∈N*),那么有:f(n) = f(n-1) + f(n-2)。

public class FibonacciTest {

    class Fibonacci extends RecursiveTask<Integer> {

        int n;

        public Fibonacci(int n) {
            this.n = n;
        }

        // 主要的实现逻辑都在compute()里
        @Override
        protected Integer compute() {
            // 这里先假设 n >= 0
            if (n <= 1) {
                return n;
            } else {
                // f(n-1)
                Fibonacci f1 = new Fibonacci(n - 1);
                f1.fork();
                // f(n-2)
                Fibonacci f2 = new Fibonacci(n - 2);
                f2.fork();
                // f(n) = f(n-1) + f(n-2)
                return f1.join() + f2.join();
            }
        }
    }

    @Test
    public void testFib() throws ExecutionException, InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        System.out.println("CPU核数:" + Runtime.getRuntime().availableProcessors());
        long start = System.currentTimeMillis();
        Fibonacci fibonacci = new Fibonacci(40);
        Future<Integer> future = forkJoinPool.submit(fibonacci);
        System.out.println(future.get());
        long end = System.currentTimeMillis();
        System.out.println(String.format("耗时:%d millis", end - start));
    }
}

上面例子的输出:

  • CPU核数:4
  • 计算结果:102334155
  • 耗时:9490 ms
  • 需要注意的是,上述计算时间复杂度为O(2^n),随着n的增长计算效率会越来越低,这也是上面的例子中n不敢取太大的原因。

总结

并不是所有的任务都适合Fork/Join框架,比如上面的例子任务划分过于细小反而体现不出效率。因为Fork/Join是使用多个线程协作来计算的,所以会有线程通信和线程切换的开销。

如果要计算的任务比较简单,直接使用单线程会更快一些。但如果要计算的东西比较复杂,计算机又是多核的情况下,就可以充分利用多核CPU来提高计算速度。

标签:fork,join,一文,队列,ForkJoinPool,ForkJoinTask,任务,线程,执行
From: https://www.cnblogs.com/GentleJim/p/17520137.html

相关文章

  • 一文读懂 Paxos 算法
    博主介绍:✌博主从事应用安全和大数据领域,有8年研发经验,5年面试官经验,Java技术专家,WEB架构师,阿里云专家博主,华为云云享专家✌......
  • 一文搞懂什么是“注解”
    博主介绍:✌博主从事应用安全和大数据领域,有8年研发经验,5年面试官经验,Java技术专家,阿里云专家博主,华为云云享专家✌......
  • 一文让你轻松拿捏 Spring MVC
    博主介绍:✌博主从事应用安全和大数据领域,有8年研发经验,5年面试官经验,Java技术专家✌Java知识图谱点击链接:体系化学习Java(Java面试专题)......
  • 一文拆解“复杂软件”的无代码配置逻辑
    最近教研组的小伙伴们收到了一些用户在试用smardaten过程中的困惑。为了解答大家的疑问,今天特别邀请了教研组的美女小姐姐,以一个比较简单易理解的场景“疫情填报”,来拆解一下smardaten如何支撑应用的搭建逻辑,以及smardaten作为企业级无代码与轻量级无代码的配置逻辑差异。课程视频......
  • 一文读懂 Mysql MVCC
    ......
  • 一文读懂什么是AIGC?
    目录AIGC概念AIGC发展历史在早期萌芽阶段(1950s~1990s)在沉淀累积阶段(1990s~2010s)在快速发展阶段(2010s~至今)ChatGPTAIGC能做什么?电子商务办公游戏娱乐&影视&动漫艺术教育设计&媒体&生活AIGC应用示例AI文本生成写周报写诗写小说写广告词写剧本安排学习计划设定减肥计划写代码AI图片生......
  • 一文详解:大数据分析工具有哪些?
    想要回答“大数据分析工具有哪些?”,首先必须了解什么是“大数据”根据麦肯锡全球研究所给出的定义,大数据通常指具有数据规模大(Volume)、高速(Velocity)、类型多(Variety)、价值密度低(Veracity)和真实性(Veracity)五V特征的数据资料。这类数据资料将超出传统数据处理软件的能力范围,因......
  • 一文搞懂什么是@Component和@Bean注解以及如何使用(包括与@Controller、@Service、@Re
      来源  https://blog.csdn.net/m0_51358164/article/details/126120731一文搞懂什么是@Component和@Bean注解以及如何使用......
  • 【HarmonyOS】一文教你快速解决低代码连接器返参数据结构嵌套错误问题
    ​【关键字】低代码平台、连接器、返参数据结构嵌套 【写在前面】关于低代码平台中的连接器如何使用,请参考以下内容:https://blog.51cto.com/u_15687416/6414269下文将会介绍连接器在实际使用中遇到的一个常见的问题。 【问题描述】1、云侧接口定义首先来一起看一下云......
  • 一文读懂火山引擎A/B测试的实验类型(3)——多链接实验
    更多技术交流、求职机会,欢迎关注字节跳动数据平台微信公众号,回复【1】进入官方交流群一.概述多链接实验,也称为Spliturl实验,用户根据分流结果访问不同版本的url。举个例子:当您有两个不同样式的落地页https://example.com/1.html和https://example.com/2.html,想要......