首页 > 其他分享 >springbatch remote partition

springbatch remote partition

时间:2023-08-08 18:55:54浏览次数:47  
标签:remote partition springframework return Bean new org springbatch public

SpringBatch远程分区demo

*使用框架版本

<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter</artifactId>
	</dependency>
		<dependency>
			<groupId>org.han</groupId>
			<artifactId>common</artifactId>
			<version>1.0-SNAPSHOT</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-batch</artifactId>
			<version>2.4.2</version>

		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-integration</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-integration</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-kafka</artifactId>
			<version>3.1.5.RELEASE</version>
			<exclusions>
				<exclusion>
					<groupId>org.slf4j</groupId>
					<artifactId>slf4j-log4j12</artifactId>
				</exclusion>
			</exclusions>

		</dependency>
		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-core</artifactId>
			<version>5.1.4.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.batch</groupId>
			<artifactId>spring-batch-integration</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>

		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>com.baomidou</groupId>
			<artifactId>mybatis-plus-boot-starter</artifactId>
			<version>3.5.3.1</version>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<version>1.18.22</version>
		</dependency>
		<dependency>
			<groupId>io.springfox</groupId>
			<artifactId>springfox-swagger2</artifactId>
			<version>2.9.2</version>
		</dependency>
		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>1.2.83</version>
		</dependency>
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>8.0.13</version>
		</dependency>
		<dependency>
			<groupId>cn.hutool</groupId>
			<artifactId>hutool-all</artifactId>
			<version>5.4.0</version>
		</dependency>
  • springbatch版本过高会报错 list.of
    • 原因: java9才开始支持这个形式
  • 所有依赖版本与公司项目版本一致

总体架构图

Alt text

其中:

  • Master作为系统的指挥中枢,其主要的作用是将任务分区,例如按照数据表中的id主键进行分区,并将分区信息分发给slave节点,一般并没有具体的Step实现, Master将创建出多个SlaveStep 并将这些Step的执行上下文存储到数据库中
// mater_topic中发送的步骤信息
// 分区1
{
    "stepExecutionId": 410,
    "stepName": "slave",
    "jobExecutionId": 159
}
// 分区2
{
    "stepExecutionId": 411,
    "stepName": "slave",
    "jobExecutionId": 160
}
//context
{
    "@class":"java.util.HashMap",
    "batch.taskletType":"org.springframework.batch.core.step.item.ChunkOrientedTasklet",
    "MyBatisPagingItemReader.read.count":0,
    "start":0,
    "end":200,
    "batch.stepType":"org.springframework.batch.core.step.tasklet.TaskletStep"
}
{
    "@class":"java.util.HashMap",
    "batch.taskletType":"org.springframework.batch.core.step.item.ChunkOrientedTasklet",
    "MyBatisPagingItemReader.read.count":0,
    "start":200,
    "end":400,
    "batch.stepType":"org.springframework.batch.core.step.tasklet.TaskletStep"
}
  • Slave节点负责监听来自Master topic中传递的分区信息, 在读取到分区中的数据后, 将按照分区中的消息去数据库中查找对应的步骤的id, 并且获得对应Step的执行上下文, 并根据这些参数, 执行对应的任务

消息队列设置

  kafka:
    bootstrap-servers: xx:9092
    producer:
      # 发生错误后,消息重发的次数。
      retries: 0
      #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。      batch-size: 16384      # 设置生产者内存缓冲区的大小。
      buffer-memory: 33554432
      # 键的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
      # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
      # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
      acks: 1
    consumer:
      group-id: test
      # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
      auto-commit-interval: 1S
      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
      # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
      # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
      auto-offset-reset: earliest
      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
      enable-auto-commit: false
      # 键的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
    listener:
      # 在侦听器容器中运行的线程数。
      concurrency: 5
      #listner负责ack,每调用一次,就立即commit
      ack-mode: manual_immediate
      missing-topics-fatal: false

kafka主题设置

public class KafkaConstants {
    // master分发任务的使用的topic,slave订阅这个topic
    public static final String MASTER_SEND_TOPIC = "MASTER_SEND_TOPIC_1";
    // slave给与master返回结果使用的topic
    public static final String SLAVE_SEND_TOPIC = "SLAVE_SEND_TOPIC";


    // master发送topic的分区数 
    public static final Integer TOPIC_PARTITION_COUNT = 2;
}

$\bigstar$ Master

$\blacktriangleright$ 发送 Channel

    @Bean("outboundChannel")
    public DirectChannel outboundChannel() {
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "outboundChannel")
    public MessageHandler outbound() {

        KafkaProducerMessageHandler messageHandler = new KafkaProducerMessageHandler(kafkaTemplate);
        messageHandler.setTopicExpression(new LiteralExpression(KafkaConstants.MASTER_SEND_TOPIC));

        //保证消息分发的时候的轮询
        Function<Message<?>, Long> partitionIdFn = (m) -> {
            StepExecutionRequest executionRequest = (StepExecutionRequest) m.getPayload();
            return executionRequest.getStepExecutionId() % KafkaConstants.TOPIC_PARTITION_COUNT;
        };
        messageHandler.setPartitionIdExpression(new FunctionExpression<>(partitionIdFn));
        messageHandler.setOutputChannel(outboundChannel());
        return messageHandler;
    }

