首页 > 其他分享 >Spring batch

Spring batch

时间:2022-12-20 18:45:42浏览次数:34  
标签:return Spring batch springframework import org public

1.spring batch--批处理框架

2.结构: Job>Flow>Step>Chunk>read process write

  2.1 基本概念:

    Spring Batch运行基本单位是一个job,一个job就做一件批处理事情。一个job包含多个flow,一个flow包含多个step,

    Flow控制step执行顺序; step是每个job要执行单个步骤

    chunk就是数据块,你需要定义多大的数据量是一个chunk

 

                                        

 

  2.2 代码:arkFlow和dataHighWayFlow并行执行   

启动类

@SpringBootApplication
@EnableBatchProcessing
public class Application{ //NOSONAR

    public static void main(String[] args) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
        SpringApplication.run(Application.class, args);  //NOSONAR
    }

}

Controller:

@RestController
@RequestMapping("/ipv")
public class QueryController {
    @Autowired
    private JobLaunchService jobLaunchService;
    @Autowired
    private Job ipvJob;
    @RequestMapping(value="springbatch")
    public JobResult ipvSpringbatch(){
        JobResult jobResult=jobLaunchService.launchJob(ipvJob);
        return jobResult;
    }
}

 

 

Service:

import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Calendar;

@Slf4j
@Service
public class JobLaunchService {
    @Autowired
    JobLauncher jobLauncher;

    public JobResult launchJob(Job job) {
        try {
            JobParameters jobParameters = new JobParametersBuilder()
                    .addDate("timestamp", Calendar.getInstance().getTime())
                    .toJobParameters();
            JobExecution jobExecution = jobLauncher.run(job, jobParameters);
            return JobResult.builder()
                    .jobName(job.getName())
                    .jobId(jobExecution.getJobId())
                    .jobExitStatus(jobExecution.getExitStatus())
                    .timestamp(Calendar.getInstance().getTimeInMillis())
                    .build();
        } catch (Exception e) {
            throw new RuntimeException("launch job exception ", e); //NOSONAR
        }
    }
}

 

 SpringBatch job config

@Configuration
public class JobService {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job ipvJob(){
        Flow ipvFlow=new FlowBuilder<SimpleFlow>("ipvFlow")
                .start(ipvStep()).build();
        return jobBuilderFactory.get("ipvJob")
                .incrementer(new RunIdIncrementer())
                .start(ipvFlow)
                .end()
                .build();
    }

    @Bean
    public Step ipvStep(){
        return stepBuilderFactory.get("ipvStep").<List<String>,String>chunk(3).
                reader(ipvReader()).
                processor(ipvProcessor()).
                writer(ipvWriter()).
                faultTolerant().retryLimit(3).retry(Exception.class).
                build();
    }
    @Bean
    @StepScope
    public ItemReader ipvReader(){
        List<String> inputs= Arrays.asList("test1","test2");
        return new IpvReader(inputs);
    }
    @Bean
    public ItemProcessor<String,String> ipvProcessor(){
        return new IpvProcessor();
    }

    @Bean
    public ItemWriter<String> ipvWriter(){
        return new IpvWriter();
    }
}

 

Reader:

import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

@Slf4j
public class IpvReader implements ItemReader<String> {
  
    private final Iterator<String> iterator;

    public IpvReader(List<String> data) {
        this.iterator = data.iterator();
    }
    @Override
    public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {

        List<String> reader= Arrays.asList("test1","test2");
        if(iterator.hasNext()){
            return this.iterator.next();
        }else{
            return null;
        }
    }

}

 

Processor:

import org.springframework.batch.item.ItemProcessor;

public class IpvProcessor implements ItemProcessor<String,String> {
    @Override
    public String process(String item) throws Exception {
        //do processor business
        return item.concat("processor");
    }
}

 

Writer:

import org.springframework.batch.item.ItemWriter;

import java.util.List;

public class IpvWriter implements ItemWriter<String> {

    @Override
    public void write(List<? extends String> items) throws Exception {
        System.out.println("write:"+items);
    }
}

 

 

 

  2.3 代码:testStep first execute, and flow1 and flows parallel execution

   job:

package springbatch.SequenceSpringBatch.Job;

