首页 > 编程语言 >分布式任务elasticjob整合springboot本地多服务启动(附源码)

分布式任务elasticjob整合springboot本地多服务启动(附源码)

时间:2023-06-27 10:14:26浏览次数:52  
标签:springboot ddframe job 源码 elasticjob org import com annotation

Elastic-Job是一个分布式调度的解决方案,由当当网开源,它由两个相互独立的子项目Elastic-Job-Lite和ElasticJob-Cloud组成,使用Elastic-Job可以快速实现分布式任务调度。

Elastic-Job的github地址:https://github.com/elasticjob

功能列表: 分布式调度协调 在分布式环境中,任务能够按指定的调度策略执行,并且能够避免同一任务多实例重复执行。 丰富的调度策略: 基于成熟的定时任务作业框架Quartz cron表达式执行定时任务。 弹性扩容缩容 , 当集群中增加某一个实例,它应当也能够被选举并执行任务;当集群减少一个实例时,它所执行的任务能被 转移到别的实例来执行。 失效转移 某实例在任务执行失败后,会被转移到其他实例执行。 错过执行作业重触发 若因某种原因导致作业错过执行,自动记录错过执行的作业,并在上次作业完成后自动触发。 支持并行调度 支持任务分片,任务分片是指将一个任务分为多个小任务项在多个实例同时执行。 作业分片一致性 当任务被分片后,保证同一分片在分布式环境中仅一个执行实例。 支持作业生命周期操作 可以动态对任务进行开启及停止操作。 丰富的作业类型 支持Simple、DataFlow、Script三种作业类型,后续会有详细介绍。 Spring整合以及命名空间支持 对Spring支持良好的整合方式,支持spring自定义命名空间,支持占位符。 运维平台 提供运维界面,可以管理作业和注册中心。

本次是用elasticjob本地简单测试多服务器启动定时任务,这里需要安装Zookeeper,这里就不多聊了

1、编写Zookeeper配置类

package elastic.job.demo.autoconfig;

import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @ClassName : ZookeeperAutoConfig
 * @Description :自动配置类
 */
@Configuration
@ConditionalOnProperty("elasticjob.zookeeper.server-list")
@EnableConfigurationProperties(ZookeeperProperties.class)
public class ZookeeperAutoConfig {

    private final ZookeeperProperties zookeeperProperties;

    public ZookeeperAutoConfig(ZookeeperProperties zookeeperProperties) {
        this.zookeeperProperties = zookeeperProperties;
    }

    @Bean(initMethod = "init")
    public CoordinatorRegistryCenter zkCenter(){
        String serverList = zookeeperProperties.getServerlist();
        String namespace = zookeeperProperties.getNamespace();

        ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(serverList, namespace);
        ZookeeperRegistryCenter zookeeperRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
        return zookeeperRegistryCenter;
    }
}
package elastic.job.demo.autoconfig;

import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;

/**
 * @ClassName : ZookeeperProperties
 * @Description :属性配置类
 */
@Getter
@Setter
@ConfigurationProperties(prefix = "elasticjob.zookeeper")
public class ZookeeperProperties {
    //zookeeper地址列表
    private String serverlist;
    //zookeeper命名空间
    private String namespace;

}

2、写完配置类后,我们这里写一个注解,用注解的方式进行实现elasticjob的任务,方便后续的开发和简洁性

package elastic.job.demo.autoconfig;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.TYPE)//表示使用在哪:这里是类,
@Retention(RetentionPolicy.RUNTIME)//表示运行时进行启动
public @interface ElasticSimpleJob {
    String jobName() default "";
    String cron() default "";
    int shardingTotalCount() default 1;
    boolean overwrite() default  false;
}

3、用springboot的自动装载功能,进行注解的扫描和自动注册

package elastic.job.demo.autoconfig;

import com.dangdang.ddframe.job.api.ElasticJob;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.JobRootConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import elastic.job.demo.job.MySimpleJob;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.Map;

/**
 * @ClassName : SimpleJobAutoConfig
 * @Description :
 */
