14、Forkjoin(分支合并)
什么是 ForkJoin
ForkJoin 在 JDK 1.7 , 并行执行任务!提高效率。在大数据量中!
大数据:Map Reduce (把大任务拆分为小任务)
Forkjoin 特点:工作窃取,这里面维护的是双端队列
接口
通过forkjoinPool来执行forkjoin
构造方法
使用forkjoin
package com.xing.forkjoin;
import java.util.concurrent.RecursiveTask;
/**
* 求和计算的任务
* 3000 6000(frokjoin) 9000(Stream并行流)
* 如何使用frokjoin
* 1.forkjoinPool通过它来执行
* 2.计算任务forkjoinPool.execute(ForkJoinTask task)
* 3.计算类要继承RecursiveTask(递归任务有返回值)
*/
// 重写方法的返回值类型
public class ForkJoinDemo extends RecursiveTask<Long> {
private Long start;
private Long end;
//临界值
private Long temp = 10000L;
public ForkJoinDemo(Long start, Long end) {
this.start = start;
this.end = end;
}
//重写的方法
@Override
protected Long compute() {
//正常计算
if((end-start) < temp){
Long sum = 0L;
for (long i = start; i <= end; i++) {
sum += i;
}
return sum;
}else {
//forkjoin 递归
long middle = (start + end) / 2; //中间值
// 将一个任务拆分成两个任务
ForkJoinDemo task1 = new ForkJoinDemo(start,middle);
task1.fork();//拆分任务,把任务压入线程队列
ForkJoinDemo task2 = new ForkJoinDemo(middle,end);
task2.fork();
//返回结果
return task1.join() + task2.join();
}
}
}
不同方法的执行速度 package com.xing.forkjoin;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;
public class Test {
public static void main(String[] args) {
test1();
try {
test2();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
test3();
}
//普通程序员
public static void test1(){
Long sum = 0L;
long start = System.currentTimeMillis();
for (long i = 1L; i <= 10_0000_0000; i++) {
sum += i;
}
long end = System.currentTimeMillis();
System.out.println("sum = " + sum + "时间:" + (end - start));
}
//使用forkjoin
public static void test2() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Long> task = new ForkJoinDemo(0L,10_0000_0000L); //向下转型
ForkJoinTask<Long> submit = forkJoinPool.submit(task);//提交任务
Long sum = submit.get();//获得结果
long end = System.currentTimeMillis();
System.out.println("sum=" + sum +"时间:" + (end - start));
}
//用Stream并行流
public static void test3(){
long start = System.currentTimeMillis();
//stream并行流 包含10_0000_0000
Long sum = LongStream.rangeClosed(0L, 10_0000_0000L)
.parallel()//并行计算
.reduce(0,Long::sum);//调用Long下面的sum方法 输出结果
long end = System.currentTimeMillis();
System.out.println("sum =" + sum + "时间:" + (end - start));
}
}
15、异步回调(Future)
Future 设计的初衷: 对将来的某个事件的结果进行建模
同步回调
我们常用的一些请求都是同步回调的,同步回调是阻塞的,单个的线程需要等待结果的返回才能继续执行。
异步回调
有的时候,我们不希望程序在某个执行方法上一直阻塞,需要先执行后续的方法,那就是这里的异步回调。我们在调用一个方法时,如果执行时间比较长,我们可以传入一个回调的方法,当方法执行完时,让被调用者执行给定的回调方法。
package com.xing.future;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
* 异步调用:CompletableFuture
* 异步执行
* 成功回调
* 失败回调
*/
public class Demo01 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
/*
//发起一个请求
//异步回调 没有返回值的异步回调
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(()->{
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "runAsync=>void");
});
System.out.println("11111");
completableFuture.get();//获取执行结果
*/
//有返回值的异步回调
//Ajax 成功和失败的回调
//返回的是错误信息
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName() + "supplyAsync=>Integer");
//int i = 10/0;
return 1024;
});
System.out.println(completableFuture.whenComplete((t,u)->{//结果编译成功的时候返回
//成功的时候t为1024 u为null
System.out.println("t=>" + t);//错的是时候t为null,
System.out.println("u=>" + u);//错的时候u打印错误信息 java.util.concurrent.CompletionException:java.lang.ArithmeticException: / by zero
}).exceptionally((e)->{//编译失败的时候返回
System.out.println(e.getMessage());//打印异常信息 java.lang.ArithmeticException: / by zero
return 2333;//可以获取错误的返回结果
}).get());
}
}