import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.job.flow.support.SimpleFlow;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import springbatch.SequenceSpringBatch.Entity.InputEntity;
import springbatch.SequenceSpringBatch.Entity.OutputEntity;
import springbatch.SequenceSpringBatch.Processor.Processor;
import springbatch.SequenceSpringBatch.Reader.Reader1;
import springbatch.SequenceSpringBatch.Reader.Reader2;
import springbatch.SequenceSpringBatch.Reader.Reader3;
import springbatch.SequenceSpringBatch.Writer.Writer1;
import springbatch.SequenceSpringBatch.Writer.Writer2;
import springbatch.SequenceSpringBatch.Writer.Writer3;

import javax.annotation.Resource;

/**
 * Create by qf86923
 * Date 1/16/2020 5:00 PM
 * Description:testStep first execute, and flow1 and flows parallel execution
 */
@Slf4j
@Configuration
public class SequenceJobConfig {
    @Resource
    private JobBuilderFactory jobBuilderFactory;

    @Resource
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job SequenceJob(){
        Flow flow1=new FlowBuilder<SimpleFlow>("flow1")
                .start(testFlow1Step())
                .build();
        Flow flow2=new FlowBuilder<SimpleFlow>("flow2")
                .start(testFlow2Step())
                .build();
        Flow flow0=new FlowBuilder<SimpleFlow>("flow0")
                .start(flow1)
                .split(new SimpleAsyncTaskExecutor()) //.split(new SimpleAsyncTaskExecutor()) //创建一个异步执行任务
                .add(flow2)
                .build();
        Flow flowComplex=new FlowBuilder<SimpleFlow>("flowcomplex")
                .start(testStep())   //.start() .next()是顺序执行的
                .next(flow0)
                .build();
        return jobBuilderFactory.get("sequenceJob")
                .incrementer(new RunIdIncrementer())
                .start(flowComplex)
                .end()
                .build();
    }

    @Bean
    public Step testFlow1Step(){
        return stepBuilderFactory.get("testFlow1Step").<InputEntity, OutputEntity>chunk(3).
                faultTolerant().skip(Exception.class).skipLimit(1).
                reader(Reader1()).
                processor(Processor()).
                writer(Writer1()).
                build();
    }

    @Bean
    public Step testFlow2Step(){
        return stepBuilderFactory.get("testFlow2Step").<InputEntity,OutputEntity>chunk(3).
                faultTolerant().skip(Exception.class).skipLimit(1).
                reader(Reader2()).
                processor(Processor()).
                writer(Writer2()).
                build();
    }

    @Bean
    public Step testStep(){
        return stepBuilderFactory.get("testStep").<InputEntity,OutputEntity>chunk(3).
                faultTolerant().skip(Exception.class).skipLimit(1).
                reader(Reader3()).
                processor(Processor()).
                writer(Writer3()).
                build();
    }

//    @Bean
//    public Step testStepNext(){
//        return stepBuilderFactory.get("testStepNext").<OutputEntity,NextOutputEntity>chunk(3)
//                .faultTolerant().skip(Exception.class).skipLimit(1).
//                reader(ReaderNext()).
//                processor(ProcessorNext()).
//                writer(WriterNext()).
//                build();
//    }

    @Bean
    public ItemReader<InputEntity> Reader1(){
        return new Reader1();
    }

    @Bean
    public ItemReader<InputEntity> Reader2(){
        return new Reader2();
    }

    @Bean
    public ItemReader<InputEntity> Reader3(){
        return new Reader3();
    }

    @Bean
    public ItemProcessor<InputEntity,OutputEntity> Processor(){
        return new Processor();
    }

    @Bean
    public ItemWriter<OutputEntity> Writer1(){
        return new Writer1();
    }

    @Bean
    public ItemWriter<OutputEntity> Writer2(){
        return new Writer2();
    }

    @Bean
    public ItemWriter<OutputEntity> Writer3(){
        return new Writer3();
    }
}
View Code

 

   read:

package springbatch.SequenceSpringBatch.Reader;

import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.beans.factory.annotation.Autowired;
import springbatch.SequenceSpringBatch.Entity.InputEntity;
import springbatch.SequenceSpringBatch.ProcessService.Service1;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * Create by qf86923
 * Date 1/16/2020 5:23 PM
 * Description
 */
@Slf4j
public class Reader1 implements ItemReader<InputEntity> {

    @Autowired
    private Service1 service1;

    private static AtomicInteger count = new AtomicInteger(0);

    @Override
    public InputEntity read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        InputEntity inputEntity=service1.getInputEntity();
        count.incrementAndGet();
        if(inputEntity != null && count.get()<2){
            System.out.println("=====Reder1_inputEntity======"+inputEntity);
            return inputEntity;
        }
        count=new AtomicInteger(0);  //NOSONAR
        return null;
    }
}
View Code

 

  process:   

