首页 > 编程语言 >xxl-job定时调度任务Java代码分析

xxl-job定时调度任务Java代码分析

时间:2022-12-20 21:12:20浏览次数:64  
标签:Java executorBiz job Table new 服务端 xxl

简介

用xxl-job做后台任务管理, 主要是快速解决定时任务的HA问题, 项目代码量不大, 功能精简, 没有特殊依赖. 因为产品中用到了这个项目, 上午花了点时间研究了一下运行机制. 把看到的记一下.

环境

<!-- http://repo1.maven.org/maven2/com/xuxueli/xxl-job-core/ -->
<dependency>
    <groupId>com.xuxueli</groupId>
    <artifactId>xxl-job-core</artifactId>
    <version>${最新稳定版本}</version>
</dependency>

运行需要 JDK1.8, MySQL5.7

数据库结构

  • 库编码 utf8mb4_unicode_ci
  • Table: xxl_job_group
    任务分组, 组名, 只支持一级分组, address_list 字段支持多个执行端地址, 逗号分隔
  • Table: xxl_job_info
    任务主表, 记录了任务明细, 调度明细以及预警设置
  • Table: xxl_job_log
    任务每次执行的日志
  • Table: xxl_job_log_report
    按日对执行日志进行统计的结果
  • Table: xxl_job_logglue
  • Table: xxl_job_registry
    用于登记任务的执行者, 记录group:分组, key:名称, value:接口地址. 名称是可以重复的, 接口地址会添加到任务分组表中的注册字段
  • Table: xxl_job_user
    简单的登录控制, 与其它表没有关联
  • Table: xxl_job_lock
    单字段表, 用于并发时加锁避免冲突

代码结构

  • 项目用到的都是常见组件, MyBatis, FreeMarker, Bootstrap, 当前版本基于SpringBoot 2.6.7
  • 线上运行的是 xxl-job-admin 模块, 提供执行端注册, 任务发起和日志记录等服务
  • 项目中需要实现 xxl-job-executor, 项目中提供了例子

项目文件结构如下

├───doc
│   ├───db                                               # 初始化的sql
│   └───images
├───xxl-job-admin                                        # 运行的服务端模块, 提供界面和调度
│   └───src
│       ├───main
│       │   ├───java
│       │   │   └───com
│       │   │       └───xxl
│       │   │           └───job
│       │   │               └───admin
│       │   │                   ├───controller
│       │   │                   │   ├───annotation
│       │   │                   │   ├───interceptor
│       │   │                   │   └───resolver
│       │   │                   ├───core
│       │   │                   ├───dao
│       │   │                   └───service
│       │   │                       └───impl
│       │   └───resources
│       │       ├───i18n                                 # 多国化, 简繁英
│       │       ├───mybatis-mapper                       # xml形式的mapper
│       │       ├───static                               # 前端静态文件
│       │       └───templates                            # Freemarker模板
│       └───test
│           └───java
│
├───xxl-job-core                                         # 公用jar包, 模块内部依赖
│   └───src
│       └───main
│           └───java
│
└───xxl-job-executor-samples
    ├───xxl-job-executor-sample-frameless                # 任务执行层示例
    │   └───src
    │       ├───main
    │       │   ├───java
    │       │   └───resources
    │       └───test
    │           └───java
    └───xxl-job-executor-sample-springboot               # 使用SpringBoot的执行层示例
        └───src
            ├───main
            │   ├───java
            │   └───resources
            └───test

运行机制

执行端需要准备以下信息

  • adminAddresses 服务端地址, 例如 http://127.0.0.1:8080/xxl-job-admin

  • accessToken 貌似是服务端的token, 在调用服务端 api/registry, api/registryRemove 等操作时需要验证

  • appname 执行端名称

  • address 执行端地址, 和 ip:port 二选一, 存在则覆盖 ip:port

  • ip 执行端IP

  • port 执行端服务端口

  • 执行端启动后将自己注册到服务端, 等待回调

  • 任务执行通过 XxlJobTrigger.processTrigger() 发起, 准备参数, 并在分组中选择一个地址

  • 根据这个地址取得 ExecutorBiz, 调用 executorBiz.run() 执行任务

  • 服务端: 通过 ExecutorBizClient,

    • 调用XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
    • 其中 accessToken 是服务端的accessToken
  • 执行端: 通过 ExecutorBizImpl.run()

    • 调用 XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());得到XxlJob方法
    • 通过 XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason) 执行

非 Spring 的场景

通过调用 FrameLessXxlJobConfig.getInstance().initXxlJobExecutor() 这个方法将 XxlJobSimpleExecutor 实例化, 并注册到xxl_job服务端

