首页 > 其他分享 >Spring Boot集成Spring Batch入门

Spring Boot集成Spring Batch入门

时间:2023-08-01 16:03:17浏览次数:35  
标签:Spring Boot batch springframework Batch item import org public

一、简介

Spring Batch是一个开源的、全面的、轻量级的批处理框架,通过Spring Batch可以实现强大的批处理应用程序开发。Spring Batch还提供记录/跟踪、事务管理、作业处理统计、作业重启以及资源管理等功能。

二、整合Spring Boot

1.添加依赖

<!--mysql-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.30</version>
        </dependency>
        <!--Spring Batch-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>
        <!--jpa-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>

2.application.yml配置

server:
  port: 6666

spring:
  datasource:
    username: root
    password: 123456
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/mxg_member?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8
    type: com.alibaba.druid.pool.DruidDataSource
    # 配置项目启动时创建数据表的SQL脚本 由Spring Batch提供
    schema: classpath:/org/springframework/batch/core/schema-mysql.sql
  jpa:
    show-sql: true
    hibernate:
      ddl-auto: update
  batch:
    jdbc:
      # 项目启动时执行建表SQL
      initialize-schema: always
    job:
      # 禁止Spring Batch自动执行
      enabled: false
      names: parentJob
      # Spring batch相关表的前缀 默认为batch_
    table-prefix: batch_

3.定义数据源

我们在resources目录下创建一个data目录,data目录下创建一个users.csv文件。添加以下测试数据

姓名	性别	年龄	地址
张三	男	12	深圳
李四	男	32	广州
王雪	女	21	上海
孙云	女	23	北京
赵柳	女	42	成都
孙雪	女	15	武汉

创建一个用户实体类,用于数据的存取。

package com.mengxuegu.member.bean;

import lombok.Data;

import javax.persistence.*;

/**
 * @author qx
 * @date 2023/8/1
 * @des 用户实体
 */
@Entity
@Table(name = "t_user")
@Data
public class User {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    private String name;

    private String sex;

    private Integer age;

    private String address;

}

4.Spring Batch配置类

package com.mengxuegu.member.config;

/**
 * @author qx
 * @date 2023/8/1
 * @des Spring Batch配置
 */

import com.mengxuegu.member.bean.User;
import org.springframework.batch.core.*;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.validator.Validator;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;

import javax.sql.DataSource;
import java.io.FileNotFoundException;

@Configuration
@EnableBatchProcessing
public class CsvBatchJobConfig {

    // 用来读取数据
    @Bean
    public ItemReader<User> reader() {
        // FlatFileItemReader是一个用来加载文件的itemReader
        FlatFileItemReader<User> reader = new FlatFileItemReader<>();
        // 跳过第一行的标题
        reader.setLinesToSkip(1);
        // 设置csv的位置
        reader.setResource(new ClassPathResource("data/users.csv"));
        // 设置每一行的数据信息
        reader.setLineMapper(new DefaultLineMapper<User>() {{
            setLineTokenizer(new DelimitedLineTokenizer() {{
                // 配置了四行文件
                setNames(new String[]{"name", "sex", "age", "address"});
                // 配置列于列之间的间隔符,会通过间隔符对每一行进行切分
                setDelimiter("\t");
            }});

            // 设置要映射的实体类属性
            setFieldSetMapper(new BeanWrapperFieldSetMapper<User>() {{
                setTargetType(User.class);
            }});
        }});
        return reader;
    }

    // 用来处理数据
    @Bean
    public ItemProcessor<User, User> processor() {
        // 使用我们自定义的ItemProcessor的实现CsvItemProcessor
        CsvItemProcessor processor = new CsvItemProcessor();
        // 为processor指定校验器为CsvBeanValidator()
        processor.setValidator(csvBeanValidator());
        return processor;
    }


