首页 > 其他分享 >SpringBatch

SpringBatch

时间:2023-07-30 23:14:22浏览次数:42  
标签:org batch springframework new import SpringBatch public

SpringBatch

配置数据库自动生成

Spring:
    batch:
        jdbc:
            initialize-schema: never # always表示会进行初始化
        job:
            enabled: false # true 程序启动的时候会进行调用job false则不会调用

Partition

@Slf4j
public class PagePartitioner implements Partitioner {
    @Resource
    private TestMetaDataDaoService dataDaoService;
    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        //ExecutionContext Map类型 在分区器中添加key value 在其他函数中可以时候用StepExecutionContext取出

        Map<String, ExecutionContext> result = new HashMap<>();

        for(int j = 0; j < gridSize; j++) {
            //从其他函数传递
            ExecutionContext e = new ExecutionContext();
            e.putInt("start", j * 5);
            e.putInt("end",(j+1)*5);
            //不要忘记添加到Map中
            result.put("worker"+j,e);
        }


        return result;
    }
}

分区器负责将输入的数据分成多个partition以供worker使用
将分区数据添加到步骤上下文中

MyBatisPagingItemReader

    @Bean
    @StepScope //延迟加载 在调用的时候才会区生成bean
        public MyBatisPagingItemReader<TestMetadata> myItemReader(@Value("#{StepExecutionContext['start']}")Integer start,
    @Value("#{StepExecutionContext['end']}")Integer end){
        // 从上下文中取出,分区器中存入的数据
        Map<String,Object> parameters = new HashMap<>();
        parameters.put("start",start);
        parameters.put("end",end);
        return new MyBatisPagingItemReaderBuilder<TestMetadata>().sqlSessionFactory(sqlSessionFactory)

                .queryId("com.han.springbatchremotekafka.generate.SqlDoMapper.queryPage")
                .parameterValues(parameters)

                .pageSize(5)
                .build();
    }

Sql

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.han.springbatchremotekafka.generate.SqlDoMapper">
    <select id="queryPage" parameterType="java.util.Map"
            resultType="com.han.springbatchremotekafka.generate.TestMetadata">
        select *
        from
             test_metadata
        where id between #{start} and #{end}-1
        order by id asc
        limit #{_pagesize} OFFSET #{_skiprows}
        <!-- 分页器会默认传递这两个值所以要加上 -->
    </select>
</mapper>

partitionHandler

执行一些调度工作 并决定分区的个数

    @Bean
    public PartitionHandler myPartitionHandler(){
        TaskExecutorPartitionHandler handler=new TaskExecutorPartitionHandler();
        handler.setStep(partitionedStep());
        handler.setTaskExecutor(new SimpleAsyncTaskExecutor());
        handler.setGridSize(4);
        return handler;
    }

全部代码

package com.han.springbatchremotekafka.config;

import com.han.springbatchremotekafka.config.partitioner.PagePartitioner;
import com.han.springbatchremotekafka.generate.TestMetadata;
import com.han.springbatchremotekafka.generate.TestMetadataNew;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.batch.MyBatisPagingItemReader;
import org.mybatis.spring.batch.builder.MyBatisBatchItemWriterBuilder;
import org.mybatis.spring.batch.builder.MyBatisPagingItemReaderBuilder;
import org.springframework.batch.core.*;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.batch.item.*;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.scheduling.annotation.Scheduled;
import javax.annotation.Resource;
import java.util.*;

/**
 * @author 韩文硕
 * @version 1.0
 * @date 2023/7/30 13:27
 * @usage
 */
@Configuration
@Slf4j
public class SpringBatchConfig {

    @Resource
    private JobBuilderFactory jobFactory;
    @Resource
    private StepBuilderFactory stepFactory;

    @Resource
    private SqlSessionFactory sqlSessionFactory;

    @Resource
    private JobLauncher jobLauncher;


    @Bean
    public PagePartitioner partitioner(){
        return  new PagePartitioner();
    }

    @Bean("job")
    public Job newJob(){
        return jobFactory.get("job_test_1"+ System.currentTimeMillis())
                .start(maseterStep())
                .build();
    }



    @Bean
    public PartitionHandler myPartitionHandler(){
        TaskExecutorPartitionHandler handler=new TaskExecutorPartitionHandler();
        handler.setStep(partitionedStep());
        handler.setTaskExecutor(new SimpleAsyncTaskExecutor());
        handler.setGridSize(4);
        return handler;
    }

