首页 > 其他分享 >【Spring】SpringBoot3+SpringBatch5.xの構築

【Spring】SpringBoot3+SpringBatch5.xの構築

时间:2024-02-07 09:04:51浏览次数:32  
标签:Spring 構築 batch springframework private import org new SpringBatch5

■概要

 

 

■POMのXMLの設定

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.example</groupId>
        <artifactId>springboot3-parent-all</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <artifactId>springboot3-batch5</artifactId>
    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>
        <dependency>
            <groupId>com.h2database</groupId>
            <artifactId>h2</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>com.mysql</groupId>
            <artifactId>mysql-connector-j</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.alibaba/druid-spring-boot-starter -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.2.21</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.baomidou/mybatis-plus-boot-starter -->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.5.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.mybatis.spring.boot/mybatis-spring-boot-starter -->
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>3.0.3</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
pom.xml

■ApplicationのYMLの設定

#サーバの設定(ポイント、パス)
server:
  port: 8081
  servlet:
    context-path: /batch
#Springの設定
spring:
  jpa:
    open-in-view: true
  #バッチの設定
  batch:
    job:
      enabled: true
      #ディフォルトの起動バッチを設定
      name: stepBase00
    #バッチ用のDBを設定
    jdbc:
      #初期化の時、alwaysの設定が必要。以外の場合、neverの設定をしてください。
      initialize-schema: never
      #DBの選定。(lib「org.springframework.batch.core」のsqlに参照)
      platform: mysql
#データソースの設定
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/test?useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=true
    username: root
    password: password
    type: com.alibaba.druid.pool.DruidDataSource
application.yml

■相関データ

[
  {
    "id": 1,
    "field1": "11",
    "field2": "12",
    "field3": "13"
  },
  {
    "id": 2,
    "field1": "21",
    "field2": "22",
    "field3": "23"
  },
  {
    "id": 3,
    "field1": "31",
    "field2": "32",
    "field3": "33"
  }
]
サンプル(Json)
//ヘッダー部
1,11,12,13
2,21,22,23
3,31,32,33
4,41,42,43
5,51,52,53
6,61,62,63
サンプル(CSV)

