前言
创建线程的方式只有两种:继承Thread或者实现Runnable接口。 但是这两种方法都存在一个缺陷,没有返回值
Java 1.5 以后,可以通过向线程池提交一个Callable来获取一个包含返回值的Future对象
Future接口的局限性
当Future的线程进行了一个非常耗时的操作,那我们的主线程也就阻塞了。
当我们在简单业务上,可以使用Future的另一个重载方法get(long,TimeUnit)来设置超时时间,避免我们的主线程被无穷尽地阻塞。
单纯使用Future接口或者FutureTask类并不能很好地完成以下我们所需的业务
将两个异步计算合并为一个,这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果
等待Future集合中的所有任务都完成。
仅等待Future集合种最快结束的任务完成,并返回它的结果。
通过编程方式完成一个Future任务的执行
当Future的完成时间完成时会收到通知,并能使用Future的计算结果进行下一步的的操作,不只是简单地阻塞等待操作的结果
什么是CompletableFuture
在Java 8中, 新增类: CompletableFuture,结合了Future的优点,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果
CompletableFuture被设计在Java中进行异步编程。主线程不用为了任务的完成而阻塞/等待,你可以用主线程去并行执行其他的任务。 使用这种并行方式,极大地提升了程序的表现。
CompletableFuture实现了Future接口,因此有异步执行返回结果的能力。
CompletableFuture实现了CompletionStage接口,该接口是Java8新增得一个接口,用于异步执行中的阶段处理,其大量用在Lambda表达式计算过程中,目前只有CompletableFuture一个实现类。
public class CompletableFuture
方法命名规则
带有Async后缀方法都是异步另外线程执行,没有就是复用之前任务的线程
带有Apply标识方法都是可以获取返回值+有返回值的
带有Accept标识方法都是可以获取返回值
带有run标识的方法不可以获取返回值和无返回值,只是运行
get方法和join方法
join:阻塞获取结果或抛出非受检异常。
get: 阻塞获取结果或抛出受检测异常,需要显示进行try...catch处理
不同线程池使用
默认线程池执行
/**
- 默认线程池
- 运行结果:
- main.................start.....
- main.................end......
- 当前线程:ForkJoinPool.commonPool-worker-9
- 运行结果:5
*/
@Test
public void defaultThread() {
System.out.println("main.................start.....");
CompletableFuture.runAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getName());
int i = 10 / 2;
System.out.println("运行结果:" + i);
});
System.out.println("main.................end......");
}
默认使用 ForkJoinPool.commonPool(),commonPool是一个会被很多任务共享的线程池,commonPool 设计时的目标场景是运行 非阻塞的 CPU 密集型任务,为最大化利用 CPU,其线程数默认为 CPU 数量- 1
哪些地方使用了commonPool
CompletableFuture
Parallel Streams
为什么要引入commonPool
为了避免任何并行操作都引入一个线程池,最坏情况会导致在单个JVM上创建了太多的池线程,降低效率。
commonPool线程池是怎么创建和使用的
ForkJoinTask一定会运行在一个ForkJoinPool中,如果没有显式地交它提交到ForkJoinPool,会使用一个common池(全进程共享)来执行任务。
自定义线程池执行
自定义一个线程池
private ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue
使用定义的线程池
/**
- 自定义线程池
- 运行结果:
- main.................start.....
- main.................end......
- 当前线程:pool-1-thread-1
- 运行结果:5
*/
@Test
public void myThread() {
System.out.println("main.................start.....");
CompletableFuture.runAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getName());
int i = 10 / 2;
System.out.println("运行结果:" + i);
},executor);
System.out.println("main.................end......");
}
开启一个异步
runAsync-无返回值
使用runAsync开启一个异步任务线程,该方法无结果返回,适合一些不需要结果的异步任务
/***
- 无返回值
- runAsync
- 结果:
- main.................start.....
- main.................end......
- 当前线程:33
- 运行结果:5
*/
@Test
public void runAsync() {
System.out.println("main.................start.....");
CompletableFuture.runAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
}, executor);
System.out.println("main.................end......");
}
supplyAsync-有返回值
使用completableFuture.get()方法获取结果,这时程序会阻塞到这里直到结果返回。
/**
- 有返回值
- supplyAsync
- 结果:
- main.................start.....
- 当前线程:33
- 运行结果:5
- main.................end.....5
*/
@Test
public void supplyAsync() throws ExecutionException, InterruptedException {
System.out.println("main.................start.....");
CompletableFuturecompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
return i;
}, executor);
System.out.println("main.................end....." + completableFuture.get());
}
如果要超时就得往下执行,请使用completableFuture.get(long timeout, TimeUnit unit)方法。
线程串行化方法
带有Async后缀方法都是异步另外线程执行,没有就是复用之前任务的线程
thenApply-上面任务执行完执行+获取返回值+有返回值
/**
-
上面任务执行完执行+可以拿到结果+可以返回值
-
结果:
-
thenApplyAsync当前线程:33
-
thenApplyAsync运行结果:5
-
thenApplyAsync任务2启动了。。。。。上步结果:5
-
main.................end.....hello10
-
@throws ExecutionException
-
@throws InterruptedException
*/
@Test
public void thenApplyAsync() throws ExecutionException, InterruptedException {CompletableFuture
thenApplyAsync = CompletableFuture.supplyAsync(() -> {
System.out.println("thenApplyAsync当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("thenApplyAsync运行结果:" + i);
return i;
}, executor).thenApplyAsync(result -> {
System.out.println("thenApplyAsync任务2启动了。。。。。上步结果:" + result);
return "hello" + result * 2;
}, executor);
System.out.println("main.................end....." + thenApplyAsync.get());
}
thenAccept-上面任务执行完执行+获取返回值
/**
- 上面任务执行完执行+可以拿到结果
- 结果:
- thenAcceptAsync当前线程:33
- thenAcceptAsync运行结果:5
- thenAcceptAsync任务2启动了。。。。。上步结果:5
- @throws ExecutionException
- @throws InterruptedException
*/
@Test
public void thenAcceptAsync() throws ExecutionException, InterruptedException {
CompletableFuturethenAcceptAsync = CompletableFuture.supplyAsync(() -> {
System.out.println("thenAcceptAsync当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("thenAcceptAsync运行结果:" + i);
return i;
}, executor).thenAcceptAsync(result -> {
System.out.println("thenAcceptAsync任务2启动了。。。。。上步结果:" + result);
}, executor);
}
thenRun-上面任务执行完执行
/**
- 上面任务执行完执行
- 结果
- main.................start.....
- 当前线程:33
- 运行结果:5
- 任务2启动了。。。。。
*/
@Test
public void thenRunAsync() throws ExecutionException, InterruptedException {
System.out.println("main.................start.....");
CompletableFuturevoidCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
return i;
}, executor).thenRunAsync(() -> {
System.out.println("任务2启动了。。。。。");
}, executor);
}
thenCompose-接收返回值并生成新的任务
当原任务完成后接收返回值,返回一个新的任务
thenApply()转换的是泛型中的类型,相当于将CompletableFuture 转换生成新的CompletableFuture
thenCompose()用来连接两个CompletableFuture,是生成一个新的CompletableFuture。
/**
-
当原任务完成后接收返回值,返回一个新的任务
-
结果:
-
hello: thenCompose
/
@Test
public void thenCompose() {
CompletableFuture cf = CompletableFuture.completedFuture("hello")
.thenCompose(str -> CompletableFuture.supplyAsync(() -> {
return str + ": thenCompose";
},executor));
System.out.println(cf.join());
}
任务组合
thenCombine-消费两个结果+返回结果
/* -
两任务组合 都要完成
-
completableFuture.thenCombine()获取两个future返回结果,有返回值
-
结果:
-
任务1线程:33
-
任务1运行结果:5
-
任务2线程:34
-
任务2运行结果:
-
任务5启动。。。结果1:5。。。结果2:hello
-
任务5结果hello-->5
*/
@Test
public void thenCombine() throws ExecutionException, InterruptedException {
CompletableFuturefuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("任务1运行结果:" + i);
return i;
}, executor);CompletableFuture
future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2线程:" + Thread.currentThread().getId());
System.out.println("任务2运行结果:");
return "hello";
}, executor);
CompletableFuturethenCombineAsync = future1.thenCombineAsync(future2, (result1, result2) -> {
System.out.println("任务5启动。。。结果1:" + result1 + "。。。结果2:" + result2);
return result2 + "-->" + result1;
}, executor);
System.out.println("任务5结果" + thenCombineAsync.get());
}
thenAcceptBoth-消费两个结果+无返回
/** -
两任务组合 都要完成
-
completableFuture.thenAcceptBoth() 获取两个future返回结果,无返回值
-
结果:
-
任务1线程:33
-
任务1运行结果:5
-
任务2线程:34
-
任务2运行结果:
-
任务4启动。。。结果1:5。。。结果2:hello
*/
@Test
public void thenAcceptBothAsync() throws ExecutionException, InterruptedException {
CompletableFuturefuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("任务1运行结果:" + i);
return i;
}, executor);CompletableFuture
future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2线程:" + Thread.currentThread().getId());
System.out.println("任务2运行结果:");
return "hello";
}, executor);CompletableFuture
thenAcceptBothAsync = future1.thenAcceptBothAsync(future2, (result1, result2) -> {
System.out.println("任务4启动。。。结果1:" + result1 + "。。。结果2:" + result2);
}, executor);
}
runAfterBoth-两个任务完成接着运行
/**
-
两任务组合 都要完成
-
completableFuture.runAfterBoth() 组合两个future
-
结果:
-
任务1线程:33
-
任务1运行结果:5
-
任务2线程:34
-
任务2运行结果:
-
任务3启动。。。
*/
@Test
public void runAfterBothAsync() {
CompletableFuturefuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("任务1运行结果:" + i);
return i;
}, executor);CompletableFuture
future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2线程:" + Thread.currentThread().getId());
System.out.println("任务2运行结果:");
return "hello";
}, executor);CompletableFuture
runAfterBothAsync = future1.runAfterBothAsync(future2, () -> {
System.out.println("任务3启动。。。");
}, executor);
}
两任务完成一个就执行
applyToEither-其中一个执行完执行+获取返回值+有返回值
/**
-
两任务组合,一个任务完成就执行
-
objectCompletableFuture.applyToEither() 其中一个执行完执行+获取返回值+有返回值
-
结果:
-
任务1线程:33
-
任务2线程:34
-
任务2运行结果:
-
任务5开始执行。。。结果:hello
-
任务5结果:hello world
-
-
Process finished with exit code 0
*/
@Test
public void applyToEither() throws ExecutionException, InterruptedException {
CompletableFuture -
两任务组合,一个任务完成就执行
-
objectCompletableFuture.acceptEither() 其中一个执行完执行+获取返回值
-
结果:
-
任务1线程:33
-
任务2线程:34
-
任务2运行结果:
-
任务4开始执行。。。结果:hello
*/
@Test
public void acceptEither() {
CompletableFuture
}
runAfterEither-有一任务完成就执行
/**
-
两任务组合,一个任务完成就执行
-
-
objectCompletableFuture.runAfterEither() 其中一个执行完执行
-
结果:
-
任务1线程:33
-
任务2线程:34
-
任务2运行结果:
-
任务3开始执行。。。
*/
@Test
public void runAfterEither() {
CompletableFuture
多任务组合
allOf-等待全部完成后才执行
/**
-
多任务组合
-
allOf 等待所有任务完成
-
结果:
-
任务1
-
任务3
-
任务2
-
allOf任务1-------任务2-------任务3
-
@throws ExecutionException
-
@throws InterruptedException
*/
@Test
public void allOf() throws ExecutionException, InterruptedException {
CompletableFuturefuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1");
return "任务1";
}, executor);
CompletableFuturefuture2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
System.out.println("任务2");
} catch (InterruptedException e) {
e.printStackTrace();
}
return "任务2";
}, executor);
CompletableFuturefuture3 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务3");
return "任务3";
}, executor);CompletableFuture
allOf = CompletableFuture.allOf(future1, future2, future3);
//等待所有任务完成
//allOf.get();
allOf.join();
System.out.println("allOf" + future1.get() + "-------" + future2.get() + "-------" + future3.get());
}
anyOf-等待其中之一完成后就执行
/**
-
多任务组合
-
anyOf 只要一个任务完成
-
结果:
-
任务1
-
anyOf--最先完成的是任务1
-
任务3
-
等等任务2
-
任务2
-
@throws ExecutionException
-
@throws InterruptedException
*/
@Test
public void anyOf() throws ExecutionException, InterruptedException {
CompletableFuturefuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1");
return "任务1";
}, executor);
CompletableFuturefuture2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
System.out.println("任务2");
} catch (InterruptedException e) {
e.printStackTrace();
}
return "任务2";
}, executor);CompletableFuture
future3 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务3");
return "任务3";
}, executor);
CompletableFuture
/**
- 入参为结果或者异常,返回新结果
- 结果:
- main.................start.....
- 当前线程:33
- main.................end.....报错返回
*/
@Test
public void handle() throws ExecutionException, InterruptedException {
System.out.println("main.................start.....");
final CompletableFuturecompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 0;
System.out.println("运行结果:" + i);
return i;
}, executor).handleAsync((in, throwable) -> {
if (throwable != null) {
return "报错返回";
}
return "正确了";
});
System.out.println("main.................end....." + completableFuture.get());
}
whenComplete-感知结果或异常并返回相应信息
whenComplete虽然得到异常信息,但是不能修改返回信息
/**
-
有返回值并且有后续操作 whenComplete
-
-
结果:
-
main.................start.....
-
当前线程:33
-
异步完成。。。。结果是:null...异常是:java.util.concurrent.CompletionException: java.lang.ArithmeticException: 除以零
-
报错了2
-
@throws ExecutionException
-
@throws InterruptedException
*/
@Test
public void whenComplete() {
System.out.println("main.................start.....");
final CompletableFuturecompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 0;
System.out.println("运行结果:" + i);
return i;
}, executor).whenComplete((result, throwable) -> {
//whenComplete虽然得到异常信息,但是不能修改返回信息
System.out.println("异步完成。。。。结果是:" + result + "...异常是:" + throwable);
});try {
System.out.println("main.................end..T..." + completableFuture.get());
} catch (InterruptedException e) {
System.out.println("报错了1");
} catch (ExecutionException e) {
System.out.println("报错了2");
}
}
exceptionally-捕获异常并返回指定值
/** -
方法完成后的感知
-
感知错误并返回指定值 exceptionally
-
结果:
-
main.................start.....
-
当前线程:33
-
执行了exceptionally
-
main.................end.....0
-
@throws ExecutionException
-
@throws InterruptedException
*/
@Test
public void exceptionally() throws ExecutionException, InterruptedException {
System.out.println("main.................start.....");
CompletableFuturecompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 0;
System.out.println("运行结果:" + i);
return i;
}, executor).exceptionally(throwable -> {
//R apply(T t);
//exceptionally可以感知错误并返回指定值
System.out.println("执行了exceptionally");
return 0;
});
System.out.println("main.................end....." + completableFuture.get());
}