首页 > 其他分享 >批处理操作优化(参考皮哥)

批处理操作优化(参考皮哥)

时间:2024-09-26 19:22:02浏览次数:3  
标签:题目 参考 批处理 数据库 皮哥 线程 new 题库 id

需求分析

在一个关于题目管理的网页,为了提高效率,需要给管理员提供批量操作的功能

例如:

  • 批量将题目添加到一个题库
  • 批量将题目从一个题库中移除
  • 批量删除题目

基础设计

相信大家都会觉得该需求挺简单的吧,这不随便写?然后细细研究,发现问题还不少,我们先来看看有手就行版本

基础后端开发

我们以批量将题目添加到一个题库为例写一个接口

首先设计请求对象

@Data
public class QuestionBankQuestionBatchAddRequest implements Serializable {

    /**
     * 题库 id
     */
    private Long questionBankId;

    /**
     * 题目 id 列表
     */
    private List<Long> questionIdList;

    private static final long serialVersionUID = 1L;
}

Controller

@PostMapping("/add/batch")
@AuthCheck(mustRole = UserConstant.ADMIN_ROLE)//只有管理员才能操作
public BaseResponse<Boolean> batchAddQuestionsToBank(
        @RequestBody QuestionBankQuestionBatchAddRequest questionBankQuestionBatchAddRequest,
        HttpServletRequest request
) {
    // 参数校验
    ThrowUtils.throwIf(questionBankQuestionBatchAddRequest == null, ErrorCode.PARAMS_ERROR);
    User loginUser = userService.getLoginUser(request);
    Long questionBankId = questionBankQuestionBatchAddRequest.getQuestionBankId();
    List<Long> questionIdList = questionBankQuestionBatchAddRequest.getQuestionIdList();
    questionBankQuestionService.batchAddQuestionsToBank(questionIdList, questionBankId, loginUser);
    return ResultUtils.success(true);
}

Service

@Override
@Transactional(rollbackFor = Exception.class)
public void batchAddQuestionsToBank(List<Long> questionIdList, Long questionBankId, User loginUser) {
    // 参数校验
    ThrowUtils.throwIf(CollUtil.isEmpty(questionIdList), ErrorCode.PARAMS_ERROR, "题目列表为空");
    ThrowUtils.throwIf(questionBankId == null || questionBankId <= 0, ErrorCode.PARAMS_ERROR, "题库非法");
    ThrowUtils.throwIf(loginUser == null, ErrorCode.NOT_LOGIN_ERROR);
    // 检查题目 id 是否存在
    List<Question> questionList = questionService.listByIds(questionIdList);
    // 合法的题目 id
    List<Long> validQuestionIdList = questionList.stream()
            .map(Question::getId)
            .collect(Collectors.toList());
    ThrowUtils.throwIf(CollUtil.isEmpty(validQuestionIdList), ErrorCode.PARAMS_ERROR, "合法的题目列表为空");
    // 检查题库 id 是否存在
    QuestionBank questionBank = questionBankService.getById(questionBankId);
    ThrowUtils.throwIf(questionBank == null, ErrorCode.NOT_FOUND_ERROR, "题库不存在");
    // 执行插入
    for (Long questionId : validQuestionIdList) {
        QuestionBankQuestion questionBankQuestion = new QuestionBankQuestion();
        questionBankQuestion.setQuestionBankId(questionBankId);
        questionBankQuestion.setQuestionId(questionId);
        questionBankQuestion.setUserId(loginUser.getId());
        boolean result = this.save(questionBankQuestion);
        if (!result) {
            throw new BusinessException(ErrorCode.OPERATION_ERROR, "向题库添加题目失败");
        }
    }
}

这不就完成了吗?这么一看,貌似确实功能已经实现了,但是这样真的就没问题了吗?

测试验证

一些明显的问题就跑出来了

1)已添加到题库的题目,重复添加就会报错

2)未添加到题库的题目,解除绑定关系也会报错

3)删除题目时,如果没有关联,也会报错。因为不存在题目关联,抛出我们自定义的业务异常。

而且,以上代码还有一些其他问题

  • 稳定性低,有一道题报错,就全部出错
  • 性能低,同时操作的题目较多时,执行时间会很长

所以,们以这些功能为例,来学习一些通用的 批处理操作 的优化方案。

优化

一般情况下,我们可以从以下多个角度对批处理任务进行优化。

  • 健壮性
  • 稳定性
  • 性能
  • 数据一致性
  • 可观测性

健壮性