■DB相関

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- Table structure for TBL_TEST
-- ----------------------------
DROP TABLE IF EXISTS `TBL_TEST`;
CREATE TABLE `TBL_TEST` (
  `id` int NOT NULL COMMENT '编号',
  `field1` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '字段1',
  `field2` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '字段2',
  `field3` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '字段3',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

-- ----------------------------
-- Records of TBL_TEST
-- ----------------------------
BEGIN;
INSERT INTO `TBL_TEST` (`id`, `field1`, `field2`, `field3`) VALUES (1, '11', '12', '13');
INSERT INTO `TBL_TEST` (`id`, `field1`, `field2`, `field3`) VALUES (2, '21', '22', '23');
INSERT INTO `TBL_TEST` (`id`, `field1`, `field2`, `field3`) VALUES (3, '31', '32', '33');
INSERT INTO `TBL_TEST` (`id`, `field1`, `field2`, `field3`) VALUES (4, '41', '42', '43');
INSERT INTO `TBL_TEST` (`id`, `field1`, `field2`, `field3`) VALUES (5, '51', '52', '53');
INSERT INTO `TBL_TEST` (`id`, `field1`, `field2`, `field3`) VALUES (6, '61', '62', '63');
COMMIT;

SET FOREIGN_KEY_CHECKS = 1;
TBL_TEST.sql

■相関Object

▶︎サンプルObject

package org.example.dto;

import com.baomidou.mybatisplus.extension.activerecord.Model;
import lombok.Data;
import lombok.EqualsAndHashCode;

import java.io.Serializable;

@Data
@EqualsAndHashCode(callSuper=true)
public class SampleVo extends Model<SampleVo> implements Serializable {

    private Integer id;

    private Object field1;
    private Object field2;
    private Object field3;

    @Override
    public Serializable pkVal() {
        return this.id;
    }
}
SampleVo.java

▶︎インプットObject

package org.example.dto;

import com.baomidou.mybatisplus.extension.activerecord.Model;
import lombok.Data;
import lombok.EqualsAndHashCode;

import java.io.Serializable;
import java.util.Date;

@Data
@EqualsAndHashCode(callSuper=true)
public class SampleIn extends Model<SampleIn> implements Serializable {

    private Integer id;
    private String userName;
    private Integer address;
    private Date birthday;
}
SampleIn.java

▶︎音プットObject

package org.example.dto;

import com.baomidou.mybatisplus.extension.activerecord.Model;
import lombok.Data;
import lombok.EqualsAndHashCode;

import java.io.Serializable;
import java.util.Date;

@Data
@EqualsAndHashCode(callSuper=true)
public class SampleOut extends Model<SampleOut> implements Serializable {

    private Integer id;
    private String userName;
    private Integer address;
    private Date birthday;
    private Date nowDate;
}
SampleOut.java

■ソース(ベース)

package org.example.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

/**
 * SpringBoot3(SpringBatch5.x)により、シンプルのBatchです。
 * 実行順番:jobBase00->(stepBase00)
 */
@Configuration
public class SimpleBatchBaseConfig {
    //ログ出力(slf4j)
    Logger logger = LoggerFactory.getLogger(this.getClass());

    //DB用のRepository
    @Autowired
    private JobRepository jobRepository;

    //DB用のトレザングション
    @Autowired
    private PlatformTransactionManager transactionManager;

    /**
     * 起動のJob
     *
     * @return
     */
    @Bean("jobBase00")
    public Job jobBase(){
        //Stepの設定
        Step step = new StepBuilder("stepBase00",jobRepository)
                .tasklet(new Tasklet() {
                    //ロジックの実行
                    @Override
                    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("My Base00 job step!");
                        logger.info("My Base00 job step!.......");
                        return RepeatStatus.FINISHED;
                    }
                },transactionManager)
                .allowStartIfComplete(true) //何回でも再起動の可能
                .build();
        //Jobの設定
        Job job = new JobBuilder("jobBase00",jobRepository)
                .start(step)
                .build();
        return job;
    }
}
SimpleBatchBaseConfig.java

▶︎同期化に実行のバッチ(順番)

package org.example.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

/**
 * 同期の実行バッチ
 * 実行順番:flowJobー>(flow->(flowStep01->flowStep02))->(flowStep03)
 */
@Configuration
public class FlowBatchConfig {
    Logger logger = LoggerFactory.getLogger(this.getClass());

    private JobRepository jobRepository;
    private PlatformTransactionManager transactionManager;

    public FlowBatchConfig(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        this.jobRepository = jobRepository;
        this.transactionManager = transactionManager;
    }

    @Bean
    public Job flowJob(){
        return new JobBuilder("flowJob",jobRepository)
                .start(flow())
                .next(flowStep03())
                .end()
                .build();
    }

    private Flow flow(){
        return new FlowBuilder<Flow>("flowname")
                .start(flowStep01())
                .next(flowStep02())
                .build();
    }

    private Step flowStep01(){
        return new StepBuilder("flowStep01",jobRepository)
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                        logger.info("flowStep01()......");
                        return RepeatStatus.FINISHED;
                    }
                },transactionManager)
                .allowStartIfComplete(true)
                .build();
    }
    private Step flowStep02(){
        return new StepBuilder("flowStep02",jobRepository)
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                        logger.info("flowStep02()......");
                        return RepeatStatus.FINISHED;
                    }
                },transactionManager)
                .allowStartIfComplete(true)
                .build();
    }
    private Step flowStep03(){
        return new StepBuilder("flowStep03",jobRepository)
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                        logger.info("flowStep03()......");
                        return RepeatStatus.FINISHED;
                    }
                },transactionManager)
                .allowStartIfComplete(true)
                .build();
    }

}
FlowBatchConfig.java

▶︎非同期化に実行のバッチ(非順番)

package org.example.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;

/**
 * 非同期の実行バッチ
 * 実行順番:
 * ①splitJob->(splitFlow1->(splitStep1->splitStep2))
 * ②splitJob->(splitFlow2->(splitStep3))
 */
