首页 > 其他分享 >并发-框架[老的,有时间我重新整理一下]

并发-框架[老的,有时间我重新整理一下]

时间:2022-12-29 10:03:45浏览次数:146  
标签:返回 框架 并发 CompletionStage 重新整理 CompletableFuture FutureTask 线程 执行


并发-框架[老的,有时间我重新整理一下]

文章是直接从我本地word笔记粘贴过来的,排版啥的可能有点乱,凑合看吧,有时间我会慢慢整理

Fork-Join

java下多线程的开发可以我们自己启用多线程,线程池,还可以使用forkjoin,forkjoin可以让我们不去了解诸如Thread,Runnable等相关的知识,只要遵循forkjoin的开发模式,就可以写出很好的多线程并发程序.

分而治之
分治法的设计思想是:将一个难以直接解决的大问题,分割成一些规模较小的相同问题(小问题之间无关联),以便各个击破,分而治之。
分治策略是:对于一个规模为n的问题,若该问题可以容易地解决(比如说规模n较小)则直接解决,否则将其分解为k个规模较小的子问题,这些子问题互相独立且与原问题形式相同(子问题相互之间有联系就会变为动态规范算法),递归地解这些子问题,然后将各子问题的解合并得到原问题的解。这种算法设计策略叫做分治法。

归并排序

归并排序是建立在归并操作上的一种有效的排序算法。该算法是采用分治法的一个非常典型的应用。将已有序的子序列合并,得到完全有序的序列;即先使每个子序列有序,再使子序列段间有序。

若将两个有序表合并成一个有序表,称为2-路归并,与之对应的还有多路归并。

对于给定的一组数据,利用递归与分治技术将数据序列划分成为越来越小的半子表,在对半子表排序后,再用递归方法将排好序的半子表合并成为越来越大的有序序列。

为了提升性能,有时我们在半子表的个数小于某个数(比如15)的情况下,对半子表的排序采用其他排序算法,比如插入排序。

归并排序(降序)示例

并发-框架[老的,有时间我重新整理一下]_工作线程


先讲数组划分为左右两个子表:

并发-框架[老的,有时间我重新整理一下]_数据_02


并发-框架[老的,有时间我重新整理一下]_数据_03


然后继续左右两个子表拆分:

并发-框架[老的,有时间我重新整理一下]_初始化_04


对最后的拆分的子表,两两进行排序

并发-框架[老的,有时间我重新整理一下]_初始化_05


对有序的子表进行排序和比较合并

并发-框架[老的,有时间我重新整理一下]_工作线程_06


对合并后的子表继续比较合并

并发-框架[老的,有时间我重新整理一下]_数据_07

Fork-Join原理
说白了就是把大任务拆分.每个小任务单独处理(交给线程去执行),处理完之后再一级一级的汇总,最后汇总成最终的结果

并发-框架[老的,有时间我重新整理一下]_工作线程_08

工作密取算法

即当前线程的Task已经全被执行完毕,则自动取到其他线程的Task池中取出Task继续执行。
ForkJoinPool中维护着多个线程(一般为CPU核数)在不断地执行Task,每个线程除了执行自己职务内的Task之外,还会根据自己工作线程的闲置情况去获取其他繁忙的工作线程的Task,如此一来就能能够减少线程阻塞或是闲置的时间,提高CPU利用率。

工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。那么,为什么需要使用工作窃取算法呢?
假如我们需要做一个比较大的任务,可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。比如A线程负责处理A队列里的任务。但是,有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

并发-框架[老的,有时间我重新整理一下]_数据_09

工作窃取算法的优点:充分利用线程进行并行计算,减少了线程间的竞争。
工作窃取算法的缺点:在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且该算法会消耗了更多的系统资源,比如创建多个线程和多个双端队列。

使用场景

Excel表的统计
sql统计(统计100万条记录,单线程一条一条的统计,然后发现很慢,后来拆分一万条一组去统计,效率就高了很多倍.)

还需要考虑线程上下文切换的时间问题 ,如果数据量少(几千个之内的)的话可能还不如单线程统计来的快. 但是数据量大的话,或者处理比较耗时,Fork-Join优势就起来了.

Fork/Join使用方式