健壮性是指系统在面对 异常情况或不合法输入 时仍能表现出合理的行为。一个健壮的系统能够 预见和处理异常,并且即使发生错误,也不会崩溃或产生不可预期的行为。

1、参数校验提前

可以在调用数据库之前就对参数进行校验,这样可以减少不必要的数据库操作开销,不用等到数据库操作时再抛出异常。

在现有的添加题目到题库的代码中,我们已经提前对参数进行了非空校验,并且会提前检查题目和题库是否存在,这是很好的。但是我们还没有校验哪些题目已经添加到题库中,对于这些题目,不必再执行插入关联记录的数据库操作。

需要补充的代码如下:

// 检查题库 id 是否存在
// ...

// 检查哪些题目还不存在于题库中,避免重复插入
LambdaQueryWrapper<QuestionBankQuestion> lambdaQueryWrapper = Wrappers.lambdaQuery(QuestionBankQuestion.class)
        .eq(QuestionBankQuestion::getQuestionBankId, questionBankId)
        .in(QuestionBankQuestion::getQuestionId, validQuestionIdList);
List<QuestionBankQuestion> existQuestionList = this.list(lambdaQueryWrapper);
// 已存在于题库中的题目 id
Set<Long> existQuestionIdSet = existQuestionList.stream()
        .map(QuestionBankQuestion::getId)
        .collect(Collectors.toSet());
// 已存在于题库中的题目 id,不需要再次添加
validQuestionIdList = validQuestionIdList.stream().filter(questionId -> {
    return !existQuestionIdSet.contains(questionId);
}).collect(Collectors.toList());
ThrowUtils.throwIf(CollUtil.isEmpty(validQuestionIdList), ErrorCode.PARAMS_ERROR, "所有题目都已存在于题库中");

// 执行插入
// ...

2、异常处理

目前虽然已经对每一次插入操作的结果都进行了判断,并且抛出自定义异常,但是有些特殊的异常并没有被捕获。

可以进一步细化异常处理策略,考虑更细粒度的异常分类,不同的异常类型可以通过不同的方式处理,例如:

  • 数据唯一键重复插入问题,会抛出 DataIntegrityViolationException
  • 数据库连接问题、事务问题等导致操作失败时抛出 DataAccessException
  • 其他的异常可以通过日志记录详细错误信息,便于后期追踪(全局异常处理器也有这个能力)。

示例代码如下:

try {
    boolean result = this.save(questionBankQuestion);
    if(!result) {
        throw new BusinessException(ErrorCode.OPERATION_ERROR, "向题库添加题目失败");
    }
} catch (DataIntegrityViolationException e) {
    log.error("数据库唯一键冲突或违反其他完整性约束,题目 id: {}, 题库 id: {}, 错误信息: {}",questionId, questionBankId, e.getMessage());
    throw new BusinessException(ErrorCode.OPERATION_ERROR, "题目已存在于该题库,无法重复添加");
} catch (DataAccessException e) {
    log.error("数据库连接问题、事务问题等导致操作失败,题目 id: {}, 题库 id: {}, 错误信息: {}",
            questionId, questionBankId, e.getMessage());
    throw new BusinessException(ErrorCode.OPERATION_ERROR, "数据库操作失败");
} catch (Exception e) {
    // 捕获其他异常,做通用处理
    log.error("添加题目到题库时发生未知错误,题目 id: {}, 题库 id: {}, 错误信息: {}",
            questionId, questionBankId, e.getMessage());
    throw new BusinessException(ErrorCode.OPERATION_ERROR, "向题库添加题目失败");
}

稳定性

1、避免长事务问题

批量操作中,一次性处理过多数据会导致事务过长,影响数据库性能。可以通过 分批处理 来避免长事务问题,确保部分数据异常不会影响整个批次的数据保存。

假设操作 10w 条数据,其中有 1 条数据操作异常,如果是长事务,那么修改的 10w 条数据都需要回滚,而分批事务仅需回滚一批既可,降低长事务带来的资源消耗,同时也提升了稳定性。

编写一个新的方法,用于对某一批操作进行事务管理:

@Override
@Transactional(rollbackFor = Exception.class)
public void batchAddQuestionsToBankInner(List<QuestionBankQuestion> questionBankQuestions) {
    for (QuestionBankQuestion questionBankQuestion : questionBankQuestions) {
        long questionId = questionBankQuestion.getQuestionId();
        long questionBankId = questionBankQuestion.getQuestionBankId();
        try {
            boolean result = this.save(questionBankQuestion);
            ThrowUtils.throwIf(!result, ErrorCode.OPERATION_ERROR, "向题库添加题目失败");
        } catch (DataIntegrityViolationException e) {
            log.error("数据库唯一键冲突或违反其他完整性约束,题目 id: {}, 题库 id: {}, 错误信息: {}",
                    questionId, questionBankId, e.getMessage());
            throw new BusinessException(ErrorCode.OPERATION_ERROR, "题目已存在于该题库,无法重复添加");
        } catch (DataAccessException e) {
            log.error("数据库连接问题、事务问题等导致操作失败,题目 id: {}, 题库 id: {}, 错误信息: {}",
                    questionId, questionBankId, e.getMessage());
            throw new BusinessException(ErrorCode.OPERATION_ERROR, "数据库操作失败");
        } catch (Exception e) {
            // 捕获其他异常,做通用处理
            log.error("添加题目到题库时发生未知错误,题目 id: {}, 题库 id: {}, 错误信息: {}",
                    questionId, questionBankId, e.getMessage());
            throw new BusinessException(ErrorCode.OPERATION_ERROR, "向题库添加题目失败");
        }
    }
}

在原方法中批量生成题目,并且调用上述事务方法:

// 分批处理避免长事务,假设每次处理 1000 条数据
int batchSize = 1000;
int totalQuestionListSize = validQuestionIdList.size();
for (int i = 0; i < totalQuestionListSize; i += batchSize) {
    // 生成每批次的数据
    List<Long> subList = validQuestionIdList.subList(i, Math.min(i + batchSize, totalQuestionListSize));
    List<QuestionBankQuestion> questionBankQuestions = subList.stream().map(questionId -> {
        QuestionBankQuestion questionBankQuestion = new QuestionBankQuestion();
        questionBankQuestion.setQuestionBankId(questionBankId);
        questionBankQuestion.setQuestionId(questionId);
        questionBankQuestion.setUserId(loginUser.getId());
        return questionBankQuestion;
    }).collect(Collectors.toList());
    // 使用事务处理每批数据
    QuestionBankQuestionService questionBankQuestionService = (QuestionBankQuestionServiceImpl) AopContext.currentProxy();
    questionBankQuestionService.batchAddQuestionsToBankInner(questionBankQuestions);
}

需要注意的是,上述代码中,我们通过 AopContext.currentProxy() 方法获取到了当前实现类的代理对象,来调用事务方法。

为什么要这么做呢? 因为 Spring 事务依赖于代理机制,而内部调用通过 this 直接调用方法,不会通过 Spring 的代理,因此不会触发事务。

注意,使用 AopContext.currentProxy() 方法时必须要在启动类添加下面的注解开启切面自动代理:

@EnableAspectJAutoProxy(proxyTargetClass = true, exposeProxy = true)

(关于事务失效的场景,这是其中的一个,类内部的调用,还有非Public修饰的方法等等,大家可以自己去了解一下,而且本人在面试当中已经被这道题拷打过了)

2、重试

对于可能由于网络不稳定等临时原因偶发失败的操作,可以设计 重试机制 提高系统的稳定性,适用于执行时间很长的任务。

注意,重试的过程中要记录日志,并且重试次数要有一个上限 。示例代码如下:

int retryCount = 3;
for (int i = 0; i < retryCount; i++) {
    try {
        // 执行插入操作
        // 成功则跳出重试循环
        break; 
    } catch (Exception e) {
        log.warn("插入失败,重试次数: {}", i + 1);
        if (i == retryCount - 1) {
            throw new BusinessException(ErrorCode.OPERATION_ERROR, "多次重试后操作仍然失败");
        }
    }
}

但对于我们目前的题目管理功能,执行时间不会特别长,增加重试反而一定程度上增加了系统的不确定性和复杂度,可以不用添加。

3、中断恢复

如果在批量插入过程中由于某种原因(如数据库宕机、服务器重启)导致批处理中断,建议设计一种机制来进行 增量恢复。比如可以为每次操作打上批次标记,在操作未完成时记录操作状态(如部分题目成功添加),并在恢复时继续执行未完成的操作。

可以设计一个数据库表存储批次的状态:

create table question_batch_status (
  batch_id bigint primary key,
  question_bank_id bigint,
  total_questions int,
  processed_questions int,
  status varchar(20) -- running, completed, failed
);

通过该表可以跟踪每次批处理的进度,并在失败时根据批次继续处理。

但对于我们的题目管理功能,不用那么复杂,可以直接通过判断数据是否已经满足要求来对要新处理的数据进行过滤。比如添加题目到题库前,先查一下是否已经添加到题库里了,如果已添加就不用重复添加了。(前面 参数校验提前 就已经实现了这个功能)

