首页 > 其他分享 >记一次完整的SpringBatch批处理数据流程

记一次完整的SpringBatch批处理数据流程

时间:2024-08-19 13:54:16浏览次数:23  
标签:批处理 流程 batch springframework org apache import SpringBatch public

记一次完整的SpringBatch批处理数据流程

需求

从400多行数据的Excel表格中批量读取数据,根据读取的数据再去调用api,拿到关键返回数据后整合写入新Excel文件。

excel表格仅第一列数据手机号为有效数据,需要读取。通过手机号调用api,获取手机号对应的学生信息-学院,班级,姓名,手机号

导入依赖

需要的关键依赖:

		<!-- 批处理框架 -->
		<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>

		<!-- excel工具 -->
		<dependency>
            <groupId>org.apache.poi</groupId>
            <artifactId>poi-ooxml</artifactId>
        </dependency>

		<!-- Http库 -->
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
        </dependency>
		
		<!-- Json格式操作 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
        </dependency>

流程

1. 创建配置类

SpringBacth整体流程是:
调用定义的Job-->Job按顺序执行关联的步骤Step-->每一步Step中按预设的Reader,Processor,Writer顺序执行

  • 在配置类中配置定义Job和Step:
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;
import java.util.Map;
import java.util.function.Function;

/**
 * 配置类配置Job和Step
 */
@Configuration
@EnableBatchProcessing(modular = true)
public class BatchConfig {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    /**
     * 配置Job,决定待执行的步骤
     * @param step01
     * @return
     */
    @Bean
    public Job excelJob(@Qualifier("excelStep") Step step01/*,@Qualifier("step02") Step step02*/) {
        return jobBuilderFactory.get("excelJob")
                .start(step01)
//                .next(step02)
                .build();
    }

    /**
     * 配置步骤中的reader,processor,writer
     * @param excelProcessor
     * @return
     * @throws IOException
     */
    @Bean
    public Step excelStep(
            @Qualifier("excelProcessor") ItemProcessor<String, Map<String ,String>> excelProcessor) throws IOException {
        return stepBuilderFactory.get("excelStep")
                .<String, Map<String ,String>>chunk(1000)
                .reader(new ExcelItemReader("/Users/wangchenbo/Desktop/ytt.xls"))
                .processor(excelProcessor)
                .writer(new ExcelChunkWriter("/Users/wangchenbo/Desktop/data.xls"))
                .build();
    }


}

  • 关于Chunk
    chunk-oriented processing(基于块的处理)是一种常用的数据处理模式,它允许你以块的形式读取、处理和写入数据。ItemReader 会按照配置的 chunkSize 读取一批数据,然后这一批数据会被一起传递给 ItemProcessor 进行处理,最后会被整体写入,同时在设置chunkSize要指定输入输出的参数类型

2. 配置ItemReader

ItemReader是读取数据的接口,内置提供的常用的实现类有:

  • JdbcCursorItemReader ,使用 ResultSet 游标来从数据库逐条读取数据
  • FlatFileItemReader , 用于从平面文件(如 CSV、TXT 等)中读取数据
  • XmlRecordItemReader , 用于从 XML 文件中读取数据
  • DelimitedLineItemReader , 用于从分隔符分隔的文本文件中读取数据
  • JdbcPagingItemReader , 用于从数据库中分页读取数据

对于当下读取Excel的需求,选择自定义实现ItemReader接口来定义读取规则。具体的规则通过实现接口的read()方法进行制定。

package ytt;

import org.apache.poi.hssf.usermodel.HSSFWorkbook;
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.ss.usermodel.Sheet;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;

import java.io.FileInputStream;
import java.io.IOException;

public class ExcelItemReader implements ItemReader<String> {
	//HSSFWorkbook:是操作Excel2003以前(包括2003)的版本,扩展名是.xls
    //XSSFWorkbook:是操作Excel2007后的版本,扩展名是.xlsx
    //SXSSFWorkbook:是操作Excel2007后的版本,扩展名是.xlsx
    private HSSFWorkbook workbook;
    private Sheet sheet;
    private int currentRow = 0;

    public ExcelItemReader(String filePath) throws IOException {
        FileInputStream fis = new FileInputStream(filePath);
        this.workbook = new HSSFWorkbook(fis);
        this.sheet = this.workbook.getSheetAt(0);
    }

