首页 > 编程语言 >Java手写一个批量获取数据工具类

Java手写一个批量获取数据工具类

时间:2022-10-05 19:46:35浏览次数:59  
标签:Java name BatchFetcher List 获取数据 new return 手写 size

1. 背景

偶尔会在公司的项目里看到这样的代码

List<Info> infoList = new ArrayList<Info>();
if (infoidList.size() > 100) { 
    int size = infoidList.size();
    int batchSize = PER_IMC_INFO_MAX;
    int queryTimes = (size - 1) / batchSize + 1;
    for (int i = 0; i < queryTimes; i++) {
        int start = batchSize * i;
        int end = batchSize * (i + 1) > size ? size : batchSize * (i + 1);
        Long[] ids = new Long[end - start];
        for (int j = 0; j < end - start; j++) {
            ids[j] = infoidList.get(j + start);
        }
        List<Info> tmpList = null;
        try {
            tmpList = getInfos(Lists.newArrayList(ids));
        } catch (Exception e) {
            errorlog.error("error.", e);
        }
        if (null != tmpList) {
            infoList.addAll(tmpList);
        }
    }
}

2. 问题

这段代码是分批从其他服务获取帖子信息,功能上是没有问题的,但有以下缺点:

  1. 看起来有点繁琐,在业务逻辑中掺杂了分批获取数据的逻辑,看起来不太条理
  2. 性能可能有问题,分批的数据是在循环中一次一次的拿,耗时会随着数据的增长线性增长
  3. 从系统架构上考虑,这块代码是没办法复用的,也就是说,很有可能到处都是这样的分批获取数据的代码

3. 解决

其实在项目里面也有另外的一些同学的代码比这个写的更简洁和优雅

List<List<Long>> partitionInfoIdList = Lists.partition(infoIds, MAX_BATCH);
List<Future<List<JobRelevanceDTO>>> futureList = new ArrayList<>();
for(List<Long> infoIdList : partitionInfoIdList){
    futureList.add( FilterStrategyThreadPool.THREAD_POOL.submit(() -> {
        BatchJobRelevanceQuery batchJobRelevanceQuery = new BatchJobRelevanceQuery();
        batchJobRelevanceQuery.setInfoIds(infoIdList);

        Response<List<JobRelevanceDTO>> jobRelevanceResponse = jobRelevanceService.batchQueryJobRelevance(batchJobRelevanceQuery);

        if (jobRelevanceResponse == null || jobRelevanceResponse.getEntity() == null || jobRelevanceResponse.getEntity().isEmpty()) {
            LOG.info("DupJobIdShowUtil saveJobIdsToRedis jobRelevanceService return null, infoId size={}", infoIdList.size());
            return new ArrayList<>();
        }

        return jobRelevanceResponse.getEntity();
    }));
}
for (Future<List<JobRelevanceDTO>> future : futureList) {
    try {
        List<JobRelevanceDTO> jobRelevanceDTOList = future.get();
        for (JobRelevanceDTO jobRelevance : jobRelevanceDTOList) {
            infoJobMap.put(jobRelevance.getInfoId(), jobRelevance.getJobId());
        }
    } catch (InterruptedException e) {
        LOG.error("DupJobIdShowUtil getInfoJobMapFromInfo Exception", e);
    } catch ( ExecutionException e) {
        LOG.error("DupJobIdShowUtil getInfoJobMapFromInfo Exception", e);
    }
}

上面的代码

  1. 使用了guava的工具类Lists.partition,让分批次更简洁了;
  2. 使用了线程池,性能会更好,这也是java并行任务的最常见的用法

但因为线程池的引入,又变的复杂了起来,需要处理这些Futrue
而且也没有解决代码复用的问题,这些的相同逻辑的代码仍然会重复的出现在项目中

4. 工具类

4.1 分析

于是打算自己写一个批量数据获取工具类,我们需要首先想一下,这个工具类需要什么功能?可能有哪些属性

  1. http/rpc,支持传入HttpClient或者RPC Service
  2. totalSize,一共有多少数据要获取呢
  3. batchSize,每批次有多大
  4. oneFetchRetryCount,每批次请求时需要重试吗?重试几次?
  5. oneFetchRetryTimeout,每批次请求时需要设置超时时间吗?
  6. 应该需要合并每批次的返回结果
  7. 需不需要加缓存
  8. 当单批次任务失败时,整体任务算作成功还是失败

这些是使用者会遇到的问题,上面的代码可以自己来处理这些事件,如果你想让别的开发者使用你的工具类,你要尽可能的处理所有可能出现的情况

4.2 实现

