首页 > 其他分享 >【Netty】【XXL-JOB】时间轮的原理以及应用分析

【Netty】【XXL-JOB】时间轮的原理以及应用分析

时间:2024-05-03 13:11:26浏览次数:25  
标签:Netty 触发 任务 JOB trigger 时间 timeout 执行 XXL

1  前言

今天晚上看了一本 70 多页的讲解时间轮的 PDF,从是什么为什么以及原理到源码中的应用分析,讲的真好。这节我就按我理解的思路捋一下,记录一下哈。

2  时间轮概述

2.1  时间轮是什么

时间轮是一种高效利用线程资源进行批量化调度的一种调度模型。把大批量的调度任务全部绑定到同一个调度器上,使用这一个调度器来进行所有任务的管理、触发、以及运行。时间轮其实就是一种环形的数据结构,其设计参考了时钟转动的思维,可以想象成时钟,分成很多格子,一个格子代表一段时间。我们这里的时间轮就是由多个时间格组成,比如下图中有8个时间格,每个时间格代表当前时间轮的基本时间跨度 (tickDuration),其中时间轮的时间格的个数是固定的。

图中,有8个时间格(槽),假设每个时间格的单位为100ms,那么整个时间轮走完一圈需要800ms。每100ms指针会沿着顺时针方向移动一个时间单位,这个单位可以代表时间精度,这个单位可以设置,比如以秒为单位,也可以以一小时为单位。

而对于每个时间格里存放的是什么呢?放的就是当前时间格要触发的任务列表,通过指针移动,来获得每个时间格中的任务列表,然后遍历任务列表来执行每个任务,以此循环。

那我们大概能看到时间轮中涉及的几个变量:

(1)格子数,也就是一圈有多少个时间格

(2)格子的耗时,也就是每个时间格代表多少时长,比如1小时1分钟1秒等

(3)轮数,也就是某个任务是第几轮才触发的,比如一轮有60个格子,每个格子表示1分钟,那么1轮就是1小时,放置一个1小时10分钟后触发的任务,那么它的轮数就是1

对于轮数,不一定要有哈,比如一些任务可能很久才要执行,那么轮数会变的非常大的一个数字,也会在任务列表中插入很多当前不需要执行的任务,如果每次都执行上面的逻辑,显然会浪费大量的资源,可以利用时间轮的多层来化解。

涉及到的数据结构:比如一轮中的每个时间格用什么来存放,每个时间格中的任务用什么数据结构来存放呢,我们后续会在源码分析中提到哈。

2.2  时间轮的特点

时间轮是一个高性能,低消耗的数据结构,它适合用非准实时,延迟的短平快任务,例如心跳检测。

比如Netty动辄管理100w+的连接,每一个连接都会有很多超时任务。 比如发送超时、心跳检测间隔等,如果每一个定时任务都启动一个Timer,不仅低效,而且会消耗大量 的资源。

在Netty中的一个典型应用场景是判断某个连接是否idle,如果idle(如客户端由于网络原因导致到服 务器的心跳无法送达),则服务器会主动断开连接,释放资源。 得益于Netty NIO的优异性能,基于Netty开发的服务器可以维持大量的长连接,单台8核16G的云主机 可以同时维持几十万长连接,及时掐掉不活跃的连接就显得尤其重要。

2.3  时间轮的场景

然后我们再看下,为什么要有时间轮或者它的场景是什么呢?

时间轮的模型能够高效管理各种任务: 延时任务、 周期任务、 通知任务。

比如一个大型内容审核平时,在运营设定审核了内容的通过的时间,到了这个时间之后,相关内容自动审核 通过。本是个小的需求,但是考虑到如果需要定时审核的东西很多,这样大量的定时任务带来的一系列问题,海量定时任务管理的场景非常多,在实际项目中,存在大量需要定时或是延时触发的任务,比如电商中,延时需要检查订单是否支付成功,是否配送成功,定时给用户推送提醒等等。

(1) 单定时器方案

描述: 把所有需要定时审核的资源放到redis中,例如sorted set中,需要审核通过的时间作为score值。 后台启动一个定时器,定时轮询sortedSet,当score值小于当前时间,则运行任务审核通过。

问题 这个方案在小批量数据的情况下没有问题, 但是在大批量任务的情况下就会出现问题了,因为每次都要轮询全量的数据,逐个判断是否需要执行, 一旦轮询任务执行比较长,就会出现任务无法按照定时的时间执行的问题。

(2) 多定时器方案

描述:每个需要定时完成的任务都启动一个定时任务,然后等待完成之后销毁

问题:这个方案带来的问题很明显,定时任务比较多的情况下,会启动很多的线程,这样服务器会承受不了之 后崩溃。 基本上不会采取这个方案。

(3)redis的过期通知功能

描述:和方案一类似,针对每一个需要定时审核的任务,设定过期时间,过期时间也就是审核通过的时间,订阅redis的过期事件,当这个事件发生时,执行相应的审核通过任务。

问题:这个方案来说是借用了redis这种中间件来实现我们的功能,这中实际上属于redis的发布订阅功能中的 一部分,针对redis发布订阅功能是不推荐我们在生产环境中做业务操作的,通常redis内部(例如redis集群节点上下线,选举等等来使用),我们业务系统使用它的这个事件会产 生如下两个问题一个是redis发布订阅的不稳定问题,另一个是redid发布订阅的可靠性问题,具体可以参考redis的发布订阅缺陷