@Configuration
public class SplitBatchConfig {
    @Autowired
    private JobRepository jobRepository;
    @Autowired
    private PlatformTransactionManager transactionManager;
    Logger logger = LoggerFactory.getLogger(this.getClass());

    @Bean
    public Job splitJob(){
        return new JobBuilder("splitJob",jobRepository)
                .start(splitFlow1())
                .split(new SimpleAsyncTaskExecutor()).add(splitFlow2())
                .end().build();
    }
    private Flow splitFlow1(){
        return new FlowBuilder<Flow>("splitFlow1")
                .start(splitStep1())
                .next(splitStep2())
                .build();
    }
    private Flow splitFlow2(){
        return new FlowBuilder<Flow>("splitFlow2")
                .start(splitStep3())
                .build();
    }
    private Step splitStep1(){
        return new StepBuilder("splitStep1",jobRepository)
                .tasklet((contribution,chunkContext)->{
                    logger.info("split step(1) .......");
                    return  RepeatStatus.FINISHED;
                },transactionManager)
                .allowStartIfComplete(true)
                .build();
    }
    private Step splitStep2(){
        return new StepBuilder("splitStep2",jobRepository)
                .tasklet((contribution,chunkContext)->{
                    logger.info("split step(2) .......");
                    return  RepeatStatus.FINISHED;
                },transactionManager)
                .allowStartIfComplete(true)
                .build();
    }
    private Step splitStep3(){
        return new StepBuilder("splitStep3",jobRepository)
                .tasklet((contribution,chunkContext)->{
                    logger.info("split step(3) .......");
                    return  RepeatStatus.FINISHED;
                },transactionManager)
                .allowStartIfComplete(true)
                .build();
    }
}
SplitBatchConfig.java

▶︎親→子→親ー子で実行のバッチ

package org.example.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.JobStepBuilder;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

/**
 * childJobはStepの任務として実行する
 * 実行順番:parentJob->(childJobStep01->(childJob01->(childStep01)))
 *                  ->(childJobStep02->(childJob02->(childStep02)))
 */
@Configuration
public class NestedBatchConfig {

    @Autowired
    private JobRepository jobRepository;
    @Autowired
    private PlatformTransactionManager transactionManager;
     @Autowired
    private JobLauncher jobLauncher;
    Logger logger = LoggerFactory.getLogger(this.getClass());

    @Bean
    public Job parentJob(){
        return new JobBuilder("parentJob",jobRepository)
                .start(childJobStep01())
                .next(childJobStep02())
                .build();
    }
    private Step childJobStep01(){
        return new JobStepBuilder(new StepBuilder("childJobStep01",jobRepository))
                .job(childJob01())
                .launcher(jobLauncher)
                .allowStartIfComplete(true)
                .build();
    }
    private Step childJobStep02(){
        return new JobStepBuilder(new StepBuilder("childJobStep02",jobRepository))
                .job(childJob02())
                .launcher(jobLauncher)
                .allowStartIfComplete(true)
                .build();
    }
    private Job childJob01(){
        return new JobBuilder("childJob01",jobRepository)
                .start(childStep01())
                .build();
    }
    private Job childJob02(){
        return new JobBuilder("childJob01",jobRepository)
                .start(childStep02())
                .build();
    }
    private Step childStep01(){
        return new StepBuilder("childStep01",jobRepository)
                .tasklet((contribution, chunkContext) -> {
                    logger.info("-------child step 01--------");
                    return RepeatStatus.FINISHED;
                },transactionManager)
                .allowStartIfComplete(true)
                .build();
    }
    private Step childStep02(){
        return new StepBuilder("childStep02",jobRepository)
                .tasklet((contribution, chunkContext) -> {
                    logger.info("-------child step 02--------");
                    return RepeatStatus.FINISHED;
                },transactionManager)
                .allowStartIfComplete(true)
                .build();
    }
}
NestedBatchConfig.java

■サンプル(ベース)

package org.example.batch;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.*;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