下面是我实现的工具类BatchFetcher,它支持以下功能:

  1. 支持传一个Function对象,也就是java的lambda函数,每一个批次执行会调用一次
  2. 支持传入线程池,会使用次线程池来执行所有的批次任务
  3. 支持整体超时时间,也就是说一旦超过这个时间,将不再等待结果,将目前获取到的结果返回
  4. 传入一个名称,同时会在任务结束后打印名称,任务耗时相关信息
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

public class BatchFetcher<P, R> {
    private static final Logger LOG = LoggerFactory.getLogger(BatchFetcher.class);

    private Function<List<P>, List<R>> serivce;
    private ExecutorService executorService;
    private String name;
    private int timeout = -1;


    public List<R> oneFecth(List<P> partParams) {
        return serivce.apply(partParams);
    }

    public List<R> fetch(List<P> params, int batchSize) {
        long startTime = System.currentTimeMillis();

        ExecutorCompletionService<List<R>> completionService = new ExecutorCompletionService<>(executorService);

        List<List<P>> partition = Lists.partition(params, batchSize);

        List<R> rsList = new ArrayList<>();
        for (List<P> pList : partition) {
            completionService.submit(() -> this.oneFecth(pList));
        }

        int getRsCount = 0;
        while (getRsCount < partition.size()) {
            try {
                List<R> rs;
                if (timeout != -1) {
                    long elapsed = System.currentTimeMillis() - startTime;
                    if (elapsed >= timeout) {
                        LOG.error("{} batchFetcher fetch timout", name);
                        break;
                    }
                    Future<List<R>> poll = completionService.poll(timeout - elapsed, TimeUnit.MILLISECONDS);
                    if (poll == null) {
                        LOG.error("{} batchFetcher one fetch timout", name);
                        continue;
                    } else {
                        rs = poll.get();
                    }
                } else {
                    rs = completionService.take().get();
                }
                rsList.addAll(rs);
            } catch (Exception e) {
                LOG.error("{} batchFetcher one fetch error", name, e);
            } finally {
                getRsCount += 1;
            }
        }
        LOG.info("[BatchFetcher]: {} , total elements size: {}, task num: {}, batch size: {}, rs size: {}, cost time: {}, fetch done",
                name, params.size(), partition.size(), batchSize, rsList.size(), System.currentTimeMillis() - startTime);
        return rsList;
    }

    public static final class BatchFetcherBuilder<P, R> {
        private Function<List<P>, List<R>> serivce;
        private ExecutorService executorService;
        private String name;
        private int timeout = -1;

        public BatchFetcherBuilder() {
        }

        public BatchFetcherBuilder<P, R> serivce(Function<List<P>, List<R>> serivce) {
            this.serivce = serivce;
            return this;
        }

        public BatchFetcherBuilder<P, R> executorService(ExecutorService executorService) {
            this.executorService = executorService;
            return this;
        }

        public BatchFetcherBuilder<P, R> name(String name) {
            this.name = name;
            return this;
        }

        public BatchFetcherBuilder<P, R> timeout(int timeout) {
            this.timeout = timeout;
            return this;
        }

        public BatchFetcher<P, R> build() {
            BatchFetcher<P, R> batchFetcher = new BatchFetcher<>();
            batchFetcher.executorService = this.executorService;
            batchFetcher.serivce = this.serivce;
            batchFetcher.name = this.name;
            batchFetcher.timeout = this.timeout;

            return batchFetcher;
        }
    }
}

4.3 使用

  • 案例一
BatchFetcher.BatchFetcherBuilder<Long, Map<Long, Map<String, Tag>>> builder = new BatchFetcher.BatchFetcherBuilder<>();

BatchFetcher<Long, Map<Long, Map<String, Tag>>> cUserTagBatchFetcher = builder
        .serivce(this::queryCUserTags)
        .name("cUserTagBatchFetcher")
        .executorService(ExecutorServiceHolder.batchExecutorService)
        .build();

List<Map<Long, Map<String, Tag>>> userIdToTags = cUserTagBatchFetcher.fetch(cuserIds, 200);

Map<Long, Map<String, Tag>> cUserTag = new HashMap<>();

for (Map<Long, Map<String, Tag>> userIdToTag : userIdToTags) {
    cUserTag.putAll(userIdToTag);
}
private List<Map<Long, Map<String, Tag>>> queryCUserTags(List<Long> cuserIdList) {
        ...
    }
  • 案例二
