首页 > 其他分享 >[转]手把手教你如何用golang实现一个timewheel时间轮

[转]手把手教你如何用golang实现一个timewheel时间轮

时间:2023-11-19 23:22:56浏览次数:39  
标签:timewheel task 轮盘 tw golang 任务 时间 手把手 circle

 

转,原文:https://lk668.github.io/2021/04/05/2021-04-05-%E6%89%8B%E6%8A%8A%E6%89%8B%E6%95%99%E4%BD%A0%E5%A6%82%E4%BD%95%E7%94%A8golang%E5%AE%9E%E7%8E%B0%E4%B8%80%E4%B8%AAtimewheel/

--------------------------

 

手把手教你如何用golang实现一个timewheel时间轮

最近工作中有一个定时执行巡检任务的需求,为了让代码实现的更加优雅,选择了TimeWheel时间轮算法,该算法的运用其实是非常的广泛的,在 Netty、Akka、Quartz、ZooKeeper、Kafka 等组件中都存在时间轮的踪影。本文手把手教你如何利用Golang实现一个简单的timewheel时间轮算法。本文涉及到的代码可以在https://github.com/lk668/timewheel查看

1. timewheel简介

timewheel时间轮盘,简单理解就是一个时钟表盘,指针每隔一段时间前进一格,走玩一圈是一个周期。而需要执行的任务就放置在表盘的刻度处,当指针走到该位置时,就执行相应的任务。具体图片如下所示

时间轮是一个环形队列,底层实现就是一个固定长度的数组,数组中的每个元素存储一个双向列表,选择双向列表的原因是在O(1)时间复杂度实现插入和删除操作。而这个双向链表存储的就是要在该位置执行的任务。

举例分析,假设时间轮盘每隔1s前进一格,那么上图中的时间轮盘的周期就是12s,如果此时时间在刻度为0的位置,此时需要添加一个定时任务,需要10s后执行,那么该任务就需要放到刻度10处。当指针到达刻度10时,执行在该位置上,双向链表存储的所有任务。

此时你会有个以为,如果此时我想添加一个20s之后执行的任务,应该怎么添加呢?由于这个任务的延时超过了时间轮盘的周期(12s),所以单个轮盘已经没法满足需求了,此时有如下两个解决方案。

1.1 方案一 多级时间轮

多级时间轮的理念,更贴近了钟表,钟表分为时针、分针、秒针。每过60s,分针前进一格,每过60min,时针前进一格。多级时间轮的理念就是分两级甚至更多级轮盘,当一级轮盘走一圈后,二级轮盘前进一格,二级轮盘走一圈后,三级轮盘前进一格,依次类推~

1.2 方案二 为任务添加一个circle的参数

为每个任务添加circle的参数,例如某个任务需要20s之后执行,那么该任务的circle=1,计算得到的表盘位置是4,也就是该任务需要在表盘走一圈以后,在位置4处执行。相应的,表盘指针每前进一格,该处的任务列表中,所有的任务的circle都-1。如果该处的任务circle==0,那么执行该任务。

本文选取该种方案来做实现。

2. 代码实现

接下来从代码层面来介绍整体的实现过程。

2.1 结构体

2.1.1 首先需要一个TimeWheel的结构体来存储时间轮盘的相关信息

  1. interval: 时间轮盘的精度,也就是时间轮盘每前进一步,所需要的时间
  2. slotNums: 时间轮盘总的齿轮数,interval*slotNums就是时间轮盘走一周所花的时间
  3. currentPos: 时间轮盘指针当前的位置
  4. ticker: 时钟计时器,定时触发。
  5. slots: 利用数组来实现时间轮盘,数组中每个元素是个双向链表,来存储要执行的任务
  6. taskRecords: 针对任务的一个map表,key是任务的key,value是任务对象
  7. isRunning: 时间轮盘是否是running状态,避免重复启动
