首页 > 其他分享 >浅析Golang的层级时间轮实现方案

浅析Golang的层级时间轮实现方案

时间:2023-09-05 21:24:57浏览次数:57  
标签:层级 延时 Golang 任务 pq 时间 func 浅析 tickMs

文章目录

  • 时间轮介绍
    • 简单时间轮
    • 层级时间轮
      • kafka中的实现细节
  • 基于go语言的层级时间轮实现

 

一、时间轮介绍

  工作中,我们经常遇到到延时任务这类需求(例如用户开始一个任务,15分钟后给他发送一个通知奖励;用户下单未付款,三分钟后发送一条提醒消息...)。一般情况下,我们使用time.Timer对象完成工作,例如:

       // other code...
        go func() {
        timer := time.NewTimer(30 * time.Second)
        <-timer.C
        //延时执行的业务逻辑
    }()        

  在go语言运行时内置实现中,Timer对象的实现是使用小根堆这个数据结构。小根堆在写入,删除一个元素时,时间复杂度为O(log n)。当服务进程中,有100W个延时任务需要执行时,这100W个延时任务,写入到go运行时内置的Timer对象就是一个比较耗时,低效的操作了。而基于时间轮的任务写入、删除延时任务操作,可以将时间复杂度降低至O(1),从而高效地管理和触发这些延时任务,以满足复杂的调度需求。

  常见的时间轮有两种,分别是简单时间轮层级时间轮。下面参考Kafka的时间轮设计以及实现,分别讨论相关数据结构以及实现。

  (1)简单时间轮

  时间轮(TimeWheel)是一个存储延时任务的环形队列。队列内每个元素称为一个时间格(TimeBucket),可以存放一个任务列表(TimerTaskList),这个任务列表(TimerTaskList)则是一个环形双向链表,它的每一项表示的是延时任务项(TimerTaskEntity),代表真实的延时任务。所以,简单时间轮有大致这样的层级关系 TimeWheel -> m * TimerTaskList/TimeBucket -> n * TimerTaskEntity。(ps.时间轮的环形队列底层数据结构可以用数组来实现;TimerTaskList采用环形双向链表可以在O(1)时间复杂度去插入/删除TimerTaskEntity)

  时间轮的每一个时间格,代表了当前时间轮的基本时间跨度(tickMs)。所以,假设当前时间轮队列长度(wheelSize)为7,时间跨度为1秒,这个时间轮的总时间跨度(interval) = tickMs * wheelSize = 1s * 7 = 7s。(ps.下文时间轮队列长度统一简称队列长度。)

  最后,时间轮还存在一个表盘指针(currentTime),表示当前时间轮所处的时间。currentTime将整个时间轮划分为到期部分未到期部分。(ps. currentTime当前指向的时间格也属于到期部分,表示正在执行的时间格)

  当我们往时间轮写入一个任务时,就如下图所示

  这样,简单时间轮的雏形就出来了,用代码实现就如下所示:

// TimeWheel 时间轮对象
type TimeWheel struct {
    Buckets []Bucket // 时间格队列

    WheelSize int //时间轮格数量
    TickMs    int // 基本时间跨度

    CurrentTime int //表盘指针
    mu          sync.RWMutex
}

// Bucket 时间格
type Bucket struct {
    TaskList   *TimerTaskList
}

// TimerTaskList 任务列表,双向链表。这里直接使用go内置的List对象
type TimerTaskList = list.List

