1, ForkJoin 简介
ForkJoin是由JDK1.7之后提供的多线程并发处理框架。ForkJoin框架的基本思想是分而治之。什么是分而治之?分而治之就是将一个复杂的计算,按照设定的阈值分解成多个计算,然后将各个计算结果进行汇总。相应的,ForkJoin将复杂的计算当做一个任务,而分解的多个计算则是当做一个个子任务来并行执行。
2,工作窃取
假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
工作窃取算法的优点:
充分利用线程进行并行计算,并减少了线程间的竞争。
工作窃取算法的缺点:
在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且该算法会消耗更多的系统资源,比如创建多个线程和多个双端队列。
3,Fork/Join框架局限性:
对于Fork/Join框架而言,当一个任务正在等待它使用Join操作创建的子任务结束时,执行这个任务的工作线程查找其他未被执行的任务,并开始执行这些未被执行的任务,通过这种方式,线程充分利用它们的运行时间来提高应用程序的性能。为了实现这个目标,Fork/Join框架执行的任务有一些局限性。
(1)任务只能使用Fork和Join操作来进行同步机制,如果使用了其他同步机制,则在同步操作时,工作线程就不能执行其他任务了。比如,在Fork/Join框架中,使任务进行了睡眠,那么,在睡眠期间内,正在执行这个任务的工作线程将不会执行其他任务了。
(2)在Fork/Join框架中,所拆分的任务不应该去执行IO操作,比如:读写数据文件。
(3)任务不能抛出检查异常,必须通过必要的代码来出来这些异常。
4,ForkJoin 相关类
ForkJoin框架中一些重要的类如下所示
1,ForkJoinPool
ForkJoin 框架中的线程池,可以通过 ForkJoinPool.commonPool()
或者 new ForkJoinPool()
获取。
2,ForkJoinWorkerThread
实现 ForkJoin 框架中的线程
3,ForkJoinTask
ForkJoinTask封装了数据及其相应的计算,并且支持细粒度的数据并行。ForkJoinTask比线程要轻量,ForkJoinPool中少量工作线程能够运行大量的ForkJoinTask。
ForkJoinTask类中主要包括两个方法fork()和join(),分别实现任务的分拆与合并。
fork()方法类似于Thread.start(),但是它并不立即执行任务,而是将任务放入工作队列中。跟Thread.join()方法不同,ForkJoinTask的join()方法并不简单的阻塞线程,而是利用工作线程运行其他任务,当一个工作线程中调用join(),它将处理其他任务,直到注意到目标子任务已经完成。
ForkJoinTask 三个子类:
- RecursiveTask
类
有返回结果的ForkJoinTask实现Callable。
- RecursiveAction类
无返回结果的ForkJoinTask实现Runnable。
- CountedCompleter
类
在任务完成执行后会触发执行一个自定义的钩子函数。
5,ForkJoin demo
package com.demo.base.thread.forkjoin;
import java.util.concurrent.*;
/**
* @author gx
* 有返回值的 forkjoin
* 实现数据的累加
* */
public class ForkJoinDemo extends RecursiveTask<Integer> {
private Integer startValue;
private Integer endValue;
public ForkJoinDemo(Integer startValue, Integer endValue){
this.startValue = startValue;
this.endValue = endValue;
}
public static Integer getResult(Integer result , Integer num){
result += num;
return result;
}
//设置阈值
private static final Integer max = 200;
@Override
protected Integer compute() {
if(endValue - startValue < max){
System.out.println(Thread.currentThread().getName() + " 开始计算:startValue = " + startValue + " ; endValue = " + endValue );
Integer total = 0;
for(int i = this.startValue; i <= this.endValue; i++){
total = getResult(total, i);
}
return total;
}
/**
* 拆分任务
* */
ForkJoinDemo subTask = new ForkJoinDemo(startValue, (startValue + endValue) / 2);
subTask.fork();
ForkJoinDemo subTask1 = new ForkJoinDemo((startValue + endValue) / 2 + 1, endValue);
subTask1.fork();
return subTask.join() + subTask1.join();
}
public static void main(String[] args) {
System.out.println(Runtime.getRuntime().availableProcessors());
long l = System.currentTimeMillis();
ForkJoinDemo forkJoin = new ForkJoinDemo(1, 10000);
//ForkJoinPool 默认的线程池
//ForkJoinTask<Integer> future = ForkJoinPool.commonPool().submit(forkJoin);
//自定义 forkJoinPool
ForkJoinTask<Integer> future = new ForkJoinPool().submit(forkJoin);
try {
Integer result = future.get();
System.out.println(result);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
long l1 = System.currentTimeMillis();
System.out.println(l1 - l);
}
}
标签:JUC,队列,ForkJoinTask,任务,线程,Integer,ForkJoin
From: https://www.cnblogs.com/cnff/p/17532681.html