    // 用来输出数据
    @Bean
    public ItemWriter<User> writer(@Qualifier("dataSource") DataSource dataSource) {
        // 通过Jdbc写入到数据库中
        JdbcBatchItemWriter writer = new JdbcBatchItemWriter();
        writer.setDataSource(dataSource);
        // setItemSqlParameterSourceProvider 表示将实体类中的属性和占位符一一映射
        writer.setItemSqlParameterSourceProvider(
                new BeanPropertyItemSqlParameterSourceProvider<>());

        // 设置要执行批处理的SQL语句。其中占位符的写法是 `:属性名`
        writer.setSql("insert into t_user(name, sex, age, address) " +
                "values(:name, :sex, :age, :address)");
        return writer;
    }


    // 配置一个Step
    @Bean
    public Step csvStep(
            StepBuilderFactory stepBuilderFactory,
            ItemReader<User> reader,
            ItemProcessor<User, User> processor,
            ItemWriter<User> writer) {

        return stepBuilderFactory.get("csvStep")
                // 批处理每次提交5条数据
                .<User, User>chunk(5)
                // 给step绑定 reader
                .reader(reader)
                // 给step绑定 processor
                .processor(processor)
                // 给step绑定 writer
                .writer(writer)
                .faultTolerant()
                // 设定一个我们允许的这个step可以跳过的异常数量,假如我们设定为3,则当这个step运行时,只要出现的异常数目不超过3,整个step都不会fail。注意,若不设定skipLimit,则其默认值是0
                .skipLimit(3)
                // 指定我们可以跳过的异常,因为有些异常的出现,我们是可以忽略的
                .skip(Exception.class)
                // 出现这个异常我们不想跳过,因此这种异常出现一次时,计数器就会加一,直到达到上限
                .noSkip(FileNotFoundException.class)
                .build();
    }

    /**
     * 配置一个要执行的Job任务, 包含一个或多个Step
     */
    @Bean
    public Job csvJob(JobBuilderFactory jobBuilderFactory, Step step) {
        // 为 job 起名为 csvJob
        return jobBuilderFactory.get("csvJob")
                .start(step)
//                .next(step)
                .listener(listener())
                .build();
    }

    @Bean
    public Validator<User> csvBeanValidator() {
        return new CsvBeanValidator<>();
    }

    @Bean
    public JobExecutionListener listener() {
        return new JobCompletionListener();
    }
}

5.自定义校验器

package com.mengxuegu.member.config;

import com.mengxuegu.member.bean.User;
import org.springframework.batch.item.validator.ValidatingItemProcessor;
import org.springframework.batch.item.validator.ValidationException;

/**
 * @author qx
 * @date 2023/8/1
 * @des 自定义校验器
 */
public class CsvItemProcessor extends ValidatingItemProcessor<User> {

    @Override
    public User process(User item) throws ValidationException {
        super.process(item);
        if ("男".equals(item.getSex())) {
            item.setSex("1");
        } else {
            item.setSex("2");
        }
        return item;
    }
}

6.数据校验类

package com.mengxuegu.member.config;

/**
 * @author qx
 * @date 2023/8/1
 * @des 数据校验类
 */

import org.springframework.batch.item.validator.ValidationException;
import org.springframework.batch.item.validator.Validator;
import org.springframework.beans.factory.InitializingBean;

import javax.validation.ConstraintViolation;
import javax.validation.Validation;
import javax.validation.ValidatorFactory;
import java.util.Set;

public class CsvBeanValidator<T> implements Validator<T>, InitializingBean {

    private javax.validation.Validator validator;

    @Override
    public void validate(T value) throws ValidationException {
        // 使用Validator的validate方法校验数据
        Set<ConstraintViolation<T>> constraintViolations =
                validator.validate(value);

        if (constraintViolations.size() > 0) {
            StringBuilder message = new StringBuilder();
            for (ConstraintViolation<T> constraintViolation : constraintViolations) {
                message.append(constraintViolation.getMessage() + "\n");
            }
            throw new ValidationException(message.toString());
        }
    }

