首页 > 编程语言 >【算法数据结构专题】「延时队列算法」史上手把手教你针对层级时间轮(TimingWheel)实现延时队列的开发实战落地(下)

【算法数据结构专题】「延时队列算法」史上手把手教你针对层级时间轮(TimingWheel)实现延时队列的开发实战落地(下)

时间:2023-04-08 20:32:39浏览次数:48  
标签:task 队列 interval long int 算法 tasks 延时 public

承接上文

承接上一篇文章【算法数据结构专题】「延时队列算法」史上手把手教你针对层级时间轮(TimingWheel)实现延时队列的开发实战落地(上)】我们基本上对层级时间轮算法的基本原理有了一定的认识,本章节就从落地的角度进行分析和介绍如何通过Java进行实现一个属于我们自己的时间轮服务组件,最后,在告诉大家一下,其实时间轮的技术是来源于生活中的时钟。

时间轮演示结构总览

无序列表时间轮

【无序列表时间轮】主要是由LinkedList链表和启动线程、终止线程实现。

【算法数据结构专题】「延时队列算法」史上手把手教你针对层级时间轮(TimingWheel)实现延时队列的开发实战落地(下)_ide

遍历定时器中所有节点,将剩余时间为 0s 的任务进行过期处理,在执行一个周期。

【算法数据结构专题】「延时队列算法」史上手把手教你针对层级时间轮(TimingWheel)实现延时队列的开发实战落地(下)_ide_02

  • 无序链表:每一个延时任务都存储在该链表当中(无序存储)。
  • 启动线程: 直接在链表后面push ,时间复杂度 O(1)。
  • 终止线程: 直接在链表中删除节点,时间复杂度 O(1) 。

遍历周期:需要遍历链表中所有节点,时间复杂度 O(n),所以伴随着链表中的元素越来越多,速度也会越来越慢!

无序列表时间轮的长度限制了其适用场景,这里对此进行优化。因此引入了有序列表时间轮。

有序列表时间轮

与无序列表时间轮一样,同样使用链表进行实现和设计,但存储的是绝对延时时间点

【算法数据结构专题】「延时队列算法」史上手把手教你针对层级时间轮(TimingWheel)实现延时队列的开发实战落地(下)_ci_03

  • 启动线程有序插入,比较时间按照时间大小有序插入,时间复杂度O(n),主要耗时在插入操作
  • 终止线程链表中查找任务,删除节点,时间复杂度O(n),主要耗时在插入操作

找到执行最后一个过期任务即可,无需遍历整个链表,时间复杂度 O(1),从上面的描述「有序列表定时器」的性能瓶颈在于插入时的任务排序,但是换来的就是缩短了遍历周期。

所以我们如果要提高性,就必须要提升一下插入和删除以及检索的性能,因此引入了「树形有序列表时间轮」在「有序列表定时器」的基础上进行优化,以有序树的形式进行任务存储。

树形有序列表时间轮

  • 启动定时器: 有序插入,比较时间按照时间大小有序插入,时间复杂度 O(logn)
  • 终止定时器: 在链表中查找任务,删除节点,时间复杂度 O(logn)
  • 周期清算: 找到执行最后一个过期任务即可,无需遍历整个链表,时间复杂度 O(1)

层级时间轮

整体流程架构图,如下所示。

【算法数据结构专题】「延时队列算法」史上手把手教你针对层级时间轮(TimingWheel)实现延时队列的开发实战落地(下)_链表_04

对应的原理,在这里就不进行赘述了,之前本人已经有两篇文章对层级式时间轮进行了较为详细的介绍了,有需要的小伙伴,可以直接去前几篇文章去学习,接下来我们进行相关的实现。

时间轮数据模型

时间轮(TimingWheel)是一个存储定时任务的环形队列,数组中的每个元素可以存放一个定时任务列表,其中存放了真正的定时任务,如下图所示。

【算法数据结构专题】「延时队列算法」史上手把手教你针对层级时间轮(TimingWheel)实现延时队列的开发实战落地(下)_ci_05