性能优化

1、批量操作

当前代码中,每个题目是单独插入数据库的,这会产生频繁的数据库交互。

大多数 ORM 框架和数据库驱动都支持批量插入,可以通过批量插入来优化性能,比如 MyBatis Plus 提供了 saveBatch 方法。

优化后的代码如下:

@Override
@Transactional(rollbackFor = Exception.class)
public void batchAddQuestionsToBankInner(List<QuestionBankQuestion> questionBankQuestions) {
    try {
        boolean result = this.saveBatch(questionBankQuestions);
        ThrowUtils.throwIf(!result, ErrorCode.OPERATION_ERROR, "向题库添加题目失败");
    } catch (DataIntegrityViolationException e) {
        log.error("数据库唯一键冲突或违反其他完整性约束, 错误信息: {}", e.getMessage());
        throw new BusinessException(ErrorCode.OPERATION_ERROR, "题目已存在于该题库,无法重复添加");
    } catch (DataAccessException e) {
        log.error("数据库连接问题、事务问题等导致操作失败, 错误信息: {}", e.getMessage());
        throw new BusinessException(ErrorCode.OPERATION_ERROR, "数据库操作失败");
    } catch (Exception e) {
        // 捕获其他异常,做通用处理
        log.error("添加题目到题库时发生未知错误,错误信息: {}", e.getMessage());
        throw new BusinessException(ErrorCode.OPERATION_ERROR, "向题库添加题目失败");
    }
}

批量操作的好处:

  • 降低了数据库连接和提交的频率。
  • 避免频繁的数据库交互,减少 I/O 操作,显著提高性能。

2、SQL 优化

我们在操作数据库时,可以使用一些 SQL 优化的技巧。

其中,有一个最基本的 SQL 优化原则,不要使用 select * 来查询数据,只查出需要的字段即可。由于框架封装地太好了,可能大多数同学都不会注意这点,其实我们上述的代码就需要对此进行优化,来减少查询的数据量。

比如:

// 检查题目 id 是否存在
LambdaQueryWrapper<Question> questionLambdaQueryWrapper = Wrappers.lambdaQuery(Question.class)
        .select(Question::getId)
        .in(Question::getId, questionIdList);
List<Question> questionList = questionService.list(questionLambdaQueryWrapper);

由于返回的值只有 id 一列,还可以直接转为 Long 列表,不需要让框架封装结果为 Question 对象了,减少内存占用:

// 合法的题目 id
List<Long> validQuestionIdList = questionService.listObjs(questionLambdaQueryWrapper, obj -> (Long) obj);
ThrowUtils.throwIf(CollUtil.isEmpty(validQuestionIdList), ErrorCode.PARAMS_ERROR, "合法的题目列表为空");

3、并发编程

由于我们已经将操作分批处理,在操作较多、追求处理时间的情况下,可以通过并发编程让每批操作同时执行,而不是一批处理完再执行下一批,能够大幅提升性能。

Java 中,可以利用并发包中的 CompletableFuture + 线程池 来并发处理多个任务。

CompletableFuture 是 Java 8 中引入的一个类,用于表示异步操作的结果。它是 Future 的增强版本,不仅可以表示一个异步计算,还可以对异步计算的结果进行组合、转换和处理,实现异步任务的编排

比如下列代码,将任务拆分为多个子任务,并发执行,最后通过 CompletableFuture.allOf 方法阻塞等待,只有所有的子任务都完成,才会执行后续代码:

List<CompletableFuture<Void>> futures = new ArrayList<>();

for (List<Long> subList : splitList(validQuestionIdList, 1000)) {
    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
        processBatch(subList, questionBankId, loginUser);
    });
    futures.add(future);
}

// 等待所有任务完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();

CompletableFuture 默认使用 Java 7 引入的 ForkJoinPool 线程池来并发执行任务。该线程池特别适合需要分治法来处理的大量并发任务,支持递归任务拆分。Java 8 中的并行流默认也是使用了 ForkJoinPool 进行并发处理

ForkJoinPool 的主要特性:

  • 工作窃取算法(Work-Stealing):线程可以从其他线程的工作队列中“窃取”任务,以提高 CPU 的使用率和程序的并行性。
  • 递归任务处理:支持将大任务拆分为多个小任务并行执行,然后再将结果合并。

标签:题目,参考,批处理,数据库,皮哥,线程,new,题库,id
From: https://blog.51cto.com/u_15858858/12120850

相关文章