1
2
3
4
5
6
7
8
9
10
11
12
13
type TimeWheel struct {
// 时间轮盘的精度
interval time.Duration
// 时间轮盘的齿轮数 interval*slotNums就是时间轮盘转一圈走过的时间
slotNums int
// 时间轮盘当前的位置
currentPos int
ticker *time.Ticker
// 时间轮盘每个位置存储的Task列表
slots []*list.List
taskRecords *sync.Map
isRunning bool
}

2.1.2 需要一个到时间以后执行的Job

1
type Job func(interface{})

2.1.3 需要一个执行任务的结构体

  1. key: 任务的唯一标识,必须是唯一的
  2. interval: 任务间隔多长时间执行
  3. createdTime: 任务的创建时间
  4. pos: 任务在时间轮盘的存储位置,也就是TimeWheel.slots中的存储位置
  5. circle: 任务需要
  6. job: 到时间时,任务需要执行的Job
  7. times: 该任务需要执行的次数,如果需要一直执行,设置成<0的数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 时间轮盘上需要执行的任务
type Task struct {
// 用来标识task对象,是唯一的
key interface{}
// 任务周期
interval time.Duration
// 任务的创建时间
createdTime time.Time
// 任务在轮盘的位置
pos int
// 任务需要在轮盘走多少圈才能执行
circle int
// 任务需要执行的Job,优先级高于TimeWheel中的Job
job Job
// 任务需要执行的次数,如果需要一直循环执行,设置成<0的数
times int
}

2.2 启动时间轮

需要一个函数来启动时间轮盘,该函数需要启动一个线程来执行。启动以后,利用时间定时器,每隔固定时间(TimeWheel.Inteval),时间轮盘前进一格。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Start 启动时间轮盘
func (tw *TimeWheel) Start() {
tw.ticker = time.NewTicker(tw.interval)
go tw.start()
tw.isRunning = true
}

// 启动时间轮盘的内部函数
func (tw *TimeWheel) start() {
for {
select {
case <-tw.ticker.C:
// 通过runTask函数来检查当前需要执行的任务
tw.checkAndRunTask()
}
}
}

2.3 向时间轮添加任务

向时间轮盘添加任务的时候,需要计算任务的pos和circle,也就是在时间轮盘的位置和需要走的圈数。本文提供了两种方式了,一种是基于createdTime来计算,主要用在服务刚刚启动或者首次添加任务的时候使用。一种是基于任务的周期来计算下次需要执行的时间,这种是在任务执行以后,需要重建计算和添加任务的时候使用。第一种是为了避免在服务重启的时候,相同周期的任务被分配在同一个位置执行。从而加大任务执行时候的压力。

在计算完pos和circle以后,添加任务主要包括两部分,一个是在taskRecords这个map中存储一下task对象(主要用处是,在删除的时候,获取任务在timewheel表盘的位置),一个是根据计算的pos将task存储在timewheel对应位置的双向队列中(主要用处是在执行任务的时候,获取task需要执行的job)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// 添加任务的内部函数
// @param task Task Task对象
// @param byInterval bool 生成Task在时间轮盘位置和圈数的方式,true表示利用Task.interval来生成,false表示利用Task.createTime生成
func (tw *TimeWheel) addTask(task *Task, byInterval bool) {
var pos, circle int
if byInterval {
pos, circle = tw.getPosAndCircleByInterval(task.interval)
} else {
pos, circle = tw.getPosAndCircleByCreatedTime(task.createdTime, task.interval, task.key)
}

task.circle = circle
task.pos = pos

element := tw.slots[pos].PushBack(task)
tw.taskRecords.Store(task.key, element)
}

// 该函数通过任务的周期来计算下次执行的位置和圈数
func (tw *TimeWheel) getPosAndCircleByInterval(d time.Duration) (int, int) {
delaySeconds := int(d.Seconds())
intervalSeconds := int(tw.interval.Seconds())
circle := delaySeconds / intervalSeconds / tw.slotNums
pos := (tw.currentPos + delaySeconds/intervalSeconds) % tw.slotNums

// 特殊case,当计算的位置和当前位置重叠时,因为当前位置已经走过了,所以circle需要减一
if pos == tw.currentPos && circle != 0 {
circle--
}
return pos, circle
}