@Configuration
public class SimpleReaderWriterBaseConfig {
    @Autowired
    private JobRepository jobRepository;
    @Autowired
    private PlatformTransactionManager transactionManager;
    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Bean
    public Job buildJob(){
        return new JobBuilder("simpleBatch00Job",jobRepository)
                .start(buildStep())
                .build();
    }
    private Step buildStep(){
        return new StepBuilder("simpleBatch00Step",jobRepository)
                .<String,String>chunk(3,transactionManager)
                .reader(itemReader()).listener(itemReadListener())
                .processor(itemProcessor()).listener(itemProcessListener())
                .writer(itemWriter()).listener(itemWriteListener())
                .allowStartIfComplete(true)
                .build();
    }
    private ItemReader<String> itemReader(){
        List<String> strings = Arrays.asList("存款", "余额", "资金","冻结");
        Iterator<String> iterator = strings.iterator();
        ItemReader<String> iReader = new ItemReader<String>() {
            @Override
            public String read() throws Exception {
                return  iterator.hasNext() ? iterator.next() : null;
            }
        };
        return iReader;
    }
    private ItemReadListener<String> itemReadListener(){
        return new ItemReadListener<>() {
            @Override
            public void beforeRead() {
                logger.info("---------------itemReadListener().beforeRead()--------------------");
            }
            @Override
            public void afterRead(String item) {
                logger.info("---------------itemReadListener().afterRead("+item+")--------------------");
            }
            @Override
            public void onReadError(Exception ex) {
                logger.info("---------------itemReadListener().onReadError("+ex.getMessage()+")--------------------");
            }
        };
    }
    private ItemProcessor<String,String> itemProcessor(){
        ItemProcessor<String,String> iProcessor = new ItemProcessor<String, String>() {
            @Override
            public String process(String item) throws Exception {
                logger.info("---------------itemProcessor()---Input Data:"+item+"-----------------");
                return item;
            }
        };
        return iProcessor;
    }
    private ItemProcessListener<String,String> itemProcessListener(){
        return new ItemProcessListener<String, String>() {
            @Override
            public void beforeProcess(String item) {
                logger.info("---------------itemProcessListener().beforeProcess("+item+")--------------------");
            }
            @Override
            public void afterProcess(String item, String result) {
                logger.info("---------------itemProcessListener().afterProcess("+item+","+result+")--------------------");
            }
            @Override
            public void onProcessError(String item, Exception ex) {
                logger.info("---------------itemProcessListener().onProcessError("+item+","+ex.getMessage()+")--------------------");
            }
        };
    }
    private ItemWriter<String> itemWriter(){
        ItemWriter<String> iWriter = new ItemStreamWriter<String>() {
            @Override
            public void write(Chunk<? extends String> chunk) throws Exception {
                if(!chunk.isEmpty()){
                    for(String str:chunk.getItems()){
                        logger.info("---------------itemWriter()---"+str+"-----------------");
                    }
                }
            }
        };
        return iWriter;
    }
    private ItemWriteListener<String> itemWriteListener(){
        return new ItemWriteListener<String>() {
            @Override
            public void beforeWrite(Chunk<? extends String> items) {
                logger.info("---------------itemWriteListener().beforeWrite("+items.getItems().toString()+")--------------------");
            }
            @Override
            public void afterWrite(Chunk<? extends String> items) {
                logger.info("---------------itemWriteListener().afterWrite("+items.getItems().toString()+")--------------------");
            }
            @Override
            public void onWriteError(Exception ex, Chunk<? extends String> items) {
                logger.info("---------------itemWriteListener().onWriteError("+items.getItems().toString()+","+ex.getMessage()+")--------------------");
            }
        };
    }
}
SimpleReaderWriterBaseConfig.java

▶︎CSVファイルから読む

package org.example.batch;

import org.example.dto.SampleVo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.transaction.PlatformTransactionManager;

/**
 * @version: java version 1.8
 * @Author: Mr Orange
 * @description:
 * @date: 2024-01-27 9:23 PM
 */
@Configuration
public class CSVFileItemReaderConfig {

    Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    private JobRepository jobRepository;
    @Autowired
    private PlatformTransactionManager transactionManager;

