当使用ExecutorService启动了多个Callable后,每个Callable会产生一个Future,我们需要将多个Future存入一个线性表,用于之后处理数据。当然,还有更复杂的情况,有5个生产者线程,每个生产者线程都会创建任务,所有任务的Future都存放到同一个线性表中。另有一个消费者线程,从线性表中取出Future进行处理。
CompletionService正是为此而存在,它是一个更高级的ExecutorService,它本身自带一个线程安全的线性表,无需用户额外创建。它提供了2种方法从线性表中取出结果,poll()是非阻塞的,若目前无结果,返回一个null,线程继续运行不阻塞。take()是阻塞的,若当前无结果,则线程阻塞,直到产生一个结果,被取出返回,线程才继续运行。
public class Test {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newCachedThreadPool();
CompletionService<Integer> comp = new ExecutorCompletionService<>(executor);
for(int i = 0; i<5; i++) {
comp.submit(new Task());
}
executor.shutdown();
int count = 0, index = 1;
while(count<5) {
Future<Integer> f = comp.poll();
if(f == null) {
System.out.println(index + " 没发现有完成的任务");
}else {
System.out.println(index + "产生了一个随机数: " + f.get());
count++;
}
index++;
TimeUnit.MILLISECONDS.sleep(500);
}
}
}
class Task implements Callable<Integer> {
@Override
public Integer call() throws Exception {
Random rand = new Random();
TimeUnit.SECONDS.sleep(rand.nextInt(7));
return rand.nextInt();
}
}
实际运用小案例:
模拟页面渲染
package com.thread;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* CompletionService的运用:当Executor遇见BlockingQueue时使用。
* 使用场景:
* 如果向Executor提交了一个批处理任务,并且希望在他们完成后获得结果。
*
* CompletionService整合了Executor和BlockingQueue的功能,你可以将Callable任务提交给它执行,
* 然后使用类似队列中的take和poll方法,在结果完整可用时(只是等待任意一个future的返回值),获得这个结果。就像一个大包的Future。
*
* completionService.take()的说明是:检索并移除已完成的任务,如果没有任何一个任务完成的,则继续等待
*
* 从案例的结果可以看出,每当图片下载完毕后,就会执行渲染操作。
* take方法只是检索completionService中所有future,看是否有执行完的任务,并获得结果。
*
* @author hadoop
*
*/
public class CompletionThread {
static ExecutorService mExecutor = Executors.newFixedThreadPool(5);
static int totalTimeDownPhoto =0;
/**
* 模拟页面渲染场景
*/
static void renderPage(){
final List<String> info = new ArrayList<String>();
for (int i = 0; i < 20; i++) {
info.add("图片" + i);
}
CompletionService<String> completionService = new ExecutorCompletionService<String>(mExecutor);
/**
* 开启多线程处理下载图片的任务
*/
for(final String str : info){
completionService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
//下载图片download(str)
int randomTime = new Random().nextInt(9) + 1;//限制耗时不会出现0s,不会大于10s
Thread.sleep(randomTime*1000);
System.out.println("下载" + str +"耗费了" + randomTime + "s");
computeTime(randomTime);
return str;
}
});
}
try {
System.out.println("处理文字渲染的逻辑");
int randomTime = new Random().nextInt(9) + 1;
Thread.sleep(1000*randomTime);
computeTime(randomTime);
System.out.println("处理文字渲染的逻辑耗费了" + randomTime + "s");
/**
* 如果渲染图片耗时也比较久,也可以使用多线程。这里只是模拟,没有使用多线程处理渲染图片的过程
*/
for (int i = 0; i < info.size(); i++) {
//take检索并移除已完成的任务,如果没有任何一个任务完成的,则继续等待
Future<String> f = completionService.take();
//处理渲染图片的逻辑
randomTime = new Random().nextInt(3) + 1;
Thread.sleep(1000*randomTime);
computeTime(randomTime);
System.out.println("渲染"+f.get() +"耗时"+randomTime+"s");
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
/**
* 只有执行了shutdown方法,执行isTerminated才有效。否则isTerminated一直为ture
*/
mExecutor.shutdown();
while(true){
if(mExecutor.isTerminated()){
System.out.println("所有任务都执行完了,关闭线程池");
break;
}
}
}
/**
* 统计下载图片所花费的总时间
* @param randomTime
*/
static void computeTime(int randomTime){
synchronized (mExecutor) {
totalTimeDownPhoto += randomTime;
}
}
public static void main(String[] args) {
long start = System.currentTimeMillis();
renderPage();
long end = System.currentTimeMillis();
System.out.println("渲染页面总耗时:"+(end - start));
System.out.println("下载每张图片,渲染每张图片以及渲染文字的合计耗时是:"+ totalTimeDownPhoto);
int saveTime = (int) (totalTimeDownPhoto - (end - start)/1000);
System.out.println("总节约时间:"+ saveTime+"s");
}
}