记一次完整的SpringBatch批处理数据流程
需求
从400多行数据的Excel表格中批量读取数据,根据读取的数据再去调用api,拿到关键返回数据后整合写入新Excel文件。
excel表格仅第一列数据手机号为有效数据,需要读取。通过手机号调用api,获取手机号对应的学生信息-学院,班级,姓名,手机号
导入依赖
需要的关键依赖:
<!-- 批处理框架 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<!-- excel工具 -->
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
</dependency>
<!-- Http库 -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<!-- Json格式操作 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
流程
1. 创建配置类
SpringBacth整体流程是:
调用定义的Job-->Job按顺序执行关联的步骤Step-->每一步Step中按预设的Reader,Processor,Writer顺序执行
- 在配置类中配置定义Job和Step:
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
import java.util.Map;
import java.util.function.Function;
/**
* 配置类配置Job和Step
*/
@Configuration
@EnableBatchProcessing(modular = true)
public class BatchConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
/**
* 配置Job,决定待执行的步骤
* @param step01
* @return
*/
@Bean
public Job excelJob(@Qualifier("excelStep") Step step01/*,@Qualifier("step02") Step step02*/) {
return jobBuilderFactory.get("excelJob")
.start(step01)
// .next(step02)
.build();
}
/**
* 配置步骤中的reader,processor,writer
* @param excelProcessor
* @return
* @throws IOException
*/
@Bean
public Step excelStep(
@Qualifier("excelProcessor") ItemProcessor<String, Map<String ,String>> excelProcessor) throws IOException {
return stepBuilderFactory.get("excelStep")
.<String, Map<String ,String>>chunk(1000)
.reader(new ExcelItemReader("/Users/wangchenbo/Desktop/ytt.xls"))
.processor(excelProcessor)
.writer(new ExcelChunkWriter("/Users/wangchenbo/Desktop/data.xls"))
.build();
}
}
- 关于Chunk
chunk-oriented processing(基于块的处理)是一种常用的数据处理模式,它允许你以块的形式读取、处理和写入数据。ItemReader 会按照配置的 chunkSize 读取一批数据,然后这一批数据会被一起传递给 ItemProcessor 进行处理,最后会被整体写入,同时在设置chunkSize要指定输入输出的参数类型
2. 配置ItemReader
ItemReader是读取数据的接口,内置提供的常用的实现类有:
- JdbcCursorItemReader ,使用 ResultSet 游标来从数据库逐条读取数据
- FlatFileItemReader , 用于从平面文件(如 CSV、TXT 等)中读取数据
- XmlRecordItemReader , 用于从 XML 文件中读取数据
- DelimitedLineItemReader , 用于从分隔符分隔的文本文件中读取数据
- JdbcPagingItemReader , 用于从数据库中分页读取数据
对于当下读取Excel的需求,选择自定义实现ItemReader接口来定义读取规则。具体的规则通过实现接口的read()方法进行制定。
package ytt;
import org.apache.poi.hssf.usermodel.HSSFWorkbook;
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.ss.usermodel.Sheet;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import java.io.FileInputStream;
import java.io.IOException;
public class ExcelItemReader implements ItemReader<String> {
//HSSFWorkbook:是操作Excel2003以前(包括2003)的版本,扩展名是.xls
//XSSFWorkbook:是操作Excel2007后的版本,扩展名是.xlsx
//SXSSFWorkbook:是操作Excel2007后的版本,扩展名是.xlsx
private HSSFWorkbook workbook;
private Sheet sheet;
private int currentRow = 0;
public ExcelItemReader(String filePath) throws IOException {
FileInputStream fis = new FileInputStream(filePath);
this.workbook = new HSSFWorkbook(fis);
this.sheet = this.workbook.getSheetAt(0);
}
@Override
public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
//当行数大于最后一行,关闭工作簿,结束
if (currentRow > sheet.getLastRowNum()) {
closeWorkbook();
return null;
}
//读取一行
Row row = sheet.getRow(currentRow++);
return row.getCell(0).getStringCellValue();
}
private void closeWorkbook() {
try {
if (workbook != null) {
workbook.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
3.配置ItemProcessor
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
/**
* 定义数据处理规则 <br>
* 接收每个来自Reader的数据,处理数据,执行到预设数量的数据后,以chunk为单位writer输出
*/
@Configuration
public class Processor {
/**
* 单例化httpClient,否则在ItemProcessor重复执行期间会重复创建
*/
@Bean
public CloseableHttpClient httpClient() {
return HttpClients.createDefault();
}
@Bean
public ItemProcessor<String, Map<String, String>> excelProcessor(CloseableHttpClient httpClient) {
return phoneNumber -> {
//调用地址
String url = "http://116.62.107.xxx:12010/api/User/Personal";
//创建post请求
HttpPost httpPost = new HttpPost(url);
//设置请求体
String requestBody = "{\"data\":{\"LoginName\":\"" + phoneNumber + "\"}}";
httpPost.setEntity(new StringEntity(requestBody, StandardCharsets.UTF_8));
// 设置请求头
httpPost.setHeader("Authorization", "Bearer /**省略**/");
httpPost.setHeader("Content-Type", "application/json;charset=UTF-8");
httpPost.setHeader("AppVersion", "1.5.8");
httpPost.setHeader("Host", "116.62.107.xxx:12010");
//返回值
HashMap<String, String> map = new HashMap<>();
try (CloseableHttpResponse response = httpClient.execute(httpPost)) {
int statusCode = response.getStatusLine().getStatusCode();
System.out.println("Status code: " + statusCode + phoneNumber);
// 获取响应实体
HttpEntity entity = response.getEntity();
if (entity != null) {
// 打印响应内容
String responseBody = EntityUtils.toString(entity);
//转换JSON筛选字段
JSONObject jsonBody = JSON.parseObject(responseBody);
if (StringUtils.startsWithIgnoreCase(jsonBody.getJSONObject("data").getString("facName"), "2022")) {
map.put("facName", jsonBody.getJSONObject("data").getJSONArray("details").getJSONObject(0)
.getString("facName"));
map.put("class", jsonBody.getJSONObject("data")
.getString("facName"));
map.put("name", jsonBody.getJSONObject("data")
.getString("name"));
map.put("phone", phoneNumber);
}
}
}
return CollectionUtils.isEmpty(map) ? null : map;
};
}
}
4. 配置ItemWriter
这里也选择自定义实现ItemWriter接口作为Writer
import org.apache.poi.hssf.usermodel.HSSFWorkbook;
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.ss.usermodel.Sheet;
import org.springframework.batch.item.ItemWriter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
* writer以chunk块为单位对预设的size大小的chunk进行一次性写入 <br>
* 接收的参数是基本项的集合
*/
public class ExcelChunkWriter implements ItemWriter<Map<String ,String>> {
private HSSFWorkbook workbook;
private Sheet sheet;
FileOutputStream fos;
private int rowNum = 1;
public ExcelChunkWriter(String outPath) throws IOException {
fos = new FileOutputStream(outPath);
this.workbook = new HSSFWorkbook();
this.sheet = workbook.createSheet("已清洗数据");
Row headerRow = sheet.createRow(0);
headerRow.createCell(0).setCellValue("学院");
headerRow.createCell(1).setCellValue("班级");
headerRow.createCell(2).setCellValue("姓名");
headerRow.createCell(3).setCellValue("号码");
}
@Override
public void write(List<? extends Map<String ,String>> items) throws IOException {
for (Map<String ,String> map : items) {
Row row = sheet.createRow(rowNum++);
row.createCell(0).setCellValue(map.get("facName"));
row.createCell(1).setCellValue(map.get("class"));
row.createCell(2).setCellValue(map.get("name"));
row.createCell(3).setCellValue(map.get("phone"));
}
try {
workbook.write(fos);
}finally {
workbook.close();
}
}
}
注意事项
- 实际上查看ItemReader和ItemPorcessor内部实现方法可以看出,每一个read和process方法接收的参数是单个数据项,实际上ItemReader每次处理单个数据之后并不会直接传递给Processor,而是维护了一个内部缓冲区,将读取的数据达到预设chunkSize数量后整体传递给ItemProcesser,同理itemProcessor也是逐个处理数据项,所以在实现ItemReader或者ItemProcessor接口方法时,只需要关注单个数据项的读取规则和处理逻辑即可,这种接口的设计目的是为了方便地处理单个数据项。
- ItemReader和ItemProcessor的实现方法接受的参数实际上是单个数据项,而ItemWriter将以chunk块为单位整体接收,以
List<? extends T> items
类型接收参数,进行统一写入