    @Bean
    public Job fileItemReaderJob(){
        return new JobBuilder("fileItemReaderJob",jobRepository)
                .start(fileItemReaderStep())
                .build();
    }
    private Step fileItemReaderStep(){
        return new StepBuilder("fileItemReaderStep",jobRepository)
                .<SampleVo,SampleVo>chunk(3,transactionManager)
                //.reader(itemReader())
                .reader(csvItemReader())
                .processor(null)
                .writer(list->list.forEach(System.out::println))
                .allowStartIfComplete(true)
                .build();
    }
    private ItemReader<SampleVo> itemReader(){
        FlatFileItemReader<SampleVo> reader = new FlatFileItemReader<>();
        //データファイルを指定
        reader.setResource(new ClassPathResource("datafile/sample.csv"));
        //第一行を無視
        reader.setLinesToSkip(1);

        //tokenizerの初期化
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        //ヘッダー部
        tokenizer.setNames("id","field1","field2","field3");

        //CSVー>Objectの変換
        DefaultLineMapper<SampleVo> mapper = new DefaultLineMapper<>();
        mapper.setLineTokenizer(tokenizer);

        mapper.setFieldSetMapper(fieldSet -> {
            SampleVo vo = new SampleVo();
            vo.setId(fieldSet.readInt("id"));
            vo.setField1((Object)fieldSet.readString("field1"));
            vo.setField2((Object)fieldSet.readString("field2"));
            vo.setField3((Object)fieldSet.readString("field3"));
            return vo;
        });
        reader.setLineMapper(mapper);
        return reader;
    }

    private FlatFileItemReader<SampleVo> csvItemReader(){
        FlatFileItemReader<SampleVo> csvReader = new FlatFileItemReader<>();
        csvReader.setResource(new ClassPathResource("datafile/sample.csv"));
        csvReader.setLinesToSkip(1);
        csvReader.setLineMapper(new DefaultLineMapper<SampleVo>(){{
            setLineTokenizer(new DelimitedLineTokenizer(){{
                setNames(new String[]{"id","field1","field2","field3"});
            }});
            setFieldSetMapper(new BeanWrapperFieldSetMapper<SampleVo>(){{
                setTargetType(SampleVo.class);
            }});
        }});
        return csvReader;
    }
}
CSVFileItemReaderConfig.java

▶︎JSONファイルから読む

package org.example.batch;

import org.example.dto.SampleVo;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.json.JacksonJsonObjectReader;
import org.springframework.batch.item.json.JsonItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.transaction.PlatformTransactionManager;

/**
 * @version: java version 1.8
 * @Author: Mr Orange
 * @description:
 * @date: 2024-01-28 3:44 PM
 */
@Configuration
public class JsonFileItemReaderConfig {
    @Autowired
    private JobRepository jobRepository;
    @Autowired
    private PlatformTransactionManager transactionManager;

    @Bean
    public Job jsonFileItemReaderJob(){
        return new JobBuilder("jsonFileItemReaderJob",jobRepository)
                .start(jsonFileItemReaderStep())
                .build();
    }
    private Step jsonFileItemReaderStep(){
        return new StepBuilder("jsonFileItemReaderStep",jobRepository)
                .<SampleVo,SampleVo>chunk(2,transactionManager)
                .reader(jsonItemReader())
                .writer(list->list.forEach(System.out::println))
                .build();
    }

    private ItemReader<SampleVo> jsonItemReader(){
        ClassPathResource resource = new ClassPathResource("datafile/sample.json");
        JacksonJsonObjectReader<SampleVo> objectReader = new JacksonJsonObjectReader<>(SampleVo.class);
        JsonItemReader<SampleVo> reader = new JsonItemReader<>(resource,objectReader);
        reader.setName("testDataJsonItemReader");
        return reader;
    }

}
JsonFileItemReaderConfig.java

▶︎DBから読む

package org.example.batch;

import org.example.dto.SampleVo;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

/**
 * @version: java version 1.8
 * @Author: Mr Orange
 * @description:
 * @date: 2024-01-28 2:43 PM
 */