我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork和join的操作机制
通常我们不直接继承ForkjoinTask类,只需要直接继承其子类。

  1. RecursiveAction,用于没有返回结果的任务
  2. RecursiveTask,用于有返回值的任务

task要通过ForkJoinPool来执行,使用submit 或 invoke 提交,两者的区别是:
1.invoke是同步执行,调用之后需要等待任务完成,才能执行后面的代码;
2.submit是异步执行。调用之后可以继续执行下面代码,不用等任务完成.

join()和get方法当任务完成的时候返回计算结果。

并发-框架[老的,有时间我重新整理一下]_数据_10

在我们自己实现的compute方法里,首先需要判断任务是否足够小,如果足够小就直接执行任务。如果不足够小,就必须分割成两个子任务,每个子任务在调用invokeAll方法时,又会进入compute方法,看看当前子任务是否需要继续分割成孙任务,如果不需要继续分割,则执行当前子任务并返回结果。使用join方法会等待子任务执行完并得到其结果。

CountDownLatch

代码演示 zjj_parentzjj_parent-eb4130da-0744-b59f-0beb-a89ea23fb8f8

CountDownLatch是一个同步的辅助类,它可以允许一个或多个线程等待,直到一组在其它线程中的操作执行完成。

一个CountDownLatch会通过一个给定的count数来被初始化。其中await()方法会一直阻塞,直到当前的count被减到0,而这个过程是通过调用countDown()方法来实现的。在await()方法不再阻塞以后,所有等待的线程都会被释放,并且任何await()的子调用都会立刻返回。

CountDownLatch是通过一个计数器来实现的,计数器的初始值为初始任务的数量。每当完成了一个任务后,计数器的值就会减1(CountDownLatch.countDown()方法)。当计数器值到达0时,它表示所有的已经完成了任务,然后在闭锁上等待CountDownLatch.await()方法的线程就可以恢复执行任务。

需要注意,CountDownLatch计数器必须大于等于0,只有等于0的时候,计数器就是0,调用await方法时不会阻塞当前线程

应用场景

启动框架,框架有所谓的主线程,框架肯定有很多初始化工作(连接数据库,读取配置文件等等),如果放在主线程里面去做,会影响框架启动的性能,我们把初始化工作放在初始化线程里面去工作.我希望主线程必须要等待初始化线程里面的所有初始化工作完成以后再去执行主线程的任务(相关代码),

实现最大的并行性:有时我们想同时启动多个线程,实现最大程度的并行性。例如,我们想测试一个单例类。如果我们创建一个初始计数为1的CountDownLatch,并让所有线程都在这个锁上等待,那么我们可以很轻松地完成测试。我们只需调用 一次countDown()方法就可以让所有的等待线程同时恢复执行。
开始执行前等待n个线程完成各自任务:例如应用程序启动类要确保在处理用户请求前,所有N个外部系统已经启动和运行了,例如处理excel中多个表单。

TW1 TW2主线程一直wait等待中, 当 Ta Tb Tc Td 四个子线程都完成工作调用 countDown()就是把初始化的CNT计数器减一,直到CNT计数器变为0的时候,TW1和TW2两个工作线程就开始唤醒了,就开始继续运行了.

并发-框架[老的,有时间我重新整理一下]_初始化_11

特别注意和API

特别注意

1.countDown计数器和线程数并不是一对一关系,而是计数器是可以远远大于初始化线程数的. 一个线程完全可以调用多次countDown()方法对CNT计数器减一操作

2.工作线程(初始化线程)当扣减完CNT计数器之后完全可以继续运行,并不是初始化线程做完CNT计数器归零之后就一定要关闭退出等等

  1. await()可以是多个线程等待

API

await() 能够阻塞线程 直到调用N次countDown() 方法等待countDownLatch计数器为0的时候才唤醒线程

countDown() 可以在多个线程中调用 计算调用次数是所有线程调用次数的总和

getCount() 获取当前的countDownLatch计数器.

CyclicBarrier

| 代码

并发-框架[老的,有时间我重新整理一下]_工作线程_12

zjj_parent_4ce9fd69-e237-0781-b348-39acb324f6da

CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。

CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties,Runnable barrierAction),用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。