    @Override
    public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
		//当行数大于最后一行,关闭工作簿,结束
        if (currentRow > sheet.getLastRowNum()) {
            closeWorkbook();
            return null;
        }
		//读取一行
        Row row = sheet.getRow(currentRow++);
        return row.getCell(0).getStringCellValue();
    }

    private void closeWorkbook() {
        try {
            if (workbook != null) {
                workbook.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

3.配置ItemProcessor


import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

/**
 * 定义数据处理规则 <br>
 * 接收每个来自Reader的数据,处理数据,执行到预设数量的数据后,以chunk为单位writer输出
 */
@Configuration
public class Processor {

    /**
     * 单例化httpClient,否则在ItemProcessor重复执行期间会重复创建
     */
    @Bean
    public CloseableHttpClient httpClient() {
        return HttpClients.createDefault();
    }

    @Bean
    public ItemProcessor<String, Map<String, String>> excelProcessor(CloseableHttpClient httpClient) {
        return phoneNumber -> {

            //调用地址
            String url = "http://116.62.107.xxx:12010/api/User/Personal";
            //创建post请求
            HttpPost httpPost = new HttpPost(url);
            //设置请求体
            String requestBody = "{\"data\":{\"LoginName\":\"" + phoneNumber + "\"}}";
            httpPost.setEntity(new StringEntity(requestBody, StandardCharsets.UTF_8));
            // 设置请求头
            httpPost.setHeader("Authorization", "Bearer /**省略**/");
            httpPost.setHeader("Content-Type", "application/json;charset=UTF-8");
            httpPost.setHeader("AppVersion", "1.5.8");
            httpPost.setHeader("Host", "116.62.107.xxx:12010");

            //返回值
            HashMap<String, String> map = new HashMap<>();

            try (CloseableHttpResponse response = httpClient.execute(httpPost)) {
                int statusCode = response.getStatusLine().getStatusCode();
                System.out.println("Status code: " + statusCode + phoneNumber);

                // 获取响应实体
                HttpEntity entity = response.getEntity();
                if (entity != null) {
                    // 打印响应内容
                    String responseBody = EntityUtils.toString(entity);
                    //转换JSON筛选字段
                    JSONObject jsonBody = JSON.parseObject(responseBody);
                    if (StringUtils.startsWithIgnoreCase(jsonBody.getJSONObject("data").getString("facName"), "2022")) {
                        map.put("facName", jsonBody.getJSONObject("data").getJSONArray("details").getJSONObject(0)
                                .getString("facName"));
                        map.put("class", jsonBody.getJSONObject("data")
                                .getString("facName"));
                        map.put("name", jsonBody.getJSONObject("data")
                                .getString("name"));
                        map.put("phone", phoneNumber);
                    }
                }
            }

            return CollectionUtils.isEmpty(map) ? null : map;
        };
    }
}

4. 配置ItemWriter

这里也选择自定义实现ItemWriter接口作为Writer

import org.apache.poi.hssf.usermodel.HSSFWorkbook;
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.ss.usermodel.Sheet;
import org.springframework.batch.item.ItemWriter;

import java.io.FileOutputStream;
import java.io.IOException;
import java.util.List;
import java.util.Map;

/**
 * writer以chunk块为单位对预设的size大小的chunk进行一次性写入 <br>
 * 接收的参数是基本项的集合
 */
public class ExcelChunkWriter implements ItemWriter<Map<String ,String>> {
    private HSSFWorkbook workbook;
    private Sheet sheet;
    FileOutputStream fos;
    private int rowNum = 1;

    public ExcelChunkWriter(String outPath) throws IOException {
        fos = new FileOutputStream(outPath);
        this.workbook = new HSSFWorkbook();
        this.sheet = workbook.createSheet("已清洗数据");
        Row headerRow = sheet.createRow(0);
        headerRow.createCell(0).setCellValue("学院");
        headerRow.createCell(1).setCellValue("班级");
        headerRow.createCell(2).setCellValue("姓名");
        headerRow.createCell(3).setCellValue("号码");
    }

    @Override
    public void write(List<? extends Map<String ,String>> items) throws IOException {
        for (Map<String ,String> map : items) {
            Row row = sheet.createRow(rowNum++);
            row.createCell(0).setCellValue(map.get("facName"));
            row.createCell(1).setCellValue(map.get("class"));
            row.createCell(2).setCellValue(map.get("name"));
            row.createCell(3).setCellValue(map.get("phone"));
        }

        try {
            workbook.write(fos);
        }finally {
            workbook.close();
        }
    }
}

注意事项

  • 实际上查看ItemReader和ItemPorcessor内部实现方法可以看出,每一个read和process方法接收的参数是单个数据项,实际上ItemReader每次处理单个数据之后并不会直接传递给Processor,而是维护了一个内部缓冲区,将读取的数据达到预设chunkSize数量后整体传递给ItemProcesser,同理itemProcessor也是逐个处理数据项,所以在实现ItemReader或者ItemProcessor接口方法时,只需要关注单个数据项的读取规则和处理逻辑即可,这种接口的设计目的是为了方便地处理单个数据项。
  • ItemReader和ItemProcessor的实现方法接受的参数实际上是单个数据项,而ItemWriter将以chunk块为单位整体接收,以List<? extends T> items类型接收参数,进行统一写入

标签:批处理,流程,batch,springframework,org,apache,import,SpringBatch,public
From: https://www.cnblogs.com/chuimber/p/18367167

相关文章

  • 软件开发流程
    遵从一套成熟的产品研发过程体系,才能做出质量较好的产品。因此,如果出现项目较多的情况,应该合理地安排基线和定制之前的里程碑,让基线产品能够尽量多地收集用户的通用型需求,为定制项目进度实现技术支撑,减少定制项目中大量更改代码、需要新增模块情况发生,以下是开发流程从软件......
  • 只有AI在组织形态和流程上创新了,变革才真正开始
    Google前CEO埃里克·施密特近期在斯坦福CS323课堂上的访谈中提到:只有AI在组织形态和流程上创新后,才算真正的变革开始。完整的视频:https://www.bilibili.com/video/BV1Fy411v7jt/下面是这部分翻译:电力是一种通用技术。通用技术有一个特点,它们本身就是一种重要的创新,但......
  • 只有AI在组织形态和流程上创新了,变革才真正开始
    Google前CEO埃里克·施密特近期在斯坦福CS323课堂上的访谈中提到:只有AI在组织形态和流程上创新后,才算真正的变革开始。完整的视频:https://www.bilibili.com/video/BV1Fy411v7jt/下面是这部分翻译:电力是一种通用技术。通用技术有一个特点,它们本身就是一种重要的创新,但......
  • 只有AI在组织形态和流程上创新了,变革才真正开始
    Google前CEO埃里克·施密特近期在斯坦福CS323课堂上的访谈中提到:只有AI在组织形态和流程上创新后,才算真正的变革开始。完整的视频:https://www.bilibili.com/video/BV1Fy411v7jt/下面是这部分翻译:电力是一种通用技术。通用技术有一个特点,它们本身就是一种重要的创新,但......
  • Elsa V3学习之调起其他流程
    在Elsa中,还能通过DispatchWorkflow节点来执行其他已发布的流程。DispatchWorkflowDispatchWorkflow可以选择任一以及发布的工作流程,这里我们选择最初的HelloWord的流程Workflow1。通过HTTPEndpoint节点触发。触发链接为https://localhost:5001/api/workflows/Dispatch请求......
  • vue3 - 详细实现内网使用离线百度地图功能,在vue3中无需网络离线使用百度地图相关功能,
    效果图在vue3、nuxt3项目开发中,完成内网离线使用百度地图详细教程,让vue3网站无需网络就能加载百度地图及相关功能,完整的百度地图离线使用及地图瓦片的下载教程、更新教程等,vue3百度地图内网离线使用显示地图及各种功能,无论js/ts语法都可以使用,详解百度地图离线加载机制及整......
  • ansible 流程控制
    目录ansibleif流程控制变量控制结构过滤器注释Jinja2api示例test语句ansiblewhen流程控制简单示例多条件示例ansibleif流程控制大部分的Ansible任务,需要对用户的输入内容或任务的运行结果进行判断,这中间体现了流程控制的作用像ansible的模板文件,以.j2结尾的都是Jinja2......
  • Android usb广播 ACTION_USB_DEVICE_ATTACHED流程源码分析
    整体流程图大概意思就是UsbHostManager启动监控线程,monitorUsbHostBus会调用usb_host_run函数(使用inotify来监听USB设备的插拔)不停的读取bus总线,读取到以后,当1、设备插入:发送广播ACTION_USB_DEVICE_ATTACHED2、设备拔出:发送广播ACTION_USB_DEVICE_DETACHED本篇只分析插入......