// 该函数用任务的创建时间来计算下次执行的位置和圈数
func (tw *TimeWheel) getPosAndCircleByCreatedTime(createdTime time.Time, d time.Duration, key interface{}) (int, int) {

passedTime := time.Since(createdTime)
passedSeconds := int(passedTime.Seconds())
delaySeconds := int(d.Seconds())
intervalSeconds := int(tw.interval.Seconds())

circle := delaySeconds / intervalSeconds / tw.slotNums
pos := (tw.currentPos + (delaySeconds-(passedSeconds%delaySeconds))/intervalSeconds) % tw.slotNums

// 特殊case,当计算的位置和当前位置重叠时,因为当前位置已经走过了,所以circle需要减一
if pos == tw.currentPos && circle != 0 {
circle--
}

return pos, circle
}

2.4 从时间轮删除任务

删除任务需要两部分,一部分是从taskRecords中删除任务记录,一部分是从时间轮盘的双向链表中删除任务记录。

1
2
3
4
5
6
7
8
9
10
// 删除任务的内部函数
func (tw *TimeWheel) removeTask(task *Task) {
// 从map结构中删除
val, _ := tw.taskRecords.Load(task.key)
tw.taskRecords.Delete(task.key)

// 通过TimeWheel.slots获取任务的
currentList := tw.slots[task.pos]
currentList.Remove(val.(*list.Element))
}

2.5 到期执行任务

时间轮盘的currentPos记录当前的位置,当时间轮盘前进一格的时候,就会检查当前位置的双向链表,检查他们的circle是否为0,如果为0就执行任务,如果不为零,circle–。执行完成后,将任务从双向链表中删除。
之后更新Task的times参数,如果times<0说明需要一直循环执行,计算下一次的执行位置,重新将任务添加到对应新位置的双向队列。如果times==0,表示不需要再执行了,就不需要再执行任务,将任务从taskRecords中删除即可。如果times>0,更新times–,然后计算下一次的执行位置,重新将任务添加到新位置的双端队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// 检查该轮盘点位上的Task,看哪个需要执行
func (tw *TimeWheel) checkAndRunTask() {

// 获取该轮盘位置的双向链表
currentList := tw.slots[tw.currentPos]

if currentList != nil {
for item := currentList.Front(); item != nil; {
task := item.Value.(*Task)
// 如果圈数>0,表示还没到执行时间,更新圈数
if task.circle > 0 {
task.circle--
item = item.Next()
continue
}

// 执行任务时,Task.job是第一优先级,然后是TimeWheel.job
if task.job != nil {
go task.job(task.key)
} else {
fmt.Println(fmt.Sprintf("The task %d don't have job to run", task.key))
}

// 执行完成以后,将该任务从时间轮盘删除
next := item.Next()
tw.taskRecords.Delete(task.key)
currentList.Remove(item)

item = next

// 重新添加任务到时间轮盘,用Task.interval来获取下一次执行的轮盘位置
if task.times != 0 {
if task.times < 0 {
tw.addTask(task, true)
} else {
task.times--
tw.addTask(task, true)
}

} else {
// 将任务从taskRecords中删除
tw.taskRecords.Delete(task.key)
}
}
}

// 轮盘前进一步
if tw.currentPos == tw.slotNums-1 {
tw.currentPos = 0
} else {
tw.currentPos++
}
}

3. 总结

以上是实现一个时间轮盘所需要的必要操作,更多的优化请参考我的github源代码https://github.com/lk668/timewheel。源代码丰富了以下操作:

  1. 添加函数来关闭timewheel
  2. 添加函数来获取timewheel的单例模式,从而保证timewheel的唯一性
  3. 添加检查timewheel是否running的函数
  4. 为TimeWheel结构体也添加了job参数,但是其优先级低于task,主要用于大部分task需要执行相同任务的情况,此时就没必要为每个task设置job。只需要为TimeWheel结构体设置一个job即可

