Elastic-Job是一个分布式调度的解决方案,由当当网开源,它由两个相互独立的子项目Elastic-Job-Lite和ElasticJob-Cloud组成,使用Elastic-Job可以快速实现分布式任务调度。
Elastic-Job的github地址:https://github.com/elasticjob
功能列表: 分布式调度协调 在分布式环境中,任务能够按指定的调度策略执行,并且能够避免同一任务多实例重复执行。 丰富的调度策略: 基于成熟的定时任务作业框架Quartz cron表达式执行定时任务。 弹性扩容缩容 , 当集群中增加某一个实例,它应当也能够被选举并执行任务;当集群减少一个实例时,它所执行的任务能被 转移到别的实例来执行。 失效转移 某实例在任务执行失败后,会被转移到其他实例执行。 错过执行作业重触发 若因某种原因导致作业错过执行,自动记录错过执行的作业,并在上次作业完成后自动触发。 支持并行调度 支持任务分片,任务分片是指将一个任务分为多个小任务项在多个实例同时执行。 作业分片一致性 当任务被分片后,保证同一分片在分布式环境中仅一个执行实例。 支持作业生命周期操作 可以动态对任务进行开启及停止操作。 丰富的作业类型 支持Simple、DataFlow、Script三种作业类型,后续会有详细介绍。 Spring整合以及命名空间支持 对Spring支持良好的整合方式,支持spring自定义命名空间,支持占位符。 运维平台 提供运维界面,可以管理作业和注册中心。
本次是用elasticjob本地简单测试多服务器启动定时任务,这里需要安装Zookeeper,这里就不多聊了
1、编写Zookeeper配置类
package elastic.job.demo.autoconfig; import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @ClassName : ZookeeperAutoConfig * @Description :自动配置类 */ @Configuration @ConditionalOnProperty("elasticjob.zookeeper.server-list") @EnableConfigurationProperties(ZookeeperProperties.class) public class ZookeeperAutoConfig { private final ZookeeperProperties zookeeperProperties; public ZookeeperAutoConfig(ZookeeperProperties zookeeperProperties) { this.zookeeperProperties = zookeeperProperties; } @Bean(initMethod = "init") public CoordinatorRegistryCenter zkCenter(){ String serverList = zookeeperProperties.getServerlist(); String namespace = zookeeperProperties.getNamespace(); ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(serverList, namespace); ZookeeperRegistryCenter zookeeperRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration); return zookeeperRegistryCenter; } }
package elastic.job.demo.autoconfig; import lombok.Getter; import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; /** * @ClassName : ZookeeperProperties * @Description :属性配置类 */ @Getter @Setter @ConfigurationProperties(prefix = "elasticjob.zookeeper") public class ZookeeperProperties { //zookeeper地址列表 private String serverlist; //zookeeper命名空间 private String namespace; }
2、写完配置类后,我们这里写一个注解,用注解的方式进行实现elasticjob的任务,方便后续的开发和简洁性
package elastic.job.demo.autoconfig; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; @Target(ElementType.TYPE)//表示使用在哪:这里是类, @Retention(RetentionPolicy.RUNTIME)//表示运行时进行启动 public @interface ElasticSimpleJob { String jobName() default ""; String cron() default ""; int shardingTotalCount() default 1; boolean overwrite() default false; }
3、用springboot的自动装载功能,进行注解的扫描和自动注册
package elastic.job.demo.autoconfig; import com.dangdang.ddframe.job.api.ElasticJob; import com.dangdang.ddframe.job.api.simple.SimpleJob; import com.dangdang.ddframe.job.config.JobCoreConfiguration; import com.dangdang.ddframe.job.config.JobRootConfiguration; import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration; import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration; import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler; import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter; import elastic.job.demo.job.MySimpleJob; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.Map; /** * @ClassName : SimpleJobAutoConfig * @Description : */ @Configuration //@Service @ConditionalOnBean(CoordinatorRegistryCenter.class) @AutoConfigureAfter(ZookeeperAutoConfig.class) public class SimpleJobAutoConfig { @Autowired private CoordinatorRegistryCenter coordinatorRegistryCenter; @Autowired private ApplicationContext applicationContext; //自动注册 @PostConstruct public void initSimpleJob(){ //获取spring的上下文 Map<String, Object> beansWithAnnotation = applicationContext.getBeansWithAnnotation(ElasticSimpleJob.class); for(Map.Entry<String,Object> entry: beansWithAnnotation.entrySet()){ Object instance = entry.getValue(); Class<?>[] interfaces = instance.getClass().getInterfaces(); for (Class<?> superInterface : interfaces) { if(superInterface == SimpleJob.class){ ElasticSimpleJob annotation = instance.getClass().getAnnotation(ElasticSimpleJob.class); String jobName = annotation.jobName(); String cron = annotation.cron(); int shardingTotalCount = annotation.shardingTotalCount(); boolean overwrite =annotation.overwrite(); //注册定时任务 //job 核心配置 JobCoreConfiguration buildJcc = JobCoreConfiguration .newBuilder(jobName, cron, shardingTotalCount) .build(); //job类型配置 SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration( buildJcc, instance.getClass().getCanonicalName() ); // job配置(LiteJobConfiguration) LiteJobConfiguration buildLiteJobConfiguration = LiteJobConfiguration .newBuilder(simpleJobConfiguration) .overwrite(overwrite) .build(); //启动 new SpringJobScheduler((ElasticJob) instance,coordinatorRegistryCenter,buildLiteJobConfiguration).init(); } } } } }
在springboot添加spring.factories用于扫描并自动注册
4、我们这里编写2个任务 MySimpleJob 和 MySimpleJob2,
MySimpleJob,5秒一次,总分片数为4,MySimpleJob2,5秒一次,总分片数为1
package elastic.job.demo.job; import com.dangdang.ddframe.job.api.ShardingContext; import com.dangdang.ddframe.job.api.simple.SimpleJob; import elastic.job.demo.autoconfig.ElasticSimpleJob; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; /** * @ClassName : MySimpleJob * @Description : 我的定时任务 */ @ElasticSimpleJob(jobName = "mySimpleJob", cron = "0/5 * * * * ?", shardingTotalCount = 4, overwrite = true) @Component public class MySimpleJob implements SimpleJob { @Value("${server.port}") private int port; @Override public void execute(ShardingContext shardingContext) { System.out.println("端口:" + port + ",我是任务1分片项:"+shardingContext.getShardingItem()+",总分片数是:"+ shardingContext.getShardingTotalCount()); } }
package elastic.job.demo.job; import com.dangdang.ddframe.job.api.ShardingContext; import com.dangdang.ddframe.job.api.simple.SimpleJob; import elastic.job.demo.autoconfig.ElasticSimpleJob; import org.springframework.stereotype.Component; /** * @author zhoucc * @date 2023-06-26 16:38 */ @ElasticSimpleJob(jobName = "mySimpleJob2", cron = "0/5 * * * * ?", shardingTotalCount = 1, overwrite = true) @Component public class MySimpleJob2 implements SimpleJob { @Override public void execute(ShardingContext shardingContext) { System.out.println("我是任务2分片项:"+shardingContext.getShardingItem()+",总分片数是:"+ shardingContext.getShardingTotalCount()); } }
5、启动效果展示,我这里分别用8081、8082、8083,这3个端口的服务进行演示
(1)启动zookeeper,这里不多讲了,我这里的端口是2181
(2)启动第一个服务8081端口
可以看到我这里4个分片的任务都在8081上进行执行了。
(3)启动第二个服务8082
可以看到8082只运行了0和1.8081运行2和3.
(4)启动第3个服务8083
可以看到8081只运行了2,8082运行0和3,8083运行4
(5)当我们停掉8083后
又回到了 8081只运行了2和3,8083运行0和1
(6)最后我们看一下zookeeper的页面
0,1,2,3这四个分片,2个服务,这里可以观察具体那个分片在哪个服务器上面进行执行。
最后总结一下,个人的见解:elasticjob首先定义分片策略,将服务器注册到zookpeeper上,进行分片分发,若某个服务终止,那将进行重新的分发。
附上源码包:
链接:https://pan.baidu.com/s/1ChB9u18jgu_2IvWXFASjKA
提取码:1234
如果对你有帮助,麻烦给个小赞
标签:springboot,ddframe,job,源码,elasticjob,org,import,com,annotation From: https://www.cnblogs.com/zhouchengchen/p/17507900.html