时间轮的最基本逻辑模型,由多个时间格组成,每个时间格代表当前时间轮的基本时间跨度(tickMs),所以我们先来设计和定义开发对应的时间轮的轮盘模型。命名为Roulette类。

轮盘抽象类-Roulette

之所以定义这个抽象类

public abstract class Roulette {
	// 链表数据-主要用于存储每个延时任务节点
    List<TimewheelTask> tasks = null;
    // 游标指针索引
    protected int index;
	// 时间轮轮盘的容量大小,如果是分钟级别,一般就是60个格
    protected int capacity;
	// 时间轮轮盘的层级,如果是一级,它的上级就是二级
    protected Integer level;
    private AtomicInteger num = new AtomicInteger(0);

   // 构造器
    public Roulette(int capacity, Integer level) {
        this.capacity = capacity;
        this.level = level;
        this.tasks = new ArrayList<>(capacity);
        this.index = 0;
    }
    // 获取当前下表的索引对应的时间轮的任务
    public TimewheelTask getTask() {
        return tasks.get(index);
    }
   // init初始化操作机制
    public List<TimewheelTask> init() {
        long interval = MathTool.power((capacity + 1), level);
        long add = 0;
        TimewheelTask delayTask = null;
        for (int i = 0; i < capacity; i++) {
            add += interval;
            if (level == 0) {
                delayTask = new DefaultDelayTask(level);
            } else {
                delayTask = new SplitDelayTask(level);
            }
            //已经转换为最小的时间间隔
            delayTask.setDelay(add, TimeUnitProvider.getTimeUnit());
            tasks.add(delayTask);
        }
        return tasks;
    }

   // 索引下标移动
    public void indexAdd() {
        this.index++;
        if (this.index >= capacity) {
            this.index = 0;
        }
    }
   // 添加对应的任务到对应的队列里面
    public void addTask(TimewheelTask task) {
        tasks.add(task);
    }
	// 给子类提供的方法进行实现对应的任务添加功能
    public abstract void addTask(int interval, MyTask task);
}
时间轮盘的熟悉信息介绍

链表数据-主要用于存储每个延时任务节点。

List<TimewheelTask> tasks = null;

tasks也可以改成双向链表 + 数组的结构:即节点存贮的对象中有指针,组成环形,可以通过数组的下标灵活访问每个节点,类似 LinkedHashMap。

游标指针索引

protected int index;

时间轮轮盘的容量大小,如果是分钟级别,一般就是60个格

protected int capacity;

时间轮轮盘的层级,如果是一级,它的上级就是二级

protected Integer level;

init初始化时间轮轮盘对象模型,主要用于分配分配每一个轮盘上面元素的TimewheelTask,用于延时队列的执行任务线程,已经分配对应的每一个节点的延时时间节点数据。

public List<TimewheelTask> init() {
	   //  那么整个时间轮的总体时间跨度(interval)
        long interval = MathTool.power((capacity + 1), level);
        long add = 0;
        TimewheelTask delayTask = null;
        for (int i = 0; i < capacity; i++) {
            add += interval;
            if (level == 0) {
                delayTask = new ExecuteTimewheelTask(level);
            } else {
                delayTask = new MoveTimewheelTask(level);
            }
            //已经转换为最小的时间间隔
            delayTask.setDelay(add, TimeUnitProvider.getTimeUnit());
            tasks.add(delayTask);
        }
        return tasks;
}
  • 整数a的n次幂:interval,计算跨度,主要是各级别之间属于平方倍数

例如,第一层:20 ,第二层:20^2 ......

//例如 n=7  二进制 0   1                 1          1
    //a的n次幂 = a的2次幂×a的2次幂  ×   a的1次幂×a的1次幂  ×a
    public static long power(long a, int n) {
        int rtn = 1;
        while (n >= 1) {
            if((n & 1) == 1){
                rtn *= a;
            }
            a *= a;
            n = n >> 1;
        }
        return rtn;
    }
TimeUnitProvider工具类

主要用于计算时间单位操作的转换

public class TimeUnitProvider {
    private static TimeUnit unit = TimeUnit.SECONDS;
    public static TimeUnit getTimeUnit() {
        return unit;
    }
}