Spring 场景

  • @Configuration 中, 将 XxlJobSpringExecutor 作为一个 @Bean 添加到 Spring context
  • XxlJobSpringExecutor 是 XxlJobExecutor 的子类并实现了 SmartInitializingSingleton 接口的 afterSingletonsInstantiated()方法
  • afterSingletonsInstantiated()方法中
    • 调用 initJobHandlerMethodRepository(), 在这个方法中, 找到所有@XxlJob注解的方法
    • 通过 registJobHandler(), 将@XxlJob方法添加到private static ConcurrentMap<String, IJobHandler> jobHandlerRepository
    • 调用 XxlJobExecutor.start(), 将自己注册到 xxl_job 服务端

远程调用服务

xxl_job 并未使用Spring的服务机制, 而是内部实现了一个侦听指定IP+端口的服务. 这个实现对应的类是 EmbedServer, 服务基于 Netty, 核心代码是

// start server
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel channel) throws Exception {
                channel.pipeline()
                        .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle
                        .addLast(new HttpServerCodec())
                        .addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL
                        .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
            }
        })
        .childOption(ChannelOption.SO_KEEPALIVE, true);

这行代码注册了内部的XxlJob方法

.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool)

处理远程请求时, 在下面的代码中, 通过executorBiz.run(triggerParam)调用XxlJob方法

private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
    //...
    // services mapping
    try {
        switch (uri) {
            case "/beat":
                return executorBiz.beat();
            case "/idleBeat":
                IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
                return executorBiz.idleBeat(idleBeatParam);
            case "/run":
                TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
                return executorBiz.run(triggerParam);
            case "/kill":
                KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
                return executorBiz.kill(killParam);
            case "/log":
                LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
                return executorBiz.log(logParam);
            default:
                return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");
        }
    } catch (Exception e) {
    //...
}

锁机制

通过select ... for update实现的, 这个表并没有放到 MyBatis, 在 JobScheduleHelper 中, 通过

preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();

得到锁, 在方法末尾释放

// close PreparedStatement
if (null != preparedStatement) {
    try {
        preparedStatement.close();
    } catch (SQLException e) {
        if (!scheduleThreadToStop) {
            logger.error(e.getMessage(), e);
        }
    }
}

标签:Java,executorBiz,job,Table,new,服务端,xxl
From: https://www.cnblogs.com/milton/p/16994113.html

相关文章

  • java中的接口
    本文主要讲述java的接口,以其相关细节。老韩接口的介绍:老韩接口的细节:老韩继承类vs接口实现老韩接口的多态性 1)接口的多态性示意图,如下图所......
  • java命令--jmap命令使用
    jdk安装后会自带一些小工具,jmap命令(JavaMemoryMap)是其中之一。主要用于打印指定Java进程(或核心文件、远程调试服务器)的共享对象内存映射或堆内存细节。jmap命令可以获......
  • java8-时间相关代码整理
    获取昨天起止时刻,就是0点和23点59分59秒LocalDateTime.now().minus(Period.ofDays(1)).with(LocalTime.MIN)LocalDateTime.now().minus(Period.ofDays(1)).with(LocalTi......
  • Java学习笔记1
    1.注释​ 注释是对代码的解释和说明文字。Java中的注释分为三种:单行注释://这是单行注释文字多行注释:/*这是多行注释文字这是多行注释文字这是多行注释文字......
  • JAVA循环结构
    什么是循环:重复的去执行某一件事情while(条件){//循环操作}例如:打印50份卷子1、确定循环内容以及循环条件循环内容:打印卷子循环......
  • 两道面试题,带你解析Java类加载机制
    通过两道面试题,带你深入学习Java类加载机制。简单易懂,深入浅出!博主个人独立站点开通啦!欢迎点击访问:​​https://shuyi.tech​​在许多Java面试中,我......
  • 构建一个应用程序,用于在基于内存的数据库中存储 POJO(普通旧 Java 对象)
    本指南将引导您完成构建应用程序的过程,该应用程序使用SpringDataJPA在关系数据库中存储和检索数据。您将构建什么您将构建一个应用程序,用于在基于内存的数据库中存储PO......
  • Java实现基本MySQL连接 - 数据的基本操作
    importjava.sql.*;publicclassMain{//MySQL8.0以下版本-JDBC驱动名及数据库URL//staticfinalStringJDBC_DRIVER="com.mysql.jdbc.Driver";......
  • 配置jndi服务,javax.naming.NamingException的四种情况
    1.当jndi服务没有启动,或者jndi服务的属性没有设置正确,抛出如下异常:javax.naming.CommunicationException:Can'tfindSerialContextProvider...2.如果InitialContextcla......
  • 用Java EE技术实现产品售后服务系统(论文+PPT+源码)
    目录摘要1Abstract11引言52.1系统需求分析72.2可行性分析82.3本系统采用的关键技术93系统概要设计124系统详细设计144.1后台数据库设计144.2系统E-R图184.3.2数......