1. 背景
偶尔会在公司的项目里看到这样的代码
List<Info> infoList = new ArrayList<Info>();
if (infoidList.size() > 100) {
int size = infoidList.size();
int batchSize = PER_IMC_INFO_MAX;
int queryTimes = (size - 1) / batchSize + 1;
for (int i = 0; i < queryTimes; i++) {
int start = batchSize * i;
int end = batchSize * (i + 1) > size ? size : batchSize * (i + 1);
Long[] ids = new Long[end - start];
for (int j = 0; j < end - start; j++) {
ids[j] = infoidList.get(j + start);
}
List<Info> tmpList = null;
try {
tmpList = getInfos(Lists.newArrayList(ids));
} catch (Exception e) {
errorlog.error("error.", e);
}
if (null != tmpList) {
infoList.addAll(tmpList);
}
}
}
2. 问题
这段代码是分批从其他服务获取帖子信息,功能上是没有问题的,但有以下缺点:
- 看起来有点繁琐,在业务逻辑中掺杂了分批获取数据的逻辑,看起来不太条理
- 性能可能有问题,分批的数据是在循环中一次一次的拿,耗时会随着数据的增长线性增长
- 从系统架构上考虑,这块代码是没办法复用的,也就是说,很有可能到处都是这样的分批获取数据的代码
3. 解决
其实在项目里面也有另外的一些同学的代码比这个写的更简洁和优雅
List<List<Long>> partitionInfoIdList = Lists.partition(infoIds, MAX_BATCH);
List<Future<List<JobRelevanceDTO>>> futureList = new ArrayList<>();
for(List<Long> infoIdList : partitionInfoIdList){
futureList.add( FilterStrategyThreadPool.THREAD_POOL.submit(() -> {
BatchJobRelevanceQuery batchJobRelevanceQuery = new BatchJobRelevanceQuery();
batchJobRelevanceQuery.setInfoIds(infoIdList);
Response<List<JobRelevanceDTO>> jobRelevanceResponse = jobRelevanceService.batchQueryJobRelevance(batchJobRelevanceQuery);
if (jobRelevanceResponse == null || jobRelevanceResponse.getEntity() == null || jobRelevanceResponse.getEntity().isEmpty()) {
LOG.info("DupJobIdShowUtil saveJobIdsToRedis jobRelevanceService return null, infoId size={}", infoIdList.size());
return new ArrayList<>();
}
return jobRelevanceResponse.getEntity();
}));
}
for (Future<List<JobRelevanceDTO>> future : futureList) {
try {
List<JobRelevanceDTO> jobRelevanceDTOList = future.get();
for (JobRelevanceDTO jobRelevance : jobRelevanceDTOList) {
infoJobMap.put(jobRelevance.getInfoId(), jobRelevance.getJobId());
}
} catch (InterruptedException e) {
LOG.error("DupJobIdShowUtil getInfoJobMapFromInfo Exception", e);
} catch ( ExecutionException e) {
LOG.error("DupJobIdShowUtil getInfoJobMapFromInfo Exception", e);
}
}
上面的代码
- 使用了guava的工具类Lists.partition,让分批次更简洁了;
- 使用了线程池,性能会更好,这也是java并行任务的最常见的用法
但因为线程池的引入,又变的复杂了起来,需要处理这些Futrue
而且也没有解决代码复用的问题,这些的相同逻辑的代码仍然会重复的出现在项目中
4. 工具类
4.1 分析
于是打算自己写一个批量数据获取工具类,我们需要首先想一下,这个工具类需要什么功能?可能有哪些属性
- http/rpc,支持传入HttpClient或者RPC Service
- totalSize,一共有多少数据要获取呢
- batchSize,每批次有多大
- oneFetchRetryCount,每批次请求时需要重试吗?重试几次?
- oneFetchRetryTimeout,每批次请求时需要设置超时时间吗?
- 应该需要合并每批次的返回结果
- 需不需要加缓存
- 当单批次任务失败时,整体任务算作成功还是失败
这些是使用者会遇到的问题,上面的代码可以自己来处理这些事件,如果你想让别的开发者使用你的工具类,你要尽可能的处理所有可能出现的情况
4.2 实现
下面是我实现的工具类BatchFetcher,它支持以下功能:
- 支持传一个Function对象,也就是java的lambda函数,每一个批次执行会调用一次
- 支持传入线程池,会使用次线程池来执行所有的批次任务
- 支持整体超时时间,也就是说一旦超过这个时间,将不再等待结果,将目前获取到的结果返回
- 传入一个名称,同时会在任务结束后打印名称,任务耗时相关信息
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
public class BatchFetcher<P, R> {
private static final Logger LOG = LoggerFactory.getLogger(BatchFetcher.class);
private Function<List<P>, List<R>> serivce;
private ExecutorService executorService;
private String name;
private int timeout = -1;
public List<R> oneFecth(List<P> partParams) {
return serivce.apply(partParams);
}
public List<R> fetch(List<P> params, int batchSize) {
long startTime = System.currentTimeMillis();
ExecutorCompletionService<List<R>> completionService = new ExecutorCompletionService<>(executorService);
List<List<P>> partition = Lists.partition(params, batchSize);
List<R> rsList = new ArrayList<>();
for (List<P> pList : partition) {
completionService.submit(() -> this.oneFecth(pList));
}
int getRsCount = 0;
while (getRsCount < partition.size()) {
try {
List<R> rs;
if (timeout != -1) {
long elapsed = System.currentTimeMillis() - startTime;
if (elapsed >= timeout) {
LOG.error("{} batchFetcher fetch timout", name);
break;
}
Future<List<R>> poll = completionService.poll(timeout - elapsed, TimeUnit.MILLISECONDS);
if (poll == null) {
LOG.error("{} batchFetcher one fetch timout", name);
continue;
} else {
rs = poll.get();
}
} else {
rs = completionService.take().get();
}
rsList.addAll(rs);
} catch (Exception e) {
LOG.error("{} batchFetcher one fetch error", name, e);
} finally {
getRsCount += 1;
}
}
LOG.info("[BatchFetcher]: {} , total elements size: {}, task num: {}, batch size: {}, rs size: {}, cost time: {}, fetch done",
name, params.size(), partition.size(), batchSize, rsList.size(), System.currentTimeMillis() - startTime);
return rsList;
}
public static final class BatchFetcherBuilder<P, R> {
private Function<List<P>, List<R>> serivce;
private ExecutorService executorService;
private String name;
private int timeout = -1;
public BatchFetcherBuilder() {
}
public BatchFetcherBuilder<P, R> serivce(Function<List<P>, List<R>> serivce) {
this.serivce = serivce;
return this;
}
public BatchFetcherBuilder<P, R> executorService(ExecutorService executorService) {
this.executorService = executorService;
return this;
}
public BatchFetcherBuilder<P, R> name(String name) {
this.name = name;
return this;
}
public BatchFetcherBuilder<P, R> timeout(int timeout) {
this.timeout = timeout;
return this;
}
public BatchFetcher<P, R> build() {
BatchFetcher<P, R> batchFetcher = new BatchFetcher<>();
batchFetcher.executorService = this.executorService;
batchFetcher.serivce = this.serivce;
batchFetcher.name = this.name;
batchFetcher.timeout = this.timeout;
return batchFetcher;
}
}
}
4.3 使用
- 案例一
BatchFetcher.BatchFetcherBuilder<Long, Map<Long, Map<String, Tag>>> builder = new BatchFetcher.BatchFetcherBuilder<>();
BatchFetcher<Long, Map<Long, Map<String, Tag>>> cUserTagBatchFetcher = builder
.serivce(this::queryCUserTags)
.name("cUserTagBatchFetcher")
.executorService(ExecutorServiceHolder.batchExecutorService)
.build();
List<Map<Long, Map<String, Tag>>> userIdToTags = cUserTagBatchFetcher.fetch(cuserIds, 200);
Map<Long, Map<String, Tag>> cUserTag = new HashMap<>();
for (Map<Long, Map<String, Tag>> userIdToTag : userIdToTags) {
cUserTag.putAll(userIdToTag);
}
private List<Map<Long, Map<String, Tag>>> queryCUserTags(List<Long> cuserIdList) {
...
}
- 案例二
public Map<Long, LinkResult> getBatchLinkResult(List<Long> cUserIds, Long bUserId) {
List<LinkType> linkTypes = Lists.newArrayList();
BatchFetcher.BatchFetcherBuilder<Long, Map<Long, LinkResult>> builder = new BatchFetcher.BatchFetcherBuilder<>();
BatchFetcher<Long, Map<Long, LinkResult>> linkDataBatchFetcher = builder
.serivce(getLinkResult(linkTypes, bUserId))
.name("linkDataBatchFetcher")
.executorService(ExecutorServiceHolder.batchExecutorService)
.build();
List<Map<Long, LinkResult>> fetchRs = linkDataBatchFetcher.fetch(cUserIds, BATCH_NUM);
Map<Long, LinkResult> rs = new HashMap<>();
for (Map<Long, LinkResult> partFetch : fetchRs) {
if (partFetch != null) {
rs.putAll(partFetch);
}
}
return rs;
}
private Function<List<Long>, List<Map<Long, LinkResult>>> getLinkResult(List<LinkType> linkTypes, Long bUserId) {
return (partUserIds) -> {
Map<Long, LinkResult> idToLinkResult = null;
try {
idToLinkResult = linkService.getLink(bUserId, partUserIds, linkTypes);
} catch (Exception e) {
logger.error("LinkData getLinkResult error cUserId: {} bUserId: {}", partUserIds, bUserId);
}
return Lists.newArrayList(idToLinkResult);
};
}
4.4 问题
这两个使用的例子,只需要提供一个单次获取数据的Function、参数、最大批次就可以拿到数据,相比最初的两种做法是比较简单的,但也有一些别的问题
- Function的入参和返回结果都是List,有可能和Http或者RPC Service的不一致,需要转为List后在进行处理
- 忽略了单次请求失败
4.5 后续扩展
这个工具类目前解决了代码复用的问题,而且使用起来只需提供最小化的参数,封装了重复性的繁琐工作,相比之前更为简单。但是仍然有优化的空间,例如:
- 报警,当单次任务失败或者整体任务超时发送报警
- 更优雅的返回结果,支持返回自定义的结果
- 支持传递参数,用来确认是不是单次失败就算作整体任务失败