CyclicBarrier可以用于多线程计算数据,最后合并计算结果的场景。

并发-框架[老的,有时间我重新整理一下]_初始化_13

await();方法是可以反复调用的, 调用一次之后 CyclicBarrier计数器就复位了.
而CountDownLatch 只能调用一次


使用场景

CyclicBarrier可以用于多线程计算数据,最后合并计算结果的场景。例如,用一个Excel保存了用户所有银行流水,每个Sheet保存一个账户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日均银行流水,最后,再用barrierAction用这些线程的计算结果,计算出整个Excel的日均银行流水.

CountDownLatch和CyclicBarrier辨析

CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置。所以CyclicBarrier能处理更为复杂的业务场景。例如,如果计算发生错误,可以重置计数器,并让线程重新执行一次。

CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得Cyclic-Barrier阻塞的线程数量。isBroken()方法用来了解阻塞的线程是否被中断。

CountDownLatch.await一般阻塞工作线程,所有的进行预备工作的线程执行countDown,而CyclicBarrier通过工作线程调用await从而自行阻塞,直到所有工作线程达到指定屏障,再大家一起往下走。
在控制多个线程同时运行上,CountDownLatch可以不限线程数量,而CyclicBarrier是固定线程数。
同时,CyclicBarrier还可以提供一个barrierAction,汇总合并多线程计算结果。

Semaphore

案例代码:zjj_SpringBoot_1f13d8d5-0bd1-e369-b316-f2416b5bbaeb

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
应用场景Semaphore可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接。

假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程并发地读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这时我们必须控制只有10个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。

这个时候,就可以使用Semaphore来做流量控制,特别是公用资源有限的应用场景,比如数据库连接。。。

Semaphore的构造方法Semaphore(int permits)接受一个整型的数字,表示可用的许可证数量。Semaphore的用法也很简单,首先线程使用Semaphore的acquire()方法获取一个许可证,使用完之后调用release()方法归还许可证。还可以用tryAcquire()方法尝试获取许可证。

Semaphore还提供一些其他方法,具体如下。
•intavailablePermits():返回此信号量中当前可用的许可证数。
•intgetQueueLength():返回正在等待获取许可证的线程数。
•booleanhasQueuedThreads():是否有线程正在等待获取许可证。
•void reducePermits(int reduction):减少reduction个许可证,是个protected方法。
•Collection getQueuedThreads():返回所有等待获取许可证的线程集合,是个protected方法。

并发-框架[老的,有时间我重新整理一下]_数据_14

在实现数据库连接池的时候一定要定义两个Semaphore,不然会出现隐藏bug.

Exchange

案例:zjj_SpringBoot_c741f758-0067-d8bb-2573-df5f6df28ddf

主要是两个线程的数据交换,如果超过两个线程都不行

Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。

数据交换的过程中是线程安全的.

并发-框架[老的,有时间我重新整理一下]_工作线程_15

Callable、Future和FutureTask

演示代码 zjj_parent_f6d25231-2f62-44c3-2bff-3264364ab946

在实际开发中Runnable等等线程的run方法是没有返回值结果的,如果我需要拿到线程执行完的结果,就需要Callable方法了.

Callable位于java.util.concurrent包下,它也是一个接口,在它里面也只声明了一个方法,只不过这个方法叫做call(),这是一个泛型接口,call()函数返回的类型就是传递进来的V类型。

1.Future

Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。

它声明这样的五个方法:

当FutureTask处于未启动状态时,执行FutureTask.cancel()方法将导致此任务永远不会被执行;当FutureTask处于已启动状态时,执行FutureTask.cancel(true)方法将以中断执行此任务线程的方式来试图停止任务;当FutureTask处于已启动状态时,执行FutureTask.cancel(false)方法将不会对正在执行此任务的线程产生影响(让正在执行的任务运行完成);当FutureTask处于已完成状态时,执行FutureTask.cancel(…)方法将返回false。

isCancelled方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。
isDone方法表示任务是否已经完成,若任务完成,则返回true;
get()方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;
get(long timeout, TimeUnit unit)用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。同时会抛异常出来.

也就是说Future提供了三种功能:
判断任务是否完成;
能够中断任务;
能够获取任务执行结果。