$\blacktriangleright$ 接收 Channel


    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata defaultPoller() {
        PollerMetadata pollerMetadata = new PollerMetadata();
        pollerMetadata.setTrigger(new PeriodicTrigger(100L));
        return pollerMetadata;
    }

    @Bean
    public QueueChannel inboundChannel(){
        return new QueueChannel();
    }

    @Bean(name = "Container")
    public KafkaMessageListenerContainer<String,Object> listenerContainer(){
        ContainerProperties containerProperties=new ContainerProperties(KafkaConstants.SLAVE_SEND_TOPIC);
        containerProperties.setMissingTopicsFatal(false);
        return new KafkaMessageListenerContainer<>(consumerFactory,containerProperties);

    }

    @Bean
    public KafkaMessageDrivenChannelAdapter<String,Object> jobConsumers(){
        KafkaMessageDrivenChannelAdapter<String,Object> adapter=new KafkaMessageDrivenChannelAdapter<>(listenerContainer());
        adapter.setOutputChannel(inboundChannel());
        adapter.afterPropertiesSet();
        return adapter;
    }

$\blacktriangleright$ partitioner分区器自定义

@Component
public class MySimplePartitioner implements Partitioner {


    @Override
    public Map<String, ExecutionContext> partition(int i) {
        //根据所传递的i值进行分区,并将id分段存入step执行上下文中
        Map<String,ExecutionContext> res= new HashMap<>();
        for(int j=0;j<i;j++){
            ExecutionContext e =new ExecutionContext();
            e.putInt("start",j*200);
            e.putInt("end",j*200+200);
            res.put("slave_"+j,e);
        }
        return res;
    }
}

$\blacktriangleright$ managerJob与managerStep

    @Bean(name = "managerJob")
    public Job managerJob(){
        return jobBuilderFactory.get("managerJob"+System.currentTimeMillis())
                .start(managerStep())
                .build();
    }
    @Bean
    public Step managerStep(){
        return stepBuilderFactory.get("master")
                .partitioner("slave",mySimplePartitioner)
                .gridSize(KafkaConstants.TOPIC_PARTITION_COUNT)
                .outputChannel(outboundChannel())
                .inputChannel(inboundChannel())
                .build();
    }

$\bigstar$ Slave端

$\blacktriangleright$ 接收消息队列

    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata defaultPoller() {
        PollerMetadata pollerMetadata = new PollerMetadata();
        pollerMetadata.setTrigger(new PeriodicTrigger(100L));
        return pollerMetadata;
    }

    @Bean
    public QueueChannel inboundChannel(){
        return new QueueChannel();
    }


    @Bean(name = "Container")
    public KafkaMessageListenerContainer<String,Object> listenerContainer(){
        ContainerProperties containerProperties=new ContainerProperties(KafkaConstants.MASTER_SEND_TOPIC);
        containerProperties.setMissingTopicsFatal(false);
        return new KafkaMessageListenerContainer<>(consumerFactory,containerProperties);

    }


    @Bean
    public KafkaMessageDrivenChannelAdapter<String,Object> jobConsumers(){
        KafkaMessageDrivenChannelAdapter<String,Object> adapter=new KafkaMessageDrivenChannelAdapter<>(listenerContainer());
        adapter.setOutputChannel(inboundChannel());
        adapter.afterPropertiesSet();
        return adapter;
    }

$\blacktriangleright$ 回复通道

    @Bean("slaveOutboundChannel")
    public DirectChannel outboundChannel() {
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "slaveOutboundChannel")
    public MessageHandler outbound() {

        KafkaProducerMessageHandler messageHandler = new KafkaProducerMessageHandler(template);
        messageHandler.setTopicExpression(new LiteralExpression(KafkaConstants.SLAVE_SEND_TOPIC));
        messageHandler.setOutputChannel(outboundChannel());
        return messageHandler;
    }

$\blacktriangleright$ ItemWriter 负责获取数据 数据库或者文件

    @Bean
    @StepScope
    //从执行上下文中获取分区的id范围
    public MyBatisPagingItemReader<TestMetaData> i17ItemReader(@Value("#{StepExecutionContext['start']}")Integer start,
                                                               @Value("#{StepExecutionContext['end']}")Integer end){
        MyBatisPagingItemReader<TestMetaData> reader=new MyBatisPagingItemReader<>();

        reader.setSqlSessionFactory(sqlSessionFactory);
        reader.setQueryId("org.han.dao.mapper.MetaDataMapper.pageQuery");
        String sql = "select * from test_meta_data imi where id between " +start+" and "+end;
        Map<String,Object> queryParameter = new HashMap<>();
        queryParameter.put("sql",sql);
        reader.setParameterValues(queryParameter);
        reader.setPageSize(50);
        return reader;
    }