// TimerTaskEntity 抽象的具体任务
type TimerTaskEntity struct {
    DelayTime int
    Task      func()
}

  (1.1)简单时间轮的实现

  接下来,就是让这个时间轮跑起来。这里我们使用一个Ticker驱动这个时间轮,这个Ticker的执行周期为时间轮的基本时间跨度。我们在最开始,添加一个1s和3s,9s的延时任务,然后随着时间的推移,分别在2023/09/01 23:52:20,2023/09/01 23:52:22,2023/09/01 23:52:28 执行了三个延时任务。完整代码如下:

  1 // TimeWheel 时间轮对象
  2 type TimeWheel struct {
  3     Buckets []Bucket // 时间格队列
  4 
  5     WheelSize int //时间轮格数量
  6     TickMs    int // 基本时间跨度
  7 
  8     CurrentTime int //表盘指针
  9     mu          sync.RWMutex
 10 
 11     ticker    *time.Ticker
 12     startChan chan bool
 13     stopChan  chan bool
 14 }
 15 
 16 // Bucket 时间格
 17 type Bucket struct {
 18     TaskList *TimerTaskList
 19 }
 20 
 21 // TimerTaskList 任务列表,双向链表。这里直接使用go内置的List对象
 22 type TimerTaskList = list.List
 23 
 24 // TimerTaskEntity 抽象的具体任务
 25 type TimerTaskEntity struct {
 26     DelayTime int // 延时时间
 27     Task      func()
 28 }
 29 
 30 func NewTimeWheel(wheelSize, tickMs int) *TimeWheel {
 31     return &TimeWheel{
 32         Buckets:     make([]Bucket, wheelSize),
 33         WheelSize:   wheelSize,
 34         TickMs:      tickMs,
 35         CurrentTime: 0,
 36 
 37         startChan: make(chan bool, 1),
 38         stopChan:  make(chan bool, 1),
 39     }
 40 }
 41 
 42 func (t *TimeWheel) Start() {
 43     select {
 44     case t.startChan <- true:
 45 
 46     default:
 47         fmt.Println("timewheel is already running, exit")
 48         return
 49     }
 50     // 启动时间轮的内部定时器
 51     t.ticker = time.NewTicker(time.Duration(t.TickMs) * time.Second)
 52     go func() {
 53         for {
 54             select {
 55             case <-t.ticker.C:
 56                 t.handler()
 57             case <-t.stopChan:
 58                 return
 59             }
 60         }
 61     }()
 62 }
 63 
 64 func (t *TimeWheel) Stop() {
 65     select {
 66     case <-t.startChan:
 67         t.ticker.Stop()
 68         t.stopChan <- true
 69         return
 70     default:
 71         log.Println("timewheel has stopped,exit.")
 72         return
 73     }
 74 }
 75 
 76 func (t *TimeWheel) handler() {
 77     t.mu.Lock()
 78     defer t.mu.Unlock()
 79 
 80     // 时间轮转动
 81     t.CurrentTime++
 82     // 转动一圈后,指针复位
 83     if t.CurrentTime%t.WheelSize == 0 {
 84         t.CurrentTime = 0
 85     }
 86 
 87     //跳过没有任务的时间槽
 88     taskList := t.Buckets[t.CurrentTime].TaskList
 89     if taskList == nil {
 90         return
 91     }
 92     for e := taskList.Front(); e != nil; e = e.Next() {
 93         taskEntity, _ := e.Value.(*TimerTaskEntity)
 94         go taskEntity.Task()
 95     }
 96     // 删除时间槽的定时器链表
 97     t.Buckets[t.CurrentTime].TaskList = nil
 98 }
 99 
