1.spring batch--批处理框架
2.结构: Job>Flow>Step>Chunk>read process write
2.1 基本概念:
Spring Batch运行基本单位是一个job,一个job就做一件批处理事情。一个job包含多个flow,一个flow包含多个step,
Flow控制step执行顺序; step是每个job要执行单个步骤
chunk就是数据块,你需要定义多大的数据量是一个chunk
2.2 代码:arkFlow和dataHighWayFlow并行执行
启动类
@SpringBootApplication @EnableBatchProcessing public class Application{ //NOSONAR public static void main(String[] args) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException { SpringApplication.run(Application.class, args); //NOSONAR } }
Controller:
@RestController @RequestMapping("/ipv") public class QueryController { @Autowired private JobLaunchService jobLaunchService; @Autowired private Job ipvJob; @RequestMapping(value="springbatch") public JobResult ipvSpringbatch(){ JobResult jobResult=jobLaunchService.launchJob(ipvJob); return jobResult; } }
Service:
import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.Calendar; @Slf4j @Service public class JobLaunchService { @Autowired JobLauncher jobLauncher; public JobResult launchJob(Job job) { try { JobParameters jobParameters = new JobParametersBuilder() .addDate("timestamp", Calendar.getInstance().getTime()) .toJobParameters(); JobExecution jobExecution = jobLauncher.run(job, jobParameters); return JobResult.builder() .jobName(job.getName()) .jobId(jobExecution.getJobId()) .jobExitStatus(jobExecution.getExitStatus()) .timestamp(Calendar.getInstance().getTimeInMillis()) .build(); } catch (Exception e) { throw new RuntimeException("launch job exception ", e); //NOSONAR } } }
SpringBatch job config
@Configuration public class JobService { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Bean public Job ipvJob(){ Flow ipvFlow=new FlowBuilder<SimpleFlow>("ipvFlow") .start(ipvStep()).build(); return jobBuilderFactory.get("ipvJob") .incrementer(new RunIdIncrementer()) .start(ipvFlow) .end() .build(); } @Bean public Step ipvStep(){ return stepBuilderFactory.get("ipvStep").<List<String>,String>chunk(3). reader(ipvReader()). processor(ipvProcessor()). writer(ipvWriter()). faultTolerant().retryLimit(3).retry(Exception.class). build(); } @Bean @StepScope public ItemReader ipvReader(){ List<String> inputs= Arrays.asList("test1","test2"); return new IpvReader(inputs); } @Bean public ItemProcessor<String,String> ipvProcessor(){ return new IpvProcessor(); } @Bean public ItemWriter<String> ipvWriter(){ return new IpvWriter(); } }
Reader:
import lombok.extern.slf4j.Slf4j; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.NonTransientResourceException; import org.springframework.batch.item.ParseException; import org.springframework.batch.item.UnexpectedInputException; import java.util.Arrays; import java.util.Iterator; import java.util.List; @Slf4j public class IpvReader implements ItemReader<String> { private final Iterator<String> iterator; public IpvReader(List<String> data) { this.iterator = data.iterator(); } @Override public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException { List<String> reader= Arrays.asList("test1","test2"); if(iterator.hasNext()){ return this.iterator.next(); }else{ return null; } } }
Processor:
import org.springframework.batch.item.ItemProcessor; public class IpvProcessor implements ItemProcessor<String,String> { @Override public String process(String item) throws Exception { //do processor business return item.concat("processor"); } }
Writer:
import org.springframework.batch.item.ItemWriter; import java.util.List; public class IpvWriter implements ItemWriter<String> { @Override public void write(List<? extends String> items) throws Exception { System.out.println("write:"+items); } }
2.3 代码:testStep first execute, and flow1 and flows parallel execution
job:
package springbatch.SequenceSpringBatch.Job; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.job.builder.FlowBuilder; import org.springframework.batch.core.job.flow.Flow; import org.springframework.batch.core.job.flow.support.SimpleFlow; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.task.SimpleAsyncTaskExecutor; import springbatch.SequenceSpringBatch.Entity.InputEntity; import springbatch.SequenceSpringBatch.Entity.OutputEntity; import springbatch.SequenceSpringBatch.Processor.Processor; import springbatch.SequenceSpringBatch.Reader.Reader1; import springbatch.SequenceSpringBatch.Reader.Reader2; import springbatch.SequenceSpringBatch.Reader.Reader3; import springbatch.SequenceSpringBatch.Writer.Writer1; import springbatch.SequenceSpringBatch.Writer.Writer2; import springbatch.SequenceSpringBatch.Writer.Writer3; import javax.annotation.Resource; /** * Create by qf86923 * Date 1/16/2020 5:00 PM * Description:testStep first execute, and flow1 and flows parallel execution */ @Slf4j @Configuration public class SequenceJobConfig { @Resource private JobBuilderFactory jobBuilderFactory; @Resource private StepBuilderFactory stepBuilderFactory; @Bean public Job SequenceJob(){ Flow flow1=new FlowBuilder<SimpleFlow>("flow1") .start(testFlow1Step()) .build(); Flow flow2=new FlowBuilder<SimpleFlow>("flow2") .start(testFlow2Step()) .build(); Flow flow0=new FlowBuilder<SimpleFlow>("flow0") .start(flow1) .split(new SimpleAsyncTaskExecutor()) //.split(new SimpleAsyncTaskExecutor()) //创建一个异步执行任务 .add(flow2) .build(); Flow flowComplex=new FlowBuilder<SimpleFlow>("flowcomplex") .start(testStep()) //.start() .next()是顺序执行的 .next(flow0) .build(); return jobBuilderFactory.get("sequenceJob") .incrementer(new RunIdIncrementer()) .start(flowComplex) .end() .build(); } @Bean public Step testFlow1Step(){ return stepBuilderFactory.get("testFlow1Step").<InputEntity, OutputEntity>chunk(3). faultTolerant().skip(Exception.class).skipLimit(1). reader(Reader1()). processor(Processor()). writer(Writer1()). build(); } @Bean public Step testFlow2Step(){ return stepBuilderFactory.get("testFlow2Step").<InputEntity,OutputEntity>chunk(3). faultTolerant().skip(Exception.class).skipLimit(1). reader(Reader2()). processor(Processor()). writer(Writer2()). build(); } @Bean public Step testStep(){ return stepBuilderFactory.get("testStep").<InputEntity,OutputEntity>chunk(3). faultTolerant().skip(Exception.class).skipLimit(1). reader(Reader3()). processor(Processor()). writer(Writer3()). build(); } // @Bean // public Step testStepNext(){ // return stepBuilderFactory.get("testStepNext").<OutputEntity,NextOutputEntity>chunk(3) // .faultTolerant().skip(Exception.class).skipLimit(1). // reader(ReaderNext()). // processor(ProcessorNext()). // writer(WriterNext()). // build(); // } @Bean public ItemReader<InputEntity> Reader1(){ return new Reader1(); } @Bean public ItemReader<InputEntity> Reader2(){ return new Reader2(); } @Bean public ItemReader<InputEntity> Reader3(){ return new Reader3(); } @Bean public ItemProcessor<InputEntity,OutputEntity> Processor(){ return new Processor(); } @Bean public ItemWriter<OutputEntity> Writer1(){ return new Writer1(); } @Bean public ItemWriter<OutputEntity> Writer2(){ return new Writer2(); } @Bean public ItemWriter<OutputEntity> Writer3(){ return new Writer3(); } }View Code
read:
package springbatch.SequenceSpringBatch.Reader; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.NonTransientResourceException; import org.springframework.batch.item.ParseException; import org.springframework.batch.item.UnexpectedInputException; import org.springframework.beans.factory.annotation.Autowired; import springbatch.SequenceSpringBatch.Entity.InputEntity; import springbatch.SequenceSpringBatch.ProcessService.Service1; import java.util.concurrent.atomic.AtomicInteger; /** * Create by qf86923 * Date 1/16/2020 5:23 PM * Description */ @Slf4j public class Reader1 implements ItemReader<InputEntity> { @Autowired private Service1 service1; private static AtomicInteger count = new AtomicInteger(0); @Override public InputEntity read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException { InputEntity inputEntity=service1.getInputEntity(); count.incrementAndGet(); if(inputEntity != null && count.get()<2){ System.out.println("=====Reder1_inputEntity======"+inputEntity); return inputEntity; } count=new AtomicInteger(0); //NOSONAR return null; } }View Code
process:
package springbatch.SequenceSpringBatch.Processor; import org.springframework.batch.item.ItemProcessor; import springbatch.SequenceSpringBatch.Entity.InputEntity; import springbatch.SequenceSpringBatch.Entity.OutputEntity; /** * Create by qf86923 * Date 1/16/2020 5:47 PM * Description */ public class Processor implements ItemProcessor<InputEntity, OutputEntity> { @Override public OutputEntity process(InputEntity item) throws Exception { OutputEntity outputEntity=new OutputEntity(); outputEntity.setName(item.getName()); outputEntity.setGrade(item.getAge()); outputEntity.setSex(item.getAge()>1?"female":"male"); return outputEntity; } }View Code
writer:
package springbatch.SequenceSpringBatch.Writer; import org.springframework.batch.item.ItemWriter; import springbatch.SequenceSpringBatch.Entity.OutputEntity; import java.util.List; /** * Create by qf86923 * Date 1/16/2020 5:52 PM * Description */ public class Writer1 implements ItemWriter<OutputEntity> { @Override public void write(List<? extends OutputEntity> items) throws Exception { for(OutputEntity outputEntity:items){ System.out.println("=====Writer1"+outputEntity.getName()+","+outputEntity.getSex()+","+outputEntity.getGrade()); } } }View Code
标签:return,Spring,batch,springframework,import,org,public From: https://www.cnblogs.com/enhance/p/12204802.html