2.FutureTask

因为Future只是一个接口,所以是无法直接用来创建对象使用的,因此就有了下面的FutureTask。

并发-框架[老的,有时间我重新整理一下]_初始化_16


并发-框架[老的,有时间我重新整理一下]_数据_17

FutureTask类实现了RunnableFuture接口,RunnableFuture继承了Runnable接口和Future接口,而FutureTask实现了RunnableFuture接口。所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。

并发-框架[老的,有时间我重新整理一下]_初始化_18


因此我们通过一个线程运行Callable,但是Thread不支持构造方法中传递Callable的实例,所以我们需要通过FutureTask把一个Callable包装成Runnable,然后再通过这个FutureTask拿到Callable运行后的返回值。

要new一个FutureTask的实例,有两种方法

并发-框架[老的,有时间我重新整理一下]_初始化_19

/Task实现了Callable接口/

FutureTask futureTask = **new **FutureTask<>(**new **Task());

**new **Thread(futureTask).start();

三种状态

1)未启动。FutureTask.run()方法还没有被执行之前,FutureTask处于未启动状态。当创建一个FutureTask,且没有执行FutureTask.run()方法之前,这个FutureTask处于未启动状态。
2)已启动。FutureTask.run()方法被执行的过程中,FutureTask处于已启动状态。
3)已完成。FutureTask.run()方法执行完后正常结束,或被取消(FutureTask.cancel(…)),或执行FutureTask.run()方法时抛出异常而异常结束,FutureTask处于已完成状态。

当FutureTask处于未启动或已启动状态时,执行FutureTask.get()方法将导致调用线程阻塞;
当FutureTask处于已完成状态时,执行FutureTask.get()方法将导致调用线程立即返回结果或抛出异常。
当FutureTask处于未启动状态时,执行FutureTask.cancel()方法将导致此任务永远不会被执行;当FutureTask处于已启动状态时,执行FutureTask.cancel(true)方法将以中断执行此任务线程的方式来试图停止任务;当FutureTask处于已启动状态时,执行FutureTask.cancel(false)方法将不会对正在执行此任务的线程产生影响(让正在执行的任务运行完成);当FutureTask处于已完
成状态时,执行FutureTask.cancel(…)方法将返回false。

CompletableFuture

案例:ZJJ_JavaBasic_2020/02/09_11:43:44_6iz60

(一)Future的不足

在JDK1.8以前,通过调用线程池的submit方法可以让任务以异步的方式运行,该方法会返回一个Future对象.

Future是Java 5添加的类,用来描述一个异步计算的结果。你可以使用isDone方法检查计算是否完成,或者使用get阻塞住调用线程,直到计算完成返回结果,你也可以使用cancel方法停止任务的执行。

虽然Future以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,而且也不能及时地得到计算结果,为什么不能用观察者设计模式当计算结果完成及时通知监听者呢?

Future缺点:

  1. 结果的获取不方便(要么调用get方法进入阻塞,要不轮询获取任务的结果.)
  2. 很难直接表述多个Future结果的依赖性(之前都是用CompletionService )

(二)CompletableFuture的使用

JDK1.8才新加入的一个实现类CompletableFuture,实现了Future, CompletionStage两个接口。实现了Future接口,意味着可以像以前一样通过阻塞或者轮询的方式获得结果。

CompletableFuture的应用场景
存在IO密集型的任务可以选择CompletableFuture,IO部分交由另外一个线程去执行。Logback、Log4j2异步日志记录的实现原理就是新起了一个线程去执行IO操作,这部分可以以CompletableFuture.runAsync(()->{ioOperation();})的方式去调用,有关Logback异步日志记录的原理可以参考这篇文章Logback异步日志记录。如果是CPU密集型就不推荐使用了推荐使用并行流.

除了直接new出一个CompletableFuture的实例,还可以通过工厂方法创建CompletableFuture的实例

工厂方法:

| static ​CompletableFuture​​<​Void​​> | ​runAsync​​(​Runnable​ runnable)

返回一个新的CompletableFuture,它在运行给定操作后由运行在 ForkJoinPool.commonPool()中的任务 ​异步​完成。

