首页 > 其他分享 >JUC-ForkJoin

JUC-ForkJoin

时间:2023-07-06 17:22:49浏览次数:37  
标签:JUC 队列 ForkJoinTask 任务 线程 Integer ForkJoin

1, ForkJoin 简介

ForkJoin是由JDK1.7之后提供的多线程并发处理框架。ForkJoin框架的基本思想是分而治之。什么是分而治之?分而治之就是将一个复杂的计算,按照设定的阈值分解成多个计算,然后将各个计算结果进行汇总。相应的,ForkJoin将复杂的计算当做一个任务,而分解的多个计算则是当做一个个子任务来并行执行。

2,工作窃取

  假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

工作窃取算法的优点:

充分利用线程进行并行计算,并减少了线程间的竞争。

工作窃取算法的缺点:

在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且该算法会消耗更多的系统资源,比如创建多个线程和多个双端队列。

3,Fork/Join框架局限性:

对于Fork/Join框架而言,当一个任务正在等待它使用Join操作创建的子任务结束时,执行这个任务的工作线程查找其他未被执行的任务,并开始执行这些未被执行的任务,通过这种方式,线程充分利用它们的运行时间来提高应用程序的性能。为了实现这个目标,Fork/Join框架执行的任务有一些局限性。

(1)任务只能使用Fork和Join操作来进行同步机制,如果使用了其他同步机制,则在同步操作时,工作线程就不能执行其他任务了。比如,在Fork/Join框架中,使任务进行了睡眠,那么,在睡眠期间内,正在执行这个任务的工作线程将不会执行其他任务了。
(2)在Fork/Join框架中,所拆分的任务不应该去执行IO操作,比如:读写数据文件。
(3)任务不能抛出检查异常,必须通过必要的代码来出来这些异常。

4,ForkJoin 相关类

ForkJoin框架中一些重要的类如下所示

1,ForkJoinPool

ForkJoin 框架中的线程池,可以通过 ForkJoinPool.commonPool() 或者 new ForkJoinPool() 获取。

2,ForkJoinWorkerThread

实现 ForkJoin 框架中的线程

3,ForkJoinTask

ForkJoinTask封装了数据及其相应的计算,并且支持细粒度的数据并行。ForkJoinTask比线程要轻量,ForkJoinPool中少量工作线程能够运行大量的ForkJoinTask。

ForkJoinTask类中主要包括两个方法fork()和join(),分别实现任务的分拆与合并。

fork()方法类似于Thread.start(),但是它并不立即执行任务,而是将任务放入工作队列中。跟Thread.join()方法不同,ForkJoinTask的join()方法并不简单的阻塞线程,而是利用工作线程运行其他任务,当一个工作线程中调用join(),它将处理其他任务,直到注意到目标子任务已经完成。

ForkJoinTask 三个子类:

  1. RecursiveTask

有返回结果的ForkJoinTask实现Callable。

  1. RecursiveAction类

无返回结果的ForkJoinTask实现Runnable。

  1. CountedCompleter

在任务完成执行后会触发执行一个自定义的钩子函数。

5,ForkJoin demo

package com.demo.base.thread.forkjoin;

import java.util.concurrent.*;

/**
 * @author gx
 * 有返回值的 forkjoin
 * 实现数据的累加
 * */
public class ForkJoinDemo extends RecursiveTask<Integer> {

    private Integer startValue;
    private Integer endValue;

    public ForkJoinDemo(Integer startValue, Integer endValue){
        this.startValue = startValue;
        this.endValue = endValue;
    }

    public static Integer getResult(Integer result , Integer num){
        result += num;
        return result;
    }

    //设置阈值
    private static final Integer max = 200;

    @Override
    protected Integer compute() {
        if(endValue - startValue < max){
            System.out.println(Thread.currentThread().getName() + "   开始计算:startValue = " + startValue + " ; endValue = " + endValue );

            Integer total = 0;
            for(int i = this.startValue; i  <= this.endValue; i++){
                total = getResult(total, i);
            }
            return total;
        }

        /**
         * 拆分任务
         * */
        ForkJoinDemo subTask = new ForkJoinDemo(startValue, (startValue + endValue) / 2);
        subTask.fork();

        ForkJoinDemo subTask1 = new ForkJoinDemo((startValue + endValue) / 2 + 1, endValue);
        subTask1.fork();

        return subTask.join() + subTask1.join();
    }