package springbatch.SequenceSpringBatch.Processor;

import org.springframework.batch.item.ItemProcessor;
import springbatch.SequenceSpringBatch.Entity.InputEntity;
import springbatch.SequenceSpringBatch.Entity.OutputEntity;

/**
 * Create by qf86923
 * Date 1/16/2020 5:47 PM
 * Description
 */
public class Processor implements ItemProcessor<InputEntity, OutputEntity> {

    @Override
    public OutputEntity process(InputEntity item) throws Exception {
        OutputEntity outputEntity=new OutputEntity();
        outputEntity.setName(item.getName());
        outputEntity.setGrade(item.getAge());
        outputEntity.setSex(item.getAge()>1?"female":"male");
        return outputEntity;
    }
}
View Code

 

  writer:

package springbatch.SequenceSpringBatch.Writer;

import org.springframework.batch.item.ItemWriter;
import springbatch.SequenceSpringBatch.Entity.OutputEntity;

import java.util.List;

/**
 * Create by qf86923
 * Date 1/16/2020 5:52 PM
 * Description
 */
public class Writer1 implements ItemWriter<OutputEntity> {
    @Override
    public void write(List<? extends OutputEntity> items) throws Exception {
        for(OutputEntity outputEntity:items){
            System.out.println("=====Writer1"+outputEntity.getName()+","+outputEntity.getSex()+","+outputEntity.getGrade());
        }
    }
}
View Code

 

 

 

 

 

 

 

      

标签:return,Spring,batch,springframework,import,org,public
From: https://www.cnblogs.com/enhance/p/12204802.html

相关文章

  • SpringBoot - @ImportResource,@ConfigurationProperties 让xml生效与类属性绑定配置文
    @ImportResource作用:使用.xml配置文件范围:必须使用在主程序@SpringBootApplication或配置类上@Configuration@SpringBootApplication@ImportResource("classpath:appl......
  • SpringBoot - 条件注解 @Conditional
    @ConditiOnBean作用:如果Spring容器里面存在指定的Bean则生效范围:类上,方法上,一般在配置类中使用参数:value参数类型Class[],name参数类型String[]IOC容器中组件的名称......
  • (1)SpringMVC前传
    在我们熟知的建立在三层结构(表示层、业务逻辑层、持久层)基础之上的J2EE应用程序开发之中,表示层的解决方案最多。因为在表示层自身的知识触角很多,需要解决的问题也不少,这也就......
  • SpringBoot - 配置包扫描注解@ComponentScan
    @ComponentScan作用:配置包扫描规则范围:主程序类上(被@SpringBootApplication修饰),或配置类上(被@Configuration修饰)参数:value指定要扫描的包,excludeFilters配置排除......
  • SpringBoot - MVC三层架构注解注入到容器中与从IOC容器获取实例注解
    MVC三层架构注解@Controller控制层@Service业务层@Repository持久层@Component作用:把类注入到IOC容器当中范围:类上参数:value给类起类名从IOC容器中获取实例注......
  • SpringBoot - @Configuration,@Bean,@Scope 组件注入容器
    @Configuration作用:声明一个类为配置类,替代之前使用的xml文件范围:类上参数:proxyBeanMethods:boolean(default=true) 基本使用:注册一个类到IOC容器中@Configuration......
  • Spring Vault的Vault 的概念和语义以及语法、核心功能
    SpringVault项目将Spring的核心概念应用于使用HashiCorpVault开发解决方案。我们提供了一个“模板”作为存储和查询文档的高级抽象。你会注意到与Spring框架中的REST......
  • 8个Spring事务失效的场景,你碰到过几种?
    前言作为Java开发工程师,相信大家对Spring种事务的使用并不陌生。但是你可能只是停留在基础的使用层面上,在遇到一些比较特殊的场景,事务可能没有生效,直接在生产上暴露了,这可......
  • Spring Statemachine状态机的概念(四)
    状态机示例参考文档的这一部分解释了状态的使用机器以及示例代码和UML状态图。我们使用一些表示状态图、Spring状态机之间关系时的快捷方式配置,以及应用程序对状态机......
  • Spring Statemachine状态机的概念(五)
    部署部署示例显示了如何将状态机概念与UML建模以提供通用错误处理状态。此状态机器是一个相对复杂的例子,说明如何使用各种功能提供集中式错误处理概念。下图显示了部署......