@Configuration
public class DBItemReaderConfig {

    @Autowired
    private JobRepository jobRepository;

    @Autowired
    private PlatformTransactionManager transactionManager;

    @Autowired
    private DataSource dataSource;

    @Bean
    public Job dataSourceItemReaderJob() throws Exception{
        return new JobBuilder("dataSourceItemReaderJob",jobRepository)
                .start(dataSourceItemReaderStep())
                .build();
    }

    private Step dataSourceItemReaderStep() throws Exception{
        return new StepBuilder("dataSourceItemReaderStep",jobRepository)
                .allowStartIfComplete(true)
                .<SampleVo,SampleVo>chunk(2,transactionManager) //「2」は、ProcessorとWriterが毎回の処理の件数となる
                .reader(itemReader())
                .writer(list->list.forEach(System.out::println))
                .build();
    }

    private ItemReader<SampleVo> itemReader() throws Exception{
        //方法一
        return new JdbcPagingItemReader<SampleVo>(){
            {
                setDataSource(dataSource);
                //毎回で「3」件を読む(H2の場合、指定の無効)
                setFetchSize(3);
                //ページごとに2件
                setPageSize(2);
                MySqlPagingQueryProvider provider = new MySqlPagingQueryProvider();
                provider.setSelectClause("id, field1, field2, field3");//テーブルのコラム
                provider.setFromClause("TBL_TEST"); //DBのテーブル名称(大小文字)と一致

                setRowMapper((result,row)->{
                    SampleVo in = new SampleVo();
                    in.setId(result.getInt(1));
                    in.setField1(result.getObject(2));
                    in.setField2(result.getObject(3));
                    in.setField3(result.getObject(4));
                    return in;
                });

                Map<String, Order> sort = new HashMap<>();
                sort.put("id",Order.ASCENDING); // ソート(ASC)
                provider.setSortKeys(sort);
                setQueryProvider(provider);
                afterPropertiesSet();
            }
        };
        //方法二
        /*
        JdbcPagingItemReader<SampleVo> reader = new JdbcPagingItemReader<>();
        // 设置数据源
        reader.setDataSource(dataSource);
        // 每次取多少条记录
        reader.setFetchSize(5);
        // 设置每页数据量
        reader.setPageSize(5);

        // 指定sql查询语句 select id,field1,field2,field3 from TEST
        MySqlPagingQueryProvider mySqlPagingQueryProvider = new MySqlPagingQueryProvider();
        //设置查询字段
        mySqlPagingQueryProvider.setSelectClause("id,field1,field2,field3");
        // 设置从哪张表查询
        mySqlPagingQueryProvider.setFromClause("From TBL_TEST");

        // 将读取到的数据转换为Test对象
        reader.setRowMapper((resultSet, rowNum) -> {
            SampleVo testVo = new SampleVo();
            testVo.setId(resultSet.getInt(1));
            // 读取第一个字段,类型为String
            testVo.setField1(resultSet.getString(2));
            testVo.setField2(resultSet.getString(3));
            testVo.setField3(resultSet.getString(4));
            return testVo;
        });

        Map<String, Order> sort = new HashMap<>(1);
        sort.put("id", Order.ASCENDING);
        // 设置排序,通过id 升序
        mySqlPagingQueryProvider.setSortKeys(sort);
        reader.setQueryProvider(mySqlPagingQueryProvider);
        // 设置namedParameterJdbcTemplate等属性
        reader.afterPropertiesSet();
        return reader;
        */
    }


}
DBItemReaderConfig.java

 

■参考

https://blog.csdn.net/justlpf/article/details/129320362

https://blog.csdn.net/youanyyou/article/details/132466350

https://www.baeldung.com/introduction-to-spring-batch

https://kagamihoge.hatenablog.com/entry/2023/03/24/192846

https://chenchenchen.blog.csdn.net/article/details/104764027

https://blog.csdn.net/langfeiyes/article/details/128906822

 

标签:Spring,構築,batch,springframework,private,import,org,new,SpringBatch5
From: https://www.cnblogs.com/lnsylt/p/17981192