(4)Hash分层记时轮(分层时间轮)算法

这个东西就是专为大批量定时任务管理而生。比如要支持触发时间是一年的精度为秒级别的时间轮,如果单纯的用一个秒级的时间轮:365*24*60*60 这都三千多万个时间格了,造成大量资源开销。而分层的话,那么可分为四个层次:天级别的时间轮,小时级时间轮,分钟级时间轮,秒级时间轮,他们的时间格数分别为:365,24,60,60;总时间格数只有365+24+60+60 = 509个!

(5)MQ的延时消息

当然 MQ的延时消息也可以实现,但是你要知道比如你发送一个延时消息到MQ,但是当你想取消的时候,就没办法删除队列里的消息了,只能通过增加某个取消标志,当延时消息执行的时候,判断一下取消标志,再决定是否进行后续的操作。

时间轮的本质是一种类似延迟任务队列的实现, 那么它的特点如上所述,适用于对时效性不高的,可快速执行的,大量这样的“小”任务,能够做到高性 能,低消耗。

应用场景大致有:心跳检测(客户端探活)、会话或者请求是否超时、消息延迟推送、业务场景超时取消(订单、退款单等)

时间轮的思想应用范围非常广泛,各种操作系统的定时任务调度,Crontab,还有基于java的通信框架 Netty中也有时间轮的实现, 几乎所有的时间任务调度系统采用的都是时间轮的思想。 至于采用round型的基础时间轮还是采用分层时间轮,看实际需要吧,时间复杂度和实现复杂度的取舍。

3  源码应用

接下来我们就从源码的角度看看如何使用。

3.1  Netty 中的时间轮

Netty 的时间轮主要是在类 HashedWheelTimer 中,我们这里就从它的属性和几个关键方法看起。

3.1.1  HashedWheelTimer 属性

// 真正执行工作的线程
private final Worker worker = new Worker();
private final Thread workerThread;
// 工作线程的状态
public static final int WORKER_STATE_INIT = 0;
public static final int WORKER_STATE_STARTED = 1;
public static final int WORKER_STATE_SHUTDOWN = 2;
// 每个时间格表示的时长
private final long tickDuration;
// 有多少个格子
private final HashedWheelBucket[] wheel;
// 与运算用于计算某个任务应该存放在哪个格子
private final int mask;
// 最多允许多少个等待任务
private final long maxPendingTimeouts;
// 时间轮的启动时间单位是纳秒
private volatile long startTime;
// 启动控制 防止多次启动
private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
// 存放提交的任务比如往时间轮中提交一个任务会先放置在该队列中
private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
// 已经取消的任务
private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();
// 等待执行的任务数的计数器
private final AtomicLong pendingTimeouts = new AtomicLong(0);

我们从一个图大概先了解一下执行过程,先有个全局的认识,然后我们再细看每个方法:

(1)我们实例化好时间轮后,会通过 newTimeout 方法,添加任务到时间轮,这个时候他还不会进入到时间轮,会先进入到 timeouts队列中

(2)当工作线程执行的时候,会先从 timeouts 队列中捞任务,然后计算应该存放在哪个时间槽中

(3)根据计算的槽位,然后将任务放进该槽的链表中

(4)然后取出当前时刻的时间槽中的任务,依次执行。

3.1.2  HashedWheelTimer 实例化

它的实例化方法有多个:

// 空参的实例化
public HashedWheelTimer() {
    this(Executors.defaultThreadFactory());
}
// 带线程工厂的 默认每个时间槽是100毫秒
public HashedWheelTimer(ThreadFactory threadFactory) {
    this(threadFactory, 100, TimeUnit.MILLISECONDS);
}
// 默认一轮有512个时间槽
public HashedWheelTimer(
        ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
    this(threadFactory, tickDuration, unit, 512);
}
// 默认开启内存泄漏检查
public HashedWheelTimer(
        ThreadFactory threadFactory,
        long tickDuration, TimeUnit unit, int ticksPerWheel) {
    this(threadFactory, tickDuration, unit, ticksPerWheel, true);
}
// 默认不限制等待任务数
public HashedWheelTimer(
    ThreadFactory threadFactory,
    long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection) {
    this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection, -1);
}
// 最后的落点 都会走到这个实例化
public HashedWheelTimer(
        ThreadFactory threadFactory,
        long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
        long maxPendingTimeouts) {
    if (threadFactory == null) {
        throw new NullPointerException("threadFactory");
    }
    if (unit == null) {
        throw new NullPointerException("unit");
    }
    if (tickDuration <= 0) {
        throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
    }
    if (ticksPerWheel <= 0) {
        throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
    }
    // 先把时间格数组创建出来,所以你时间格越多资源申请的也越多。Normalize ticksPerWheel to power of two and initialize the wheel.
    wheel = createWheel(ticksPerWheel);
    // 这里就是 与运算 方便计算任务所在的时间格子
    mask = wheel.length - 1;
    // 时间都转为纳秒 Convert tickDuration to nanos.
    this.tickDuration = unit.toNanos(tickDuration);
    // 检验参数的合法性 Prevent overflow.
    if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
        throw new IllegalArgumentException(String.format(
                "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
                tickDuration, Long.MAX_VALUE / wheel.length));
    }
    // 初始化工作线程
    workerThread = threadFactory.newThread(worker);
    // 内存泄漏检查的线程
    leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
    // 最大等待的任务数 默认-1不限制
    this.maxPendingTimeouts = maxPendingTimeouts;
    // 判断时间轮的实例化个数 64个 也就是不能创建过多的时间轮 
    if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
        WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
        reportTooManyInstances();
    }
}