public Map<Long, LinkResult> getBatchLinkResult(List<Long> cUserIds, Long bUserId) {

        List<LinkType> linkTypes = Lists.newArrayList();

        BatchFetcher.BatchFetcherBuilder<Long, Map<Long, LinkResult>> builder = new BatchFetcher.BatchFetcherBuilder<>();

        BatchFetcher<Long, Map<Long, LinkResult>> linkDataBatchFetcher = builder
                .serivce(getLinkResult(linkTypes, bUserId))
                .name("linkDataBatchFetcher")
                .executorService(ExecutorServiceHolder.batchExecutorService)
                .build();

        List<Map<Long, LinkResult>> fetchRs = linkDataBatchFetcher.fetch(cUserIds, BATCH_NUM);

        Map<Long, LinkResult> rs = new HashMap<>();

        for (Map<Long, LinkResult> partFetch : fetchRs) {
            if (partFetch != null) {
                rs.putAll(partFetch);
            }
        }

        return rs;
}
private Function<List<Long>, List<Map<Long, LinkResult>>> getLinkResult(List<LinkType> linkTypes, Long bUserId) {
        return (partUserIds) -> {
            Map<Long, LinkResult> idToLinkResult = null;
            try {
                idToLinkResult = linkService.getLink(bUserId, partUserIds, linkTypes);
            } catch (Exception e) {
                logger.error("LinkData getLinkResult error cUserId: {} bUserId: {}", partUserIds, bUserId);
            }
            return Lists.newArrayList(idToLinkResult);
        };
    }

4.4 问题

这两个使用的例子,只需要提供一个单次获取数据的Function、参数、最大批次就可以拿到数据,相比最初的两种做法是比较简单的,但也有一些别的问题

  1. Function的入参和返回结果都是List,有可能和Http或者RPC Service的不一致,需要转为List后在进行处理
  2. 忽略了单次请求失败

4.5 后续扩展

这个工具类目前解决了代码复用的问题,而且使用起来只需提供最小化的参数,封装了重复性的繁琐工作,相比之前更为简单。但是仍然有优化的空间,例如:

  1. 报警,当单次任务失败或者整体任务超时发送报警
  2. 更优雅的返回结果,支持返回自定义的结果
  3. 支持传递参数,用来确认是不是单次失败就算作整体任务失败

标签:Java,name,BatchFetcher,List,获取数据,new,return,手写,size
From: https://www.cnblogs.com/songjiyang/p/16756194.html

相关文章

  • Java学习 三大循环语句和switch语句
    Java学习三大循环语句和switch语句 while循环语句(当)只要布尔表达式为true,循环就回一直执行下去。**我们大多是情况会让循环停止下来的,我们需要一个让表达式时效......
  • Java第三讲动手动脑
    1以上代码无法通过编译主要是由于在Foo类中自定义了有参的构造函数,系统不在提供默认的构造函数(无参),而在上述的引用中并没有提供参数导致无法通过编译。 ......
  • Java了解知识点
    Java了解#编辑器java:idea(收费)、eclipse(免费)、MyEclipse(收费)#编译型语言java:一处编码,处处运行#java是编译型还是解释型?编译型#编译过程:......
  • 【java基础】接口和抽象类,static和final
    接口和抽象类接口:主要定义方法,让子类去实现,作为标准只有方法,方法都是public,修饰符都被省去没有构造器使用implement关键字继承只能定义静态常量抽象类:父类不实现......
  • java如何将字符串转换为json格式字符串呢?
    转自:http://www.java265.com/JavaJingYan/202206/16540828373607.htmlJSON简介:   JSON(JavaScriptObjectNotation,JS对象简谱)是一种轻量级的数据交换格式。它基......
  • 一篇文章让你彻底理解Java的单例设计模式
    下文是笔者编写的单例模式实现的八种方式,如下所示:单例模式的简介我们将一个类在当前进程中只有一个实例的这种模式,称之为“单例模式”那么Java代码如何实现一个单例模式呢?......
  • Java 中 Set 的4中遍历方式
    Set和List遍历方式基本一致,Set没有for的遍历方式主测试方法@Testpublicvoidtest(){Set<Integer>set=newHashSet<>();intn=100......
  • Java 中 Map 的5种遍历方式
    主测试方法@Testpublicvoidtest1(){Map<Integer,Integer>map=newHashMap<Integer,Integer>();intn=1000_0000;for(int......
  • Java 中 List 的5种遍历方式
    本测试没有具体业务,单纯遍历,也许会有误差。主单元测试方法@Testpublicvoidtest(){List<Integer>list=newArrayList<>();intn=100......
  • Effective+Java+中文版 pdf
    高清扫描版下载链接:https://pan.baidu.com/s/1lYgUZopqC5MqRFQSiLHW6w点击这里获取提取码 ......