什么是Fork/Join框架?
Fork/Join框架是JDK1.7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干小任务,最终汇总每个小任务结果后得到大任务结果的框架。
Fork就是把一个大任务切分成若干个小任务并行执行;
Join就是合并这些小任务的执行结果,最后得到这个大任务的结果;
例子:
计算1+2+3+...+10000。可以分割成10个任务,每个任务分别执行1000个数进行求和。最终汇总这10个任务。
Fork/Join流程图
采用什么算法?
Fork/Join采用的是工作窃取算法,指某个线程从其他线程里窃取任务出来执行。
窃取算法优缺点
- 优点:充分利用线程进行并行计算;
- 缺点:在某些情况下还是存在竞争,比如:双向队列里只有一个任务时,并且该算法会消耗了更多的系统资源,比如创建多线程和多个双端队列。
设计方式
我们已经了解了Fork/Join框架的需求了,那么咱们可以继续思考一下,如果让我们来设计一个Fork/Join框架,该如何设计呢?这样会有助于咱们理解Fork/Join框架的设计。我们这里把Fork/Join框架分成两步:
- 第一步,分割任务,首先我们需要有一个fork类来把大任务拆分成若干个子任务,也有你可能拆分出来的子任务依旧是很大,所以得不停地拆分,直到拆分为我们认为是小为止。
- 第二步,执行任务并合并任务,分割的子任务分别放在双端队列里,然后启动几个线程分别从双端队列里获取任务并执行任务,子任务执行完成后的结果都统一放在一个队列里,启动一个线程从队列里那数据,然后合并。
Fork/Join使用两个类来完成以上两个步骤。
1,ForkJoinTask:我们使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork和join操作的机制。通常情况下,我们不需要直接继承ForkJoinTask类,只需要继承它的子类,Fork/Join框架提供了以下两个子类:
RecursiveAction:用于没有返回结果的任务。
RecursiveTask:用于又返回结果的任务
2,ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行。
任务拆分出来的子任务添加到当前工作现场所维护的双端队列中,进入队列的头部,当一个工作现场的队列里暂时没有任务时,他会随机从其他工作线程的队列的尾部获取一个任务。
Fork/Join框架的应用
计算:1+2+3+4+...+n
我们把这个大任务拆分成若干个小任务,每个任务负责两个数相加。1+2、3+4、4+5....
代码如下:
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
/**
* 使用ForkJoin计算1+2=3+4
*/
public class MyForkJoinTest extends RecursiveTask<Integer> {
//阈值
private static final int THRESHOLD = 2;
private int start;
private int end;
public MyForkJoinTest(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
boolean canCompute = (end - start) <= THRESHOLD;
if (canCompute) {
//只有两个数
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
//多个数,拆分任务
int middle = (start + end) / 2;
MyForkJoinTest left = new MyForkJoinTest(start, middle);
MyForkJoinTest right = new MyForkJoinTest(middle + 1, end);
//执行子任务
left.fork();
right.fork();
int leftResult = left.join();
int rightResult = right.join();
//合并任务
sum = leftResult + rightResult;
}
return sum;
}
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
//生成一个计算任务,负责计算1+2+3+...+100
MyForkJoinTest myForkJoinTest = new MyForkJoinTest(1, 100);
//执行任务
Future<Integer> result = forkJoinPool.submit(myForkJoinTest);
try {
System.out.println(result.get());
} catch (Exception e) {
e.printStackTrace();
}
}
}
计算结果:
通过上面这个例子,我们进一步了解ForkJoinTask,ForkJoinTask与一般任务的主要区别在于它需要实现compute方法,在这个方法里,首先需要判断任务是否足够小,如果足够小就直接执行任务。如果不够小,就必须分割成为足够小的任务,每个任务在调用fork方法时,又会进入compute方法,看看当前子任务是否需要继续拆分成子任务,如果不需要继续拆分,则执行当前子任务并返回结果。使用join方法会等待子任务执行完成并得到结果。
Fork/Join框架的异常处理
Fork/Join在执行的时候可能会抛出异常,但是我们没办法在主线程里直接捕获到异常,所以ForkJoinTask提供了isCompletedAbnormally()方法,用来检查任务是否已经抛出异常或者是否已经被取消了,并且可以通过ForkJoinTask的getException()方法来获取异常,代码如下:
if(task.isCompletedAbnormally()){
System.out.println(task.getException());
}
getException的源码:
public final Throwable getException() {
int s = status & DONE_MASK;
return ((s >= NORMAL) ? null :
(s == CANCELLED) ? new CancellationException() :
getThrowableException());
}
人如没有完成或者任务没有抛出异常返回null,
任务被取消返回CancellationException
其他情况返回Throwable
Fork/Join框架的实现原理
和传统的线程池使用AQS的实现逻辑不同,ForkJoin
引入全新的结构来标识:
- ForkJoinPool: 用于执行
ForkJoinTask
任务的执行池,不再是传统执行池 Worker+Queue 的组合模式,而是维护了一个队列数组WorkQueue
,这样在提交任务和线程任务的时候大幅度的减少碰撞。 - WorkQueue: 双向列表,用于任务的有序执行,如果
WorkQueue
用于自己的执行线程Thread
,线程默认将会从top端选取任务用来执行 - LIFO。因为只有owner的Thread才能从top端取任务,所以在设置变量时,int top;
不需要使用volatile
。 - ForkJoinWorkThread: 用于执行任务的线程,用于区别使用非ForkJoinWorkThread线程提交的task;启动一个该Thread,会自动注册一个WorkQueue到Pool,这里规定,拥有Thread的WorkQueue只能出现在WorkQueue数组的奇数位
- ForkJoinTask: 任务, 它比传统的任务更加轻量,不再对是RUNNABLE的子类,提供
fork
/join
方法用于分割任务以及聚合结果。 - 为了充分施展并行运算,该框架实现了复杂的 worker steal算法,当任务处于等待中,thread通过一定策略,不让自己挂起,充分利用资源,当然,它比其他语言的协程要重一些。
jdk1.8环境下的。下面再来一个0到10000000000L以内的数据相加耗时对比:
import java.util.concurrent.RecursiveTask;
public class ForkJoinCalculate extends RecursiveTask<Long> {
private static final long serialVersionUID = 13475679780L;
private long start;
private long end;
//阈值
private static final long THRESHOLD = 10000L;
public ForkJoinCalculate(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long length = end - start;
if (length <= THRESHOLD) {
long sum = 0;
for (long i = start; i <= end; i++) {
sum += i;
}
return sum;
} else {
long middle = (start + end) / 2;
ForkJoinCalculate left = new ForkJoinCalculate(start, middle);
left.fork(); //拆分,并将该子任务压入线程队列
ForkJoinCalculate right = new ForkJoinCalculate(middle + 1, end);
right.fork();
return left.join() + right.join();
}
}
}
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;
public class TestForkJoin {
public static void test1(){
long start = System.currentTimeMillis();
ForkJoinPool pool = new ForkJoinPool();
ForkJoinTask<Long> task = new ForkJoinCalculate(0L, 10000000000L);
long sum = pool.invoke(task);
System.out.println(sum);
long end = System.currentTimeMillis();
System.out.println("test1耗费的时间为: " + (end - start));
}
public static void test2(){
long start = System.currentTimeMillis();
long sum = 0L;
for (long i = 0L; i <= 10000000000L; i++) {
sum += i;
}
System.out.println(sum);
long end = System.currentTimeMillis();
System.out.println("test2耗费的时间为: " + (end - start));
}
public static void test3(){
long start = System.currentTimeMillis();
Long sum = LongStream.rangeClosed(0L, 10000000000L)
.parallel().sum();
System.out.println(sum);
long end = System.currentTimeMillis();
System.out.println("test3耗费的时间为: " + (end - start));
}
public static void main(String[] args) {
test1();
test2();
test3();
}
}
运行结果:
-5340232216128654848
test1耗费的时间为: 2060
-5340232216128654848
test2耗费的时间为: 5262
-5340232216128654848
test3耗费的时间为: 1939
我们观察上面可以看出来执行10000000000L的相加操作各自执行完毕的时间不同。观察到当数据很大的时候ForkJoin比普通线程实现有效的多,但是相比之下ForkJoin的实现实在是有点麻烦,这时候Java 8 就为我们提供了一个并行流来实现ForkJoin实现的功能。可以看到并行流比自己实现ForkJoin还要快。
Java 8 中将并行流进行了优化,我们可以很容易的对数据进行并行流的操作,Stream API可以声明性的通过parallel()与sequential()在并行流与穿行流中随意切换!
参考:《Java并发编程的艺术》