首页 > 编程语言 >Java多线程 CompletionService和ExecutorCompletionService

Java多线程 CompletionService和ExecutorCompletionService

时间:2022-11-11 10:34:11浏览次数:46  
标签:Java Thread CompletionService Callable Future 线程 new 多线程 public


目录

  • ​​一、说明​​
  • ​​二、理解​​
  • ​​三、实现​​
  • ​​1.使用Future​​
  • ​​2.使用ExecutorCompletionService​​
  • ​​3.take()方法​​
  • ​​4.poll()方法​​
  • ​​5.poll(long timeout, TimeUnit unit)方法​​

一、说明

Future的不足

  • 当通过 ​​.get()​​ 方法获取线程的返回值时,会导致阻塞
  • 也就是和当前这个Future关联的计算任务真正执行完成的时候才返回结果
  • 新任务必须等待已完成任务的结果才能继续进行处理,会浪费很多时间,最好是谁最先执行完成谁最先返回

CompletionService的引入

  • 解决阻塞的问题
  • 以异步的方式一边处理新的线程任务,一边处理已完成任务的结果,将执行任务与处理任务分开进行处理

二、理解

CompletionService

  • ​java.util.concurrent​​​包下​​CompletionService<V>​​​接口,但并不继承​​Executor​​​接口,仅有一个实现类​​ExecutorCompletionService​​用于管理线程对象
  • 更加有效地处理Future的返回值,避免阻塞,使用​​.submit()​​​方法执行任务,使用​​.take()​​取得已完成的任务,并按照完成这些任务的时间顺序处理它们的结果
public interface CompletionService<V> {
Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
Future<V> take() throws InterruptedException;
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}
  • ​submit()​​方法用来执行线程任务
  • ​take()​​​方法从队列中获取完成任务的Future对象,谁最先执行完成谁最先返回,获取到的对象再调用​​.get()​​方法获取结果
  • ​poll()​​方法获取并删除代表下一个已完成任务的 Future,如果不存在,则返回null,此无阻塞的效果
  • ​poll(long timeout, TimeUnit unti)​​ timeout表示等待的最长时间,unit表示时间单位,在指定时间内还没获取到结果,则返回null

ExecutorCompletionService

  • ​java.util.concurrent​​​包下​​ExecutorCompletionService<V>​​​类实现​​CompletionService<V>​​接口,方法与接口相同
  • 比​​ExecutorService​​可以更精确和简便地完成异步任务的执行
  • ​executor​​​执行任务,​​completionQueue​​保存异步任务执行的结果
public class ExecutorCompletionService<V> implements CompletionService<V> {
private final Executor executor;
private final AbstractExecutorService aes;
private final BlockingQueue<Future<V>> completionQueue;
……
Future<V> submit(Callable<V> task)
Future<V> submit(Runnable task, V result)
Future<V> take() throws InterruptedException
Future<V> poll()
Future<V> poll(long timeout, TimeUnit unit)
……
}
  • ​completionQueue​​​初始化了一个​​LinkedBlockingQueue​​类型的先进先出阻塞队列
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
  • ​submit()​​​方法中​​QueueingFuture​​​是​​ExecutorCompletionService​​中的内部类
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture<V>(f, completionQueue));
return f;
}
  • ​QueueingFuture​​​将​​RunnableFuture​​​实例对象赋值给了​​task​​​,内部的​​done()​​​方法将​​task​​​添加到已完成阻塞队列中,调用​​take()​​​或​​poll()​​方法获取已完成的Future
private static class QueueingFuture<V> extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task,
BlockingQueue<Future<V>> completionQueue) {
super(task, null);
this.task = task;
this.completionQueue = completionQueue;
}
private final Future<V> task;
private final BlockingQueue<Future<V>> completionQueue;
protected void done() { completionQueue.add(task); }
}

三、实现

1.使用Future

创建​​CompletionServiceDemo​​​类,创建好的线程对象,使用​​Executors​​​工厂类来创建​​ExecutorService​​​的实例(即线程池),通过​​ThreadPoolExecutor​​​的​​.submit()​​方法提交到线程池去执行,线程执行后,返回值Future可被拿到

public class CompletionServiceDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1.创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(5);

// 2.创建Callable子线程对象任务
Callable callable_1 = new Callable() {
@Override
public String call() throws Exception {
Thread.sleep(5000);
return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " );
}
};

Callable callable_2 = new Callable() {
@Override
public String call() throws Exception {
Thread.sleep(3000);
return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " );
}
};

Callable callable_3 = new Callable() {
@Override
public String call() throws Exception {
Thread.sleep(1000);
return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " );
}
};

// 3.使用Future提交三个任务到线程池
Future future_1 = executorService.submit(callable_1);
Future future_2 = executorService.submit(callable_2);
Future future_3 = executorService.submit(callable_3);