代码简介:

  • interval:代表着初始化的延时时间数据值,主要用于不同的层次的出发时间数据
  • for (int i = 0; i < capacity; i++) :代表着进行for循环进行添加对应的延时队列任务到集合中
  • add += interval,主要用于添加对应的延时队列的延时数据值!并且分配给当前轮盘得到所有数据节点。

【算法数据结构专题】「延时队列算法」史上手把手教你针对层级时间轮(TimingWheel)实现延时队列的开发实战落地(下)_ide_06

获取当前下标的索引对应的时间轮的任务节点

public TimewheelTask getTask() {
        return tasks.get(index);
}
层级时间轮的Bucket数据桶

在这里我们建立了一个TimewheelBucket类实现了Roulette轮盘模型,从而进行建立对应的我们的层级时间轮的数据模型,并且覆盖了addTask方法。

public class TimewheelBucket extends Roulette {

    public TimewheelBucket(int capacity, Integer level) {
        super(capacity, level);
    }

    public synchronized void addTask(int interval, MyTask task) {
        interval -= 1;
        int curIndex = interval + this.index;
        if (curIndex >= capacity) {
            curIndex = curIndex - capacity;
        }
        tasks.get(curIndex).addTask(task);
    }
}

添加addTask方法,进行获取计算对应的下标,并且此方法add操作才是对外开发调用的,在这里,我们主要实现了根据层级计算出对应的下标进行获取对应的任务执行调度点,将我们外界BizTask,真正的业务操作封装到这个BizTask模型,交由我们的系统框架进行执行。

public synchronized void addTask(int interval, BizTask task) {
        interval -= 1;
        int curIndex = interval + this.index;
        if (curIndex >= capacity) {
            curIndex = curIndex - capacity;
        }
        tasks.get(curIndex).addTask(task);
    }
时间轮轮盘上的任务点

【算法数据结构专题】「延时队列算法」史上手把手教你针对层级时间轮(TimingWheel)实现延时队列的开发实战落地(下)_ci_07

我们针对于时间轮轮盘的任务点进行设计和定义对应的调度执行任务模型。一个调度任务点,可以帮到关系到多个BizTask,也就是用户提交上来的业务任务线程对象,为了方便采用延时队列的延时处理模式,再次实现了Delayed这个接口,对应的实现代码如下所示:

Delayed接口
public interface Delayed extends Comparable<Delayed> {
    /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     *
     * @param unit the time unit
     * @return the remaining delay; zero or negative values indicate
     * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);
}
TimewheelTask时间轮刻度点
@Getter
public abstract class TimewheelTask implements Delayed {
    private List<BizTask> tasks = new ArrayList<BizTask>();
    private int level;
    private Long delay;
    private long calDelay;
    private TimeUnit calUnit;
    public TimewheelTask(int level) {
        this.level = level;
    }
    public void setDelay(Long delay, TimeUnit unit) {
        this.calDelay=delay;
        this.calUnit=unit;
    }
    public void calDelay() {
        this.delay = TimeUnit.NANOSECONDS.convert(this.calDelay, this.calUnit) + System.nanoTime(); 
    }
    public long getDelay(TimeUnit unit) {
        return this.delay - System.nanoTime();
    }
    public int compareTo(Delayed o) {
        long d = (getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS));
        return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
    }
    public void addTask(BizTask task) {
        synchronized (this) {
            tasks.add(task);
        }
    }
    public void clear() {
        tasks.clear();
    }
    public abstract void run();
}
  • 业务任务集合:private List<BizTask> tasks = new ArrayList<BizTask>();
  • 层级
private int level;
  • 延时时间
private Long delay;
  • 实际用于延时计算的时间,就是底层是统一化所有的延时时间到对应的延时队列
private long calDelay;
  • 实际用于延时计算的时间,就是底层是统一化所有的延时时间到对应的延时队列(用于统一化的时间单位)
private TimeUnit calUnit;
添加对应的业务延时任务到轮盘刻度点
public void addTask(BizTask task) {
        synchronized (this) {
            tasks.add(task);
        }
    }
