目录
1 ExecutorCompletionService
1.1 简介
当我们向Executor
提交一组任务,并且希望任务在完成后获得结果,此时可以考虑使用ExecutorCompletionService
。
ExecutorCompletionService
实现了CompletionService
接口。ExecutorCompletionService
将Executor
和BlockingQueue
功能融合在一起,使用它可以提交我们的Callable
任务。这个任务委托给Executor
执行,可以使用ExecutorCompletionService
对象的take
和poll
方法获取结果。
ExecutorCompletionService
的设计目的在于提供一个可获取线程池执行结果的功能,这个类采用了装饰器模式,需要用户提供一个自定义的线程池,在ExecutorCompletionService
内部持有该线程池进行线程执行,在原有的线程池功能基础上装饰额外的功能。
ExecutorCompletionService
相比之前 Future
相比 ,提供了一个通知机制,将结果统一到一个队列,当前提交任务不会阻塞获取,从另一个队列中阻塞获取。
1.2 原理
执行原理:
- 在使用
ExecutorCompletionService
时需要提供一个自定义的线程池Executor
,构造ExecutorCompletionService
。同时,也可以指定一个自定义的队列作为线程执行结果的容器,当线程执行完成时,通过重写FutureTask#done()
将结果压入队列中。 - 当用户把所有的任务都提交了以后,可通过
ExecutorCompletionService#poll
方法来弹出已完成的结果,这样做的好处是可以节省获取完成结果的时间。
1.3 Demo示例
1.3.1 未使用ExecutorCompletionService
public class ExecutorCompletionServiceDemo {
public static void main(String[] args) {
//这里只是为了方便,真正项目中不要这样创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(5);
List<Future<String>> list = new ArrayList<>();
Future<String> future1 = executorService.submit(() -> {
System.out.println("执行任务1开始");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("执行任务1结束");
return "任务1执行成功";
});
list.add(future1);
Future<String> future2 = executorService.submit(() -> {
System.out.println("执行任务2开始");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("执行任务2结束");
return "任务2执行成功";
});
list.add(future2);
Future<String> future3 = executorService.submit(() -> {
System.out.println("执行任务3开始");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("执行任务3结束");
return "任务3执行成功";
});
list.add(future3);
for (int i = 0; i < list.size(); i++) {
String s = null;
try {
s = list.get(i).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println(s);
}
executorService.shutdown();
}
}
我们可以看到三个任务的执行结果会按照提交顺序的任务执行时间进行堵塞依次获取结果;我们提交到线程池中,通过Futrue类的get()方法,会造成堵塞,需要先等执行任务1的线程结束返回结果,才会进行获取下一个任务的执行的结果,那边后面的任务先于任务一执行结束;当然如果工作中我们不需要获取多个任务执行的结果,我们可以采用上面的实现方式去进行并行处理任务;
1.3.2 使用ExecutorCompletionService
如果我们要获取到并行处理任务的结果快慢来进行一些处理,我们就可以使用到ExecutorCompletionService
来进行实现;我们来使用ExecutorCompletionService
类将线程池进行包装处理下,然后进行提交任务;
public class ExecutorCompletionServiceDemo {
public static void main(String[] args) {
//这里只是为了方便,真正项目中不要这样创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(5);
ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(executorService);
completionService.submit(() -> {
System.out.println("执行任务1开始");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("执行任务1结束");
return "任务1执行成功";
});
completionService.submit(() -> {
System.out.println("执行任务2开始");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("执行任务2结束");
return "任务2执行成功";
});
completionService.submit(() -> {
System.out.println("执行任务3开始");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("执行任务3结束");
return "任务3执行成功";
});
for (int i = 0; i < 3; i++) {
try {
String result = completionService.take().get();
System.out.println(result);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
executorService.shutdown();
}
}
1.4 深入分析说明
解决批量提交任务办法就是使用使用 ExecutorCompletionService
,异步通知返回。
1.4.1 所有方法
当前类提供的方法
1.4.2 构造方法
提供的两个构造函数,一个可以指定返回阻塞队列,另一个使用默认的。另外都需要提供一个线程池进来
1.4.3 获取方法
提供了三个获取方法,可以看到都是从队列中获取
take
获取:谁先执行完 谁先出来 take()
获取时候回阻塞 也可以通过Poll
方法获取
poll
和 take
区别在于 poll
可以执行超时时间,可以看到,谁先执行结束 谁先出来。
1.4.4 提交方法
两个提交任务方法
如何执行任务结果放入队列呢?
可以看到是将 执行结果放入队列中。
内部实现了异步执行接口,以及重写了它的done方法