其中 reader 会根据 size 默认传入 _skiprows, _pagesize 来构造分页逻辑

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.han.dao.mapper.MetaDataMapper">
    <select id="pageQuery" parameterType="java.util.Map" resultType="org.han.dao.dataobject.TestMetaData">
        ${sql} limit ${_skiprows},${_pagesize}
    </select>
</mapper>

$\blacktriangleright$ Workstep 负责执行具体的任务

    @Bean(name = "slave")
    public Step step(){
        return stepBuilderFactory.get("slave")
                .inputChannel(inboundChannel())
                .outputChannel(outboundChannel())
                .<TestMetaData,TestMetaData>chunk(10)
                .reader(i17ItemReader(null,null))
                .writer(slaveItemWriter())
                .build();
    }

总结

  • master通过分区器将数据库中的数据按照容量和slave节点的数量分区成多个分区,并将workstep生成并分配好.通过kafka的通道将workid传送给worker,
  • woker在拉取消息后通过 stepid在数据库中获得对用的work分区数据 接下来slave进行正常的读取与操作,并将结果返回给master
  • master进行聚合获得结果

标签:remote,partition,springframework,return,Bean,new,org,springbatch,public
From: https://www.cnblogs.com/AIxuexiH/p/17615155.html

相关文章

  • 遇到问题---hadoop--Remote App Log Directory does not have same value for the 4 N
    情况因为我们的某台服务器空间不足,暂时清理不出来,所以需要修改一些存放数据的日志目录等。修改完毕之后发现报错错误的配置RemoteAppLogDirectorydoesnothavesamevalueforthe4NodeManagers。原因一般来说不同的主机不要求配置的目录一致,但是yarn.nodemanager.remote......
  • git remote 修改远程仓库连接
    因为某些原因,你的仓库IP或网址编了,不得不修改远程仓库的地址。方法一:删除本地仓库,重新从远程拉取仓库。这样虽然简单,但是耗时间。方法二:命令行,修改https协议地址假设,本地仓库关联了gitee远程仓库,执行下面命令gitremote-v显示:originhttp://gitee.com/aabb/repo.gi......
  • SpringBatch
    SpringBatch配置数据库自动生成Spring:batch:jdbc:initialize-schema:never#always表示会进行初始化job:enabled:false#true程序启动的时候会进行调用jobfalse则不会调用Partition@Slf4jpublicclassPagePartiti......
  • [CF1849F] XOR Partition
    XORPartition题目描述Forasetofintegers$S$,let'sdefineitscostastheminimumvalueof$x\oplusy$amongallpairsofdifferentintegersfromtheset(here,$\oplus$denotesbitwiseXOR).Iftherearelessthantwoelementsintheset,......
  • 体验IntelliJ IDEA的远程开发(Remote Development)
    欢迎访问我的GitHub这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos本篇概览IDEA的远程开发功能,可以将本地的编译、构建、调试、运行等工作都放在远程服务器上执行,而本地仅运行客户端软件进行常规的开发操作即可,官方给出的逻辑图如下,可见......
  • remote: Support for password authentication was removed on August 13, 2021
    一、问题描述remote:SupportforpasswordauthenticationwasremovedonAugust13,2021.Pleaseuseapersonalaccesstokeninstead.具体如下:  大概意思:你原先的密码凭证从2021年8月13日开始就不能用了,必须使用个人访问令牌(personalaccesstoken),就是把你的密码替......
  • windows下shellcode注入的例子(WriteProcessMemory+CreateRemoteThread)
    vs里x64编译如下代码:  #include<iostream>#include<Windows.h>//#include"common.h"intmain(){ //msfvenom-pwindows/x64/execCMD=notepad.exe-fc unsignedcharshellcode[]= "\xfc\x48\x83\xe4\xf0\xe8\xc0\x00\x0......
  • 电池供电低功耗 远程综合数采仪Remote comprehensive data acquisition instrument
     一、数采物联网远传监测系统1.采用数采物联-智汇远程地温监测系统2.可以自建数据中心,保存野外监测数据。访问数据中心可以查询和导出指定时间段或所有的监测数据。每个数据中心均建设有安全的用户隔离措施,保证监测数据仅能被授权的用户访问。3.系统遵循物联网的三层架构,分......
  • oracle partition by 查询重复记录中的1条数据(获取表去重后的数据所有字段)
    1,partitionby分组后给分组数据排序selectt.*,row_number()over(partitionbyt."name",t."rid"orderbyt."rid")as"sort"from"person"t;2、获取去重后的记录selectt2.*from(SELECTt.*,row_number()over(partitionbyt.&......
  • mRemoteNG 设置共享账号
    FoldersandInheritance—mRemoteNGdocumentation可以通过在文件夹设置账号和密码,然后文件夹下的Connection继承账号和密码Rightnownothinghaschangedandnothingwillbeinherited.Toenableinheritanceswitchtotheinheritanceviewbyclickingthededicated......