3.1.3  HashedWheelTimer 启动

它的启动有两个入口:

(1)直接调用 HashedWheelTimer 的 start 方法

(2)newTimeout 也就是添加任务的时候,会调用 start 方法启动时间轮

那我们这里直接看它的 start 方法:

public void start() {
    // 实例化后的默认的状态是0 表示初始化
    switch (WORKER_STATE_UPDATER.get(this)) {
        // 如果是初始化,则通过 CAS 启动工作现场
        case WORKER_STATE_INIT:
            if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
                workerThread.start();
            }
            break;
        // 如果已经启动 直接跳出
        case WORKER_STATE_STARTED:
            break;
        // 如果已经停止了,则抛个异常
        case WORKER_STATE_SHUTDOWN:
            throw new IllegalStateException("cannot be started once stopped");
        default:
            throw new Error("Invalid WorkerState");
    }
    // 当工作现场启动的时候,会设置 startTime 这里是保证工作线程绝对启动吧 Wait until the startTime is initialized by the worker.
    while (startTime == 0) {
        try {
            startTimeInitialized.await();
        } catch (InterruptedException ignore) {
            // Ignore - it will be ready very soon.
        }
    }
}

3.1.4  newTimeout 添加任务

public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    if (unit == null) {
        throw new NullPointerException("unit");
    }
    // 统计任务个数
    long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
    // 判断最大任务数量是否超过限制
    if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
        pendingTimeouts.decrementAndGet();
        throw new RejectedExecutionException("Number of pending timeouts ("
            + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
            + "timeouts (" + maxPendingTimeouts + ")");
    }
    // 如果时间轮没有启动,则通过start方法进行启动
    start();
    // Add the timeout to the timeout queue which will be processed on the next tick.
    // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
    // 计算任务的延迟时间,通过当前的时间+当前任务执行的延迟时间-时间轮启动的时间 也就是在多少纳秒值的时候要启动
    long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
    // 如果为负数,那么说明超过了long的最大值 Guard against overflow.
    if (delay > 0 && deadline < 0) {
        deadline = Long.MAX_VALUE;
    }
    // 创建一个Timeout任务,理 论上来说,这个任务应该要加入到时间轮的时间格子中,但是这里并不是先添加到时间格,而是先   
    // 加入到一个阻塞队列,然后等到时间轮执行到下一个格子时,再从队列中取出最多100000个任务添加到指定的 时间格(槽)中。
    HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
    // 加到队列中
    timeouts.add(timeout);
    return timeout;
}

3.1.5  Worker 执行任务

Worker 类是 HashedWheelTimer 的内部类,我们看看它的执行过程:

private final class Worker implements Runnable {
    // 工作线程停止了,还没有执行的任务
    private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
    // 当前到几个时间格了
    private long tick;
    @Override
    public void run() {
        // 当前的纳秒值 Initialize the startTime.
        startTime = System.nanoTime();
        // 这个还真不知道是干啥的 什么时候能等于 0 呢?
        if (startTime == 0) {
            // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
            startTime = 1;
        }
        // 工作线程启动了,其他线程可以不用等着了 唤醒被阻塞的start()方法 Notify the other threads waiting for the initialization at start().
        startTimeInitialized.countDown();
        do {
            // 返回每tick一次的时间间隔 也就是当前要执行的时间格的纳秒值 它是一个差值 也就是距离 startTime的差值 而我们添加任务的时候也是计算的每个任务距离 startTime 的差值
            // 那也就是这里的 deadLine 大于等于任务的 deadLine 的时候,这个任务就应该执行

            final long deadline = waitForNextTick();
            if (deadline > 0) {
                // 计算并获取时间格
                int idx = (int) (tick & mask);
                processCancelledTasks();
                HashedWheelBucket bucket =
                        wheel[idx];
                // 从等待队列里捞任务
                transferTimeoutsToBuckets();
                // 执行任务
                bucket.expireTimeouts(deadline);
                // 下一个时间格++
                tick++;
            }
        } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
        // 清空每个时间格 Fill the unprocessedTimeouts so we can return them from stop() method.
        for (HashedWheelBucket bucket: wheel) {
            bucket.clearTimeouts(unprocessedTimeouts);
        }
        // 取出等待队列中还没来得及执行的任务 放到未执行的集合中
        for (;;) {
            HashedWheelTimeout timeout = timeouts.poll();
            if (timeout == null) {
                break;
            }
            if (!timeout.isCancelled()) {
                unprocessedTimeouts.add(timeout);
            }
        }
        // 处理被取消的任务
        processCancelledTasks();
    }
}

3.1.5.1  waitForNextTick 指针跳动

这个方法的主要作用就是返回下一个指针指向的时间间隔,然后进行sleep操作。

