由于普通的线程池,返回的Future,功能比较单一;Guava 定义了 ListenableFuture接口并继承了JDK concurrent包下的Future 接口,ListenableFuture 允许你注册回调方法(callbacks),在运算(多线程执行)完成的时候进行调用。
1.使用方法如下:
- 1.创建线程池
- 2.装饰线程池
- 3.任务处理
- 4.回调函数处理
- 5.所有任务完成后处理
场景模拟:
导入一张1211条数据的Excel表格:
- 1.每条数据处理较慢
- 2.处理完后需要汇总数据
- 3.处理汇总成功的数据
2.代码示例如下:
2.1接口和调用
package com.java4all.test11;
import org.python.google.common.collect.ImmutableList;
import org.python.google.common.util.concurrent.*;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Author: yunqing
* Date: 2018/9/19
* Description:
*/
@RestController
@RequestMapping(value = "testThread")
public class TestThread {
/**线程池*/
static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4, 10, 60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(200),
new ThreadPoolExecutor.CallerRunsPolicy()
);
/**
* 数据处理
* @return
* @throws Exception
*/
@RequestMapping(value = "parse",method = RequestMethod.GET)
public String parse() throws Exception{
List<String> result = new ArrayList<>();
List<String> list = new ArrayList<>();
//模拟原始数据
for(int i = 0; i < 1211;i ++){
list.add(i+"-");
System.out.println("添加原始数据:"+i);
}
int size = 50;//切分粒度,每size条数据,切分一块,交由一条线程处理
int countNum = 0;//当前处理到的位置
int count = list.size()/size;//切分块数
int threadNum = 0;//使用线程数
if(count*size != list.size()){
count ++;
}
final CountDownLatch countDownLatch = new CountDownLatch(count);
//使用Guava的ListeningExecutorService装饰线程池
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(threadPoolExecutor);
while (countNum < count*size){
//切割不同的数据块,分段处理
threadNum ++;
countNum += size;
MyCallable myCallable = new MyCallable();
myCallable.setList(ImmutableList.copyOf(
list.subList(countNum-size,list.size() > countNum ? countNum : list.size())));
ListenableFuture listenableFuture = executorService.submit(myCallable);
//回调函数
Futures.addCallback(listenableFuture, new FutureCallback<List<String>>() {
//任务处理成功时执行
@Override
public void onSuccess(List<String> list) {
countDownLatch.countDown();
System.out.println("第h次处理完成");
result.addAll(list);
}
//任务处理失败时执行
@Override
public void onFailure(Throwable throwable) {
countDownLatch.countDown();
System.out.println("处理失败:"+throwable);
}
});
}
//设置时间,超时了直接向下执行,不再阻塞
countDownLatch.await(3,TimeUnit.SECONDS);
result.stream().forEach(s -> System.out.println(s));
System.out.println("------------结果处理完毕,返回完毕,使用线程数量:"+threadNum);
return "处理完了";
}
}
2.2任务处理
package com.java4all.test11;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
/**
* Author: yunqing
* Date: 2018/9/19
* Description:任务处理逻辑
*/
public class MyCallable implements Callable{
private List<String> list ;
@Override
public Object call() throws Exception {
List<String> listReturn = new ArrayList<>();
//模拟对数据处理,然后返回
for(int i = 0;i < list.size();i++){
listReturn.add(list.get(i)+":处理时间:"+System.currentTimeMillis()+"---:处理线程:"+Thread.currentThread());
}
return listReturn;
}
public void setList(List<String> list) {
this.list = list;
}
}