    public static void main(String[] args) {
        System.out.println(Runtime.getRuntime().availableProcessors());
        long l = System.currentTimeMillis();
        ForkJoinDemo forkJoin = new ForkJoinDemo(1, 10000);

        //ForkJoinPool 默认的线程池
        //ForkJoinTask<Integer> future = ForkJoinPool.commonPool().submit(forkJoin);

        //自定义 forkJoinPool
        ForkJoinTask<Integer> future = new ForkJoinPool().submit(forkJoin);

        try {
            Integer result = future.get();
            System.out.println(result);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        long l1 = System.currentTimeMillis();
        System.out.println(l1 - l);
    }

}

标签:JUC,队列,ForkJoinTask,任务,线程,Integer,ForkJoin
From: https://www.cnblogs.com/cnff/p/17532681.html

相关文章

  • 【JUC进阶】06. 轻量级锁
    目录1、前言2、轻量级锁2.1、什么是轻量级锁2.2、工作原理2.2.1、回顾MarkWord2.2.2、工作流程2.3、解锁3、适用场景4、注意事项5、轻量级锁与偏向锁的对比6、小结1、前言前面一节我们讲到了偏向锁。当偏向锁被撤销,或其他线程竞争的时候,偏向锁会撤销并且升级为轻量级锁。轻量级锁(L......
  • JUC同步锁原理源码解析五----Phaser
    JUC同步锁原理源码解析五----PhaserPhaserPhaser的来源Areusablesynchronizationbarrier,similarinfunctionalityto{@linkjava.util.concurrent.CyclicBarrierCyclicBarrier}and{@linkjava.util.concurrent.CountDownLatchCountDownLatch}butsupportingmore......
  • JUC同步锁原理源码解析一 之ReentrantLock
    JUC同步锁原理1.锁的本质1.什么是锁?​ 通俗来讲,锁要保证的就是原子性,就是一个代码块不允许多线程同时执行,就是锁。从生活的角度上来说,就比如你要去上厕所,当你在上厕所期间,你会把门锁上,其他人只能排队。不允许多个人同时上厕所。2.锁的底层实现​ java语言是运行在jvm之上,jvm......
  • juc--三大接口
    文章目录juc一、为什么会有juc二、juc--三大接口1.lock2.condition3.ReadWriteLock二、juc--的默认实现类1.ReentrantLock--lock的默认实现类公平锁,非公平锁2.ReentrantReadWriteLock读写锁--ReadWriteLock的默认实现类1读写锁和排它锁2读写锁原则总结jucjuc:java.util.conc......
  • JUC是什么?
    JUC表示什么JUC是java.util.concurrent包的缩写,其包结构如下。JUC框架结构JUC是包的简称,JUC可能也是Java核心里最难的一块儿,JUC指的是Java的并发工具包,里边提供了各种各样的控制同步和线程通信的工具类。学习JUC之前,最重要的是了解JUC的结构是什么样的。就如同Java的集合框架的结构......
  • 【JAVA】集合、JUC包结构图
    发现Java基础还是不踏实,需要好好修炼一段时间,就从下面的三张图开始学习吧,加油!一、集合框架二、JUC包三、Socket......
  • JUC知识点框架
    JUC(java.util.concurrent)atomicAtomicBooleanAtomicIntegerAtomicLongAtomicReferencelocksAQS(AbstractQueuedSynchronizer)ReentrantLockConditionLockSupportReentrantReadWriteLockStampedLockAQS典型应用ReentrantLockCountDownLatchSemaphore......
  • 十一、JUC-synchronized与锁升级
    零、问题谈谈你对Synchronized的理解Synchronized的锁升级你聊聊Synchronized的性能是不是一定弱于Locksynchronized锁:由对象头中的MarkWord根据锁标志位的不同而被复用及锁升级策略一、Synchronized的性能变化1、java5之前java5以前,只有Synchronized,这个是操作系统级......
  • 十、JUC-Java对象内存布局和对象头
    零、问题Objectobject=newObject()谈谈你对这句话的理解?一般而言JDK8按照默认情况下,new一个对象占多少内存空间?位置所在:JVM里堆→新生区→伊甸园区一、对象的内存布局在HotSpot虚拟机里,对象在堆内存中的存储布局可以划分为三个部分:对象头、实例数据、对齐填充(保证8......
  • JUC:AQS
    AQS是JUC的基石,提供了数据结构和底层实现方法,比如获取锁的方式由子类实现完成出入队、唤醒线程由功能。这里只分析AQS已经实现了的功能逻辑,如果要分析完成的功能需要配合具体的子类比如ReentrantLock核心思想如果共享资源空闲,当前线程就工作,并锁住资源。如果共享资源被占......