相关文章

  • SpringBoot 优雅实现超大文件上传,通用方案
    文件上传是一个老生常谈的话题了,在文件相对比较小的情况下,可以直接把文件转化为字节流上传到服务器,但在文件比较大的情况下,用普通的方式进行上传,这可不是一个好的办法,毕竟很少有人会忍受,当文件上传到一半中断后,继续上传却只能重头开始上传,这种让人不爽的体验。那有没有比较好的上传......
  • Springboot和Vue(2或者3都行)实现Twitter授权登录,并获取用户公开信息-OAuth1.0。
    第一步先申请twitter开发者账号,创建App,我这里没有创建app,当时好像是默认有一个app,twitter官方说,创建一个app需要先删除一个app,我是没有充钱的,不知道充钱和免费使用接口的是不是一样的。第二步在生成CustomerKey以及CustomeSecret,我之后会用到这两个,这写密钥一生成永久有效,除非......
  • 【Spring】- 自动注入注解
    【@Autowired】冷知识:@AutowiredprivateMovieCatalog[]movieCatalogs;//根据类型注入全部的bean对象数组@AutowiredprivateSet<MovieCatalog>movieCatalogs;//根据类型注入全部的bean对象集合@AutowiredprivateMap<String,MovieCatalog>movieCatalogs;//根据类型注......
  • spring boot controller设置返回json
    在SpringBoot中,Controller通常会返回JSON格式的数据,这得益于SpringBoot的自动配置能力以及内嵌的Jackson库。以下是如何设置Controller返回JSON数据的基本步骤:添加依赖:首先,确保你的项目中包含了SpringBoot的WebStarter依赖,它已经包括了Jackson库,用于处理JSON序列化。<dependen......
  • spring boot 引入 log.info("[消息服务]初始化成功"); log 爆红
    首先在idea中下载lombok插件下载完就好了一个小辣椒logo的lombok其次导入日志库的问题:确保项目的依赖中包含正确的日志库。在SpringBoot项目中,常用的是SLF4J,您可以在pom.xml(如果是Maven项目)或build.gradle(如果是Gradle项目)中添加以下依赖:xml<!--Maven--><dependencie......
  • Java微服务SpringCloud+Uniapp+Vue3+Element Plus开源BizSpring商城
    产品介绍BizSpring电商平台概述BizSpring电商平台,是基于最新SpringCloud微服务架构开发的多语言电商平台,使用领先的Vue3.0+ElementPlus+uniapp技术开发的移动全端业务、实现了多平台同步构建及建设的解决方案。应用发布基于Uni-app,实现跨多个平台(H5、公众号、头条、抖音......
  • SpringBoot集成Flink-CDC 采集PostgreSQL变更数据发布到Kafka
    (之前写了一个flink-cdc同步数据的博客,发布在某N,最近代码开源了,直接复制过来了,懒得重新写了,将就着看下吧)最近做的一个项目,使用的是pg数据库,公司没有成熟的DCD组件,为了实现数据变更消息发布的功能,我使用SpringBoot集成Flink-CDC采集PostgreSQL变更数据发布到Kafka。 一、业务......
  • SpringBoot使用Validation框架手动校验对象是否符合规则
      在springboot项目中引入<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-validation</artifactId></dependency> 伪代码importlombok.Data;import......
  • springboot集成easypoi导出多sheet页
    pom文件<dependency> <groupId>cn.afterturn</groupId> <artifactId>easypoi-base</artifactId> <version>4.1.0</version></dependency>导出模板:编辑后端代码示例:/***导出加油卡进便利店大额审批列表*@throwsIOException......
  • org.springframework.beans.factory.NoSuchBeanDefinitionException: No bean named '
    开发遇到一个问题:org.springframework.beans.factory.NoSuchBeanDefinitionException:Nobeannamed'ckhSynCardNumToMbhkJob'available这个报错可能是因为:1.spring的xml配置文件Bean中的id和getBean的id不一致2.是否是忘记加注解了,3.启动类包扫描路径是否正确经过测试发......