刻度点的实现类

因为对应的任务可能会需要将下游的业务任务进行升级或者降级,所以我们会针对于执行任务点分为,执行任务刻度点和跃迁任务刻度点两种类型。

  • 执行任务延时队列刻度点
public class ExecuteTimewheelTask extends TimewheelTask {
    public ExecuteTimewheelTask(int level) {
        super(level);
    }
    //到时间执行所有的任务
    public void run() {
        List<BizTask> tasks = getTasks();
        if (CollectionUtils.isNotEmpty(tasks)) {
            tasks.forEach(task -> ThreadPool.submit(task));
        }
    }
}

再次我们就定义执行这些任务的线程池为:

private static ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 100, 3, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(10000),
            new MyThreadFactory("executor"), new ThreadPoolExecutor.CallerRunsPolicy());
  • 跃迁任务延时队列刻度点
public class MoveTimewheelTask extends TimewheelTask {
    public MoveTimewheelTask(int level) {
        super(level);
    }
    //跃迁到其他轮盘,将对应的任务
    public void run() {
        List<BizTask> tasks = getTasks();
        if (CollectionUtils.isNotEmpty(tasks)) {
            tasks.forEach(task -> {
                long delay = task.getDelay();
                TimerWheel.adddTask(task,delay, TimeUnitProvider.getTimeUnit());
            });
        }
    }
}

致辞整个时间轮轮盘的数据模型就定义的差不多了,接下来我们需要定义运行在时间轮盘上面的任务模型,BizTask基础模型。

BizTask基础模型
public abstract class BizTask implements Runnable {
     protected long interval;
     protected int index;
     protected long executeTime;
     public BizTask(long interval, TimeUnit unit, int index) {
          this.interval  = interval;
          this.index = index;
          this.executeTime= TimeUnitProvider.getTimeUnit().convert(interval,unit)+TimeUnitProvider.getTimeUnit().convert(System.nanoTime(),TimeUnit.NANOSECONDS);
     }
     public long getDelay() {
          return this.executeTime - TimeUnitProvider.getTimeUnit().convert(System.nanoTime(), TimeUnit.NANOSECONDS);
     }
}

主要针对于任务执行,需要交给线程池去执行,故此,实现了Runnable接口。

  • protected long interval;:跨度操作
  • protected int index;:索引下表,在整个队列里面的下表处理
  • protected long executeTime;:对应的执行时间

其中最重要的便是获取延时时间的操作,主要提供给框架的Delayed接口进行判断是否到执行时间了。

public long getDelay() {
          return this.executeTime - TimeUnitProvider.getTimeUnit().convert(System.nanoTime(), TimeUnit.NANOSECONDS);
     }
层级时间轮的门面TimerWheel

最后我们要进行定义和设计开发对应的整体的时间轮层级模型。

【算法数据结构专题】「延时队列算法」史上手把手教你针对层级时间轮(TimingWheel)实现延时队列的开发实战落地(下)_ide_08

public class TimerWheel {

    private static Map<Integer, TimewheelBucket> cache = new ConcurrentHashMap<>();
    //一个轮表示三十秒
    private static int interval = 30;
    private static wheelThread wheelThread;

    public static void adddTask(BizTask task, Long time, TimeUnit unit) {
        if(task == null){
            return;
        }
        long intervalTime = TimeUnitProvider.getTimeUnit().convert(time, unit);
        if(intervalTime < 1){
            ThreadPool.submit(task);
            return;
        }
        Integer[] wheel = getWheel(intervalTime,interval);
        TimewheelBucket taskList = cache.get(wheel[0]);
        if (taskList != null) {
            taskList.addTask(wheel[1], task);
        } else {
            synchronized (cache) {
                if (cache.get(wheel[0]) == null) {
                    taskList = new TimewheelBucket(interval-1, wheel[0]);
                    wheelThread.add(taskList.init());
                    cache.putIfAbsent(wheel[0],taskList);
                }
            }
            taskList.addTask(wheel[1], task);
        }
    }
    static{
        interval = 30;
        wheelThread = new wheelThread();
        wheelThread.setDaemon(false);
        wheelThread.start();
    }
    private static Integer[] getWheel(long intervalTime,long baseInterval) {
        //转换后的延时时间
        if (intervalTime < baseInterval) {
            return new Integer[]{0, Integer.valueOf(String.valueOf((intervalTime % 30)))};
        } else {
            return getWheel(intervalTime,baseInterval,baseInterval, 1);
        }
    }