    /**
     * 使用JSR-303的Validator来校验我们的数据,在此进行JSR-303的Validator的初始化
     */
    @Override
    public void afterPropertiesSet() throws Exception {
        ValidatorFactory validatorFactory =
                Validation.buildDefaultValidatorFactory();
        validator = validatorFactory.usingContext().getValidator();
    }
}

7.批处理监听类

package com.mengxuegu.member.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;

/**
 * @author qx
 * @date 2023/8/1
 * @des 批处理监听类
 */
@Slf4j
public class JobCompletionListener extends JobExecutionListenerSupport {
    // 用于批处理开始前执行
    @Override
    public void beforeJob(JobExecution jobExecution) {
        log.info("任务ID:{},开始于:{}", jobExecution.getJobId(), jobExecution.getStartTime());
    }

    // 用于批处理开始后执行
    @Override
    public void afterJob(JobExecution jobExecution) {
        if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
            log.info("任务ID:{},结束于:{}", jobExecution.getJobId(), jobExecution.getEndTime());
        } else {
            log.error("任务ID:{},执行异常状态:{}", jobExecution.getJobId(), jobExecution.getStatus());
        }
    }

}

8.创建控制层测试

package com.mengxuegu.member.controller;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

/**
 * @author qx
 * @date 2023/8/1
 * @des Spring Batch测试
 */
@RestController
@RequestMapping("/job")
public class JobController {

    @Autowired
    private JobLauncher jobLauncher;
    @Autowired
    private Job job;

    @GetMapping("/do")
    public void doJob() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
        JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
        jobParametersBuilder.addDate("jobDate", new Date());

        // 执行一个批处理任务
        jobLauncher.run(job, jobParametersBuilder.toJobParameters());
    }
}

9.测试

我们启动项目,会在系统对应的数据库中新建以batch开头的Spring Batch 相关表。

Spring Boot集成Spring Batch入门_批处理

batch_job_execution: 表示Job执行的句柄(一次执行)

batch_job_execution_params: 通过Job参数区分不同的Job实例,实际使用hashMap存储参数(仅4种数据类型)

batch_job_instance: 作业实例,一个运行期概念(一次执行关联一个实例)

batch_job_execution_seq

batch_job_seq

batch_step_execution: 执行上下文,在job/Step执行时保存需要进行持久化的状态信息。

batch_job_execution_context: 执行上下文,在job/Step执行时保存需要进行持久化的状态信息。

batch_step_execution_context

batch_step_execution_seq

我们在postman上访问接口进行测试:

Spring Boot集成Spring Batch入门_批处理_02

控制台显示了我们批处理的一些操作日志:

15:41:51.278  INFO 9840--- [nio-6666-exec-5] o.s.b.c.l.support.SimpleJobLauncher      :Job: [SimpleJob: [name=csvJob]] launched with the following parameters: [{jobDate=1690875711218}]
15:41:51.306  INFO 9840--- [nio-6666-exec-5] c.m.member.config.JobCompletionListener  :任务ID:1,开始于:Tue Aug 01 15:41:51 CST 2023
15:41:51.324  INFO 9840--- [nio-6666-exec-5] o.s.batch.core.job.SimpleStepHandler     :Executing step: [csvStep]
15:41:51.382  INFO 9840--- [nio-6666-exec-5] o.s.batch.core.step.AbstractStep         :Step: [csvStep] executed in 58ms
15:41:51.391  INFO 9840--- [nio-6666-exec-5] c.m.member.config.JobCompletionListener  :任务ID:1,结束于:Tue Aug 01 15:41:51 CST 2023
15:41:51.398  INFO 9840--- [nio-6666-exec-5] o.s.b.c.l.support.SimpleJobLauncher      :Job: [SimpleJob: [name=csvJob]] completed with the following parameters: [{jobDate=1690875711218}] and the following status: [COMPLETED] in 95ms

最后我们刷新数据表t_user,发现csv中的数据已经添加到了我们的数据表。

Spring Boot集成Spring Batch入门_批处理_03

到这里我们实现了SpringBoot集成Spring Batch的批处理基本使用。

