文章目录
- 时间轮介绍
- 简单时间轮
- 层级时间轮
- kafka中的实现细节
- 基于go语言的层级时间轮实现
一、时间轮介绍
工作中,我们经常遇到到延时任务这类需求(例如用户开始一个任务,15分钟后给他发送一个通知奖励;用户下单未付款,三分钟后发送一条提醒消息...)。一般情况下,我们使用time.Timer对象完成工作,例如:
// other code... go func() { timer := time.NewTimer(30 * time.Second) <-timer.C //延时执行的业务逻辑 }()
在go语言运行时内置实现中,Timer对象的实现是使用小根堆这个数据结构。小根堆在写入,删除一个元素时,时间复杂度为O(log n)。当服务进程中,有100W个延时任务需要执行时,这100W个延时任务,写入到go运行时内置的Timer对象就是一个比较耗时,低效的操作了。而基于时间轮的任务写入、删除延时任务操作,可以将时间复杂度降低至O(1),从而高效地管理和触发这些延时任务,以满足复杂的调度需求。
常见的时间轮有两种,分别是简单时间轮 和 层级时间轮。下面参考Kafka的时间轮设计以及实现,分别讨论相关数据结构以及实现。
(1)简单时间轮
时间轮(TimeWheel)是一个存储延时任务的环形队列。队列内每个元素称为一个时间格(TimeBucket),可以存放一个任务列表(TimerTaskList),这个任务列表(TimerTaskList)则是一个环形双向链表,它的每一项表示的是延时任务项(TimerTaskEntity),代表真实的延时任务。所以,简单时间轮有大致这样的层级关系 TimeWheel -> m * TimerTaskList/TimeBucket -> n * TimerTaskEntity。(ps.时间轮的环形队列底层数据结构可以用数组来实现;TimerTaskList采用环形双向链表可以在O(1)时间复杂度去插入/删除TimerTaskEntity)
时间轮的每一个时间格,代表了当前时间轮的基本时间跨度(tickMs)。所以,假设当前时间轮队列长度(wheelSize)为7,时间跨度为1秒,这个时间轮的总时间跨度(interval) = tickMs * wheelSize = 1s * 7 = 7s。(ps.下文时间轮队列长度统一简称队列长度。)
最后,时间轮还存在一个表盘指针(currentTime),表示当前时间轮所处的时间。currentTime将整个时间轮划分为到期部分和未到期部分。(ps. currentTime当前指向的时间格也属于到期部分,表示正在执行的时间格)
当我们往时间轮写入一个任务时,就如下图所示
这样,简单时间轮的雏形就出来了,用代码实现就如下所示:
// TimeWheel 时间轮对象 type TimeWheel struct { Buckets []Bucket // 时间格队列 WheelSize int //时间轮格数量 TickMs int // 基本时间跨度 CurrentTime int //表盘指针 mu sync.RWMutex } // Bucket 时间格 type Bucket struct { TaskList *TimerTaskList } // TimerTaskList 任务列表,双向链表。这里直接使用go内置的List对象 type TimerTaskList = list.List // TimerTaskEntity 抽象的具体任务 type TimerTaskEntity struct { DelayTime int Task func() }
(1.1)简单时间轮的实现
接下来,就是让这个时间轮跑起来。这里我们使用一个Ticker驱动这个时间轮,这个Ticker的执行周期为时间轮的基本时间跨度。我们在最开始,添加一个1s和3s,9s的延时任务,然后随着时间的推移,分别在2023/09/01 23:52:20,2023/09/01 23:52:22,2023/09/01 23:52:28 执行了三个延时任务。完整代码如下:
1 // TimeWheel 时间轮对象 2 type TimeWheel struct { 3 Buckets []Bucket // 时间格队列 4 5 WheelSize int //时间轮格数量 6 TickMs int // 基本时间跨度 7 8 CurrentTime int //表盘指针 9 mu sync.RWMutex 10 11 ticker *time.Ticker 12 startChan chan bool 13 stopChan chan bool 14 } 15 16 // Bucket 时间格 17 type Bucket struct { 18 TaskList *TimerTaskList 19 } 20 21 // TimerTaskList 任务列表,双向链表。这里直接使用go内置的List对象 22 type TimerTaskList = list.List 23 24 // TimerTaskEntity 抽象的具体任务 25 type TimerTaskEntity struct { 26 DelayTime int // 延时时间 27 Task func() 28 } 29 30 func NewTimeWheel(wheelSize, tickMs int) *TimeWheel { 31 return &TimeWheel{ 32 Buckets: make([]Bucket, wheelSize), 33 WheelSize: wheelSize, 34 TickMs: tickMs, 35 CurrentTime: 0, 36 37 startChan: make(chan bool, 1), 38 stopChan: make(chan bool, 1), 39 } 40 } 41 42 func (t *TimeWheel) Start() { 43 select { 44 case t.startChan <- true: 45 46 default: 47 fmt.Println("timewheel is already running, exit") 48 return 49 } 50 // 启动时间轮的内部定时器 51 t.ticker = time.NewTicker(time.Duration(t.TickMs) * time.Second) 52 go func() { 53 for { 54 select { 55 case <-t.ticker.C: 56 t.handler() 57 case <-t.stopChan: 58 return 59 } 60 } 61 }() 62 } 63 64 func (t *TimeWheel) Stop() { 65 select { 66 case <-t.startChan: 67 t.ticker.Stop() 68 t.stopChan <- true 69 return 70 default: 71 log.Println("timewheel has stopped,exit.") 72 return 73 } 74 } 75 76 func (t *TimeWheel) handler() { 77 t.mu.Lock() 78 defer t.mu.Unlock() 79 80 // 时间轮转动 81 t.CurrentTime++ 82 // 转动一圈后,指针复位 83 if t.CurrentTime%t.WheelSize == 0 { 84 t.CurrentTime = 0 85 } 86 87 //跳过没有任务的时间槽 88 taskList := t.Buckets[t.CurrentTime].TaskList 89 if taskList == nil { 90 return 91 } 92 for e := taskList.Front(); e != nil; e = e.Next() { 93 taskEntity, _ := e.Value.(*TimerTaskEntity) 94 go taskEntity.Task() 95 } 96 // 删除时间槽的定时器链表 97 t.Buckets[t.CurrentTime].TaskList = nil 98 } 99 100 func (t *TimeWheel) AddTimerTaskEntity(entity *TimerTaskEntity) { 101 102 t.mu.Lock() 103 defer t.mu.Unlock() 104 if len(t.startChan) == 0 { 105 log.Println(" timewheel has not been started") 106 return 107 } 108 109 // 找到延时应该放在哪个时间格 110 index := (t.CurrentTime + entity.DelayTime) % t.WheelSize 111 bucket := t.Buckets[index] 112 if bucket.TaskList == nil { 113 bucket.TaskList = list.New() 114 } 115 // 延时任务放入时间格链表 116 bucket.TaskList.PushBack(entity) 117 t.Buckets[index] = bucket 118 }简单时间轮Demo
1 func TestTimeWheel(t *testing.T) { 2 simpleTimeWheel := NewTimeWheel(10, 1) 3 simpleTimeWheel.Start() 4 simpleTimeWheel.AddTimerTaskEntity(&TimerTaskEntity{ 5 Task: func() { 6 log.Println("this is delay 1 s task") 7 }, 8 DelayTime: 1, 9 }) 10 simpleTimeWheel.AddTimerTaskEntity(&TimerTaskEntity{ 11 Task: func() { 12 log.Println("this is delay 3 s task") 13 }, 14 DelayTime: 3, 15 }) 16 simpleTimeWheel.AddTimerTaskEntity(&TimerTaskEntity{ 17 Task: func() { 18 log.Println("this is delay 9 s task") 19 }, 20 DelayTime: 9, 21 }) 22 23 time.Sleep(time.Second * 20) 24 simpleTimeWheel.Stop() 25 26 }测试示例
这里可以看到简单时间轮的局限性:时间轮一旦初始化完成,确定了时间轮的时间格长度后,就不能再添加超过总时间跨度的延时任务了(例如一个tickMs为1s,wheelSize为10的时间轮,假如写入一个15s的延时任务,就会跟5s延时任务有冲突)。此时解决方案之一是扩充wheelSize时间格长度,但是假如我要写入一个100W * tickMs后执行的延时任务。就需要把wheelSize扩充到100W,不仅浪费内存空间,而且整体拉低时间轮的执行效率。
(2)层级时间轮
针对简单时间轮的局限性,就引入层级时间轮的概念。当任务到期时间超过当前时间轮所表示的时间范围后,就尝试添把它加到上层时间轮中。上层时间轮为按需创建,且随着时间的推进,上层时间轮中的延时任务会被降级重新插入到下层时间轮中。(ps.下文中数字越大,层数越高。例如1层时间轮的上层为2层时间轮)
同样,上层时间轮也会有时间格(TimeBucket),任务列表(TimerTaskList),基本时间跨度(tickMs),队列长度(wheelSize),总时间跨度(interval)等这些概念。但是第二层的基本时间跨度(tickMs-wheel2)为第一层的总时间跨度(interval-wheel1)。假如时间轮当前时间轮和上层时间轮的队列长度都是相同的,那么上层时间轮的总时间跨度为:
- interval-wheel2 = tickMs-wheel2 * wheelSize;
- tickMs-wheel2 = interval-wheel1 = tickMs-wheel1 * wheelSize;
举个例子,当前时间轮A的基本时间跨度(tickMs)为1s,A的队列长度(wheelSize)为7。此时假如我们要添加一个15秒后执行的延时任务X。此时时间轮A已经不能满足条件,所以这个任务需要插入到上层时间轮B中。假设B和A的队列长度(wheelSize)相同。时间轮B的tickMs-B = tickMs-A * wheelSize-A = 7s。时间轮B的interval-B = tickMs-B * wheelSize-B = 49s。那么我们任务X将保存在什么地方呢?最终将保存到时间轮B中时间格2中所对应的任务列表(TimerTaskList)中。(队列的第三个元素,该时间格的时间跨度为[14s,21s) )。如下图:
同理,假如我们要添加一个延时50s后执行的任务,就需要引入第三层的时间轮C。C的基本时间跨度 tickMs-C = tickMs-B * wheelSize-B。 (ps.这里可以想象一下手表的时,分,秒针。秒针的基本时间跨度为1s,队列长度为60,总时间跨度为秒针转一圈的时间60s;分针的基本时间跨度为1min = 1s * 60,队列长度为60,总时间跨度为分针转一圈的时间60min;时针的基本时间跨度为1h = 1min * 60,队列长度为12,总时间跨度为时针转一圈的时间12h。)
接下来,就要让我们的层级时间轮运行起来了。随着时间的流逝,时间轮B的表盘指针往前移动,当指向时间格2的时候,会把任务X重新提交到时间轮A的时间格1中(该时间格的时间跨度为[1s,2s) ,因为时间轮B走了两格,所以任务X的剩余时间为15s - 7s * 2 = 1s),这个行为称为时间轮的降级操作。之后再经历1s,任务X真正到期,最终被执行。
这里还有些实现的细节需要注意下。
- 除了第一层时间轮以外,其余高层时间轮的起始时间(startMs)必须是创建此层时间轮时前一层时间轮的表盘指针(currentTime)。
- 每一层时间轮的表盘指针(currentTime)必须为基本时间跨度(tickMs)的整数倍。如果不满足,将执行一个修剪操作,将currentTime修剪为tickMs整数倍,并以此为依据与时间格的到期时间范围对应起来。假设某一时刻的时间为timerMs,那么具体的修剪行为的公式为 currentTime = timerMs - ( timerMs % tickMs )。(ps. currentTime始终为tickMs的整数倍)例如在加入延时任务X后的第9秒这个时刻,时间轮B的currentTime为 9 - ( 9 % 7 ) = 7s。
- 每一层时间轮中都包含一个引用(overflowWheel),它指向跟高一层的时间轮。所以客户端只有创建的第一层时间轮,根据该引用间接持有各个层级时间轮的引用。
(2.1)关于“空推进”问题
我们简单时间轮中,使用了一个ticker来驱动时间轮的运转,在TimeWheel的handler()函数中,可以看到这样一行代码:
func (t *TimeWheel) handler() { // other code ... //跳过没有任务的时间槽 taskList := t.Buckets[t.CurrentTime].TaskList if taskList == nil { return } // other code ... }
假设我们现在有一个tickMs为1s的,wheelSize为1000的时间轮,现在往这个时间轮里面写入两个延时任务。第一个延时任务为200s之后执行,第二个延时任务为850s之后执行。那么在执行第一个延时任务,需要让ticker驱动200次,才能执行到第一个任务,且这200次驱动推进中,前199次为“空推进”。第二个延时任务执行时,又需要649次空推进。这样会无辜消耗机器的性能资源。
(2.2)kafka中如何解决“空推进”
针对上面“空推进”这个问题,Kafka的延时队列使用了JDK(JAVA语言)的一个叫做DelayQueue的队列来协助推进时间轮。DelayQueue如名字所示,是一个延迟队列,具有队列的所有特性。策略如下:
- Kafka把所有任务项(TimerTaskEntity)对应的任务列表(TimerTaskList)都加入到DelayQueue中。每一个任务列表(TimerTaskList)都有一个expiration(到期时间),该时间为 时间轮当前时间(currentTime) + 延时任务的延时时间(TimerTaskEntity.DelayTime)。依据这个时间做一个排序,将expiration(到期时间)最短的任务列表(TimerTaskList)排在DelayQueue的队头,其他任务列表按照超时时间依次入队。这样,DelayQueue队列中各个元素就是按照expiration排好序的任务列表(TimerTaskList)。
- 然后再单独开启一个线程“ExpiredOperationReaper”(过期收割机线程?),通过队头出队的方式来获取DelayQueue队列中到期的任务(获取队头元素的时间复杂度为O(1),获取到队头元素后,会切换新的队头元素)。
- 最后,根据ExpiredOperationReaper获取到任务列表(TimerTaskList),既可以根据它的expiration(到期时间)来推进时间轮时间,又可以对任务列表(TimerTaskList)中的任务项(TimerTaskEntity)做相应操作(降级 or 执行)。 总结下就是,这里使用时间轮(TimeWheel)来完成对 任务项(TimerTaskEntity)插入,删除等操作,使用DelayQueue辅助来完成对时间轮“精准推进”操作。
二、基于go语言的层级时间轮实现
首先,实现延时队列(DelayQueue)。DelayQueue中的priorityQueue使用小根堆实现,配合过期时间作为延迟优先级,保证了队头元素是最早到期的延时任务。Poll函数完成“精准推进”操作。如下所示:
1 type item struct { 2 Value interface{} 3 Priority int64 4 Index int 5 } 6 7 type priorityQueue []*item 8 9 func newPriorityQueue(capacity int) priorityQueue { 10 return make(priorityQueue, 0, capacity) 11 } 12 13 func (pq priorityQueue) Len() int { 14 return len(pq) 15 } 16 17 func (pq priorityQueue) Less(i, j int) bool { 18 return pq[i].Priority < pq[j].Priority 19 } 20 21 func (pq priorityQueue) Swap(i, j int) { 22 pq[i], pq[j] = pq[j], pq[i] 23 pq[i].Index = i 24 pq[j].Index = j 25 } 26 27 func (pq *priorityQueue) Push(x interface{}) { 28 n := len(*pq) 29 c := cap(*pq) 30 if n+1 > c { 31 npq := make(priorityQueue, n, c*2) 32 copy(npq, *pq) 33 *pq = npq 34 } 35 *pq = (*pq)[0 : n+1] 36 item := x.(*item) 37 item.Index = n 38 (*pq)[n] = item 39 } 40 41 func (pq *priorityQueue) Pop() interface{} { 42 n := len(*pq) 43 c := cap(*pq) 44 if n < (c/2) && c > 25 { 45 npq := make(priorityQueue, n, c/2) 46 copy(npq, *pq) 47 *pq = npq 48 } 49 item := (*pq)[n-1] 50 item.Index = -1 51 *pq = (*pq)[0 : n-1] 52 return item 53 } 54 55 func (pq *priorityQueue) PeekAndShift(max int64) (*item, int64) { 56 if pq.Len() == 0 { 57 return nil, 0 58 } 59 60 item := (*pq)[0] 61 if item.Priority > max { 62 return nil, item.Priority - max 63 } 64 heap.Remove(pq, 0) 65 66 return item, 0 67 } 68 69 // DelayQueue 小根堆实现的优先级队列 70 type DelayQueue struct { 71 C chan interface{} 72 73 mu sync.Mutex 74 pq priorityQueue 75 76 sleeping int32 77 wakeupC chan struct{} 78 } 79 80 func NewDelayQueue(size int) *DelayQueue { 81 return &DelayQueue{ 82 C: make(chan interface{}), 83 pq: newPriorityQueue(size), 84 wakeupC: make(chan struct{}), 85 } 86 } 87 88 // Offer 写入一个指定到期时间的元素到当前的延时队列 89 func (dq *DelayQueue) Offer(elem interface{}, expiration int64) { 90 item := &item{ 91 Value: elem, 92 Priority: expiration, 93 } 94 95 dq.mu.Lock() 96 heap.Push(&dq.pq, item) 97 index := item.Index 98 dq.mu.Unlock() 99 100 // 假如新写入的元素是最早到期的元素,则尝试唤醒等待调用Poll函数的调用者 101 if index == 0 { 102 if atomic.CompareAndSwapInt32(&dq.sleeping, 1, 0) { 103 dq.wakeupC <- struct{}{} 104 } 105 } 106 } 107 108 // Poll 无限循环的获取一个元素 109 func (dq *DelayQueue) Poll(exitC chan struct{}, nowF func() int64) { 110 for { 111 // 获取当前时间 112 now := nowF() 113 114 dq.mu.Lock() 115 // 查看并弹出到期元素 116 item, delta := dq.pq.PeekAndShift(now) 117 if item == nil { 118 // 假如没有到期元素,则重置延时队列睡眠状态。同时保证Poll 和 Offer操作原子性 119 atomic.StoreInt32(&dq.sleeping, 1) 120 } 121 dq.mu.Unlock() 122 123 if item == nil { 124 if delta == 0 { 125 // 没有元素到期 126 select { 127 case <-dq.wakeupC: 128 // 等待第一个写入的元素 129 continue 130 case <-exitC: 131 goto exit 132 } 133 } else if delta > 0 { 134 // delta > 0 , 延时队列中最少有一个item在等待处理中 135 select { 136 case <-dq.wakeupC: 137 continue 138 case <-time.After(time.Duration(delta) * time.Second): 139 // 等待队列中"最早"的元素到期 140 if atomic.SwapInt32(&dq.sleeping, 0) == 0 { 141 <-dq.wakeupC 142 } 143 continue 144 case <-exitC: 145 goto exit 146 } 147 } 148 } 149 150 select { 151 case dq.C <- item.Value: //取到到期的元素,通过延时队列C channel 发送到期元素 152 case <-exitC: 153 goto exit 154 } 155 } 156 157 exit: 158 // Reset the states 159 atomic.StoreInt32(&dq.sleeping, 0) 160 }延时队列(DelayQueue)
接下来是实现延时任务对象(TimerTaskEntity),以及时间格对象(Bucket)。如下所示:
1 // TimerTaskEntity 延时任务 2 type TimerTaskEntity struct { 3 DelayTime int64 // 延时时间 4 Task func() 5 6 b unsafe.Pointer // type: *bucket 保存当前延时任务所在的时间格,使用桶指针,可通过原子操作并发更新/读取 7 8 element *list.Element // 延时任务所在的双向链表中的节点元素 9 10 } 11 12 func (t *TimerTaskEntity) getBucket() *Bucket { 13 return (*Bucket)(atomic.LoadPointer(&t.b)) 14 } 15 16 func (t *TimerTaskEntity) setBucket(b *Bucket) { 17 atomic.StorePointer(&t.b, unsafe.Pointer(b)) 18 } 19 20 // Stop 停止延时任务的执行 21 func (t *TimerTaskEntity) Stop() bool { 22 stopped := false 23 for b := t.getBucket(); b != nil; b = t.getBucket() { 24 // 如果时间格尚未过期/执行,则从时间格中删除这个延时任务 25 stopped = b.Remove(t) 26 } 27 return stopped 28 } 29 30 // Bucket 时间格 31 type Bucket struct { 32 expiration int64 // 时间格的到期时间,这个时间是时间格内存储定时任务的到期时间 33 34 mu sync.Mutex 35 TaskList *TimerTaskList 36 } 37 38 // TimerTaskList 任务列表,双向链表。这里直接使用go内置的List对象 39 type TimerTaskList = list.List 40 41 func NewBucket() *Bucket { 42 return &Bucket{ 43 TaskList: list.New(), 44 expiration: -1, 45 } 46 } 47 48 func (b *Bucket) Expiration() int64 { 49 return atomic.LoadInt64(&b.expiration) 50 } 51 52 func (b *Bucket) SetExpiration(expiration int64) bool { 53 return atomic.SwapInt64(&b.expiration, expiration) != expiration 54 } 55 56 func (b *Bucket) Add(t *TimerTaskEntity) { 57 b.mu.Lock() 58 59 e := b.TaskList.PushBack(t) 60 t.setBucket(b) 61 t.element = e 62 63 b.mu.Unlock() 64 } 65 66 func (b *Bucket) remove(t *TimerTaskEntity) bool { 67 // 检查当前延时任务是否属于当前桶 68 if t.getBucket() != b { 69 return false 70 } 71 b.TaskList.Remove(t.element) 72 t.setBucket(nil) 73 t.element = nil 74 return true 75 } 76 77 func (b *Bucket) Remove(t *TimerTaskEntity) bool { 78 b.mu.Lock() 79 defer b.mu.Unlock() 80 return b.remove(t) 81 } 82 83 // Flush 延时任务降级,重新插入到下层时间轮中 84 func (b *Bucket) Flush(reinsert func(*TimerTaskEntity)) { 85 b.mu.Lock() 86 defer b.mu.Unlock() 87 88 for e := b.TaskList.Front(); e != nil; { 89 next := e.Next() 90 t := e.Value.(*TimerTaskEntity) 91 b.remove(t) 92 reinsert(t) 93 e = next 94 } 95 // 当前桶所有延时任务降级完成后,该桶过期时间重置为-1,该桶不再有效 96 b.SetExpiration(-1) 97 }延时任务&时间格
最后,基于任务对象(TimerTaskEntity),时间格对象(Bucket)还有延时队列(DelayQueue)完成对时间轮的实现。其中,一个延时任务的写入,会在写入到时间轮的同时,以过期时间为优先级参照写入到延时队列中。时间轮启动时,会开启两个协程分别执行从延时队列中取出最近一次到期的任务(DelayQueue的Poll函数);和根据获取到的任务的过期时间,去推进时间轮指针转动,以及去触发相应的延时任务(真实执行 or 任务降级)操作。如下:
1 // truncate returns the result of rounding x toward zero to a multiple of m. 2 // If m <= 0, Truncate returns x unchanged. 3 func truncate(x, m int64) int64 { 4 if m <= 0 { 5 return x 6 } 7 return x - x%m 8 } 9 10 func timeToS(t time.Time) int64 { 11 return t.UnixNano() / int64(time.Second) 12 } 13 14 func sToTime(t int64) time.Time { 15 return time.Unix(0, t*int64(time.Second)).UTC() 16 } 17 18 type waitGroupWrapper struct { 19 sync.WaitGroup 20 } 21 22 func (w *waitGroupWrapper) Wrap(cb func()) { 23 w.Add(1) 24 go func() { 25 cb() 26 w.Done() 27 }() 28 }时间相关转换函数
1 type TimingWheel struct { 2 tickMs int64 //基本时间跨度 3 wheelSize int64 // 时间轮队列长度 4 5 interval int64 // 总跨度 6 currentTime int64 //表盘指针 7 buckets []*Bucket // 时间格队列 8 queue *DelayQueue // 延时队列 9 10 // 上层时间轮引用 可以通过Add函数去并发的读写 11 overflowWheel unsafe.Pointer // type: *TimingWheel 12 13 exitC chan struct{} 14 waitGroup waitGroupWrapper 15 } 16 17 func NewTimingWheel(tickMs time.Duration, wheelSize int64) *TimingWheel { 18 tick := int64(tickMs / time.Second) 19 if tickMs <= 0 { 20 panic(errors.New("tickMs must be greater than or equal to 1s")) 21 } 22 23 startMs := timeToS(time.Now().UTC()) 24 return newTimingWheel(tick, wheelSize, startMs, NewDelayQueue(int(wheelSize))) 25 } 26 27 func newTimingWheel(tickMs int64, wheelSize int64, startMs int64, queue *DelayQueue) *TimingWheel { 28 buckets := make([]*Bucket, wheelSize) 29 for i := range buckets { 30 buckets[i] = NewBucket() 31 } 32 33 return &TimingWheel{ 34 tickMs: tickMs, 35 wheelSize: wheelSize, 36 currentTime: truncate(startMs, tickMs), 37 interval: tickMs * wheelSize, 38 buckets: buckets, 39 queue: queue, 40 exitC: make(chan struct{}), 41 } 42 } 43 44 // 增加延时任务至时间轮 45 func (tw *TimingWheel) add(t *TimerTaskEntity) bool { 46 currentTime := atomic.LoadInt64(&tw.currentTime) 47 if t.DelayTime < currentTime+tw.tickMs { 48 return false 49 } else if t.DelayTime < currentTime+tw.interval { 50 // 写入到当前时间轮 51 virtualID := t.DelayTime / tw.tickMs 52 b := tw.buckets[virtualID%tw.wheelSize] 53 b.Add(t) 54 55 // 当前时间格写入到延时队列 ps. 延时队列是小根堆,时间格的过期时间是优先级参照,所以,越早到期的时间格越在队头 56 if b.SetExpiration(t.DelayTime) { 57 tw.queue.Offer(b, b.Expiration()) 58 } 59 return true 60 } else { 61 // 超出当前时间轮最大范畴,写入到上层时间轮 62 overflowWheel := atomic.LoadPointer(&tw.overflowWheel) 63 // 上层时间轮不存在 则创建新的时间轮。新时间轮的ticketMs = 当前时间轮的interval ;上层时间轮的开始时间startMs为当前时间轮表盘指针时间 64 if overflowWheel == nil { 65 atomic.CompareAndSwapPointer(&tw.overflowWheel, nil, unsafe.Pointer(newTimingWheel(tw.interval, tw.wheelSize, currentTime, tw.queue))) 66 overflowWheel = atomic.LoadPointer(&tw.overflowWheel) 67 } 68 return (*TimingWheel)(overflowWheel).add(t) 69 } 70 } 71 72 func (tw *TimingWheel) addOrRun(t *TimerTaskEntity) { 73 if !tw.add(t) { 74 // 如果无法添加,则表示该延时任务执行时间小于当前时间轮表盘指针指向的时间(换句话说,该延时任务已过期),则立即执行 75 go t.Task() 76 } 77 } 78 79 // advanceClock 推进时间轮时钟到指定的过期时间 80 func (tw *TimingWheel) advanceClock(expiration int64) { 81 currentTime := atomic.LoadInt64(&tw.currentTime) 82 if expiration >= currentTime+tw.tickMs { 83 currentTime = truncate(expiration, tw.tickMs) 84 atomic.StoreInt64(&tw.currentTime, currentTime) 85 86 overflowWheel := atomic.LoadPointer(&tw.overflowWheel) 87 if overflowWheel != nil { 88 (*TimingWheel)(overflowWheel).advanceClock(currentTime) 89 } 90 } 91 } 92 93 // Start 时间轮启动 94 // 开启两个协程 95 // - 协程1不断地从延时队列中取出最近一次到期的任务; 96 // - 协程2根据获取到的任务的过期时间,去推进时间轮指针转动,以及去触发相应的延时任务(真实执行 or 任务降级) 97 func (tw *TimingWheel) Start() { 98 tw.waitGroup.Wrap(func() { 99 tw.queue.Poll(tw.exitC, func() int64 { 100 return timeToS(time.Now().UTC()) 101 }) 102 }) 103 104 tw.waitGroup.Wrap(func() { 105 for { 106 select { 107 case elem := <-tw.queue.C: 108 b := elem.(*Bucket) 109 tw.advanceClock(b.Expiration()) 110 b.Flush(tw.addOrRun) 111 case <-tw.exitC: 112 return 113 } 114 } 115 }) 116 } 117 118 func (tw *TimingWheel) Stop() { 119 close(tw.exitC) 120 tw.waitGroup.Wait() 121 } 122 123 func (tw *TimingWheel) AfterFunc(d time.Duration, f func()) *TimerTaskEntity { 124 t := &TimerTaskEntity{ 125 DelayTime: timeToS(time.Now().UTC().Add(d)), 126 Task: f, 127 } 128 tw.addOrRun(t) 129 return t 130 }时间轮相关实现
最后看一下执行结果:
1 func TestTimingWheel_AfterFunc(t *testing.T) { 2 tw := NewTimingWheel(time.Second, 10) 3 tw.Start() 4 log.Printf("启动时间轮 \n") 5 defer tw.Stop() 6 7 durations := []time.Duration{ 8 //1 * time.Second, 9 5 * time.Second, 10 2 * time.Second, 11 //50 * time.Second, 12 1 * time.Minute, 13 } 14 15 for _, d := range durations { 16 dTime := d 17 tw.AfterFunc(d, func() { 18 log.Printf("this is delay %v task \n", dTime) 19 }) 20 21 if dTime == 5*time.Second { 22 tw.AfterFunc(dTime, func() { 23 log.Printf("this is delay %v task2 \n", dTime) 24 }) 25 } 26 27 } 28 29 time.Sleep(time.Second * 100) 30 }测试示例
资料参考:
https://book.douban.com/subject/30437872
http://russellluo.com/2018/10/golang-implementation-of-hierarchical-timing-wheels.html
标签:层级,延时,Golang,任务,pq,时间,func,浅析,tickMs From: https://www.cnblogs.com/liumengchen-boke/p/17671223.html