大家可以想象一下,一个钟表上秒与秒之间是有时间间隔的,那么waitForNextTick就是根据当前时间 计算出跳动到下个时间的时间间隔,然后进行sleep,然后再返回当前时间距离时间轮启动时间的时间间隔(时间差)。

private long waitForNextTick() {
    // tick表示到了第几个时间格 tickDuration表示每个时间格的跨度,所以deadline返回的是下一次时间轮指针跳动的时间
    long deadline = tickDuration * (tick + 1);
    for (;;) {
        // 计算当前时间距离启动时间的时间间隔
        final long currentTime = System.nanoTime() - startTime;
        // 通过下一次指针跳动的延迟时间距离当前时间的差额,这个作为sleep时间使用 
        long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
        // sleepTimeMs小于零表示走到了下一个时间槽位置
        if (sleepTimeMs <= 0) {
            if (currentTime == Long.MIN_VALUE) {
                return -Long.MAX_VALUE;
            } else {
                return currentTime;
            }
        }
        // Check if we run on windows, as if thats the case we will need
        // to round the sleepTime as workaround for a bug that only affect
        // the JVM if it runs on windows.
        //
        // See https://github.com/netty/netty/issues/356
        if (PlatformDependent.isWindows()) {
            sleepTimeMs = sleepTimeMs / 10 * 10;
        }
        // 进入到这里进行sleep,表示当前时间距离下一次tick时间还有一段距离,需要sleep
        try {
            Thread.sleep(sleepTimeMs);
        } catch (InterruptedException ignored) {
            if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
                return Long.MIN_VALUE;
            }
        }
    }
}

3.1.5.2  transferTimeoutsToBuckets 捞队列中的任务

转移任务到时间轮中,前面我们讲过,任务添加进来时,是先放入到阻塞队列。而在现在这个方法中,就是把阻塞队列中的数据转移到时间轮的指定位置。

在这个转移方法中,写死了一个循环,每次都只转移10万个任务。然后根据HashedWheelTimeout的deadline延迟时间计算出时间轮需要运行多少次才能运行当前的任 务,如果当前的任务延迟时间大于时间轮跑一圈所需要的时间,那么就计算需要跑几圈才能到这个任务运行。最后计算出该任务在时间轮中的槽位,添加到时间轮的链表中。

private void transferTimeoutsToBuckets() {
    // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
    // adds new timeouts in a loop.
    // 循环100000次,也就是每次转移10w个任务
    for (int i = 0; i < 100000; i++) {
        // 从阻塞队列中获得具体的任务
        HashedWheelTimeout timeout = timeouts.poll();
        if (timeout == null) {
            // all processed
            break;
        }
        if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
            // Was cancelled in the meantime.
            continue;
        }
        // 计算tick次数,deadline表示当前任务的延迟时间, tickDuration表示时间槽的间隔,两者相除就可以计算当前任务需要tick几次才能被执行
        long calculated = timeout.deadline / tickDuration;
        // 计算剩余的轮数, 只有 timer 走够轮数, 并且到达了 task 所在的 slot, task 才会过期.(被执行)
        timeout.remainingRounds = (calculated - tick) / wheel.length;
        // 如果任务在 timeouts队列里面放久了, 以至于已经过了执行时间, 这个时候就使用当前tick, 也就是放到当前 bucket, 此方法调用完后就会被执行
        final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
        // 算出任务应该插入的 wheel 的 slot, stopIndex = tick 次数 & mask, mask = wheel.length - 1
        int stopIndex = (int) (ticks & mask);
        // 把timeout任务插入到指定的bucket链中。
        HashedWheelBucket bucket = wheel[stopIndex];
        bucket.addTimeout(timeout);
    }
}

我们再小看一下 Bucket 添加任务的方法:

private static final class HashedWheelBucket {
    // Used for the linked-list datastructure
    private HashedWheelTimeout head;
    private HashedWheelTimeout tail;
    /**
     * Add {@link HashedWheelTimeout} to this bucket.
     * 典型的链表结构 插入哈
     */
    public void addTimeout(HashedWheelTimeout timeout) {
        assert timeout.bucket == null;
        timeout.bucket = this;
        if (head == null) {
            head = tail = timeout;
        } else {
            tail.next = timeout;
            timeout.prev = tail;
            tail = timeout;
        }
    }
}

3.1.5.3  expireTimeouts 运行时间轮中的任务

当指针跳动到某一个时间槽中时,会就触发这个槽中的任务的执行。该功能是通过expireTimeouts来实现,这个方法的主要作用是: 过期并执行格子中到期的任务。也就是当tick进入到指定格子时,worker线程 会调用这个方法。

HashedWheelBucket是一个链表,所以我们需要从head节点往下进行遍历。如果链表没有遍历到链表 尾部那么就继续往下遍历。

获取的timeout节点节点,如果剩余轮数remainingRounds大于0,那么就说明要到下一圈才能运行, 所以将剩余轮数减一;

如果当前剩余轮数小于等于零了,那么就将当前节点从bucket链表中移除,并判断一下当前的时间是否 大于timeout的延迟时间,如果是则调用timeout的expire执行任务。

因为要执行某个时间槽的任务,所以这里调用的是 bucket 的方法哈:

public void expireTimeouts(long deadline) {
    HashedWheelTimeout timeout = head;
    // process all timeouts
    // 遍历当前时间槽中的所有任务
    while (timeout != null) {
        HashedWheelTimeout next = timeout.next;
        // 轮数小于等于0 说明当前轮要执行
        if (timeout.remainingRounds <= 0) {
            // 取出当前的任务
            next = remove(timeout);
            // 小于当前的时间间隔了 执行
            if (timeout.deadline <= deadline) {
                timeout.expire();
            } else {
                // 按理不可能会走到这里的 The timeout was placed into a wrong slot. This should never happen.
                throw new IllegalStateException(String.format(
                        "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
            }
        } else if (timeout.isCancelled()) {
            // 如果已经取消了 移除当前返回下一个
            next = remove(timeout);
        } else {
            // 因为当前的槽位已经过了,说明已经走了一圈了,把轮数减一
            timeout.remainingRounds --;
        }
        timeout = next;
    }
}

3.2  XXL-JOB 中的时间轮

3.2.1  XXL-JOB 介绍

XXL JOB 是一个轻量级分布式任务调度平台,主打特点是平台化,易部署,开发迅速、学习简单、轻量 级、易扩展,代码仍在持续更新中。目前 XXL-JOB 任务执行已经摒弃 Quartz 框架,目前 通过时间轮方式来管理任务触发任务。

调度中心: 任务调度控制台,平台自身并不承担业务逻辑,只是负责任务的统一管理和调度执行, 并且提供任务管理平台

执行器: 负责接收“调度中心”的调度并执行,可直接部署执行器,也可以将执行器集成到现有业务 项目中。 通过将任务的调度控制和任务的执行解耦,业务使用只需要关注业务逻辑的开发。

XXL-JOB 主要提供了任务的动态配置管理、任务监控和统计报表以及调度日志几大功能模块,支 持多种运行模式和路由策略,可基于对应执行器机器集群数量进行简单分片数据处理。

3.2.2  XXL-JOB 特性

(1)、简单:支持通过 Web页面对任务进行 CRUD 操作, 操作简单 ,一分钟上手;

(2)、动态:支持 动态修改任务状态、启动 / 停止任务,以及终止运行中任务,即时生效 ;

(3)、调度中心HA(中心式):调度采用 中心式设计,调度中心自研调度组件并 证调度中心HA; 支持集群部署,可保

(4)、执行器HA(分布式):任务分布式执行,任务"执行器"支持集群部署,可保证任务执行HA;

(5)、注册中心: 执行器会周期性自动注册任务, 调度中心将会自动发现注册的任务并触发执行。也支 持手动录入执行器地址;

(6)、弹性扩容缩容:一旦有新执行器机器上线或者下线,下次调度时将会重新分配任务;

(7)、路由策略:执行器集群部署时提供丰富的路由策略,包括: 第一个、最后一个、轮询、随机、 一致性 HASH 、最不经常使用、最近最久未使用、故障转移、忙碌转移 等;

(8)、故障转移:任务路由策略选择 故障转移 情况下,如果执行器集群中某一台机器故障,将会自动 Failover切换到一台正常的执行器发送调度请求。

(9)、阻塞处理策略:调度过于密集执行器来不及处理时的处理策略,策略包括: 单机串行(默 认)、丢弃后续调度、覆盖之前调度 ;

(10)、任务超时控制:支持 自定义任务超时时间 ,任务运行超时将会主动中断任务;

(11)、任务失败重试:支持 自定义任务失败重试次数 ,当任务失败时将会按照预设的失败重试次数 主动进行重试;其中分片任务支持分片粒度的失败重试;

(12)、任务失败警告:默认提供邮件方式失败告警,同时预留扩展接口,可方便的扩展短信、钉钉 等告警方式;

(13)、分片广播任务:执行器集群部署时,任务路由策略选择 分片广播情况下,一次任务调度将会 广播触发集群中所有执行器执行一次任务,可根据分片参数开发分片任务;

(14)、动态分片:分片广播任务以执行器为维度进行分片,支持动态扩容执行器集群从而动态增加 分片数量,协同进行业务处理;在进行大数据量业务操作时可显著提升任务处理能力和速度。

(15)、事件触发:除了 Cron方式和 任务依赖方式触发任务执行之外,支持基于事件的触发任务方 式。调度中心提供触发任务单次执行的API服务,可根据业务事件灵活触发

3.2.3  时间轮-任务执行

XXL-JOB 时间轮实现方式比较简单,就是一个 Map 结构数据,key值0-60,value是任务ID列表 Map<Integer, List> ringData 。

XXL-JOB 任务执行中启动了两个线程:

线程 scheduleThread 运行中不断的从任务表中查询 查询近 5000 毫秒(5秒)中要执行的任务,如 果当前时间大于任务接下来要执行的时间则立即执行,否则将任务执行时间除以 1000 变为秒之后再与 60 求余添加到时间轮中。

线程 ringThread 运行中不断根据当前时间求余从 时间轮 ringData 中获取任务列表,取出任务之 后执行任务。

我们从 JobScheduleHelper 这个类的 start 看起。

public void start (){
 
    // 启动调度线程,这些线程是用来取数据的 schedule thread
    scheduleThread = new Thread( new Runnable() {
    @Override
    public void run () {
    try { // 不知道为啥要休眠 4-5 秒 时间,然后再启动
        TimeUnit. MILLISECONDS .sleep( 5000 - System. currentTimeMillis ()% 1000 ) ;
    } catch (InterruptedException e) {
        if (! scheduleThreadToStop ) {
            logger .error(e.getMessage() , e) ;
        }
    }
    logger .info( ">>>>>>>>> init xxl-job admin scheduler success." ) ;
 
     // 这里是预读数量 pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
    int preReadCount = (XxlJobAdminConfig. getAdminConfig ().getTriggerPoolFastMax() + XxlJobAdminConfig. getAdminConfig ().getTriggerPoolSlowMax()) * 20 ;
 
    while (! scheduleThreadToStop ) {
    // 扫描任务 Scan Job
    long start = System. currentTimeMillis () ;
    Connection conn = null;
    Boolean connAutoCommit = null;
    PreparedStatement preparedStatement = null
    boolean preReadSuc = true;
    try {
        conn = XxlJobAdminConfig. getAdminConfig ().getDataSource().getConnection() ;
          connAutoCommit = conn.getAutoCommit() ;
          conn.setAutoCommit( false ) ;
          // 采用 select for update ,是排它锁。说白了 xxl-job 用一张数据库表来当分布式锁了,确保多个 xxl-job admin 节点下,依旧只能同时执行一个调度线程任务
        preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" ) ;
          preparedStatement.execute() ;
 
          // tx start
 
          // 1 、预读数据 pre read
          long nowTime = System. currentTimeMillis () ;
          // -- 从数据库中读取截止到五秒后未执行的 job ,并且读取 preReadCount=6000 条
          List<XxlJobInfo> scheduleList = XxlJobAdminConfig. getAdminConfig ().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS , preReadCount) ;
          if (scheduleList!= null && scheduleList.size()> 0 ) {
              // 2 、 push 压进 时间轮 push time-ring
              for (XxlJobInfo jobInfo: scheduleList) {
 
                  // time-ring jump
                    if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS ) {
                        // 当前时间 大于 (任务的下一次触发时间 + PRE_READ_MS ( 5s )) , 可能是查询太久了,然后下面的代码刷新了任务下次执行时间,导致超过五秒,所以就需要特殊处理
                        // 2.1 、 trigger-expire > 5s : pass && make next-trigger-time
                        logger .warn( ">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId()) ;
                        // 1 、匹配过期失效的策略: DO_NOTHING= 过期啥也不干,废弃; FIRE_ONCE_NOW= 过期立即触发一次 misfire match
                        MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum. match (jobInfo.getMisfireStrategy() , MisfireStrategyEnum. DO_NOTHING ) ;
                      if (MisfireStrategyEnum. FIRE_ONCE_NOW == misfireStrategyEnum) {
                            // FIRE_ONCE_NOW 》 trigger
                              JobTriggerPoolHelper. trigger (jobInfo.getId() , TriggerTypeEnum. MISFIRE , - 1 , null, null, null ) ;
                              logger .debug( ">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() ) ;
                        }
                        // 2 、刷新上一次触发 和 下一次待触发时间 fresh next
                         refreshNextValidTime(jobInfo , new Date()) ;
                    } else if (nowTime > jobInfo.getTriggerNextTime()) {
                        // 当前时间 大于 任务的下一次触发时间 并且是没有过期的
                    // 2.2 、 trigger-expire < 5s : direct-trigger && make next-trigger-time
                        // 1 、直接触发任务执行器 trigger
                        JobTriggerPoolHelper. trigger (jobInfo.getId() , TriggerTypeEnum. CRON , - 1 , null, null, null ) ;
                        logger .debug( ">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() ) ;
                        // 2 、刷新上一次触发 和 下一次待触发时间 fresh next
                        refreshNextValidTime(jobInfo , new Date()) ;
 
                        // 如果下一次触发在五秒内,直接放进时间轮里面待调度 next-trigger-time in 5s, pre-read again
                        if (jobInfo.getTriggerStatus()== 1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
                              // 1 、求当前任务下一次触发时间所处一分钟的第 N 秒 make ring second
                              int ringSecond = ( int )((jobInfo.getTriggerNextTime()/ 1000 )% 60 ) ;
                              // 2 、将当前任务 ID 和 ringSecond 放进时间轮里面 push time ring
                              pushTimeRing(ringSecond , jobInfo.getId()) ;
                              // 3 、刷新上一次触发 和 下一次待触发时间 fresh next
                              refreshNextValidTime(jobInfo , new Date(jobInfo.getTriggerNextTime())) ;
                        }
 
                    } else {
                      // 当前时间 小于 下一次触发时间
                        // 2.3 、 trigger-pre-read : time-ring trigger && make next-trigger-time
                        // 1 、求当前任务下一次触发时间所处一分钟的第 N 秒 make ring second
                        int ringSecond = ( int )((jobInfo.getTriggerNextTime()/ 1000 )% 60 ) ;
                        // 2 、将当前任务 ID 和 ringSecond 放进时间轮里面 push time ring
                        pushTimeRing(ringSecond , jobInfo.getId()) ;
                        // 3 、刷新上一次触发 和 下一次待触发时间 fresh next
                        refreshNextValidTime(jobInfo , new Date(jobInfo.getTriggerNextTime())) ;
                    }
              }
 
              // 3 、更新数据库执行器信息,如 trigger_last_time 、 trigger_next_time update trigger info
              for (XxlJobInfo jobInfo: scheduleList) {
                    XxlJobAdminConfig. getAdminConfig ().getXxlJobInfoDao().scheduleUpdate(jobInfo) ;
              }
 
          } else {
            preReadSuc = false;
          }
          // tx stop
    } catch (Exception e) {
          if (! scheduleThreadToStop ) {
              logger .error( ">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}" , e) ;
          }
    } finally {
          // 提交事务,释放数据库 select for update 的锁 commit
        .......................省略.............    
    }
    long cost = System. currentTimeMillis ()-start ;
 
     // 如果执行太快了,就稍微 sleep 等待一下 Wait seconds, align second
    if (cost < 1000 ) { // scan-overtime, not wait
        try {
            // pre-read period: success > scan each second; fail > skip this period;
            TimeUnit. MILLISECONDS .sleep((preReadSuc? 1000 : PRE_READ_MS ) - System. currentTimeMillis ()% 1000 ) ;
        } catch (InterruptedException e) {
            if (! scheduleThreadToStop ) {
                logger .error(e.getMessage() , e) ;
            }
        }
    }) ;
    scheduleThread .setDaemon( true ) ;
    scheduleThread .setName( "xxl-job, admin JobScheduleHelper#scheduleThread" ) ;
    scheduleThread .start() ;
 
 
     // 时间轮线程,用于取出每秒的数据,然后处理 ring thread
    ringThread = new Thread( new Runnable() {
        @Override
        public void run () {
            while (! ringThreadToStop ) {
                   // align second
                   try {
                       TimeUnit. MILLISECONDS .sleep( 1000 - System. currentTimeMillis () % 1000 ) ;
                   } catch (InterruptedException e) {
                    if (! ringThreadToStop ) {
                        logger .error(e.getMessage() , e) ;
                    }
                }
                   try {
                       // second data
                       List<Integer> ringItemData = new ArrayList<>() ;
                       // 获取当前所处的一分钟第几秒,然后 for 两次,第二次是为了重跑前面一个刻度没有被执行的的 job list ,避免前面的刻度遗漏了
                    int nowSecond = Calendar. getInstance ().get(Calendar. SECOND ) ; // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
                    for ( int i = 0 ; i < 2 ; i++) {
                        List<Integer> tmpData = ringData .remove( (nowSecond+ 60 -i)% 60 ) ;
                        if (tmpData != null ) {
                            ringItemData.addAll(tmpData) ;
                        }
                       }
 
                       // ring trigger
                       logger .debug( ">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays. asList (ringItemData) ) ;
                       if (ringItemData.size() > 0 ) {
                           // do trigger
                              for ( int jobId: ringItemData) {
                                  // 执行触发器 do trigger
                                  JobTriggerPoolHelper. trigger (jobId , TriggerTypeEnum. CRON , - 1 , null, null, null ) ;
                              }
                              // 清除当前刻度列表的数据 clear
                              ringItemData.clear() ;
                       }
                   } catch (Exception e) {
                         if (! ringThreadToStop ) {
                              logger .error( ">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}" , e) ;
                       }
                   }
               }
            logger .info( ">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop" ) ;
        }
    }) ;
    ringThread .setDaemon( true ) ;
    ringThread .setName( "xxl-job, admin JobScheduleHelper#ringThread" ) ;
    ringThread .start() ;
}

总结下来就是:

(1)scheduleThread-取待执行任务数据入时间轮
-- 第一步:用select for update 数据库作为分布式锁加锁,避免多个xxl-job admin调度器节点同时执行
-- 第二步:预读数据,从数据库中读取当前截止到五秒后内会执行的job信息,并且读取分页大小为preReadCount=6000条数据
----  preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
-- 第三步:将当前时间与下次调度时间对比,有如下三种情况
****  当前时间 大于 (任务的下一次触发时间 + PRE_READ_MS(5s)):可能是查询太久了,然后下面的代码刷新了任务下次执行时间,导致超过五秒,所以就需要特殊处理
--------  1、匹配过期失效的策略:DO_NOTHING=过期啥也不干,废弃;FIRE_ONCE_NOW=过期立即触发一次
--------  2、刷新上一次触发 和 下一次待触发时间
****  当前时间 大于 任务的下一次触发时间 并且是没有过期的:
--------  1、直接触发任务执行器
--------  2、刷新上一次触发 和 下一次待触发时间
--------  3、如果下一次触发在五秒内,直接放进时间轮里面待调度
----------------  1、求当前任务下一次触发时间所处一分钟的第N秒
----------------  2、将当前任务ID和ringSecond放进时间轮里面
----------------  3、刷新上一次触发 和 下一次待触发时间
****  当前时间 小于 下一次触发时间:
--------  1、求当前任务下一次触发时间所处一分钟的第N秒
--------  2、将当前任务ID和ringSecond放进时间轮里面
--------  3、刷新上一次触发 和 下一次待触发时间
-- 第四步:更新数据库执行器信息,如trigger_last_time、trigger_next_time
-- 第五步:提交数据库事务,释放数据库select for update排它锁
 
(2)ringThread-根据时间轮执行job任务
首先时间轮数据格式为:Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>()
-- 第一步:获取当前所处的一分钟第几秒,然后for两次,第二次是为了重跑前面一个刻度没有被执行的的job list,避免前面的刻度遗漏了
-- 第二步:执行触发器
-- 第三步:清除当前刻度列表的数据
**** 执行的过程中还会选择对应的策略,如下:
-------- 阻塞策略:串行、废弃后面、覆盖前面
-------- 路由策略:取第一个、取最后一个、最小分发、一致性hash、快速失败、LFU最不常用、LRU最近最少使用、随机、轮询

另外有个小细节,执行任务其实就是往线程池中,放置任务,如下:

// 执行任务
public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) {
    helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
}
// 往线程池中放
public void addTrigger(final int jobId,
                       final TriggerTypeEnum triggerType,
                       final int failRetryCount,
                       final String executorShardingParam,
                       final String executorParam,
                       final String addressList) {

    // choose thread pool  看这里有两个线程池供选择 一个快的 一个慢的
    ThreadPoolExecutor triggerPool_ = fastTriggerPool;
    AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
    if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 min
        triggerPool_ = slowTriggerPool;
    }

    // trigger
    triggerPool_.execute(new Runnable() {
        @Override
        public void run() {
            ...
        }
    });
}

可以看到有两个线程池供选择,也就是会根据当前任务ID的超时次数,来选择快慢线程池,学到了。

4  小结

好啦,关于时间轮的认识就到这里了,有理解不对的地方欢迎指正哈。

标签:Netty,触发,任务,JOB,trigger,时间,timeout,执行,XXL
From: https://www.cnblogs.com/kukuxjx/p/18170762

相关文章

  • AVEVA MARINE C# 程序执行MarJobLauncher工作
    手工执行的话一般如此操作,例如分离零件等操作今天利用c#介绍下AM如何用代码执行提取零件gen文件的过程引用如下的库文件封装的类库其中appname/shortname/appExecutable这些去下面的文件去查找C:\AVEVA\Marine\OH12.1.SP4\MarJobs.xml publicstaticclassMarJobEx......
  • ElasticJob-面试题-高频题
    ElasticJob1ElasticJob的失效转移-故障转移-机制是怎样的?答案:当任务执行失败或者节点宕机时,ElasticJob具备故障转移和重试的能力,能够自动进行故障恢复,确保任务的稳定运行。底层原理是怎么样的?底层实现原理就是:Elasticjob的故障恢复机制是通过分布式协调服务-zookeeper和任务节点......
  • xxl-job
    部署拉取镜像dockerpullxuxueli/xxl-job-admin:2.4.1docker-composeversion:'3'services:xxl-job-admin:image:xuxueli/xxl-job-admin:2.4.1container_name:xxl-job-adminrestart:alwaysports:-8087:8080environment:......
  • Mysql启动报错:Job for mysqld.service failed because the control process exited wi
      该方法会删除mysql数据,慎用centos7上使用yum安装mysql后,启动报错[root@localhost~]#systemctlstartmysqldJobformysqld.servicefailedbecausethecontrolprocessexitedwitherrorcode.See"systemctlstatusmysqld.service"and"journalctl-xe"for......
  • RocketMQLog:WARN No appenders could be found for logger (io.netty.channel.nio.Ni
    springBoot集成rocketMq启动的时候报RocketMQLog:WARNNoappenderscouldbefoundforlogger(io.netty.channel.nio.NioEventLoop). RocketMQLog:WARNPleaseinitializetheloggersystemproperly. 原因是pom中的rocket的依赖版本太高了。<dependency><groupI......
  • springboot的netty代码实操
    参考:https://www.cnblogs.com/mc-74120/p/13622008.htmlpom文件<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId></dependency>启动类@EnableFeignClients@EnableDiscoveryClient@EnableSchedu......
  • xxl-job源码解析
    简介:XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。Features:1、简单:支持通过Web页面对任务进行CRUD操作,操作简单,一分钟上手;2、动态:支持动态修改任务状态、启动/停止任务,以及终止运行......
  • k8s快速部署xxl-job
    1.初始化数据库wgethttps://raw.githubusercontent.com/xuxueli/xxl-job/2.1.2/doc/db/tables_xxl_job.sqlmysql-uroot-psource/root/tables_xxl_job.sql;CREATEUSER't1_zdbl_xxl_job'@'%'IDENTIFIEDBY'OYP!z5%0O2lALdLi';GRANTALL......
  • 测试Netty高并发工具
    测试Netty应用程序的高并发性能工具JMeterJMeter:ApacheJMeter是一个功能强大的用于性能测试的工具,可以模拟大量用户对Netty服务器的并发请求。你可以创建各种测试计划来模拟不同负载条件下的性能表现。wrkwrk:wrk是一个现代的HTTP基准测试工具,它可以轻松地对Netty服务器进......
  • 分布式任务调度平台XXL-JOB:调度日志打印时区问题
    系列文章目录文章目录系列文章目录前言前言前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站,这篇文章男女通用,看懂了就去分享给你的码吧。Quartz作为开源作业调度中的佼佼者,是作业调度的首选。但是集群环境中Q......