100 func (t *TimeWheel) AddTimerTaskEntity(entity *TimerTaskEntity) {
101 
102     t.mu.Lock()
103     defer t.mu.Unlock()
104     if len(t.startChan) == 0 {
105         log.Println(" timewheel has not been started")
106         return
107     }
108 
109     // 找到延时应该放在哪个时间格
110     index := (t.CurrentTime + entity.DelayTime) % t.WheelSize
111     bucket := t.Buckets[index]
112     if bucket.TaskList == nil {
113         bucket.TaskList = list.New()
114     }
115     // 延时任务放入时间格链表
116     bucket.TaskList.PushBack(entity)
117     t.Buckets[index] = bucket
118 }
简单时间轮Demo
 1 func TestTimeWheel(t *testing.T) {
 2     simpleTimeWheel := NewTimeWheel(10, 1)
 3     simpleTimeWheel.Start()
 4     simpleTimeWheel.AddTimerTaskEntity(&TimerTaskEntity{
 5         Task: func() {
 6             log.Println("this is delay 1 s task")
 7         },
 8         DelayTime: 1,
 9     })
10     simpleTimeWheel.AddTimerTaskEntity(&TimerTaskEntity{
11         Task: func() {
12             log.Println("this is delay 3 s task")
13         },
14         DelayTime: 3,
15     })
16     simpleTimeWheel.AddTimerTaskEntity(&TimerTaskEntity{
17         Task: func() {
18             log.Println("this is delay 9 s task")
19         },
20         DelayTime: 9,
21     })
22 
23     time.Sleep(time.Second * 20)
24     simpleTimeWheel.Stop()
25 
26 }
测试示例

   这里可以看到简单时间轮的局限性:时间轮一旦初始化完成,确定了时间轮的时间格长度后,就不能再添加超过总时间跨度的延时任务了(例如一个tickMs为1s,wheelSize为10的时间轮,假如写入一个15s的延时任务,就会跟5s延时任务有冲突)。此时解决方案之一是扩充wheelSize时间格长度,但是假如我要写入一个100W * tickMs后执行的延时任务。就需要把wheelSize扩充到100W,不仅浪费内存空间,而且整体拉低时间轮的执行效率。

  (2)层级时间轮

  针对简单时间轮的局限性,就引入层级时间轮的概念。当任务到期时间超过当前时间轮所表示的时间范围后,就尝试添把它加到上层时间轮中。上层时间轮为按需创建,且随着时间的推进,上层时间轮中的延时任务会被降级重新插入到下层时间轮中。ps.下文中数字越大,层数越高。例如1层时间轮的上层为2层时间轮)

  同样,上层时间轮也会有时间格(TimeBucket),任务列表(TimerTaskList),基本时间跨度(tickMs),队列长度(wheelSize),总时间跨度(interval)这些概念。但是第二层的基本时间跨度(tickMs-wheel2)第一层的总时间跨度(interval-wheel1)。假如时间轮当前时间轮和上层时间轮的队列长度都是相同的,那么上层时间轮总时间跨度为:

  •   interval-wheel2 = tickMs-wheel2 * wheelSize;
  • tickMs-wheel2 = interval-wheel1 = tickMs-wheel1 * wheelSize;

  举个例子,当前时间轮A基本时间跨度(tickMs)为1s,A的队列长度(wheelSize)为7。此时假如我们要添加一个15秒后执行的延时任务X。此时时间轮A已经不能满足条件,所以这个任务需要插入到上层时间轮B中。假设B和A的队列长度(wheelSize)相同。时间轮B的tickMs-B = tickMs-A * wheelSize-A = 7s。时间轮B的interval-B = tickMs-B * wheelSize-B = 49s。那么我们任务X将保存在什么地方呢?最终将保存到时间轮B中时间格2中所对应的任务列表(TimerTaskList)中。(队列的第三个元素,该时间格的时间跨度为[14s,21s) )。如下图:

  同理,假如我们要添加一个延时50s后执行的任务,就需要引入第三层的时间轮C。C的基本时间跨度 tickMs-C = tickMs-B * wheelSize-B。 (ps.这里可以想象一下手表的时,分,秒针。秒针的基本时间跨度为1s,队列长度为60,总时间跨度为秒针转一圈的时间60s;分针的基本时间跨度为1min = 1s * 60,队列长度为60,总时间跨度为分针转一圈的时间60min;时针的基本时间跨度为1h = 1min * 60,队列长度为12,总时间跨度为时针转一圈的时间12h。)

  接下来,就要让我们的层级时间轮运行起来了。随着时间的流逝,时间轮B表盘指针往前移动,当指向时间格2的时候,会把任务X重新提交到时间轮A的时间格1中(该时间格的时间跨度为[1s,2s) ,因为时间轮B走了两格,所以任务X的剩余时间为15s - 7s * 2 = 1s),这个行为称为时间轮的降级操作。之后再经历1s,任务X真正到期,最终被执行。

  这里还有些实现的细节需要注意下。

  • 除了第一层时间轮以外,其余高层时间轮的起始时间(startMs)必须是创建此层时间轮时前一层时间轮的表盘指针(currentTime)。
  • 每一层时间轮的表盘指针(currentTime)必须为基本时间跨度(tickMs)的整数倍。如果不满足,将执行一个修剪操作,将currentTime修剪为tickMs整数倍,并以此为依据与时间格的到期时间范围对应起来。假设某一时刻的时间为timerMs,那么具体的修剪行为的公式为 currentTime = timerMs - ( timerMs % tickMs )。(ps. currentTime始终为tickMs的整数倍)例如在加入延时任务X后的第9秒这个时刻,时间轮B的currentTime为 9 - ( 9 % 7 ) = 7s。
  • 每一层时间轮中都包含一个引用(overflowWheel),它指向跟高一层的时间轮。所以客户端只有创建的第一层时间轮,根据该引用间接持有各个层级时间轮的引用。

  (2.1)关于“空推进”问题

  我们简单时间轮中,使用了一个ticker来驱动时间轮的运转,在TimeWheel的handler()函数中,可以看到这样一行代码:

func (t *TimeWheel) handler() {
        // other code ...
      

        //跳过没有任务的时间槽
    taskList := t.Buckets[t.CurrentTime].TaskList
    if taskList == nil {
        return
    }    
        
        // other code ...  
}    

  假设我们现在有一个tickMs为1s的,wheelSize为1000的时间轮,现在往这个时间轮里面写入两个延时任务。第一个延时任务为200s之后执行,第二个延时任务为850s之后执行。那么在执行第一个延时任务,需要让ticker驱动200次,才能执行到第一个任务,且这200次驱动推进中,前199次为“空推进”。第二个延时任务执行时,又需要649次空推进。这样会无辜消耗机器的性能资源。  

  (2.2)kafka中如何解决“空推进”

  针对上面“空推进”这个问题,Kafka的延时队列使用了JDK(JAVA语言)的一个叫做DelayQueue的队列来协助推进时间轮。DelayQueue如名字所示,是一个延迟队列,具有队列的所有特性。策略如下:

  • Kafka把所有任务项(TimerTaskEntity)对应的任务列表(TimerTaskList)都加入到DelayQueue中。每一个任务列表(TimerTaskList)都有一个expiration(到期时间),该时间为 时间轮当前时间(currentTime) + 延时任务的延时时间(TimerTaskEntity.DelayTime)。依据这个时间做一个排序,将expiration(到期时间)最短任务列表(TimerTaskList)排在DelayQueue的队头,其他任务列表按照超时时间依次入队。这样,DelayQueue队列中各个元素就是按照expiration排好序的任务列表(TimerTaskList)。
  • 然后再单独开启一个线程“ExpiredOperationReaper”(过期收割机线程?),通过队头出队的方式来获取DelayQueue队列中到期的任务(获取队头元素的时间复杂度为O(1),获取到队头元素后,会切换新的队头元素)。
  • 最后,根据ExpiredOperationReaper获取到任务列表(TimerTaskList),既可以根据它的expiration(到期时间来推进时间轮时间,又可以对任务列表(TimerTaskList)中的任务项(TimerTaskEntity)做相应操作(降级 or 执行)。 总结下就是,这里使用时间轮(TimeWheel)来完成对 任务项(TimerTaskEntity)插入,删除等操作,使用DelayQueue辅助来完成对时间轮“精准推进”操作。  

 

二、基于go语言的层级时间轮实现

  首先,实现延时队列(DelayQueue)。DelayQueue中的priorityQueue使用小根堆实现,配合过期时间作为延迟优先级,保证了队头元素是最早到期的延时任务。Poll函数完成“精准推进”操作。如下所示:

  1 type item struct {
  2     Value    interface{}
  3     Priority int64
  4     Index    int
  5 }
  6 
  7 type priorityQueue []*item
  8 
  9 func newPriorityQueue(capacity int) priorityQueue {
 10     return make(priorityQueue, 0, capacity)
 11 }
 12 
 13 func (pq priorityQueue) Len() int {
 14     return len(pq)
 15 }
 16 
 17 func (pq priorityQueue) Less(i, j int) bool {
 18     return pq[i].Priority < pq[j].Priority
 19 }
 20 
 21 func (pq priorityQueue) Swap(i, j int) {
 22     pq[i], pq[j] = pq[j], pq[i]
 23     pq[i].Index = i
 24     pq[j].Index = j
 25 }
 26 
 27 func (pq *priorityQueue) Push(x interface{}) {
 28     n := len(*pq)
 29     c := cap(*pq)
 30     if n+1 > c {
 31         npq := make(priorityQueue, n, c*2)
 32         copy(npq, *pq)
 33         *pq = npq
 34     }
 35     *pq = (*pq)[0 : n+1]
 36     item := x.(*item)
 37     item.Index = n
 38     (*pq)[n] = item
 39 }
 40 
 41 func (pq *priorityQueue) Pop() interface{} {
 42     n := len(*pq)
 43     c := cap(*pq)
 44     if n < (c/2) && c > 25 {
 45         npq := make(priorityQueue, n, c/2)
 46         copy(npq, *pq)
 47         *pq = npq
 48     }
 49     item := (*pq)[n-1]
 50     item.Index = -1
 51     *pq = (*pq)[0 : n-1]
 52     return item
 53 }
 54 
 55 func (pq *priorityQueue) PeekAndShift(max int64) (*item, int64) {
 56     if pq.Len() == 0 {
 57         return nil, 0
 58     }
 59 
 60     item := (*pq)[0]
 61     if item.Priority > max {
 62         return nil, item.Priority - max
 63     }
 64     heap.Remove(pq, 0)
 65 
 66     return item, 0
 67 }
 68 
 69 // DelayQueue 小根堆实现的优先级队列
 70 type DelayQueue struct {
 71     C chan interface{}
 72 
 73     mu sync.Mutex
 74     pq priorityQueue
 75 
 76     sleeping int32
 77     wakeupC  chan struct{}
 78 }
 79 
 80 func NewDelayQueue(size int) *DelayQueue {
 81     return &DelayQueue{
 82         C:       make(chan interface{}),
 83         pq:      newPriorityQueue(size),
 84         wakeupC: make(chan struct{}),
 85     }
 86 }
 87 
 88 // Offer 写入一个指定到期时间的元素到当前的延时队列
 89 func (dq *DelayQueue) Offer(elem interface{}, expiration int64) {
 90     item := &item{
 91         Value:    elem,
 92         Priority: expiration,
 93     }
 94 
 95     dq.mu.Lock()
 96     heap.Push(&dq.pq, item)
 97     index := item.Index
 98     dq.mu.Unlock()
 99 
100     // 假如新写入的元素是最早到期的元素,则尝试唤醒等待调用Poll函数的调用者
101     if index == 0 {
102         if atomic.CompareAndSwapInt32(&dq.sleeping, 1, 0) {
103             dq.wakeupC <- struct{}{}
104         }
105     }
106 }
107 
108 // Poll 无限循环的获取一个元素
109 func (dq *DelayQueue) Poll(exitC chan struct{}, nowF func() int64) {
110     for {
111         // 获取当前时间
112         now := nowF()
113 
114         dq.mu.Lock()
115         // 查看并弹出到期元素
116         item, delta := dq.pq.PeekAndShift(now)
117         if item == nil {
118             // 假如没有到期元素,则重置延时队列睡眠状态。同时保证Poll 和 Offer操作原子性
119             atomic.StoreInt32(&dq.sleeping, 1)
120         }
121         dq.mu.Unlock()
122 
123         if item == nil {
124             if delta == 0 {
125                 // 没有元素到期
126                 select {
127                 case <-dq.wakeupC:
128                     // 等待第一个写入的元素
129                     continue
130                 case <-exitC:
131                     goto exit
132                 }
133             } else if delta > 0 {
134                 // delta > 0 , 延时队列中最少有一个item在等待处理中
135                 select {
136                 case <-dq.wakeupC:
137                     continue
138                 case <-time.After(time.Duration(delta) * time.Second):
139                     // 等待队列中"最早"的元素到期
140                     if atomic.SwapInt32(&dq.sleeping, 0) == 0 {
141                         <-dq.wakeupC
142                     }
143                     continue
144                 case <-exitC:
145                     goto exit
146                 }
147             }
148         }
149 
150         select {
151         case dq.C <- item.Value: //取到到期的元素,通过延时队列C channel 发送到期元素
152         case <-exitC:
153             goto exit
154         }
155     }
156 
157 exit:
158     // Reset the states
159     atomic.StoreInt32(&dq.sleeping, 0)
160 }
延时队列(DelayQueue)

  接下来是实现延时任务对象(TimerTaskEntity),以及时间格对象(Bucket)。如下所示:

 1 // TimerTaskEntity 延时任务
 2 type TimerTaskEntity struct {
 3     DelayTime int64 // 延时时间
 4     Task      func()
 5 
 6     b unsafe.Pointer // type: *bucket  保存当前延时任务所在的时间格,使用桶指针,可通过原子操作并发更新/读取
 7 
 8     element *list.Element // 延时任务所在的双向链表中的节点元素
 9 
10 }
11 
12 func (t *TimerTaskEntity) getBucket() *Bucket {
13     return (*Bucket)(atomic.LoadPointer(&t.b))
14 }
15 
16 func (t *TimerTaskEntity) setBucket(b *Bucket) {
17     atomic.StorePointer(&t.b, unsafe.Pointer(b))
18 }
19 
20 // Stop 停止延时任务的执行
21 func (t *TimerTaskEntity) Stop() bool {
22     stopped := false
23     for b := t.getBucket(); b != nil; b = t.getBucket() {
24         // 如果时间格尚未过期/执行,则从时间格中删除这个延时任务
25         stopped = b.Remove(t)
26     }
27     return stopped
28 }
29 
30 // Bucket 时间格
31 type Bucket struct {
32     expiration int64 // 时间格的到期时间,这个时间是时间格内存储定时任务的到期时间
33 
34     mu       sync.Mutex
35     TaskList *TimerTaskList
36 }
37 
38 // TimerTaskList 任务列表,双向链表。这里直接使用go内置的List对象
39 type TimerTaskList = list.List
40 
41 func NewBucket() *Bucket {
42     return &Bucket{
43         TaskList:   list.New(),
44         expiration: -1,
45     }
46 }
47 
48 func (b *Bucket) Expiration() int64 {
49     return atomic.LoadInt64(&b.expiration)
50 }
51 
52 func (b *Bucket) SetExpiration(expiration int64) bool {
53     return atomic.SwapInt64(&b.expiration, expiration) != expiration
54 }
55 
56 func (b *Bucket) Add(t *TimerTaskEntity) {
57     b.mu.Lock()
58 
59     e := b.TaskList.PushBack(t)
60     t.setBucket(b)
61     t.element = e
62 
63     b.mu.Unlock()
64 }
65 
66 func (b *Bucket) remove(t *TimerTaskEntity) bool {
67     // 检查当前延时任务是否属于当前桶
68     if t.getBucket() != b {
69         return false
70     }
71     b.TaskList.Remove(t.element)
72     t.setBucket(nil)
73     t.element = nil
74     return true
75 }
76 
77 func (b *Bucket) Remove(t *TimerTaskEntity) bool {
78     b.mu.Lock()
79     defer b.mu.Unlock()
80     return b.remove(t)
81 }
82 
83 // Flush 延时任务降级,重新插入到下层时间轮中
84 func (b *Bucket) Flush(reinsert func(*TimerTaskEntity)) {
85     b.mu.Lock()
86     defer b.mu.Unlock()
87 
88     for e := b.TaskList.Front(); e != nil; {
89         next := e.Next()
90         t := e.Value.(*TimerTaskEntity)
91         b.remove(t)
92         reinsert(t)
93         e = next
94     }
95     // 当前桶所有延时任务降级完成后,该桶过期时间重置为-1,该桶不再有效
96     b.SetExpiration(-1)
97 }
延时任务&时间格

  最后,基于任务对象(TimerTaskEntity),时间格对象(Bucket)还有延时队列(DelayQueue)完成对时间轮的实现。其中,一个延时任务的写入,会在写入到时间轮的同时,以过期时间为优先级参照写入到延时队列中。时间轮启动时,会开启两个协程分别执行从延时队列中取出最近一次到期的任务(DelayQueue的Poll函数);和根据获取到的任务的过期时间,去推进时间轮指针转动,以及去触发相应的延时任务(真实执行 or 任务降级)操作。如下:

 1 // truncate returns the result of rounding x toward zero to a multiple of m.
 2 // If m <= 0, Truncate returns x unchanged.
 3 func truncate(x, m int64) int64 {
 4     if m <= 0 {
 5         return x
 6     }
 7     return x - x%m
 8 }
 9 
10 func timeToS(t time.Time) int64 {
11     return t.UnixNano() / int64(time.Second)
12 }
13 
14 func sToTime(t int64) time.Time {
15     return time.Unix(0, t*int64(time.Second)).UTC()
16 }
17 
18 type waitGroupWrapper struct {
19     sync.WaitGroup
20 }
21 
22 func (w *waitGroupWrapper) Wrap(cb func()) {
23     w.Add(1)
24     go func() {
25         cb()
26         w.Done()
27     }()
28 }
时间相关转换函数
  1 type TimingWheel struct {
  2     tickMs    int64 //基本时间跨度
  3     wheelSize int64 // 时间轮队列长度
  4 
  5     interval    int64       // 总跨度
  6     currentTime int64       //表盘指针
  7     buckets     []*Bucket   // 时间格队列
  8     queue       *DelayQueue // 延时队列
  9 
 10     // 上层时间轮引用 可以通过Add函数去并发的读写
 11     overflowWheel unsafe.Pointer // type: *TimingWheel
 12 
 13     exitC     chan struct{}
 14     waitGroup waitGroupWrapper
 15 }
 16 
 17 func NewTimingWheel(tickMs time.Duration, wheelSize int64) *TimingWheel {
 18     tick := int64(tickMs / time.Second)
 19     if tickMs <= 0 {
 20         panic(errors.New("tickMs must be greater than or equal to 1s"))
 21     }
 22 
 23     startMs := timeToS(time.Now().UTC())
 24     return newTimingWheel(tick, wheelSize, startMs, NewDelayQueue(int(wheelSize)))
 25 }
 26 
 27 func newTimingWheel(tickMs int64, wheelSize int64, startMs int64, queue *DelayQueue) *TimingWheel {
 28     buckets := make([]*Bucket, wheelSize)
 29     for i := range buckets {
 30         buckets[i] = NewBucket()
 31     }
 32 
 33     return &TimingWheel{
 34         tickMs:      tickMs,
 35         wheelSize:   wheelSize,
 36         currentTime: truncate(startMs, tickMs),
 37         interval:    tickMs * wheelSize,
 38         buckets:     buckets,
 39         queue:       queue,
 40         exitC:       make(chan struct{}),
 41     }
 42 }
 43 
 44 // 增加延时任务至时间轮
 45 func (tw *TimingWheel) add(t *TimerTaskEntity) bool {
 46     currentTime := atomic.LoadInt64(&tw.currentTime)
 47     if t.DelayTime < currentTime+tw.tickMs {
 48         return false
 49     } else if t.DelayTime < currentTime+tw.interval {
 50         // 写入到当前时间轮
 51         virtualID := t.DelayTime / tw.tickMs
 52         b := tw.buckets[virtualID%tw.wheelSize]
 53         b.Add(t)
 54 
 55         // 当前时间格写入到延时队列 ps. 延时队列是小根堆,时间格的过期时间是优先级参照,所以,越早到期的时间格越在队头
 56         if b.SetExpiration(t.DelayTime) {
 57             tw.queue.Offer(b, b.Expiration())
 58         }
 59         return true
 60     } else {
 61         // 超出当前时间轮最大范畴,写入到上层时间轮
 62         overflowWheel := atomic.LoadPointer(&tw.overflowWheel)
 63         // 上层时间轮不存在 则创建新的时间轮。新时间轮的ticketMs = 当前时间轮的interval ;上层时间轮的开始时间startMs为当前时间轮表盘指针时间
 64         if overflowWheel == nil {
 65             atomic.CompareAndSwapPointer(&tw.overflowWheel, nil, unsafe.Pointer(newTimingWheel(tw.interval, tw.wheelSize, currentTime, tw.queue)))
 66             overflowWheel = atomic.LoadPointer(&tw.overflowWheel)
 67         }
 68         return (*TimingWheel)(overflowWheel).add(t)
 69     }
 70 }
 71 
 72 func (tw *TimingWheel) addOrRun(t *TimerTaskEntity) {
 73     if !tw.add(t) {
 74         // 如果无法添加,则表示该延时任务执行时间小于当前时间轮表盘指针指向的时间(换句话说,该延时任务已过期),则立即执行
 75         go t.Task()
 76     }
 77 }
 78 
 79 // advanceClock 推进时间轮时钟到指定的过期时间
 80 func (tw *TimingWheel) advanceClock(expiration int64) {
 81     currentTime := atomic.LoadInt64(&tw.currentTime)
 82     if expiration >= currentTime+tw.tickMs {
 83         currentTime = truncate(expiration, tw.tickMs)
 84         atomic.StoreInt64(&tw.currentTime, currentTime)
 85 
 86         overflowWheel := atomic.LoadPointer(&tw.overflowWheel)
 87         if overflowWheel != nil {
 88             (*TimingWheel)(overflowWheel).advanceClock(currentTime)
 89         }
 90     }
 91 }
 92 
 93 // Start 时间轮启动
 94 // 开启两个协程
 95 //   - 协程1不断地从延时队列中取出最近一次到期的任务;
 96 //   - 协程2根据获取到的任务的过期时间,去推进时间轮指针转动,以及去触发相应的延时任务(真实执行 or 任务降级)
 97 func (tw *TimingWheel) Start() {
 98     tw.waitGroup.Wrap(func() {
 99         tw.queue.Poll(tw.exitC, func() int64 {
100             return timeToS(time.Now().UTC())
101         })
102     })
103 
104     tw.waitGroup.Wrap(func() {
105         for {
106             select {
107             case elem := <-tw.queue.C:
108                 b := elem.(*Bucket)
109                 tw.advanceClock(b.Expiration())
110                 b.Flush(tw.addOrRun)
111             case <-tw.exitC:
112                 return
113             }
114         }
115     })
116 }
117 
118 func (tw *TimingWheel) Stop() {
119     close(tw.exitC)
120     tw.waitGroup.Wait()
121 }
122 
123 func (tw *TimingWheel) AfterFunc(d time.Duration, f func()) *TimerTaskEntity {
124     t := &TimerTaskEntity{
125         DelayTime: timeToS(time.Now().UTC().Add(d)),
126         Task:      f,
127     }
128     tw.addOrRun(t)
129     return t
130 }
时间轮相关实现

  最后看一下执行结果:

 1 func TestTimingWheel_AfterFunc(t *testing.T) {
 2     tw := NewTimingWheel(time.Second, 10)
 3     tw.Start()
 4     log.Printf("启动时间轮 \n")
 5     defer tw.Stop()
 6 
 7     durations := []time.Duration{
 8         //1 * time.Second,
 9         5 * time.Second,
10         2 * time.Second,
11         //50 * time.Second,
12         1 * time.Minute,
13     }
14 
15     for _, d := range durations {
16         dTime := d
17         tw.AfterFunc(d, func() {
18             log.Printf("this is delay %v task \n", dTime)
19         })
20 
21         if dTime == 5*time.Second {
22             tw.AfterFunc(dTime, func() {
23                 log.Printf("this is delay %v task2 \n", dTime)
24             })
25         }
26 
27     }
28 
29     time.Sleep(time.Second * 100)
30 }
测试示例

   

   资料参考:

  https://book.douban.com/subject/30437872

  http://russellluo.com/2018/10/golang-implementation-of-hierarchical-timing-wheels.html

  

        

标签:层级,延时,Golang,任务,pq,时间,func,浅析,tickMs
From: https://www.cnblogs.com/liumengchen-boke/p/17671223.html

相关文章

  • golang锁浅谈
    在Go语言中,有以下几种常用的锁类型:互斥锁(Mutex)互斥锁是最常用的一种锁机制,用于保护共享资源在并发访问时的互斥操作。常见的用法如下:varmutexsync.Mutex​//通过Lock()和Unlock()方法保护共享资源的临界区mutex.Lock()//执行对共享资源的操作mutex.Unlock()对于syn......
  • Golang匿名函数浅谈
    Go匿名函数(闭包)在Go中,匿名函数(也称为闭包)可以捕获外部变量。Go的闭包是指一个函数值(函数变量)包含了对其外部作用域中变量的引用。匿名函数可以访问和修改其外部作用域中的变量。它可以捕获外部变量的值,并在函数体中使用这些变量。下面是一个示例,展示了如何在匿名函数中捕......
  • golang接口用法浅谈
    类型接口Go不是面向对象的语言,在go里通过不同的结构体实现同一组公共接口这种组合的形式实现多态,类似C++的类和虚函数定义类型接口(InterfaceDefinition):使用type关键字定义接口,指定接口的方法签名。方法签名由方法的名称、参数列表和返回值组成,但不包含方法体。接口......
  • Golang Gorm 一对多查询 preload预加载
    预加载示例GORM允许使用 Preload通过多个SQL中来直接加载关系,例如:typeUserstruct{gorm.ModelUsernamestringOrders[]Order}typeOrderstruct{gorm.ModelUserIDuintPricefloat64}//查找user时预加载相关Orderdb.Preload("Orders").Fin......
  • 海域可视化监管:浅析海域动态远程视频智能监管平台的构建方案
    一、方案背景随着科技的不断进步,智慧海域管理平台已经成为海洋领域监管的一种重要工具。相比传统的视频监控方式,智慧海域管理平台通过建设近岸海域视频监控网、海洋环境监测网和海上目标探测网络等,可实现海洋管理的数字化转型。传统的监控方式往往需要大量人力物力,而智慧海域管理平......
  • golang编译go build -ldflags "-s -w"的 解释
    gobuild-ldflags"-s-w" 是一个Go语言的构建命令,其中使用了 -ldflags 参数来传递一些额外的链接器标志。这个命令中,-ldflags"-s-w" 传递了两个标志:-s:该标志会禁止生成可执行文件中的符号表信息,这样在执行文件时就不会暴露源代码中的函数名、变量名等符号信息。这有......
  • golang base64解码
    解码过程1.使用标准库的base64.StdEncoding.DecodeString 最开始是印象标准库有一个base64.StdEncoding.DecodeString方法可以解码,就直接使用了这个方法packagemainimport("encoding/base64""fmt")funcmain(){encrypt:="Cf1WA2nBMo3H9G2UPhlLBBVB......
  • golang realize数据库简介
    存储与数据库简介一个提供了读写,控制类接口,能够安全有效的把数据持久化的软件,就可以成为存储系统。-存储系统概览存储系统特点性能敏感既简单又复杂容易受硬件影响存储器层级结构单机存储栈RAID技术单块大容量磁盘的价格>多块小容量的磁盘单块磁盘的写入性能<多块磁盘的并发写入......
  • 浅析常用的Python Web的几大框架
    在各种语言平台中,python涌现的web框架恐怕是最多的,是一个百花齐放的世界,各种micro-framework、framework不可胜数;猜想原因应该是在python中构造框架十分简单,使得轮子不断被发明。所 以在Python社区总有关于Python框架孰优孰劣的话题。下面就给大家介绍一下python的几大框架: Djan......
  • jetbrains GoLang设置编写proto文件的实时模板
    具体步骤1. 首先,先创建一个模板组,我这里创建为"proto"。2.下面这张图是我的模板组中的内容3.具体实时模板缩写:enum描述:enumname{}模板文字:enum$name${$END$}适用于:协议缓存区中的Other缩写:import描述:import"";模板文字:import"$file$";$END$适用......