@Configuration
//@Service
@ConditionalOnBean(CoordinatorRegistryCenter.class)
@AutoConfigureAfter(ZookeeperAutoConfig.class)
public class SimpleJobAutoConfig {

    @Autowired
    private CoordinatorRegistryCenter coordinatorRegistryCenter;

    @Autowired
    private ApplicationContext applicationContext;

    //自动注册
    @PostConstruct
    public void initSimpleJob(){
        //获取spring的上下文
        Map<String, Object> beansWithAnnotation = applicationContext.getBeansWithAnnotation(ElasticSimpleJob.class);
        for(Map.Entry<String,Object> entry: beansWithAnnotation.entrySet()){
            Object instance = entry.getValue();
            Class<?>[] interfaces = instance.getClass().getInterfaces();
            for (Class<?> superInterface : interfaces) {
                if(superInterface == SimpleJob.class){
                    ElasticSimpleJob annotation = instance.getClass().getAnnotation(ElasticSimpleJob.class);
                    String jobName = annotation.jobName();
                    String cron = annotation.cron();
                    int shardingTotalCount = annotation.shardingTotalCount();
                    boolean overwrite =annotation.overwrite();
                    //注册定时任务
                    //job 核心配置
                    JobCoreConfiguration buildJcc = JobCoreConfiguration
                            .newBuilder(jobName, cron, shardingTotalCount)
                            .build();
                    //job类型配置
                    SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(
                            buildJcc, instance.getClass().getCanonicalName()
                    );
                    // job配置(LiteJobConfiguration)
                    LiteJobConfiguration buildLiteJobConfiguration = LiteJobConfiguration
                            .newBuilder(simpleJobConfiguration)
                            .overwrite(overwrite)
                            .build();

                    //启动
                    new SpringJobScheduler((ElasticJob) instance,coordinatorRegistryCenter,buildLiteJobConfiguration).init();

                }
            }
        }
    }
}

在springboot添加spring.factories用于扫描并自动注册

 4、我们这里编写2个任务 MySimpleJob 和 MySimpleJob2,

MySimpleJob,5秒一次,总分片数为4,MySimpleJob2,5秒一次,总分片数为1

package elastic.job.demo.job;

import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import elastic.job.demo.autoconfig.ElasticSimpleJob;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * @ClassName : MySimpleJob
 * @Description : 我的定时任务
 */
@ElasticSimpleJob(jobName = "mySimpleJob",
        cron = "0/5 * * * * ?",
        shardingTotalCount = 4,
        overwrite = true)
@Component
public class MySimpleJob implements SimpleJob {

    @Value("${server.port}")
    private int port;

    @Override
    public void execute(ShardingContext shardingContext) {
        System.out.println("端口:" + port  + ",我是任务1分片项:"+shardingContext.getShardingItem()+",总分片数是:"+
                shardingContext.getShardingTotalCount());
    }
}
package elastic.job.demo.job;

import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import elastic.job.demo.autoconfig.ElasticSimpleJob;
import org.springframework.stereotype.Component;

/**
 * @author zhoucc
 * @date 2023-06-26 16:38
 */
@ElasticSimpleJob(jobName = "mySimpleJob2",
        cron = "0/5 * * * * ?",
        shardingTotalCount = 1,
        overwrite = true)
@Component
public class MySimpleJob2 implements SimpleJob {

    @Override
    public void execute(ShardingContext shardingContext) {
        System.out.println("我是任务2分片项:"+shardingContext.getShardingItem()+",总分片数是:"+
                shardingContext.getShardingTotalCount());
    }
}

5、启动效果展示,我这里分别用8081、8082、8083,这3个端口的服务进行演示

(1)启动zookeeper,这里不多讲了,我这里的端口是2181

 (2)启动第一个服务8081端口

可以看到我这里4个分片的任务都在8081上进行执行了。

(3)启动第二个服务8082

 

 可以看到8082只运行了0和1.8081运行2和3.

(4)启动第3个服务8083

 

 

  可以看到8081只运行了2,8082运行0和3,8083运行4