// 4.获取返回值
System.out.println("开始获取结果 " + getStringDate());
System.out.println(future_1.get() + "" + getStringDate());
System.out.println(future_2.get() + "" + getStringDate());
System.out.println(future_3.get() + "" + getStringDate());
System.out.println("结束 " + getStringDate());

// 5.关闭线程池
executorService.shutdown();
}

// 获取时间函数
public static String getStringDate() {
Date currentTime = new Date();
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH:mm:ss.SSS");
String date = simpleDateFormat.format(currentTime);
return date;
}
}

​future_1.get()​​​会等待执行时间阻塞5秒再获取到结果,而在这5秒内​​future_2​​​和​​future_3​​的任务已完成,所以会立马得到结果

Java多线程 CompletionService和ExecutorCompletionService_ide

2.使用ExecutorCompletionService

创建一个​​ExecutorCompletionService​​​放入线程池实现​​CompletionService​​​接口,将创建好的线程对象通过​​CompletionService​​提交任务和获取结果

public class CompletionServiceDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1.创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(5);

// 2.创建一个ExecutorCompletionService放入线程池实现CompletionService接口
CompletionService completionService = new ExecutorCompletionService(executorService);

// 3.创建Callable子线程对象任务
Callable callable_1 = new Callable() {
@Override
public String call() throws Exception {
Thread.sleep(5000);
return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " );
}
};

Callable callable_2 = new Callable() {
@Override
public String call() throws Exception {
Thread.sleep(3000);
return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " );
}
};

Callable callable_3 = new Callable() {
@Override
public String call() throws Exception {
Thread.sleep(1000);
return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " );
}
};

// 3.使用CompletionService提交三个任务到线程池
completionService.submit(callable_1);
completionService.submit(callable_2);
completionService.submit(callable_3);

// 4.获取返回值
System.out.println("开始获取结果 " + getStringDate());
System.out.println(completionService.take().get() + "" + getStringDate());
System.out.println(completionService.take().get() + "" + getStringDate());
System.out.println(completionService.take().get() + "" + getStringDate());
System.out.println("结束 " + getStringDate());

// 5.关闭线程池
executorService.shutdown();
}

// 获取时间函数
public static String getStringDate() {
Date currentTime = new Date();
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH:mm:ss.SSS");
String date = simpleDateFormat.format(currentTime);
return date;
}
}

提交顺序是1-2-3,按照完成这些任务的时间顺序处理它们的结果,返回顺序是3-2-1

Java多线程 CompletionService和ExecutorCompletionService_java_02

3.take()方法

​take()​​​方法从队列中获取完成任务的Future对象,会阻塞,一直等待线程池中返回一个结果,谁最先执行完成谁最先返回,获取到的对象再调用​​.get()​​方法获取结果

如果调用​​take()​​方法的次数大于任务数,会因为等不到有任务返回结果而阻塞,只有三个任务,第四次take等不到结果而阻塞

Java多线程 CompletionService和ExecutorCompletionService_线程池_03

4.poll()方法

​poll()​​方法不会去等结果造成阻塞,没有结果则返回null,接着程序继续往下运行

直接用​​completionService.poll().get()​​​会引发 ​​NullPointerException​

Java多线程 CompletionService和ExecutorCompletionService_java_04


创建一个循环,连续调用​​poll()​​方法,每次隔1秒调用,没有结果则返回null

Java多线程 CompletionService和ExecutorCompletionService_线程池_05

public class CompletionServiceDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1.创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(5);
// 2.创建一个ExecutorCompletionService放入线程池实现CompletionService接口
CompletionService completionService = new ExecutorCompletionService(executorService);
// 3.创建Callable子线程对象任务
Callable callable_1 = new Callable() {
@Override
public String call() throws Exception {
Thread.sleep(5000);
return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " );
}
};

Callable callable_2 = new Callable() {
@Override
public String call() throws Exception {
Thread.sleep(3000);
return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " );
}
};

Callable callable_3 = new Callable() {
@Override
public String call() throws Exception {
Thread.sleep(1000);
return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " );
}
};

// 3.使用CompletionService提交三个任务到线程池
completionService.submit(callable_1);
completionService.submit(callable_2);
completionService.submit(callable_3);

// 4.获取返回值
System.out.println("开始获取结果 " + getStringDate());

// 5.创建一个循环,连续调用poll()方法,间隔1秒
for (int i = 0; i < 8; i++) {
Future future = completionService.poll();
if (future!=null){
System.out.println(future.get() + getStringDate());
}else {
System.out.println(future+" "+getStringDate());
}
Thread.sleep(1000);
}
System.out.println("结束 " + getStringDate());

// 6.关闭线程池
executorService.shutdown();
}

// 获取时间函数
public static String getStringDate() {
Date currentTime = new Date();
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH:mm:ss.SSS");
String date = simpleDateFormat.format(currentTime);
return date;
}
}

