首页 > 其他分享 >基于XXL-JOB实现分布式任务调度的实现

基于XXL-JOB实现分布式任务调度的实现

时间:2023-01-30 11:08:53浏览次数:63  
标签:job Value private JOB XXL 分片 xxlJobSpringExecutor 任务调度 xxl


背景

笔者以前在电商公司,我们需要在8月18号做大促活动,我们会提前一天给所有的用户推送活动信息,且需要根据用户画像生成不同的推送内容。

当时我们总共有80万用户左右。

经测试,通过Spring Task和分布式锁,单台机器同时开启5个线程,执行时间需要27个小时左右,即便开10个线程,需要14个小时左右,显然执行时间过长。

解决方案

当时个推服务部署节点有3台,在每年大促期间可动态扩容,其余的机器资源没有充分利用起来。

要想短时间内完成推送,那么就得想办法让每台机器各自分一部分用户数据去执行,这样效率可提高原来的N倍。

那么就需要分布式任务去执行,核心思想如下图:

基于XXL-JOB实现分布式任务调度的实现_spring

经过调研现有的开源的分布式任务调度框架,决定在elastic-job和xxl-job中选一个

​Elastic Job​​是当当网开源一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成;定位为轻量级无中心化解决方案,使用 jar 包的形式提供分布式任务的协调服务。支持分布式调度协调、弹性扩容缩容、失效转移、错过执行作业重触发、并行调度、自诊断和修复等等功能特性。

​XXL-Job官网​​是大众点评发布的分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。

更倾向于选择XXL-JOB:

  1. 轻量级,支持通过Web页面对任务进行动态CRUD操作,操作简单
  2. 只依赖数据库作为集群注册中心,接入开发简单,不需要ZK
  3. 高可用、解耦、高性能、监控报警、分片、重试、故障转移
  4. 团队持续开发,社区活跃
  5. 支持后台直接查看每个任务执行实时日志

具体实现

在项目中集成xxl-job客户端

<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.2.0</version>
</dependency>

在配置文件中配置xxl-job信息

xxl:
job:
accessToken:
admin:
addresses: http://xxl部署IP地址:8080/xxl-job-admin
executor:
appname: vm-service
address:
ip:
port: 9989
logretentiondays: 30

新增XxlJobConfig.java

package com.itheima.config;

import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* xxl-job config
*
* @author xuxueli 2017-04-28
*/
@Configuration
@Slf4j
public class XxlJobConfig {


@Value("${xxl.job.admin.addresses}")
private String adminAddresses;

@Value("${xxl.job.accessToken}")
private String accessToken;

@Value("${xxl.job.executor.appname}")
private String appname;

@Value("${xxl.job.executor.address}")
private String address;

@Value("${xxl.job.executor.ip}")
private String ip;

@Value("${xxl.job.executor.port}")
private int port;

// @Value("${xxl.job.executor.logpath}")
// private String logPath;

@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;


@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
log.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
//xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

return xxlJobSpringExecutor;
}

}

在xxl-job中新增执行器

注册方式选自动注册,这样方便动态扩容

基于XXL-JOB实现分布式任务调度的实现_推送_02

创建任务

路由策略选择分片广播

基于XXL-JOB实现分布式任务调度的实现_java_03

代码部分

在任务代码获取推送用户时,根据当前的分片及分片总数对用户ID取余,这样我们就可以在每个分片节点,获取不一样的数据。id值越连续,分片则越均匀。

ShardingUtil.ShardingVO shardingVo = ShardingUtil.getShardingVo();
int numbers = shardingVo.getTotal(); //分片总数
int index = shardingVo.getIndex(); //当前分片索引

假设分片总数为3,当前节点获取到的分片索引为0,那么查询推送用户SQL如下:

SELECT user_id FROM `user_info` WHERE MOD(user_id,3)=0

注意:我们在实际代码中,分片总数和当前分片索引是以参数的形式传给查询的SQL语句的。

如上,即可完成分布式任务。

总结

在某些定时任务需要处理大量数据的情况下,我们可以通过引入分布式任务框架xxl-job,充分利用机器资源,将需要处理的数据均匀的分配到不同的机器上去执行,提高任务执行效率。

标签:job,Value,private,JOB,XXL,分片,xxlJobSpringExecutor,任务调度,xxl
From: https://blog.51cto.com/u_8238263/6026115

相关文章

  • Quartz.Net 官方教程(Jobs 和 Trigger)
    根据官网说明类型概述IScheduler调度类核心接口IJob独立实现业务逻辑需要继承的任务接口IJobDetail给任务接口定义实例的任务说明类接口ITrigger触......
  • 技术汇总:第九章:任务调度SpringTask
    什么是任务调度在企业级应用中,经常会制定一些“计划任务”,即在某个时间点做某件事情,核心是以时间为关注点,即在一个特定的时间点,系统执行指定的一个操作。常见的任务调度框......
  • XXL-JOB调度算法备忘
    文章目录一.时间对齐二.scheduleThread调度线程三.ringThread时间轮(算法)线程原理源码实现本章介绍init()最后一个步骤,初始化调度线程。另外第六步的JobLogReportHelp......
  • springboot项目集成xxl-job
    一、xxl-job简介xxl-job是一个开源的分布式定时任务框架,它可以与其他微服务组件一起构成微服务集群。它的调度中心(xxl-job)和执行器(自己的springboot项目中有@XxlJob("......
  • Kubernetes CronJob
    CronJobCronJob用于执行常规的计划操作(如备份、报告生成等)。格式*****分时日月周创建一个Job[root@master01job]#kubectlcreate-fcronjob.yamlapiVer......
  • JobsUtility导致CPU过高问题
    UnityPlayer.dll新版加入了Jobs系统,以前是用户使用lib_burst也就是JobsUtility直接使用容易出现CPU过高问题Unity系统使用多个线程,线程数=CPU核心-1,代码不好容易......
  • Jenkins 编译Android apk 流水线 - 打工人日志 - jobcher
    Jenkins编译Androidapk,上传apk包,生成下载二维码,并推送钉钉安装Android环境#这里使用的是openjdk1.8.0版本,有需要的话需要到java官网上进行下载对应的JDK版本。$yumin......
  • strapi系列-如何创建一个定时任务-Cron Jobs
    Cron是什么?Cron有多种用途。CronJobs用于安排服务器上的任务运行。它们最常用于自动化系统管理或维护。然而,它们也与Web应用程序的构建相关。Web应用程序可能需......
  • 关于Jenkins中view的创建以及通过与正则的方式将job-Item添加到指定View中
    在Jenkins中,我们可以通过创建View来分类的管理Job/Item,View在页面上方就像菜单栏一样,里面包含了自己这一类的所有Job/Item默认会有All的View,点击它右边的+可以创建一个自......
  • mapreduce基础JOB操作
    packagecagy.mapreduce.wordcount;importjava.io.IOException;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.m......