一、简介
Spring Batch是一个开源的、全面的、轻量级的批处理框架,通过Spring Batch可以实现强大的批处理应用程序开发。Spring Batch还提供记录/跟踪、事务管理、作业处理统计、作业重启以及资源管理等功能。
二、整合Spring Boot
1.添加依赖
<!--mysql-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.30</version>
</dependency>
<!--Spring Batch-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<!--jpa-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
2.application.yml配置
server:
port: 6666
spring:
datasource:
username: root
password: 123456
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/mxg_member?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8
type: com.alibaba.druid.pool.DruidDataSource
# 配置项目启动时创建数据表的SQL脚本 由Spring Batch提供
schema: classpath:/org/springframework/batch/core/schema-mysql.sql
jpa:
show-sql: true
hibernate:
ddl-auto: update
batch:
jdbc:
# 项目启动时执行建表SQL
initialize-schema: always
job:
# 禁止Spring Batch自动执行
enabled: false
names: parentJob
# Spring batch相关表的前缀 默认为batch_
table-prefix: batch_
3.定义数据源
我们在resources目录下创建一个data目录,data目录下创建一个users.csv文件。添加以下测试数据
姓名 性别 年龄 地址
张三 男 12 深圳
李四 男 32 广州
王雪 女 21 上海
孙云 女 23 北京
赵柳 女 42 成都
孙雪 女 15 武汉
创建一个用户实体类,用于数据的存取。
package com.mengxuegu.member.bean;
import lombok.Data;
import javax.persistence.*;
/**
* @author qx
* @date 2023/8/1
* @des 用户实体
*/
@Entity
@Table(name = "t_user")
@Data
public class User {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String name;
private String sex;
private Integer age;
private String address;
}
4.Spring Batch配置类
package com.mengxuegu.member.config;
/**
* @author qx
* @date 2023/8/1
* @des Spring Batch配置
*/
import com.mengxuegu.member.bean.User;
import org.springframework.batch.core.*;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.validator.Validator;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import javax.sql.DataSource;
import java.io.FileNotFoundException;
@Configuration
@EnableBatchProcessing
public class CsvBatchJobConfig {
// 用来读取数据
@Bean
public ItemReader<User> reader() {
// FlatFileItemReader是一个用来加载文件的itemReader
FlatFileItemReader<User> reader = new FlatFileItemReader<>();
// 跳过第一行的标题
reader.setLinesToSkip(1);
// 设置csv的位置
reader.setResource(new ClassPathResource("data/users.csv"));
// 设置每一行的数据信息
reader.setLineMapper(new DefaultLineMapper<User>() {{
setLineTokenizer(new DelimitedLineTokenizer() {{
// 配置了四行文件
setNames(new String[]{"name", "sex", "age", "address"});
// 配置列于列之间的间隔符,会通过间隔符对每一行进行切分
setDelimiter("\t");
}});
// 设置要映射的实体类属性
setFieldSetMapper(new BeanWrapperFieldSetMapper<User>() {{
setTargetType(User.class);
}});
}});
return reader;
}
// 用来处理数据
@Bean
public ItemProcessor<User, User> processor() {
// 使用我们自定义的ItemProcessor的实现CsvItemProcessor
CsvItemProcessor processor = new CsvItemProcessor();
// 为processor指定校验器为CsvBeanValidator()
processor.setValidator(csvBeanValidator());
return processor;
}
// 用来输出数据
@Bean
public ItemWriter<User> writer(@Qualifier("dataSource") DataSource dataSource) {
// 通过Jdbc写入到数据库中
JdbcBatchItemWriter writer = new JdbcBatchItemWriter();
writer.setDataSource(dataSource);
// setItemSqlParameterSourceProvider 表示将实体类中的属性和占位符一一映射
writer.setItemSqlParameterSourceProvider(
new BeanPropertyItemSqlParameterSourceProvider<>());
// 设置要执行批处理的SQL语句。其中占位符的写法是 `:属性名`
writer.setSql("insert into t_user(name, sex, age, address) " +
"values(:name, :sex, :age, :address)");
return writer;
}
// 配置一个Step
@Bean
public Step csvStep(
StepBuilderFactory stepBuilderFactory,
ItemReader<User> reader,
ItemProcessor<User, User> processor,
ItemWriter<User> writer) {
return stepBuilderFactory.get("csvStep")
// 批处理每次提交5条数据
.<User, User>chunk(5)
// 给step绑定 reader
.reader(reader)
// 给step绑定 processor
.processor(processor)
// 给step绑定 writer
.writer(writer)
.faultTolerant()
// 设定一个我们允许的这个step可以跳过的异常数量,假如我们设定为3,则当这个step运行时,只要出现的异常数目不超过3,整个step都不会fail。注意,若不设定skipLimit,则其默认值是0
.skipLimit(3)
// 指定我们可以跳过的异常,因为有些异常的出现,我们是可以忽略的
.skip(Exception.class)
// 出现这个异常我们不想跳过,因此这种异常出现一次时,计数器就会加一,直到达到上限
.noSkip(FileNotFoundException.class)
.build();
}
/**
* 配置一个要执行的Job任务, 包含一个或多个Step
*/
@Bean
public Job csvJob(JobBuilderFactory jobBuilderFactory, Step step) {
// 为 job 起名为 csvJob
return jobBuilderFactory.get("csvJob")
.start(step)
// .next(step)
.listener(listener())
.build();
}
@Bean
public Validator<User> csvBeanValidator() {
return new CsvBeanValidator<>();
}
@Bean
public JobExecutionListener listener() {
return new JobCompletionListener();
}
}
5.自定义校验器
package com.mengxuegu.member.config;
import com.mengxuegu.member.bean.User;
import org.springframework.batch.item.validator.ValidatingItemProcessor;
import org.springframework.batch.item.validator.ValidationException;
/**
* @author qx
* @date 2023/8/1
* @des 自定义校验器
*/
public class CsvItemProcessor extends ValidatingItemProcessor<User> {
@Override
public User process(User item) throws ValidationException {
super.process(item);
if ("男".equals(item.getSex())) {
item.setSex("1");
} else {
item.setSex("2");
}
return item;
}
}
6.数据校验类
package com.mengxuegu.member.config;
/**
* @author qx
* @date 2023/8/1
* @des 数据校验类
*/
import org.springframework.batch.item.validator.ValidationException;
import org.springframework.batch.item.validator.Validator;
import org.springframework.beans.factory.InitializingBean;
import javax.validation.ConstraintViolation;
import javax.validation.Validation;
import javax.validation.ValidatorFactory;
import java.util.Set;
public class CsvBeanValidator<T> implements Validator<T>, InitializingBean {
private javax.validation.Validator validator;
@Override
public void validate(T value) throws ValidationException {
// 使用Validator的validate方法校验数据
Set<ConstraintViolation<T>> constraintViolations =
validator.validate(value);
if (constraintViolations.size() > 0) {
StringBuilder message = new StringBuilder();
for (ConstraintViolation<T> constraintViolation : constraintViolations) {
message.append(constraintViolation.getMessage() + "\n");
}
throw new ValidationException(message.toString());
}
}
/**
* 使用JSR-303的Validator来校验我们的数据,在此进行JSR-303的Validator的初始化
*/
@Override
public void afterPropertiesSet() throws Exception {
ValidatorFactory validatorFactory =
Validation.buildDefaultValidatorFactory();
validator = validatorFactory.usingContext().getValidator();
}
}
7.批处理监听类
package com.mengxuegu.member.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
/**
* @author qx
* @date 2023/8/1
* @des 批处理监听类
*/
@Slf4j
public class JobCompletionListener extends JobExecutionListenerSupport {
// 用于批处理开始前执行
@Override
public void beforeJob(JobExecution jobExecution) {
log.info("任务ID:{},开始于:{}", jobExecution.getJobId(), jobExecution.getStartTime());
}
// 用于批处理开始后执行
@Override
public void afterJob(JobExecution jobExecution) {
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
log.info("任务ID:{},结束于:{}", jobExecution.getJobId(), jobExecution.getEndTime());
} else {
log.error("任务ID:{},执行异常状态:{}", jobExecution.getJobId(), jobExecution.getStatus());
}
}
}
8.创建控制层测试
package com.mengxuegu.member.controller;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
/**
* @author qx
* @date 2023/8/1
* @des Spring Batch测试
*/
@RestController
@RequestMapping("/job")
public class JobController {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job job;
@GetMapping("/do")
public void doJob() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
jobParametersBuilder.addDate("jobDate", new Date());
// 执行一个批处理任务
jobLauncher.run(job, jobParametersBuilder.toJobParameters());
}
}
9.测试
我们启动项目,会在系统对应的数据库中新建以batch
开头的Spring Batch 相关表。
batch_job_execution: 表示Job执行的句柄(一次执行)
batch_job_execution_params: 通过Job参数区分不同的Job实例,实际使用hashMap存储参数(仅4种数据类型)
batch_job_instance: 作业实例,一个运行期概念(一次执行关联一个实例)
batch_job_execution_seq
batch_job_seq
batch_step_execution: 执行上下文,在job/Step执行时保存需要进行持久化的状态信息。
batch_job_execution_context: 执行上下文,在job/Step执行时保存需要进行持久化的状态信息。
batch_step_execution_context
batch_step_execution_seq
我们在postman上访问接口进行测试:
控制台显示了我们批处理的一些操作日志:
15:41:51.278 INFO 9840--- [nio-6666-exec-5] o.s.b.c.l.support.SimpleJobLauncher :Job: [SimpleJob: [name=csvJob]] launched with the following parameters: [{jobDate=1690875711218}]
15:41:51.306 INFO 9840--- [nio-6666-exec-5] c.m.member.config.JobCompletionListener :任务ID:1,开始于:Tue Aug 01 15:41:51 CST 2023
15:41:51.324 INFO 9840--- [nio-6666-exec-5] o.s.batch.core.job.SimpleStepHandler :Executing step: [csvStep]
15:41:51.382 INFO 9840--- [nio-6666-exec-5] o.s.batch.core.step.AbstractStep :Step: [csvStep] executed in 58ms
15:41:51.391 INFO 9840--- [nio-6666-exec-5] c.m.member.config.JobCompletionListener :任务ID:1,结束于:Tue Aug 01 15:41:51 CST 2023
15:41:51.398 INFO 9840--- [nio-6666-exec-5] o.s.b.c.l.support.SimpleJobLauncher :Job: [SimpleJob: [name=csvJob]] completed with the following parameters: [{jobDate=1690875711218}] and the following status: [COMPLETED] in 95ms
最后我们刷新数据表t_user,发现csv中的数据已经添加到了我们的数据表。
到这里我们实现了SpringBoot集成Spring Batch的批处理基本使用。
标签:Spring,Boot,batch,springframework,Batch,item,import,org,public From: https://blog.51cto.com/u_13312531/6923367