首页 > 其他分享 >Springboot+ElasticJob-Lite实现集群任务调度

Springboot+ElasticJob-Lite实现集群任务调度

时间:2023-04-05 20:00:54浏览次数:45  
标签:ElasticJob org elasticjob 分片 Lite import 任务调度 Springboot

前言

ElasticJob-Lite是集群环境下应用(比如SpringCloud微服务)任务调度的解决方案。

集群部署的时候,一个定时任务会有多个进程执行,如果不进行任何处理,会导致任务触发的时候每个进程重复执行一次。

解决办法有两种:一种是加锁,保证同时只有一个进程执行任务,比如用分布式锁,或者用任务调度框架Quartz,但是这种方案有个缺陷,当任务负载比较高的时候,单个进程处理压力比较大;另一种方式是分片,将任务分片到参与的多个进程中,每次执行多个进程一起分摊,解决了单进程负载过高问题,还能提高扩展性,ElasticJob-Lite实现的是这种。

关于ElasticJob-Lite其他丰富功能参考 概念 & 功能

我们假设有一个需求,需要周期性的的对N个用户进行相业务处理,来看看如果使用ElasticJob-Lite,完整例子见 GitHub - fruitbasket-litchi-elasticjob-lite

代码和配置

参考 ShardingSphere - ElasticJob-Lite Spring Boot Starter 配置

引入POM依赖,关键是elasticjob-lite-spring-boot-starter

<properties>
    <java.version>1.8</java.version>
    <maven.compiler.source>${java.version}</maven.compiler.source>
    <maven.compiler.target>${java.version}</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <spring-boot-dependencies.version>2.3.12.RELEASE</spring-boot-dependencies.version>
    <elasticjob-lite-spring-boot-starter.version>3.0.0</elasticjob-lite-spring-boot-starter.version>
    <lombok.version>1.18.22</lombok.version>
