SpringBatch远程分区demo
*使用框架版本
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.han</groupId>
<artifactId>common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>3.1.5.RELEASE</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
<version>5.1.4.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.3.1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.13</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.4.0</version>
</dependency>
- springbatch版本过高会报错 list.of
- 原因: java9才开始支持这个形式
- 所有依赖版本与公司项目版本一致
总体架构图
其中:
- Master作为系统的指挥中枢,其主要的作用是将任务分区,例如按照数据表中的id主键进行分区,并将分区信息分发给slave节点,一般并没有具体的Step实现, Master将创建出多个SlaveStep 并将这些Step的执行上下文存储到数据库中
// mater_topic中发送的步骤信息
// 分区1
{
"stepExecutionId": 410,
"stepName": "slave",
"jobExecutionId": 159
}
// 分区2
{
"stepExecutionId": 411,
"stepName": "slave",
"jobExecutionId": 160
}
//context
{
"@class":"java.util.HashMap",
"batch.taskletType":"org.springframework.batch.core.step.item.ChunkOrientedTasklet",
"MyBatisPagingItemReader.read.count":0,
"start":0,
"end":200,
"batch.stepType":"org.springframework.batch.core.step.tasklet.TaskletStep"
}
{
"@class":"java.util.HashMap",
"batch.taskletType":"org.springframework.batch.core.step.item.ChunkOrientedTasklet",
"MyBatisPagingItemReader.read.count":0,
"start":200,
"end":400,
"batch.stepType":"org.springframework.batch.core.step.tasklet.TaskletStep"
}
- Slave节点负责监听来自Master topic中传递的分区信息, 在读取到分区中的数据后, 将按照分区中的消息去数据库中查找对应的步骤的id, 并且获得对应Step的执行上下文, 并根据这些参数, 执行对应的任务
消息队列设置
kafka:
bootstrap-servers: xx:9092
producer:
# 发生错误后,消息重发的次数。
retries: 0
#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。 batch-size: 16384 # 设置生产者内存缓冲区的大小。
buffer-memory: 33554432
# 键的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
acks: 1
consumer:
group-id: test
# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
auto-commit-interval: 1S
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
auto-offset-reset: earliest
# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
enable-auto-commit: false
# 键的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 值的反序列化方式
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
listener:
# 在侦听器容器中运行的线程数。
concurrency: 5
#listner负责ack,每调用一次,就立即commit
ack-mode: manual_immediate
missing-topics-fatal: false
kafka主题设置
public class KafkaConstants {
// master分发任务的使用的topic,slave订阅这个topic
public static final String MASTER_SEND_TOPIC = "MASTER_SEND_TOPIC_1";
// slave给与master返回结果使用的topic
public static final String SLAVE_SEND_TOPIC = "SLAVE_SEND_TOPIC";
// master发送topic的分区数
public static final Integer TOPIC_PARTITION_COUNT = 2;
}
$\bigstar$ Master
$\blacktriangleright$ 发送 Channel
@Bean("outboundChannel")
public DirectChannel outboundChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "outboundChannel")
public MessageHandler outbound() {
KafkaProducerMessageHandler messageHandler = new KafkaProducerMessageHandler(kafkaTemplate);
messageHandler.setTopicExpression(new LiteralExpression(KafkaConstants.MASTER_SEND_TOPIC));
//保证消息分发的时候的轮询
Function<Message<?>, Long> partitionIdFn = (m) -> {
StepExecutionRequest executionRequest = (StepExecutionRequest) m.getPayload();
return executionRequest.getStepExecutionId() % KafkaConstants.TOPIC_PARTITION_COUNT;
};
messageHandler.setPartitionIdExpression(new FunctionExpression<>(partitionIdFn));
messageHandler.setOutputChannel(outboundChannel());
return messageHandler;
}
$\blacktriangleright$ 接收 Channel
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setTrigger(new PeriodicTrigger(100L));
return pollerMetadata;
}
@Bean
public QueueChannel inboundChannel(){
return new QueueChannel();
}
@Bean(name = "Container")
public KafkaMessageListenerContainer<String,Object> listenerContainer(){
ContainerProperties containerProperties=new ContainerProperties(KafkaConstants.SLAVE_SEND_TOPIC);
containerProperties.setMissingTopicsFatal(false);
return new KafkaMessageListenerContainer<>(consumerFactory,containerProperties);
}
@Bean
public KafkaMessageDrivenChannelAdapter<String,Object> jobConsumers(){
KafkaMessageDrivenChannelAdapter<String,Object> adapter=new KafkaMessageDrivenChannelAdapter<>(listenerContainer());
adapter.setOutputChannel(inboundChannel());
adapter.afterPropertiesSet();
return adapter;
}
$\blacktriangleright$ partitioner分区器自定义
@Component
public class MySimplePartitioner implements Partitioner {
@Override
public Map<String, ExecutionContext> partition(int i) {
//根据所传递的i值进行分区,并将id分段存入step执行上下文中
Map<String,ExecutionContext> res= new HashMap<>();
for(int j=0;j<i;j++){
ExecutionContext e =new ExecutionContext();
e.putInt("start",j*200);
e.putInt("end",j*200+200);
res.put("slave_"+j,e);
}
return res;
}
}
$\blacktriangleright$ managerJob与managerStep
@Bean(name = "managerJob")
public Job managerJob(){
return jobBuilderFactory.get("managerJob"+System.currentTimeMillis())
.start(managerStep())
.build();
}
@Bean
public Step managerStep(){
return stepBuilderFactory.get("master")
.partitioner("slave",mySimplePartitioner)
.gridSize(KafkaConstants.TOPIC_PARTITION_COUNT)
.outputChannel(outboundChannel())
.inputChannel(inboundChannel())
.build();
}
$\bigstar$ Slave端
$\blacktriangleright$ 接收消息队列
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setTrigger(new PeriodicTrigger(100L));
return pollerMetadata;
}
@Bean
public QueueChannel inboundChannel(){
return new QueueChannel();
}
@Bean(name = "Container")
public KafkaMessageListenerContainer<String,Object> listenerContainer(){
ContainerProperties containerProperties=new ContainerProperties(KafkaConstants.MASTER_SEND_TOPIC);
containerProperties.setMissingTopicsFatal(false);
return new KafkaMessageListenerContainer<>(consumerFactory,containerProperties);
}
@Bean
public KafkaMessageDrivenChannelAdapter<String,Object> jobConsumers(){
KafkaMessageDrivenChannelAdapter<String,Object> adapter=new KafkaMessageDrivenChannelAdapter<>(listenerContainer());
adapter.setOutputChannel(inboundChannel());
adapter.afterPropertiesSet();
return adapter;
}
$\blacktriangleright$ 回复通道
@Bean("slaveOutboundChannel")
public DirectChannel outboundChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "slaveOutboundChannel")
public MessageHandler outbound() {
KafkaProducerMessageHandler messageHandler = new KafkaProducerMessageHandler(template);
messageHandler.setTopicExpression(new LiteralExpression(KafkaConstants.SLAVE_SEND_TOPIC));
messageHandler.setOutputChannel(outboundChannel());
return messageHandler;
}
$\blacktriangleright$ ItemWriter 负责获取数据 数据库或者文件
@Bean
@StepScope
//从执行上下文中获取分区的id范围
public MyBatisPagingItemReader<TestMetaData> i17ItemReader(@Value("#{StepExecutionContext['start']}")Integer start,
@Value("#{StepExecutionContext['end']}")Integer end){
MyBatisPagingItemReader<TestMetaData> reader=new MyBatisPagingItemReader<>();
reader.setSqlSessionFactory(sqlSessionFactory);
reader.setQueryId("org.han.dao.mapper.MetaDataMapper.pageQuery");
String sql = "select * from test_meta_data imi where id between " +start+" and "+end;
Map<String,Object> queryParameter = new HashMap<>();
queryParameter.put("sql",sql);
reader.setParameterValues(queryParameter);
reader.setPageSize(50);
return reader;
}
其中 reader 会根据 size 默认传入 _skiprows, _pagesize 来构造分页逻辑
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.han.dao.mapper.MetaDataMapper">
<select id="pageQuery" parameterType="java.util.Map" resultType="org.han.dao.dataobject.TestMetaData">
${sql} limit ${_skiprows},${_pagesize}
</select>
</mapper>
$\blacktriangleright$ Workstep 负责执行具体的任务
@Bean(name = "slave")
public Step step(){
return stepBuilderFactory.get("slave")
.inputChannel(inboundChannel())
.outputChannel(outboundChannel())
.<TestMetaData,TestMetaData>chunk(10)
.reader(i17ItemReader(null,null))
.writer(slaveItemWriter())
.build();
}
总结
- master通过分区器将数据库中的数据按照容量和slave节点的数量分区成多个分区,并将workstep生成并分配好.通过kafka的通道将workid传送给worker,
- woker在拉取消息后通过 stepid在数据库中获得对用的work分区数据 接下来slave进行正常的读取与操作,并将结果返回给master
- master进行聚合获得结果