5.poll(long timeout, TimeUnit unit)方法

​poll(long timeout, TimeUnit unit)​​方法设置了等待时间,等待超时还没有结果就返回null

不使用 ​​Thread.sleep(1000)​​​,将等待时间设置成0.5秒,由于只有8次循环,也就是4秒执行时间,而​​callable_1​​需要执行5秒,获取不到结果则返回null

Java多线程 CompletionService和ExecutorCompletionService_ide_06

public class CompletionServiceDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1.创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(5);
// 2.创建一个ExecutorCompletionService放入线程池实现CompletionService接口
CompletionService completionService = new ExecutorCompletionService(executorService);
// 3.创建Callable子线程对象任务
Callable callable_1 = new Callable() {
@Override
public String call() throws Exception {
Thread.sleep(5000);
return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " );
}
};

Callable callable_2 = new Callable() {
@Override
public String call() throws Exception {
Thread.sleep(3000);
return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " );
}
};

Callable callable_3 = new Callable() {
@Override
public String call() throws Exception {
Thread.sleep(1000);
return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " );
}
};

// 3.使用CompletionService提交三个任务到线程池
completionService.submit(callable_1);
completionService.submit(callable_2);
completionService.submit(callable_3);

// 4.获取返回值
System.out.println("开始获取结果 " + getStringDate());

// 5.创建一个循环,连续调用poll()方法,间隔1秒
for (int i = 0; i < 8; i++) {
Future future = completionService.poll(500, TimeUnit.MILLISECONDS);
if (future!=null){
System.out.println(future.get() + getStringDate());
}else {
System.out.println(future+" "+getStringDate());
}
}
System.out.println("结束 " + getStringDate());

// 6.关闭线程池
executorService.shutdown();
}

// 获取时间函数
public static String getStringDate() {
Date currentTime = new Date();
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH:mm:ss.SSS");
String date = simpleDateFormat.format(currentTime);
return date;
}
}


标签:Java,Thread,CompletionService,Callable,Future,线程,new,多线程,public
From: https://blog.51cto.com/u_15872973/5843050

相关文章

  • 5:SpringBoot-Actuator-Java Spring
    目录​​5.1SpringBoot-Actuator介绍​​​​5.2Endpoints介绍​​​​5.3Actuator原理​​​​5.4Actuator依赖引入​​5.1SpringBoot-Actuator介绍Actuator是Spring......
  • Java Lambda 表达式
    目录​​一、说明​​​​二、理解​​​​三、演示​​​​1.常规方法实现​​​​2.静态内部类​​​​3.局部内部类​​​​4.匿名内部类​​​​5.Lambda表达式​​​​......
  • Java多线程 Callable和Future
    目录​​一、说明​​​​二、理解​​​​三、实现​​​​1.实现接口​​​​2.执行线程​​一、说明Java提供了三种创建线程的方法实现​​Runnable​​接口继承​​T......
  • Java多线程 Future和FutureTask的区别
    目录​​一、说明​​​​二、理解​​​​三、实现​​​​1.实现接口​​​​2.使用Future​​​​3.使用FutureTask​​一、说明Future和FutureTask的关系Future是一个......
  • Java多线程 ThreadPoolExecutor-RejectedExecutionHandler拒绝执行策略
    目录​​一、说明​​​​二、理解​​​​三、实现​​​​1.AbortPolicy​​​​2.DiscardPolicy​​​​3.DiscardOldestPolicy​​​​4.CallerRunsPolicy​​​​5.自......
  • Java多线程 线程池Executor框架
    目录​​一、说明​​​​二、理解​​​​Executor​​​​ExecutorService​​​​Executors​​​​三、实现​​​​1.newSingleThreadExecutor​​​​2.newFixedThr......
  • 8:Spring MVC-Java Spring
    目录​​8.1WEB开发模式一​​​​8.2WEB开发模式二​​​​8.3SpringMVC介绍​​​​8.4SpringMVC主要组件​​​​8.5SpringMVC处理流程​​​​8.6SpringMVC的......
  • Java Web项目中使用RSA加密数据
    在Web项目中有些时候需要对传输的数据加密后再传输到服务端进行解密使用,这里采用RSA进行公钥加密私钥解密的模式会有较高的安全性。这里选用的工具库是 JSEncrypt.js ......
  • 硬核剖析Java锁底层AQS源码,深入理解底层架构设计
    我们常见的并发锁ReentrantLock、CountDownLatch、Semaphore、CyclicBarrier都是基于AQS实现的,所以说不懂AQS实现原理的,就不能说了解Java锁。上篇文章讲了AQS的加锁流程,这......
  • Java并发编程——基础知识(一)
    1.进程与线程1.1基本概念进程:对运行时程序的封装,是系统进行资源调度和分配的的基本单位,实现了操作系统的并发线程:进程的子任务,是CPU调度和分派的基本单位,用于保证程序......