    @Bean
    public Step maseterStep(){
        return stepFactory.get("master_step")
                .partitioner(partitionedStep().getName(),partitioner())
                .partitionHandler(myPartitionHandler())
                .build();

    }

    @Bean
    public Step partitionedStep() {
        return stepFactory.get("partitionStep")
                .<TestMetadata,TestMetadataNew>chunk(10)
                .reader(myItemReader(null,null))
                .processor(myItemProcessor())
                .writer(databaseWriter())
                .build();
    }

    @Bean
    @StepScope
    public ItemWriter<TestMetadata> writer(){
        return new ItemWriter<TestMetadata>() {
            @Override
            public void write(List<? extends TestMetadata> list) throws Exception {
                for (TestMetadata item:list
                     ) {
                    log.info(item.toString());
                }
            }

        };
    }



    @Bean
    @StepScope
    public ItemWriter<TestMetadataNew> databaseWriter(){
        return new MyBatisBatchItemWriterBuilder<TestMetadataNew>()
                .sqlSessionFactory(sqlSessionFactory)
                .statementId("com.han.springbatchremotekafka.generate.TestMetadataNewDao.insert")
                .build();
    }



    @Bean
    @StepScope
    public ItemProcessor<TestMetadata, TestMetadataNew> myItemProcessor(){
        return new ItemProcessor<TestMetadata, TestMetadataNew>(){

            @Override
            public TestMetadataNew process(TestMetadata o) throws Exception {
                TestMetadataNew testMetadata = new TestMetadataNew();
                BeanUtils.copyProperties(o,testMetadata);
                testMetadata.setSex(o.getId()%2==0?"male":"female");

                return testMetadata;
            }
        };
    }




    @Bean
    @StepScope
        public MyBatisPagingItemReader<TestMetadata> myItemReader(@Value("#{StepExecutionContext['start']}")Integer start,
    @Value("#{StepExecutionContext['end']}")Integer end){
        Map<String,Object> parameters = new HashMap<>();
        parameters.put("start",start);
        parameters.put("end",end);
        return new MyBatisPagingItemReaderBuilder<TestMetadata>().sqlSessionFactory(sqlSessionFactory)

                .queryId("com.han.springbatchremotekafka.generate.SqlDoMapper.queryPage")
                .parameterValues(parameters)

                .pageSize(5)
                .build();
    }

    @Scheduled(cron= "0 * * * * *")
    public void executeBatch() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
        JobParameters jobParameter = new JobParametersBuilder()
                .addString("name",System.currentTimeMillis()+"")
                .toJobParameters();
        jobLauncher.run(newJob(),jobParameter);
    }


}

标签:org,batch,springframework,new,import,SpringBatch,public
From: https://www.cnblogs.com/AIxuexiH/p/17592283.html

相关文章

  • SpringBatch从入门到实战(一):简介和环境搭建
    一:简介SpringBatch是一个轻量级的批处理框架,适合处理大批量的数据(如百万级别)。功能就是从一个地方读数据写到另一个地方去。一般都是系统之间不能直接访问同一个数据库,需要通过文件来交换数据。二:从文件中读然后写到数据库这代码谁都会写,那么为什么还要使用框架?try(BufferedReader......
  • SpringBatch之实际操作
    目录1SpringBatch操作1.1SpringBatch介绍1.2依赖配置相关1.2.1pom.xml1.2.2mysql依赖库表1.2.3启动配置1.2.4数据库配置1.3示例Demo1.3.1简单执行1.3.2报错1.4流程控制1.4.1多步骤任务1.4.2Flow用法1.4.3并发执行1.4.4任务决策1.4.5任务嵌套1.5数据操作1.5.1......
  • 批量单元框架SpringBatch简介
    SpringBatch简介SpringBatch是一个轻量级的、完善的批处理框架,作为Spring体系中的一员,它拥有灵活、方便、生产可用的特点。在应对高效处理大量信息、定时处理大量数据等......
  • SpringBatch 框架
    SpringBatch是一个轻量级的、完善的批处理框架,旨在帮助企业建立健壮、高效的批处理应用。它是一个批处理应用框架,不是调度框架,需要配合调度框架来完成批处理工作。这里......