    private static Integer[] getWheel(long intervalTime,long baseInterval,long interval, int p) {
         long nextInterval = baseInterval * interval;
        if (intervalTime < nextInterval) {
            return new Integer[]{p, Integer.valueOf(String.valueOf(intervalTime / interval))};
        } else {
            return getWheel(intervalTime,baseInterval,nextInterval, (p+1));
        }
    }

    static class wheelThread extends Thread {
        DelayQueue<TimewheelTask> queue = new DelayQueue<TimewheelTask>();

        public DelayQueue<TimewheelTask> getQueue() {
            return queue;
        }

        public void add(List<TimewheelTask> tasks) {
            if (CollectionUtils.isNotEmpty(tasks)) {
                tasks.forEach(task -> add(task));
            }
        }

        public void add(TimewheelTask task) {
            task.calDelay();
            queue.add(task);
        }

        @Override
        public void run() {
            while (true) {
                try {
                    TimewheelTask task = queue.take();
                    int p = task.getLevel();
                    long nextInterval = MathTool.power(interval, Integer.valueOf(String.valueOf(MathTool.power(2, p))));
                    TimewheelBucket timewheelBucket = cache.get(p);
                    synchronized (timewheelBucket) {
                        timewheelBucket.indexAdd();
                        task.run();
                        task.clear();
                    }
                    task.setDelay(nextInterval, TimeUnitProvider.getTimeUnit());
                    task.calDelay();
                    queue.add(task);
                } catch (InterruptedException e) {

                }
            }
        }
    }
}
TimerWheel的模型定义
private static Map<Integer, TimewheelBucket> cache = new ConcurrentHashMap<>();

一个轮表示30秒的整体跨度。

private static int interval = 30;

创建整体驱动的执行线程