参考

标签:timewheel,task,轮盘,tw,golang,任务,时间,手把手,circle
From: https://www.cnblogs.com/oxspirt/p/17842958.html

相关文章

  • Centos7.8 go1.18.3指定版本安装 完美运行Golang安装
    安装背景说明:公司已经有3台服务器安装了go1.18.3版本,这次又买了一台新服务器,由于线上推上去的功能,可能会导致用户出现异常,要做压力测试,所以要重建go环境。遇到问题:昨天下午自己下载了安装包,也是go1.18.3解析到了/usr/local目录也配置了环境/etc/profile 就是识别不到go ......
  • 使用golang来解密m3u8视频播放列表里面的ts文件
    如果我有一个1.2G的mp4格式的电影,想要放在网站上进行播放,直接用video标签,src属性设置为视频的地址就可以了!这种观看体验,究竟怎么样,可以自己去尝试下。。。结果是令人崩溃的。。。加载巨慢,无法选择性观看自己想要看的部分,反正就是哪哪儿都不爽。 那么为了解决这个问题,现在有很......
  • CodeWhisperer--手把手教你使用一个十分强大的工具
    AmazonCodeWhisperer是一款能够帮助我们智能生成代码的工具。经过数十亿行代码的训练,可以根据提示和现有代码实时生成从片段到完整功能的代码建议。类似Cursor和GithubCopilot编码工具。目前,CodeWhisperer兼容Python、Java和JavaScript,支持各种IDE,包括JetBrains、Vis......
  • 手把手教你yolov5训练自己的数据集(代码+教程)
    在这篇博文中,我们对YOLOv5模型进行微调,用于自定义目标检测的训练和推理。目录引言:YOLOv5是什么?YOLOv5提供的模型YOLOv5提供的功能使用YOLOv5进行自定义目标检测训练自定义训练的方法自定义训练代码准备数据集克隆YOLOv5存储库训练小模型(yolov5s)训练YOLOv5中型模型冻结层训练中型Y......
  • golang channel
    ”不要以共享内存的方式来通信,相反,要通过通信来共享内存“golang的一个思想,不整文的,整点武的,具体来看channel怎么做的有一个很关键的golangMPG模型再单独分析,这篇先只分析channel定义//runtim/chan.gotypehchanstruct{ qcountuint//通道里的元素......
  • golang map
    golang的map使用的是hashmap基本结构下面截取自源码,已翻译//runtime/map.go:117//gomap定义,hashmap缩写typehmapstruct{ countint//map里文件数 flagsuint8//map当前是否在写入,一般为hashWriting=4(写入中)或0(空闲) Buint8......
  • Golang把文件写到excel
    最近有个需求是把看广告的日志转成excelpackagemainimport( "bufio" "encoding/json" "flag" "fmt" "github.com/xuri/excelize/v2" "os" "time")//Ad广告typeAdstruct{ OpenIdstring`json:&quo......
  • 使用golang对服务器简单监控
    packagemainimport( "fmt" "github.com/shirou/gopsutil/cpu" "github.com/shirou/gopsutil/disk" "github.com/shirou/gopsutil/host" "github.com/shirou/gopsutil/load" "github.com/shirou/gopsutil/me......
  • 从一道题来看看golang中的slice作为参数时的现象
    1、题目最近看群友在群里问一道关于golang中slice的题,题目如下:packagemainimport"fmt"funcmain(){ k:=[]int{1,2,3,4} k=append(k,5,6) fmt.Printf("k-->value:%v,add:%p,cap:%d\n",k,k,cap(k)) ap(k) fmt.Printf("k-->value......
  • windows系统使用终端和goland编辑器打包golang程序方法
    上一篇文章说了,windows系统,如何使用goland编辑器打包exe和linux程序,这篇文章再补充一下,使用终端和goland编辑器打包的对比情况。这里的终端可以是,cmd、WindowsPowerShell、MINGw64这里,我使用goland编辑器里面的Terminal,也就是WindowsPowerShelll来操作1、goland编辑器打......