首页 > 编程语言 >CompletionService 源码解析

CompletionService 源码解析

时间:2022-11-04 14:06:57浏览次数:70  
标签:异步 执行 CompletionService task 任务 源码 executor 解析 completionQueue


​CompletionService​​​的主要作用是:按照异步任务的完成顺序,逐个获取到已经完成的异步任务。主要实现是在​​ExecutorCompletionService​​中。

类图

CompletionService 源码解析_异步任务

核心内部类

private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}

在​​CompletionService​​​的实现中,将任务​​FutureTask​​​做了扩展,实现了​​FutureTask​​​的​​done​​方法。当任务完成后会回调这个方法,这时我们在这个方法中将完成的任务放到队列中,就实现了按照异步任务完成的顺序,逐个处理任务的结果了。

核心属性

// 执行任务的线程池
private final Executor executor;
// 存放已完成的异步任务的阻塞队列,默认使用 LinkedBlockingQueue
private final BlockingQueue<Future<V>> completionQueue;

构造函数

public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}

public ExecutorCompletionService(Executor executor,
BlockingQueue<Future<V>> completionQueue) {
if (executor == null || completionQueue == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = completionQueue;
}

在构造函数中我们至少需要传入一个​​Executor​​​线程池的实现来执行异步任务,但是建议再传入一个阻塞队列,默认的​​LinkedBlockingQueue​​是一个无界队列,有内存溢出的风险。

submit 提交任务

public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture(f));
return f;
}

我们可以看到,在提交任务给线程池之前,我们会将任务封装成​​QueueingFuture​​​任务。当该任务执行完成后会回调执行​​done​​方法,将任务放到队列。

获取已完成的任务

public Future<V> take() throws InterruptedException {
return completionQueue.take();
}

public Future<V> poll() {
return completionQueue.poll();
}
  • ​take​​:如果没有任务,一直阻塞,直到有新任务进来
  • ​poll​​:如果没有任务返回NULL

示例

public class CompletionServiceTest {

@Test
public void test() throws ExecutionException, InterruptedException {
Random random = new Random();
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletionService<Integer> completionService = new ExecutorCompletionService<>(executor, new LinkedBlockingQueue<>(10));
for (int i = 0; i < 8; i++) {
completionService.submit(() -> {
int time = random.nextInt(1000);
sleep(time);
System.out.println(Thread.currentThread().getName() + " 执行异步任务执行耗时: " + time);
return time;
});
}

while (true) {
System.out.println(Thread.currentThread().getName() + " 主线程获取到任务结果 " + completionService.take().get());
}
}


public static void sleep(int probe) {
try {
Thread.sleep(probe);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
pool-1-thread-7 执行异步任务执行耗时: 153
main 主线程获取到任务结果 153
pool-1-thread-5 执行异步任务执行耗时: 208
main 主线程获取到任务结果 208
pool-1-thread-4 执行异步任务执行耗时: 242
main 主线程获取到任务结果 242
pool-1-thread-8 执行异步任务执行耗时: 456
main 主线程获取到任务结果 456
pool-1-thread-1 执行异步任务执行耗时: 567
main 主线程获取到任务结果 567
pool-1-thread-2 执行异步任务执行耗时: 782
main 主线程获取到任务结果 782
pool-1-thread-6 执行异步任务执行耗时: 796
main 主线程获取到任务结果 796
pool-1-thread-3 执行异步任务执行耗时: 976
main 主线程获取到任务结果 976

我的是8核机器,所以任务的结束时间一定会按照任务的结束时间排序。

源码

​https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases​

spring-boot-student-concurrent 工程

layering-cache

为监控而生的多级缓存框架 layering-cache这是我开源的一个多级缓存框架的实现,如果有兴趣可以看一下

标签:异步,执行,CompletionService,task,任务,源码,executor,解析,completionQueue
From: https://blog.51cto.com/u_15861563/5823705

相关文章

  • JAVA并发容器-ConcurrentHashMap 1.7和1.8 源码解析
    HashMap是一个线程不安全的类,在并发情况下会产生很多问题,详情可以参考​​HashMap源码解析​​;HashTable是线程安全的类,但是它使用的是synchronized来保证线程安全,线程竞争......
  • HashMap 源码解析
    源码学习,边看源码边加注释,边debug,边理解。基本属性常量DEFAULT_INITIAL_CAPACITY:默认数组的初始容量-必须是2的幂。MAXIMUM_CAPACITY:数组的最大容量DEFAULT_LOAD_FACTOR:哈......
  • s-sgdisk源码分析 “--set-alignment=value分区对齐参数”
    文章目录​​边界对齐子命令使用​​​​源码分析​​​​sgdisk.ccmain函数入口​​​​gptcl.ccDoOptions解析并执行具体命令函数​​​​gpt.ccCreatePartition创建分......
  • centos下将vim配置为强大的源码阅读器
    每日杂事缠身,让自己在不断得烦扰之后终于有了自己的清静时光来熟悉一下我的工具,每次熟悉源码都需要先在windows端改好,拖到linux端,再编译。出现问题,还得重新回到windows端,这......
  • WCNSS_qcom_cfg.ini WIFI配置文件解析
    STA相关的一般配置gChannelBondingMode5GHz=1gChannelBondingMode24GHz=0//通道绑定gStaKeepAlivePeriod=30//使用非零周期值启用保持活动状态gVhtMpduLen=2......
  • 如何正确学习vue3.0源码
    为什么要学源码技术是第一生产力学习API的设计目的、思路、取舍学习优秀的代码风格学习组织代码的方式学习实现方法的技巧学习ES67新API、TS高级用法不给自......
  • 上帝视角看Vue源码整体架构+相关源码问答
    前言这段时间利用课余时间夹杂了很多很多事把Vue2源码学习了一遍,但很多都是跟着视频大概过了一遍,也都画了自己的思维导图。但还是对详情的感念模糊不清,故这段时间对源码......
  • qt输出自定义的pdf文件源码详解
    qt中有两种方式可以输出pdf:方式1:使用QPrinter即打印机的方式打印pdf这种方式,在qt4成为唯一的方式。QPrinterprinter(QPrinter::HighResolution);//高清晰度printer.set......
  • 设计模式:责任链模式的应用场景及源码应用
    一、概述责任链模式(ChainofResponsibilityPattern)是将链中每一个节点看作是一个对象,每个节点处理的请求均不同,且内部自动维护一个下一节点对象。当一个请求从链式的首......
  • SpringMVC源码-创建RequestMappingHandlerAdapter
    一、RequestMappingHandlerAdapterRequestMappingHandlerAdapter所属BeanDifinition的属性。RequestMappingHandlerAdapter是将当前请求适配到@RequestMapping类型的Ha......