static ​CompletableFuture​​<​Void​>

返回一个新的CompletableFuture,它在运行给定操作之后由在给定执行程序中运行的任务异步完成。

static CompletableFuture

返回一个新的CompletableFuture,它通过在 ForkJoinPool.commonPool()中运行的任务与通过调用给定的供应商获得的值 ​异步​完成。

static CompletableFuture

返回一个新的CompletableFuture,由给定执行器中运行的任务异步完成,并通过调用给定的供应商获得的值。

Asynsc表示异步,而supplyAsync与runAsync不同在与前者异步返回一个结果,后者是void.第二个函数第二个参数表示是用我们自己创建的线程池,否则采用默认的ForkJoinPool.commonPool()作为它的线程池。

获得结果的方法
public T get()
public T get(long timeout, TimeUnit unit)
public T getNow(T valueIfAbsent)
public T join()
getNow有点特殊,如果结果已经计算完则返回结果或者抛出异常,否则返回给定的valueIfAbsent值。
join返回计算的结果或者抛出一个unchecked异常(CompletionException),它和get对抛出的异常的处理有些细微的区别。


辅助方法

| static ​CompletableFuture​​<​Void​​> | ​allOf​​(​CompletableFuture​<?>… cfs)

返回一个新的CompletableFuture,当所有给定的CompletableFutures完成时,完成。

static ​CompletableFuture​​<​Object​>

返回一个新的CompletableFuture,当任何一个给定的CompletableFutures完成时,完成相同的结果。

allOf方法是当所有的CompletableFuture都执行完后执行计算。
anyOf方法是当任意一个CompletableFuture执行完后就会执行计算,计算的结果相同。

zjj_parent_2019/09/20_14:51:16_zqzz9pyp7gnal4bztiobvmn5uksbhg

CompletionStage是一个接口,从命名上看得知是一个完成的阶段,它代表了一个特定的计算的阶段,可以同步或者异步的被完成。你可以把它看成一个计算流水线上的一个单元,并最终会产生一个最终结果,这意味着几个CompletionStage可以串联起来,一个完成的阶段可以触发下一阶段的执行,接着触发下一次,再接着触发下一次,……….。
总结CompletableFuture几个关键点:
1、计算可以由 Future ,Consumer 或者 Runnable 接口中的 apply,accept 或者 run等方法表示。
2、计算的执行主要有以下
a. 默认执行
b. 使用默认的CompletionStage的异步执行提供者异步执行。这些方法名使用someActionAsync这种格式表示。
c. 使用 Executor 提供者异步执行。这些方法同样也是someActionAsync这种格式,但是会增加一个Executor 参数。
CompletableFuture里大约有五十种方法,但是可以进行归类,

变换类 thenApply:

| CompletableFuture | thenApply(Function<? super T,? extends U> fn)

返回一个新的CompletionStage,当此阶段正常完成时,将以该阶段的结果作为所提供函数的参数执行。

CompletableFuture

返回一个新的CompletionStage,当该阶段正常完成时,将使用此阶段的默认异步执行工具执行此阶段的结果作为所提供函数的参数。

CompletableFuture

返回一个新的CompletionStage,当此阶段正常完成时,将使用提供的执行程序执行此阶段的结果作为提供函数的参数。

关键入参是函数式接口Function。它的入参是上一个阶段计算后的结果,返回值是经过转化后结果。
消费类 thenAccept:

| ​CompletableFuture​​<​Void​​> | ​thenAccept​​(​Consumer​​<? super ​T​> action)

返回一个新的CompletionStage,当此阶段正常完成时,将以该阶段的结果作为提供的操作的参数执行。

CompletableFuture​​<​Void​>

返回一个新的CompletionStage,当此阶段正常完成时,将使用此阶段的默认异步执行工具执行,此阶段的结果作为提供的操作的参数。

CompletableFuture​​<​Void​>

返回一个新的CompletionStage,当此阶段正常完成时,将使用提供的执行程序执行此阶段的结果作为提供的操作的参数。

关键入参是函数式接口Consumer。它的入参是上一个阶段计算后的结果, 没有返回值。

执行操作类 thenRun:

| | ​CompletableFuture​​<​Void​​> | ​thenRun​​(​Runnable​ action)