private static wheelThread wheelThread;

 static{
        interval = 30;
        wheelThread = new wheelThread();
        wheelThread.setDaemon(false);
        wheelThread.start();
}

    static class wheelThread extends Thread {
        DelayQueue<TimewheelTask> queue = new DelayQueue<TimewheelTask>();
        public DelayQueue<TimewheelTask> getQueue() {
            return queue;
        }
        public void add(List<TimewheelTask> tasks) {
            if (CollectionUtils.isNotEmpty(tasks)) {
                tasks.forEach(task -> add(task));
            }
        }
        public void add(TimewheelTask task) {
            task.calDelay();
            queue.add(task);
        }
        @Override
        public void run() {
            while (true) {
                try {
                    TimewheelTask task = queue.take();
                    int p = task.getLevel();
                    long nextInterval = MathTool.power(interval, Integer.valueOf(String.valueOf(MathTool.power(2, p))));
                    TimewheelBucket timewheelBucket = cache.get(p);
                    synchronized (timewheelBucket) {
                        timewheelBucket.indexAdd();
                        task.run();
                        task.clear();
                    }
                    task.setDelay(nextInterval, TimeUnitProvider.getTimeUnit());
                    task.calDelay();
                    queue.add(task);
                } catch (InterruptedException e) {

                }
            }
   }
获取对应的时间轮轮盘模型体系
private static Integer[] getWheel(long intervalTime,long baseInterval) {
        //转换后的延时时间
        if (intervalTime < baseInterval) {
            return new Integer[]{0, Integer.valueOf(String.valueOf((intervalTime % 30)))};
        } else {
            return getWheel(intervalTime,baseInterval,baseInterval, 1);
        }
    }

    private static Integer[] getWheel(long intervalTime,long baseInterval,long interval, int p) {
         long nextInterval = baseInterval * interval;
        if (intervalTime < nextInterval) {
            return new Integer[]{p, Integer.valueOf(String.valueOf(intervalTime / interval))};
        } else {
            return getWheel(intervalTime,baseInterval,nextInterval, (p+1));
        }
    }

到这里相信大家,基本上应该是了解了如何去实现对应的时间轮盘的技术实现过程,有兴趣希望整个完整源码的,可以联系我哦。谢谢大家!

标签:task,队列,interval,long,int,算法,tasks,延时,public
From: https://blog.51cto.com/alex4dream/6177884

相关文章

  • HJ67_24点游戏算法_多维递归_DFS(深度优先搜索)
    思路:多维递归,深度有限遍历加减乘除四种情况。知识点:1、多维递归不能对传递的变量进行修改,否则无法回溯。应该传递一个新地址的变量,如代码所示,传递切片的列表,不修改列表  2、搜索遗漏。两括号比如((9-4)-1)*6选取任意一个数作为第一个运算数与24运算,不能找出所有24点的计算......
  • Tarjan 算法学习笔记
    (绝大部分都是贺的,来自OI-WIKI和洛谷题解,自己抄一遍印象深刻一点,部分代码未编译,不保证正确性,但大体是对的)一、DFS生成树注意可能有多棵,因为图可能不联通。树边(treeedge):示意图中以黑色边表示,每次搜索找到一个还没有访问过的结点的时候就形成了一条树边。反祖边(backedge):......
  • Python 进阶指南(编程轻松进阶):十三、性能测量和大 O 算法分析
    原文:http://inventwithpython.com/beyond/chapter13.html对于大多数小程序来说,性能并不那么重要。我们可能会花一个小时编写一个脚本来自动执行一个只需要几秒钟就能运行的任务。即使需要更长的时间,当我们端着一杯咖啡回到办公桌时,这个项目也可能已经完成了。有时候花时间学......
  • Chapter2 K-近邻算法案例1
    案例2:使用K-近邻算法实现手写数字系统1.案例要求编写一个程序,应用K-近邻算法,实现手写数字系统。通过画图生成一个32*32的数字图像,再将图像转化为代表数字的0-1文本文件。之后往程序输入代表数字的0-1文本文件,程序便可以输出相应的数字。2.案例的执行流程示例......
  • 【生产调度】基于和声搜索算法实现并行机器调度附matlab代码
    ✅作者简介:热爱科研的Matlab仿真开发者,修心和技术同步精进,matlab项目合作可私信。......
  • 算法C#
    #region二分查找法publicstaticintBinarySertch(int[]arr,intstartIndex,intendIndex,intresult){if(startIndex>endIndex){return-1;}intmidIndex=(end......
  • 数组模拟环形队列的思路及代码
    JAVA实现数组模拟环形队列的思路及代码前言在对Java实现数组模拟队列零了解的情况下,建议先去阅读《JAVA实现数组模拟单向队列的思路及代码》一文,可以辅助理解本文核心思想。一、环形数组队列实现:让数组达到复用的效果,即:当我们从数组队列中取出了数据,那取出数据后后这个空间可......
  • 数组模拟单向队列的思路及代码
    JAVA实现数组模拟单向队列的思路及代码一、什么是队列?队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。进行插入操作的端称为队尾,进行删除操作的端称为队头。队列中没有元素时,称......
  • 09、OpenFoam中的PISO,SIMPLE和PIMPLE算法
    隐式:PISO半隐式:SIMPLE组合式:PIMPLE(PISO+SIMPLE)PISO算法PISO算法是一种常用于求解不可压缩流体流动问题的数值方法,它在OpenFOAM中被广泛应用。PISO算法的全称为PressureImplicitwithSplittingofOperators,即利用算子分裂的方法进行隐式求解压力和速度。PISO算法主要分为......
  • Chapter2 K-近邻算法案例
    案例1:使用K-近邻算法分类爱情片和动作片1.案例要求创建一个应用,应用K-近邻算法,将样本分到以下三种类别。1.不喜欢的人2.魅力一般的人3.极具魅力的人2.案例的执行流程示例:在约会网站上使用k-近邻算法(1)收集数据:提供文本文件......