(5)当我们停掉8083后

 

 又回到了 8081只运行了2和3,8083运行0和1

 (6)最后我们看一下zookeeper的页面

0,1,2,3这四个分片,2个服务,这里可以观察具体那个分片在哪个服务器上面进行执行。

最后总结一下,个人的见解:elasticjob首先定义分片策略,将服务器注册到zookpeeper上,进行分片分发,若某个服务终止,那将进行重新的分发。

附上源码包:

 链接:https://pan.baidu.com/s/1ChB9u18jgu_2IvWXFASjKA

提取码:1234

如果对你有帮助,麻烦给个小赞

标签:springboot,ddframe,job,源码,elasticjob,org,import,com,annotation
From: https://www.cnblogs.com/zhouchengchen/p/17507900.html

相关文章

  • SpringBoot假死,十万火急,怎么救火?
    文章很长,且持续更新,建议收藏起来,慢慢读!疯狂创客圈总目录博客园版为您奉上珍贵的学习资源:免费赠送:《尼恩Java面试宝典》持续更新+史上最全+面试必备2000页+面试必备+大厂必备+涨薪必备免费赠送:《尼恩技术圣经+高并发系列PDF》,帮你实现技术自由,完成职业升级,薪......
  • springboot学习-1
    最终目录结构:pom.xml:<?xmlversion="1.0"encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://mav......
  • python源码结构
    在Python中,源文件通常以.py为扩展名,并且可以按照一定的结构进行组织。下面是一个典型的Python源文件的结构示例:1#-*-coding:utf-8-*-23"""模块的文档字符串"""45#导入语句6importmodule17frommodule2importfunc1,func28frommodule3import*......
  • 基于JAVA的springboot班级综合测评管理系统,附源码+数据库+论文+PPT,适合课程设计、毕业
    1、项目介绍随着互联网技术的高速发展,人们生活的各方面都受到互联网技术的影响。现在人们可以通过互联网技术就能实现不出家门就可以通过网络进行系统管理,交易等,而且过程简单、快捷。同样的,在人们的工作生活中,也就需要互联网技术来方便人们的日常工作生活,实现工作办公的自动化处......
  • SpringBoot事件机制
    1、是什么?SpringBoot事件机制是指SpringBoot中的开发人员可以通过编写自定义事件来对应用程序进行事件处理。我们可以创建自己的事件类,并在应用程序中注册这些事件,当事件被触发时,可以对其进行处理。在SpringBoot中,事件可以是任意类型的,可以是基于Spring的事件,也可以是自定义的事......
  • 称重系统,过磅软件,地磅程序,c#源码
    称重系统,过磅软件,地磅程序,c#源码原创文章,转载请说明出处,资料来源:http://imgcs.cn/5c/636690132752.html......
  • SpringBoot04
    1.Springboot和Mybatis的整合1.1.使用注解的方式整合MyBatis引入相关的依赖<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId></dependency><!--mybatis起步依赖--><dependency>......
  • transformCreateStmt 函数源码分析
    函数transformCreateStmt功能在执行器阶段对createtable命令做一些处理:列属性处理条件限制处理likeClause处理如果需要,给表名加上当前schema的名字等等。。。函数签名List*transformCreateStmt(CreateStmt*stmt,constchar*queryString)输入参数CreateSt......
  • SpringBoot自动配置原理
    SpringBoot自动配置自动配置是SpringBoot的核心因素,SpringBoot在整合每一种第三方技术时,都离不开自动配置。但在了解自动配置之前,Spring容器如何进行对bean的加载以及加载控制也是一个非常重要的前提知识。1.bean的加载方式1.1方式一:配置文件+<bean/>标签最初级的bean的加载......
  • springboot 跨域设置
      写文章 SpringBoot项目解决跨域的几种方案小满只想睡觉一直快乐!​关注她  在用SpringBoot开发后端服务时,我们一般是提供接口给前端使用,但前端通过浏览器调我们接口时,浏览器会有个同源策略的限制,即协议,域名,端口任一不一样时都会......