返回一个新的CompletionStage,当此阶段正常完成时,执行给定的操作。

CompletableFuture​​<​Void​>

返回一个新的CompletionStage,当此阶段正常完成时,使用此阶段的默认异步执行工具执行给定的操作。

CompletableFuture​​<​Void​>

返回一个新的CompletionStage,当此阶段正常完成时,使用提供的执行程序执行给定的操作。

对上一步的计算结果不关心,执行下一个操作,入参是一个Runnable的实例,表示上一步完成后执行的操作。
结合转化类:

| | <U,V> ​CompletableFuture​​ | ​thenCombine​​(​CompletionStage​​<? extends U> other, ​BiFunction​​<? super ​T​,? super U,? extends V> fn)

返回一个新的CompletionStage,当这个和另一个给定的阶段都正常完成时,两个结果作为提供函数的参数执行。

<U,V> ​CompletableFuture

返回一个新的CompletionStage,当这个和另一个给定阶段正常完成时,将使用此阶段的默认异步执行工具执行,其中两个结果作为提供函数的参数。

<U,V> ​CompletableFuture

返回一个新的CompletionStage,当这个和另一个给定阶段正常完成时,使用提供的执行器执行,其中两个结果作为提供的函数的参数

需要上一步的处理返回值,并且other代表的CompletionStage 有返回值之后,利用这两个返回值,进行转换后返回指定类型的值。
两个CompletionStage是并行执行的,它们之间并没有先后依赖顺序,other并不会等待先前的CompletableFuture执行完毕后再执行。
结合转化类

| | CompletableFuture | thenCompose(Function<? super T,? extends CompletionStage> fn)

返回一个新的CompletionStage,当这个阶段正常完成时,这个阶段将作为提供函数的参数执行。

CompletableFuture

返回一个新的CompletionStage,当此阶段正常完成时,将使用此阶段的默认异步执行工具执行,此阶段作为提供的函数的参数。

CompletableFuture

返回一个新的CompletionStage,当此阶段正常完成时,将使用提供的执行程序执行此阶段的结果作为提供函数的参数。

对于Compose可以连接两个CompletableFuture,其内部处理逻辑是当第一个CompletableFuture处理没有完成时会合并成一个CompletableFuture,如果处理完成,第二个future会紧接上一个CompletableFuture进行处理。
第一个CompletableFuture 的处理结果是第二个future需要的输入参数。

结合消费类:

| | CompletableFuture<Void> | thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)

返回一个新的CompletionStage,当这个和另一个给定的阶段都正常完成时,两个结果作为提供的操作的参数被执行。

CompletableFuture​<Void>

返回一个新的CompletionStage,当这个和另一个给定阶段正常完成时,将使用此阶段的默认异步执行工具执行,其中两个结果作为提供的操作的参数。

CompletableFuture​<Void>

返回一个新的CompletionStage,当这个和另一个给定阶段正常完成时,使用提供的执行器执行,其中两个结果作为提供的函数的参数。

需要上一步的处理返回值,并且other代表的CompletionStage 有返回值之后,利用这两个返回值,进行消费

运行后执行类:

| | ​CompletableFuture​​<​Void​​> | ​runAfterBoth​​(​CompletionStage​​<?> other, ​Runnable​ action)

返回一个新的CompletionStage,当这个和另一个给定的阶段都正常完成时,执行给定的动作。

CompletableFuture​​<​Void​>

返回一个新的CompletionStage,当这个和另一个给定阶段正常完成时,使用此阶段的默认异步执行工具执行给定的操作。

CompletableFuture​​<​Void​>

返回一个新CompletionStage,当这和其他特定阶段正常完成,使用附带的执行见执行给定的动作​CompletionStage​覆盖特殊的完成规则的文档。

不关心这两个CompletionStage的结果,只关心这两个CompletionStage都执行完毕,之后再进行操作(Runnable)。
取最快转换类:

| | CompletableFuture | applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)

返回一个新的CompletionStage,当这个或另一个给定阶段正常完成时,执行相应的结果作为提供的函数的参数。

CompletableFuture