标签:Spring,Boot,batch,springframework,Batch,item,import,org,public
From: https://blog.51cto.com/u_13312531/6923367

相关文章

  • 在非Spring类中获取Spring管理的对象
    在非Spring类中获取Spring管理的对象工具类:packagecom.xxx.task.util;importorg.springframework.context.ApplicationContext;importorg.springframework.context.ApplicationContextAware;importorg.springframework.stereotype.Component;@Componentpublicclass......
  • 掌握Spring条件装配的秘密武器
    本文分享自华为云社区《Spring高手之路9——掌握Spring条件装配的秘密武器》,作者:砖业洋__。在Spring框架中,条件装配是一个强大的功能,可以帮助我们更好地管理和控制Bean的创建过程。本文详细解释了如何使用Spring的@Profile和@Conditional注解实现条件装配,通过具体的示例可以更好地......
  • 掌握Spring条件装配的秘密武器
    本文分享自华为云社区《Spring高手之路9——掌握Spring条件装配的秘密武器》,作者:砖业洋__。在Spring框架中,条件装配是一个强大的功能,可以帮助我们更好地管理和控制Bean的创建过程。本文详细解释了如何使用Spring的@Profile和@Conditional注解实现条件装配,通过具体的示例可以更好......
  • springboot 集成 onlyoffice 实现文档预览、编辑、pdf转化、缩略图生成
    开源地址https://gitee.com/lboot/lucy-onlyoffice介绍lucy-onlyoffice是依赖于onlyoffice的springboot文档预览编辑集成解决方案,该解决方案实现了了onlyoffice的访问使用,支持对常见文档类型的预览,编辑和转化。该解决方案提供了功能的拓展实现,用户可以基于拓展接口,实现业务系统......
  • Spring Cloud Alibaba 2022 正式发布,启动速度提升 10 倍,各方面直接起飞!
    大家好,我是栈长。经过SpringCloudAlibaba2022的第一个候选版本2022.0.0.0-RC1发布7个多月后,中间还有一个2022.0.0.0-RC2版本,就在前几天,SpringCloudAlibaba2022.0.0.0正式版终于正式发布了。SpringCloudAlibaba2022.0.0.0依赖更新由于SpringBoot各个版本......
  • springboot 使用log4j2配置
      pom配置如下:<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope>......
  • spring boot学习(7)— 自定义中的 HttpMessageConverter
    在我们开发自己的应用时,有时候,我们可能需要自定义一些自己的数据格式来传输,这时,自定义的数据传输和类的实例之间进行转化就需要统一起来了, SpringMVC 中的 HttpMessageConverter 就派上用场了。HttpMessageConverter 的声明:publicinterfaceHttpMessageConverter<T>{......
  • linux sbatch 提交jupyter
    点击查看代码#!/bin/bash#SBATCH-pBatch2#SBATCH-N1#SBATCH-n1#SBATCH-c1#SBATCH--job-name=jupyter#SBATCH--output=jupy.out#SBATCH--error=jupy.err#SBATCH--parsableport=8003ip=`ifconfig|grep-m1inet|awk'{print$2}'`jup......
  • 智慧校园源码:vue2+Java+springboot+MySQL+elmentui+jpa+jwt
    智慧校园综合管理云平台源码系统主要以校园安全、智慧校园综合管理云平台为核心,以智慧班牌为学生智慧之窗,以移动管理平台、家校沟通为辅。教师—家长一学校—学生循环的无纸化管理模式及教学服务,实现多领域的信息互联互通以及校园管理一体化、信息数据化、数据自动化。智慧班牌融合......
  • SpringCloud Gateway 在微服务架构下的最佳实践
    作者:徐靖峰(岛风)前言本文整理自云原生技术实践营广州站Meetup的分享,其中的经验来自于我们团队开发的阿里云CSB2.0这款产品,其基于开源SpringCloudGateway开发,在完全兼容开源用法的前提下,做了诸多企业级的改造,涉及功能特性、稳定性、安全、性能等方面。为什么需要微服务网......