</properties>

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>${spring-boot-dependencies.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<dependencies>
    <dependency>
        <groupId>org.apache.shardingsphere.elasticjob</groupId>
        <artifactId>elasticjob-lite-spring-boot-starter</artifactId>
        <version>${elasticjob-lite-spring-boot-starter.version}</version>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>${lombok.version}</version>
        <scope>provided</scope>
    </dependency>
</dependencies>

先创建个模拟的数据源,用户ID对分片总数取模结果跟分片值进行等值比较,返回跟分片值对应的部分用户ID。

import org.springframework.stereotype.Component;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
 * 假装这是MySQL或者Redis等数据源
 */
@Component
public class MockDataSource {

    private static final int[] USER_IDS = IntStream.rangeClosed(1, 50).toArray();

    /**
     * 对用户Id取模,过滤得到跟分片ID匹配的结果
     *
     * @param shardingTotalCount 分片总数
     * @param shardingItem       分片值
     * @return 用户Id列表
     */
    public List<Integer> getBy(int shardingTotalCount, int shardingItem) {
        return Arrays.stream(USER_IDS)
                .filter(userId -> userId % shardingTotalCount == shardingItem).boxed().collect(Collectors.toList());
    }
}

创建一个处理处理调度任务的类UserDataflowJob,实现DataflowJob接口中两个方法fetchData()processData()

fetchData()根据分片值(shardingItem)来获取任务相关用户ID,在processData()中执行真正的业务逻辑。

import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.dataflow.job.DataflowJob;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.List;

@Slf4j
@Component
public class UserDataflowJob implements DataflowJob<Integer> {

    @Autowired
    private MockDataSource dataSource;

    @Override
    public List<Integer> fetchData(final ShardingContext shardingContext) {
        return dataSource.getBy(shardingContext.getShardingTotalCount(), shardingContext.getShardingItem());
    }

    @Override
    public void processData(final ShardingContext shardingContext, final List<Integer> data) {
        log.info("分片总数={},当前分片值={},分片匹配的用户ID={}"
                , shardingContext.getShardingTotalCount()
                , shardingContext.getShardingItem()
                , data);
        // 执行业务逻辑
    }
}

编辑配置文件(application.yml)。指定N个注册中心Zookeeper的客户端访问地址(serverLists);指定命名空间(elasticjob-lite),程序启动后会在Zookeeper根节点(/)下创建命名空间指定的节点,所有elasticjob相关的数据都在这个命名空间节点下。

jobs分别指定N个调度任务。比如任务名称(dataflowJob)、任务处理类的全限定名(elasticJobClass)、执行周期表达式(cron)、分片总数(shardingTotalCount)、是否覆盖注册中心的任务配置(overwrite)等等。详细配置信息参考 ShardingSphere - ElasticJob-Lite 配置手册

这里指定分片总数为5,那分片值就在0~4之间。

elasticjob:
  regCenter:
    serverLists: zookeeperhost1:2181,zookeeperhost2:2181,zookeeperhost3:2181,zookeeperhost4:2181
    namespace: elasticjob-lite
  jobs:
    userDataflowJob:
      elasticJobClass: cn.fruitbasket.litchi.elasticjob.lite.UserDataflowJob
      cron: 0/5 * * * * ?
      shardingTotalCount: 5
      overwrite: true

测试

当只有一个参与任务调度的进程时,我我们可以看到所有分片都分给了这个进程。分片总数为5,每次触发分配对应数量的线程并行执行,每个线程负责一片。

image-20211202175244406

当我们启动两个参与任务调度进程,可以通过日志看到在下次触发时,分片到了两个进程中。

image-20211202175702928

image-20211202175726336

参考

ShardingSphere - ElasticJob-Lite

ShardingSphere - ElasticJob-Lite Spring Boot Starter 配置

ShardingSphere - ElasticJob-Lite 配置手册

标签:ElasticJob,org,elasticjob,分片,Lite,import,任务调度,Springboot
From: https://www.cnblogs.com/shuiyao3/p/17290727.html

相关文章

  • Springboot+Mysql 图书管理系统【源码+sql】
    java项目学生图书管理系统(源码+数据库文件)技术框架:java+springboot+mysql后端框架:SpringBoot、SpringMVC、MyBatisPlus前端界面:Thymeleaf、BootStrap、jQuery系统共分为三种用户系统主要功能:系统设计三个角色,学生端,管理员端,系统管理员端1.普通用户书籍查询、书籍借阅......
  • 基于SpringBoot+Vue+ElementUI的在线考试系统(可做毕设)
    项目简介青云是一套麻雀虽小但五脏俱全的在线考试系统。采用了目前主流的技术栈SpringBoot+Vue+ElementUI,并进行了前后端分离。对于事务和锁都有应用,非常适合学习练手。项目演示项目演示地址:http://xuezhabiji.com:5000账号:admin密码:admin代码获取:github:https://github.com......
  • 在Linux部署ElasticJob-Lite-UI运维控制台
    前言ElasticJob-Lite-UI是任务的分布式调度解决方案(ElasticJob-Lite)搭配的可视化运维控制台。运维控制台和ElasticJob-Lite并无直接关系,是通过读取作业注册中心(Zookeeper)数据展现作业状态,或更新注册中心数据修改全局配置。它具有这些功能:登录安全控制;注册中心、事件追踪数据源管......
  • Springboot 系列 (29) - Springboot+HBase 大数据存储(七)| Springboot 项目通过 Phoeni
    Phoenix是HBase的开源SQL皮肤,通过Phoenix可以使用标准JDBCAPI代替HBase客户端API来创建表,插入数据和查询HBase数据。Phoenix会把SQL编译成一系列的Hbase的scan操作,然后把scan结果生成标准的JDBC结果集,其底层由于使用了Hbase的API,协处理器,过滤器。Pho......
  • springboot +vue2.x实现音乐网站
    1pom文件<?xmlversion="1.0"encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache......
  • 性能环境之Jenkins+Maven自动化部署SpringBoot压测环境(Docker篇)
    前言在上文性能环境之Jenkins+Maven自动化部署SpringBoot压测环境(实战篇)中我们介绍了常规部署流程,本文将在上文的基础上扩展Jenkins+Maven+Docker自动化部署我们的压测环境。关于DockerDocker在这里有什么用?Docker,是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到......
  • SpringBoot 配置类解析
    作者:LiWanghongSpringBoot作为Java领域非常流行的开源框架,集成了大量常用的第三方库配置,SpringBoot应用中这些第三方库几乎可以是零配置的开箱即用,大部分的SpringBoot应用都只需要非常少量的配置代码,开发者能够更加专注于业务逻辑。SpringBoot上手快,但是如果你的项目中业务场......
  • Spring——springboot启动源码分析
    摘要主要介绍的有关于Spring的Spring的事务注解原理和实战(SpringFramework)一、什么是事务的传播?简单的理解就是多个事务方法相互调用时,事务如何在这些方法间传播。:举个栗子,方法A是一个事务的方法,方法A执行过程中调用了方法B,那么方法B有无事务以及方法B对事务的要求不同都会对......
  • SpringBoot——注解@SpringBootConfiguration源码分析
    摘要SpringMVC是一个MVC开源框架,用来代替Struts。它是Spring项目里面的一个重要组成部分,能与SpringIOC容器紧密结合,以及拥有松耦合、方便配置、代码分离等特点,让JAVA程序员开发WEB项目变得更加容易。SpringMVC的异常处理?1.web.xml中异常处理通常为了给用户提供良好......
  • 动力节点王鹤SpringBoot3笔记——jdk新特性
    一、JDK关注的新特性1.1搭建学习环境JDK:JDK19OpenJDK:https://jdk.java.net/19/LibericaJDK:​​https://bell-sw.com/pages/downloads/​​,是一个OpenJDK发行版,为云原生,容器特别优化。Maven:构建和依赖管理,版本选择3.6以上配置本地仓库和阿里云镜像IDEA2022.3.1Ulti......