返回一个新的CompletionStage,当这个或另一个给定阶段正常完成时,将使用此阶段的默认异步执行工具执行,其中相应的结果作为提供函数的参数。

CompletableFuture

返回一个新的CompletionStage,当这个或另一个给定阶段正常完成时,将使用提供的执行器执行,其中相应的结果作为参数提供给函数。

两个CompletionStage,谁计算的快,我就用那个CompletionStage的结果进行下一步的转化操作。现实开发场景中,总会碰到有两种渠道完成同一个事情,所以就可以调用这个方法,找一个最快的结果进行处理。

取最快消费类:

| | ​CompletableFuture​​<​Void​​> | ​acceptEither​​(​CompletionStage​​<? extends ​T​​> other, ​Consumer​​<? super ​T​> action)

返回一个新的CompletionStage,当这个或另一个给定阶段正常完成时,执行相应的结果作为提供的操作的参数。

CompletableFuture​​<​Void​>

返回一个新的CompletionStage,当这个或另一个给定阶段正常完成时,将使用此阶段的默认异步执行工具执行,其中相应的结果作为提供的操作的参数。

CompletableFuture​​<​Void​>

返回一个新的CompletionStage,当这个或另一个给定阶段正常完成时,将使用提供的执行器执行,其中相应的结果作为参数提供给函数。

两个CompletionStage,谁计算的快,我就用那个CompletionStage的结果进行下一步的消费操作。


取最快运行后执行类:

| | ​CompletableFuture​​<​Void​​> | ​runAfterEither​​(​CompletionStage​​<?> other, ​Runnable​ action)

返回一个新的CompletionStage,当这个或另一个给定阶段正常完成时,执行给定的操作。

CompletableFuture​​<​Void​>

返回一个新的CompletionStage,当这个或另一个给定阶段正常完成时,使用此阶段的默认异步执行工具执行给定的操作。

CompletableFuture​​<​Void​>

返回一个新的CompletionStage,当这个或另一个给定阶段正常完成时,使用提供的执行器执行给定的操作。

两个CompletionStage,任何一个完成了都会执行下一步的操作(Runnable)。

异常补偿类:

并发-框架[老的,有时间我重新整理一下]_初始化_20


当运行时出现了异常,可以通过exceptionally进行补偿。

运行后记录结果类:

| | ​CompletableFuture​​<​T​​> | ​whenComplete​​(​BiConsumer​​<? super ​T​​,? super ​Throwable​> action)

返回与此阶段相同的结果或异常的新的CompletionStage,当此阶段完成时,使用结果(或 null如果没有))和此阶段的异常(或 null如果没有))执行给定的操作。

CompletableFuture​​<​T​>

返回一个与此阶段相同结果或异常的新CompletionStage,当此阶段完成时,执行给定操作将使用此阶段的默认异步执行工具执行给定操作,结果(或 null如果没有))和异常(或 null如果没有)这个阶段作为参数。

CompletableFuture​​<​T​>

返回与此阶段相同的结果或异常的新的CompletionStage,当此阶段完成时,使用提供的执行者执行给定的操作,如果没有,则使用结果(或 null如果没有))和异常(或 null如果没有))作为论据。

action执行完毕后它的结果返回原始的CompletableFuture的计算结果或者返回异常。所以不会对结果产生任何的作用。
运行后处理结果类:

| | CompletableFuture | handle(BiFunction<? super T,Throwable,? extends U> fn)

返回一个新的CompletionStage,当此阶段正常或异常完成时,将使用此阶段的结果和异常作为所提供函数的参数执行。

CompletableFuture

返回一个新的CompletionStage,当该阶段完成正常或异常时,将使用此阶段的默认异步执行工具执行,此阶段的结果和异常作为提供函数的参数。

CompletableFuture

返回一个新的CompletionStage,当此阶段完成正常或异常时,将使用提供的执行程序执行此阶段的结果和异常作为提供的函数的参数。

运行完成时,对结果的处理。这里的完成时有两种情况,一种是正常执行,返回值。另外一种是遇到异常抛出造成程序的中断。


标签:返回,框架,并发,CompletionStage,重新整理,CompletableFuture,FutureTask,线程,执行
From: https://blog.51cto.com/u_14861909/5976387

相关文章