SpringBatch
配置数据库自动生成
Spring:
batch:
jdbc:
initialize-schema: never # always表示会进行初始化
job:
enabled: false # true 程序启动的时候会进行调用job false则不会调用
Partition
@Slf4j
public class PagePartitioner implements Partitioner {
@Resource
private TestMetaDataDaoService dataDaoService;
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
//ExecutionContext Map类型 在分区器中添加key value 在其他函数中可以时候用StepExecutionContext取出
Map<String, ExecutionContext> result = new HashMap<>();
for(int j = 0; j < gridSize; j++) {
//从其他函数传递
ExecutionContext e = new ExecutionContext();
e.putInt("start", j * 5);
e.putInt("end",(j+1)*5);
//不要忘记添加到Map中
result.put("worker"+j,e);
}
return result;
}
}
分区器负责将输入的数据分成多个partition以供worker使用
将分区数据添加到步骤上下文中
MyBatisPagingItemReader
@Bean
@StepScope //延迟加载 在调用的时候才会区生成bean
public MyBatisPagingItemReader<TestMetadata> myItemReader(@Value("#{StepExecutionContext['start']}")Integer start,
@Value("#{StepExecutionContext['end']}")Integer end){
// 从上下文中取出,分区器中存入的数据
Map<String,Object> parameters = new HashMap<>();
parameters.put("start",start);
parameters.put("end",end);
return new MyBatisPagingItemReaderBuilder<TestMetadata>().sqlSessionFactory(sqlSessionFactory)
.queryId("com.han.springbatchremotekafka.generate.SqlDoMapper.queryPage")
.parameterValues(parameters)
.pageSize(5)
.build();
}
Sql
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.han.springbatchremotekafka.generate.SqlDoMapper">
<select id="queryPage" parameterType="java.util.Map"
resultType="com.han.springbatchremotekafka.generate.TestMetadata">
select *
from
test_metadata
where id between #{start} and #{end}-1
order by id asc
limit #{_pagesize} OFFSET #{_skiprows}
<!-- 分页器会默认传递这两个值所以要加上 -->
</select>
</mapper>
partitionHandler
执行一些调度工作 并决定分区的个数
@Bean
public PartitionHandler myPartitionHandler(){
TaskExecutorPartitionHandler handler=new TaskExecutorPartitionHandler();
handler.setStep(partitionedStep());
handler.setTaskExecutor(new SimpleAsyncTaskExecutor());
handler.setGridSize(4);
return handler;
}
全部代码
package com.han.springbatchremotekafka.config;
import com.han.springbatchremotekafka.config.partitioner.PagePartitioner;
import com.han.springbatchremotekafka.generate.TestMetadata;
import com.han.springbatchremotekafka.generate.TestMetadataNew;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.batch.MyBatisPagingItemReader;
import org.mybatis.spring.batch.builder.MyBatisBatchItemWriterBuilder;
import org.mybatis.spring.batch.builder.MyBatisPagingItemReaderBuilder;
import org.springframework.batch.core.*;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.batch.item.*;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.scheduling.annotation.Scheduled;
import javax.annotation.Resource;
import java.util.*;
/**
* @author 韩文硕
* @version 1.0
* @date 2023/7/30 13:27
* @usage
*/
@Configuration
@Slf4j
public class SpringBatchConfig {
@Resource
private JobBuilderFactory jobFactory;
@Resource
private StepBuilderFactory stepFactory;
@Resource
private SqlSessionFactory sqlSessionFactory;
@Resource
private JobLauncher jobLauncher;
@Bean
public PagePartitioner partitioner(){
return new PagePartitioner();
}
@Bean("job")
public Job newJob(){
return jobFactory.get("job_test_1"+ System.currentTimeMillis())
.start(maseterStep())
.build();
}
@Bean
public PartitionHandler myPartitionHandler(){
TaskExecutorPartitionHandler handler=new TaskExecutorPartitionHandler();
handler.setStep(partitionedStep());
handler.setTaskExecutor(new SimpleAsyncTaskExecutor());
handler.setGridSize(4);
return handler;
}
@Bean
public Step maseterStep(){
return stepFactory.get("master_step")
.partitioner(partitionedStep().getName(),partitioner())
.partitionHandler(myPartitionHandler())
.build();
}
@Bean
public Step partitionedStep() {
return stepFactory.get("partitionStep")
.<TestMetadata,TestMetadataNew>chunk(10)
.reader(myItemReader(null,null))
.processor(myItemProcessor())
.writer(databaseWriter())
.build();
}
@Bean
@StepScope
public ItemWriter<TestMetadata> writer(){
return new ItemWriter<TestMetadata>() {
@Override
public void write(List<? extends TestMetadata> list) throws Exception {
for (TestMetadata item:list
) {
log.info(item.toString());
}
}
};
}
@Bean
@StepScope
public ItemWriter<TestMetadataNew> databaseWriter(){
return new MyBatisBatchItemWriterBuilder<TestMetadataNew>()
.sqlSessionFactory(sqlSessionFactory)
.statementId("com.han.springbatchremotekafka.generate.TestMetadataNewDao.insert")
.build();
}
@Bean
@StepScope
public ItemProcessor<TestMetadata, TestMetadataNew> myItemProcessor(){
return new ItemProcessor<TestMetadata, TestMetadataNew>(){
@Override
public TestMetadataNew process(TestMetadata o) throws Exception {
TestMetadataNew testMetadata = new TestMetadataNew();
BeanUtils.copyProperties(o,testMetadata);
testMetadata.setSex(o.getId()%2==0?"male":"female");
return testMetadata;
}
};
}
@Bean
@StepScope
public MyBatisPagingItemReader<TestMetadata> myItemReader(@Value("#{StepExecutionContext['start']}")Integer start,
@Value("#{StepExecutionContext['end']}")Integer end){
Map<String,Object> parameters = new HashMap<>();
parameters.put("start",start);
parameters.put("end",end);
return new MyBatisPagingItemReaderBuilder<TestMetadata>().sqlSessionFactory(sqlSessionFactory)
.queryId("com.han.springbatchremotekafka.generate.SqlDoMapper.queryPage")
.parameterValues(parameters)
.pageSize(5)
.build();
}
@Scheduled(cron= "0 * * * * *")
public void executeBatch() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
JobParameters jobParameter = new JobParametersBuilder()
.addString("name",System.currentTimeMillis()+"")
.toJobParameters();
jobLauncher.run(newJob(),jobParameter);
}
}
标签:org,batch,springframework,new,import,SpringBatch,public
From: https://www.cnblogs.com/AIxuexiH/p/17592283.html