首页 > 其他分享 >Go-知识-定时器

Go-知识-定时器

时间:2024-09-21 13:20:32浏览次数:10  
标签:触发 定时器 协程 知识 Timer timer Go Ticker

Go-知识-定时器


Go语言提供了两种定时器,分别为一次性定时器和周期性定时器。

  • 一次性定时器(Timer):定时器只计时一次,计时结束变停止运行。
  • 周期性定时器(Ticker):定时器周期性进行计时,除非主动停止,否则将永久运行。

以下都是 go 1.10~1.13的逻辑,在go 1.14 有非常大的优化。

1. 介绍

Timer是一种单一事件的定时器,即经过指定的时间后触发一个事件,这个事件通过其本身提供的
channel进行通知。Timer只执行一次就结束。
通过timer.NewTimer(d Duration)可以创建一个Timer,参数即等待时间,时间到来后立即触发一个事件。
src/time/sleep.go中定义了Timer的数据结构
在这里插入图片描述

Timer对外仅暴露一个channel,指定的时间到来时就往该channel种写入系统时间,即一个事件。

Ticker是周期性定时器,即周期性地触发一个事件,通过Ticker本身提供的channel将事件传递出去。
Ticker的数据结构与Timer非常类似:
在这里插入图片描述

Ticker对外仅暴露一个channel,当指定的时间到来时就往该channel种写入系统时间,即一个事件。
在创建Ticker时会指定一个时间,作为事件触发的周期,这是Ticker和Timer最主要的区别。

2. Timer使用场景

2.1 设定超时时间

有时候在等待资源的时候,又不希望永久等待,而是希望加个超时时间,如果在指定的时间还未获取到,那么就超时,不在等待了。

func TestWait(t *testing.T) {
	timer := time.NewTimer(1 * time.Second)
	c := make(<-chan string)
	select {
	case <-c:
		t.Log("chan string ")
	case <-timer.C:
		t.Log("time out")
	}
}

因为case <-c 不会触发,所以在1秒后,超时结束
在这里插入图片描述

通过select语句轮询timer.C 和 c 两个channel,如果1s内,c还没有数据写入,那就认为超时了。

2.2 延迟执行某个方法

在应用启动中,一般会初始化很多组件,如果在应用启动后,马上就使用,可能出现组件还未初始化成功,拿到的组件对象是还未准备好的对象。
这个时候,如果能延迟使用,那么就不会出现使用未准备好的组件的情况。

func TestDelay(t *testing.T) {
	timer := time.NewTimer(1 * time.Second)
	select {
	case <-timer.C:
		t.Log("wait 1 s")
	}
	t.Log("do something")
}

select 只有一个case,就是当时间到了,该case满足,如果时间没到,那么select会阻塞。
当触发select 后,可以什么都不做,也可以打印日志,然后跳出select的语句,顺序执行后续逻辑,以此实现延迟等待后执行。
在这里插入图片描述

3. Timer 对外接口

3.1 创建定时器

使用func NewTimer(d Duration) *Timer方法指定一个时间即可创建一个Timer,Timer一经创建便开始计时,不需要额外的启动命令。
实际上,创建Timer意味着把一个计时任务交给系统守护协程,该协程管理着所有的Timer,当Timer的时间到达后,Timer向Channel中发送当前的时间作为事件。

3.2 停止定时器

Timer创建后可以随时停止,停止计时器的方法如下:
func (t *Timer) Stop() bool
其返回值代表定时器有没有超时。

  • true : 定时器超时前停止,后续不会在发送事件
  • false : 定时器超时后停止。

实际上,停止计时器意味着通知系统守护协程移除该定时器。

3.3 重置定时器

已过期的定时器或已停止的定时器可以通过重置动作重新激活,重置方法如下:
func (t *Timer) Reset(d Duration) bool
重置的动作实质上是先停止定时器,在启动,其返回值即停止计时器(Stop()) 的返回值。
需要注意的是,重置定时器虽然可以用于修改还未超时的定时器,但正确的使用方式还是针对已过期的定时器或已被停止的定时器,同时其返回值也不可靠,返回值存在的价值仅仅是与前面的版本兼容。
实际上,重置定时器意味着通知系统守护协程移除该定时器,重新设定时间后,再把定时器交给守护协程。

3.4 After

如果仅仅是向等待指定的时间,没有提前停止定时器的需求,也没有复用该定时器的需求,那么可以使用匿名的定时器。
使用func After(d Duration) <-chan Time方法创建一个定时器,并返回定时器的管道。

func TestAfter(t *testing.T) {
	select {
	case <-time.After(1 * time.Second):
		t.Log("after 1 s")
	}
}

执行后和之前的延迟执行一模一样:
在这里插入图片描述

实际上还是一个定时器,但是代码更加简洁。

3.5 AfterFunc

除了 After调用,返回 channel,进行同步处理,还可以使用 AfterFunc,将需要延迟的操作交给系统协程异步执行。
AfterFunc的定义: func AfterFunc(d Duration, f Func()) *Timer
比如:

func TestAfterFunc(t *testing.T) {
	time.AfterFunc(1*time.Second, func() {
		t.Log("after 1 s")
	})
	t.Log("waiting")
	time.Sleep(2 * time.Second)
	t.Log("done")
}

执行结果如下:
在这里插入图片描述

很明显执行和等待已经不是一个协程了.

4. Timer 的实现原理

4.1 Timer数据结构

4.1.1 Timer

在源码包src/time/sleep.go中定义了其数据结构:
在这里插入图片描述

Timer只有两个成员:

  • C:channel ,上层应用根据此channel接收事件
  • r:runtime定时器,该定时器即系统管理的定时器,对上层应用不可见。

这里按照层次来理解Timer的数据结构,Timer.C是面向Timer用户的,Timer.r是面向底层的定时器实现。

4.1.2 runtimeTimer

创建一个Timer实质上是把一个定时任务交给专门的写成进行监控,这个任务的载体便是runtimeTimer。
每创建一个Timer意味着创建了一个runtimeTimer变量,然后把它交给系统进行监控。通过设置runtimeTimer过期后的行为来达到定时的目的。

type runtimeTimer struct {
	tb     uintptr                    // 存储当前定时器的数组地址
	i      int                        // 存储当前定时器的数组下标
	when   int64                      // 当前定时器触发事件
	period int64                      // 当前定时器周期性触发间隔
	f      func(interface{}, uintptr) // 定时器触发时执行的回调函数
	arg    interface{}                // 定时器触发时执行回调函数传递的参数一
	seq    uintptr                    // 定时器触发时执行回调函数传递的参数二
}
  • tb: 系统底层存储runtimeTimer的数组地址
  • i: 当前runtimeTime在tb数组中的下标
  • when: 定时器触发事件的时间
  • period: 定时器周期性触发间隔(对于Timer来说,此值为0)
  • f: 定时器触发时执行的回调函数,回调函数接收两个参数。
  • arg: 定时器触发时执行回调函数的参数一
  • seq: 定时器触发时执行回调函数的参数二(Timer并不使用该参数)

4.2 Timer 实现原理

一个进程中的多个Timer都由底层的写成来管理,这个协程称为系统协程。
runtimeTimer存放在数组中,并看招when字段对所有的runtimeTimer进行堆排序,定时器触发时执行runtimeTimer中的预定义函数f,即完成了一次定时任务。

4.2.1 创建Timer

创建Timer的实现,非常简单:

func NewTimer(d Duration) *Timer {
	c := make(chan Time, 1) // 创建一个channel
	t := &Timer{ // Timer的数据结构
		C: c,
		r: runtimeTimer{
			when: when(d), // 触发事件
			f:    sendTime, // 触发后执行 sendTime 函数
			arg:  c, // 触发后执行sendTime函数时附带的参数
		},
	}
	startTimer(&t.r) // 此处启动定时器,只是把 runtimeTimer放到系统协程的堆中,由系统协程维护
	return t
}

NewTimer函数只是构造了一个Timer,然后把Timer.r 通过startTimer()交给系统协程维护。其中when()方法是计算下一次
定时器触发的绝对时间,即当前时间+NewTimer()的参数d。sendTimer()方法是定时器触发时的动作。

  • when函数
// when是一个辅助函数,用于设置runtimeTimer的“when”字段。
// 它返回未来的持续时间d,单位为纳秒。
// 如果d为负,则忽略。如果返回值小于
// 由于溢出,返回MaxInt64。
func when(d Duration) int64 {
	if d <= 0 {
		return runtimeNano()
	}
	t := runtimeNano() + int64(d)
	if t < 0 {
		t = 1<<63 - 1 // math.MaxInt64
	}
	return t
}
  • sendTimer函数
func sendTime(c interface{}, seq uintptr) {
	//c上的非阻塞时间发送。
	//在NewTimer中使用,它无论如何都不能阻塞(缓冲区)。
	//在NewTicker中使用,在地板上放下发送是
	//当读者落后时的期望行为,
	//因为发送是周期性的。
	select {
	case c.(chan Time) <- Now():
	default:
	}
}

sendTime接收一个channel作为参数,其主要任务是向channel中写入当前时间。
创建Timer时生成的管道含有一个缓冲区(make(chan Time,1)),所以Timer触发时向channel写入事件永远不会阻塞,sendTime写完即退出。
sendTime使用select搭配一个空的default分支,是因为Ticker也复用了sendTime,Ticker触发时也会向channel中写入事件,但无法保证之前的数据已经
被取走,所以使用select并搭配一个空的default分支,确保sendTime不会阻塞,Ticker触发时,如果管道中还有值,则本次不在向管道中写入时间,将本次触发的事件直接丢弃。

  • startTime函数

startTime函数的具体实现在runtime包中,其主要作用是把runtimeTimer写入系统写成的数组中,并启动系统协程(如果系统协程还未开始运行)
在这里插入图片描述

其主要是addTimer(t *timer)函数,timer是runtime包中用于表示time包中runtimeTimer结构的struct
在这里插入图片描述

addTimer(t *timer)函数

func addtimer(t *timer) {
	tb := t.assignBucket() // 获取time桶数组
	lock(&tb.lock) // 加锁
	ok := tb.addtimerLocked(t) // 将 timer加入数据组
	unlock(&tb.lock) // 解锁
	if !ok { // 如果timer加入数组失败,那么触发 panic 
		badTimer()
	}
}

在这里插入图片描述

4.2.2 停止Timer

停止Timer,只是简单地把Timer从系统协程中移除。
在这里插入图片描述

stopTimer即通知系统协程把该Timer移除,即不在监控。
stopTimer也是runtime中的函数
在这里插入图片描述

系统协程只是移除Timer,并不会关闭channel,以避免用户协程读取错误。
Stop 的返回值取决于定时器的状态:

  • 如果Timer已经触发,则Stop返回false
  • 如果Timer还未触发,则Stop返回true

在这里插入图片描述

4.2.3 重置Timer

重置Timer时会先把Timer从系统协程中删除,修改新的时间后重新添加到系统协程中。
重置的实现如下:
在这里插入图片描述

其返回值与Stop保持一致,如果Timer成功停止,则返回true,如果Timer已经触发,则返回false.
在这里插入图片描述

由于新加的Timer时间很可能变化,所以其在系统协程中的位置也会相应地发生变化。
需要注意的是,按照源码注释,Reset应该作用于已经停止的Timer或已经触发的Timer。
按照这个约定,Reset的返回值总是false,仍然保留是为了保持向前兼容,使用老版本Go编写的应用不需要因为Go升级而修改代码。
如果不按照此约定使用Reset,则有可能遇到Reset和Timer触发后同时执行的情况,此时有可能会收到两个事件,从而对应用程序造成一些负面影响。

5. Ticker 使用场景

5.1 简单定时任务

假设需求是每隔1秒就报时一次:

func TestTicker(t *testing.T) {
    // 创建一个一秒的Ticker
	ticker := time.NewTicker(1 * time.Second)
	// 使用defer关闭Ticker
	defer ticker.Stop()
	// 收到事件就打印时间 (根据之前Timer的实现,res := <-ticker.C , res 就是 Time 类型的时间戳 )
	for range ticker.C {
		t.Log("tick 1 s : " + time.Now().String())
	}
}

在这里插入图片描述

如果不主动停止,那么永远不会结束。

5.2 定时刷新缓存

假设需求是定时将内存中的数据写到磁盘中,或者内存中数据满了,需要落盘:

func TestFlush(t *testing.T) {
	// 创建一个每1秒刷新一次的Ticker
	ticker := time.NewTicker(1 * time.Second)
	// 使用 defer 关闭
	defer ticker.Stop()
	// 模拟内存数据
	mem := 0
	for {
		select {
		// 时间到,不管如何,必须刷新
		case <-ticker.C:
			t.Log("flush 1 s : " + time.Now().String())
		default:
			// 内存满了,需要刷新
			if mem > 15 {
				t.Logf("flush mem : %d", mem)
				mem = 0
			}
		}
		// 每次for循环设置随机值
		mem += rand.Intn(10)
		// 每次循环等待300毫秒
		time.Sleep(300 * time.Millisecond)
	}
}

在这里插入图片描述

6. Ticker 对外接口

6.1 创建定时器

使用NewTicker函数就能创建一个Ticker

// NewTicker返回一个新的Ticker,其中包含一个将发送
// 由duration参数指定的时间段。
// 它调整间隔或降低滴答声,以弥补接收器速度慢的问题。
// 持续时间d必须大于零;否则,NewTicker将会恐慌。
// 停止自动收报机以释放相关资源。
func NewTicker(d Duration) *Ticker {
	if d <= 0 {
		panic(errors.New("non-positive interval for NewTicker"))
	}
	c := make(chan Time, 1)
	t := &Ticker{
		C: c,
		r: runtimeTimer{
			when:   when(d),
			// Timer 的 period 是 0
			period: int64(d),
			f:      sendTime,
			arg:    c,
		},
	}
	startTimer(&t.r)
	return t
}

NewTimer和NewTicker非常类似,在初始化runtimeTimer的时候,Timer没有设置period,Ticker则设置period等于d.

6.2 停止定时器

使用定时器对外暴露的Stop方法就可以停止Ticker
在这里插入图片描述

其实不管是 Timer还是Ticker的startTimer和stopTimer,最终都是由runtime/time.go 中的timer的startTimer和stopTimer实现的
在这里插入图片描述

在这里插入图片描述

需要注意的是,该方法会停止计时,意味着不会向定时器的channel中写入事件,但是channel并不会给关闭,channel在使用完后,声明周期结束后会自动释放。

Ticker 在使用完后务必要释放,否则会产生资源泄露,进而会持续消耗CPU资源,最后会把CPU资源消耗完。

6.3 Tick

在有些长江下,启动一个Ticker后,该Ticker永远不会停止,比如定时轮询任务,此时可以使用一个简单的Tick函数来获取定时器的channel。
在这里插入图片描述

注释上面说的很明白,因为该函数内部实际上还是创建了一个Ticker,但是并没有返回,只是返回了channel,因为没有Ticker对象,所以没法调用Stop。

在for循环中,使用Ticker的时候,一定三思
在Timer中,很容易写出如下代码:

for {
	select {
	case <-time.After(1 * time.Second):
		t.Log("flush 1 s : " + time.Now().String())
	}
} ```

使用Timer这样写当然没错,因为Timer在触发事件后,就会从数组中移除。
但是当把Timer换成Ticker,那么就出现了资源泄露

for {
	select {
	case <-time.Tick(1 * time.Second):
		t.Log("flush 1 s : " + time.Now().String())
	}
}```

上面的代码出现了资源泄露,因为 Tick 会创建Ticker,并且因为使用Tick直接获取的Ticker.C,所以没有手段去Stop。
随着for的执行,最终会导致越来越多的Ticker耗尽CPU资源。

7. Ticker 实现原理

实际上Ticker与Timer几乎完全相同,数据结构和内部实现机制都相同,唯一不同的是创建方式。
在创建Timer时,不指定时间触发周期,时间触发后Timer自动销毁。而在创建Ticker时会指定一个事件触发周期,事件按照这个周期触发,如果不显式停止,则定时器永不停止。

7.1 数据结构

7.1.1 Ticker

Ticker的数据结构与Timer的数据结构除名字不同外,其他完全一样。
源码包src/time/tick.go中定义了数据结构
在这里插入图片描述

Ticker只有两个成员:

  • C: channel,上层应用根据此channel接收事件。
  • r: runtimeTimer定时器,该定时器即系统管理的定时器,对上层应用不可见。

按照层次来理解Ticker的数据结构,Ticker.C 是面向Ticker 用户的,Ticker.r 是面向底层的定时器的。

7.1.2 runtimeTimer

runtimeTimer和Timer的一样,创建一个Timer实质上是把一个定时任务交给专门的写成进行监控,这个任务的载体便是runtimeTimer。
每创建一个Timer意味着创建了一个runtimeTimer变量,然后把它交给系统进行监控。通过设置runtimeTimer过期后的行为来达到定时的目的。

type runtimeTimer struct {
	tb     uintptr                    // 存储当前定时器的数组地址
	i      int                        // 存储当前定时器的数组下标
	when   int64                      // 当前定时器触发事件
	period int64                      // 当前定时器周期性触发间隔
	f      func(interface{}, uintptr) // 定时器触发时执行的回调函数
	arg    interface{}                // 定时器触发时执行回调函数传递的参数一
	seq    uintptr                    // 定时器触发时执行回调函数传递的参数二
}
  • tb: 系统底层存储runtimeTimer的数组地址
  • i: 当前runtimeTime在tb数组中的下标
  • when: 定时器触发事件的时间
  • period: 定时器周期性触发间隔(对于Timer来说,此值为0)
  • f: 定时器触发时执行的回调函数,回调函数接收两个参数。
  • arg: 定时器触发时执行回调函数的参数一
  • seq: 定时器触发时执行回调函数的参数二(Timer并不使用该参数)

7.2 Ticker实现原理

7.2.1 创建Ticker

创建Ticker的实现非常简单:

func NewTicker(d Duration) *Ticker {
	if d <= 0 {
		panic(errors.New("non-positive interval for NewTicker"))
	}
	c := make(chan Time, 1)
	t := &Ticker{
		C: c,
		r: runtimeTimer{
			when:   when(d),
			period: int64(d), // 这个在Timer中是没有的
			f:      sendTime,
			arg:    c,
		},
	}
	startTimer(&t.r)
	return t
}

Ticker 和Timer的重要区别就是提供了period参数,据此决定Timer是一次性的,还是周期性的。
NewTicker只是构造了一个Ticker,然后把Ticker通过startTimer交给系统协程维护。
其中period为事件触发的周期,sendTime函数是定时器触发时的动作。

  • sendTimer函数
func sendTime(c interface{}, seq uintptr) {
	//c上的非阻塞时间发送。
	//在NewTimer中使用,它无论如何都不能阻塞(缓冲区)。
	//在NewTicker中使用,在地板上放下发送是
	//当读者落后时的期望行为,
	//因为发送是周期性的。
	select {
	case c.(chan Time) <- Now():
	default:
	}
}

sendTime接收一个channel作为参数,其主要任务是向channel中写入当前时间。
创建Timer时生成的管道含有一个缓冲区(make(chan Time,1)),所以Timer触发时向channel写入事件永远不会阻塞,sendTime写完即退出。
sendTime使用select搭配一个空的default分支,是因为Ticker也复用了sendTime,Ticker触发时也会向channel中写入事件,但无法保证之前的数据已经
被取走,所以使用select并搭配一个空的default分支,确保sendTime不会阻塞,Ticker触发时,如果管道中还有值,则本次不在向管道中写入时间,将本次触发的事件直接丢弃。

  • startTime函数

startTime函数的具体实现在runtime包中,其主要作用是把runtimeTimer写入系统写成的数组中,并启动系统协程(如果系统协程还未开始运行)
在这里插入图片描述

其主要是addTimer(t *timer)函数,timer是runtime包中用于表示time包中runtimeTimer结构的struct
在这里插入图片描述

addTimer(t *timer)函数

func addtimer(t *timer) {
	tb := t.assignBucket() // 获取time桶数组
	lock(&tb.lock) // 加锁
	ok := tb.addtimerLocked(t) // 将 timer加入数据组
	unlock(&tb.lock) // 解锁
	if !ok { // 如果timer加入数组失败,那么触发 panic 
		badTimer()
	}
}

在这里插入图片描述

7.2.2 停止Ticker

停止Ticker时,只是简单地把Ticker从系统协程中移除。
在这里插入图片描述

stopTimer即通知系统协程把该Ticker移除,即不在监控。
stopTimer也是runtime中的函数
在这里插入图片描述

系统协程只是移除Ticker,并不会关闭channel,以避免用户协程读取错误。
与Timer不同的是,Ticker停止时没有返回值,即不需要关注返回值,实际上返回值也没有什么用途。
在这里插入图片描述

Ticker 没有重置接口,即Ticker创建后不能通过重置修改周期。

需要格外注意的是,Ticker用完后必须主动停止,否则会产生资源泄露,持续消耗CPU资源。

8. runtimeTimer 原理(go 1.11)

NewTimer和NewTicker都会在底层创建一个runtimeTimer,runtime包负责管理runtimeTimer,保证定时器按照约定的时间触发。

  • Go 1.10 之前: 所有的runtimeTimer 保存在一个全局的堆中
  • G0 1.10 ~ 1.13 : runtimeTimer被拆分到多个全局的堆中,减少了多个系统协程的锁等待时间
  • Go 1.14+: runtimeTimer保存在每个处理器P中,消除了专门的系统协程,减少了系统协程上下文切换。

8.1 定时器存储

8.1.1 timer 的数据结构

Timer和Ticker的数据结构除名字外,其他完全一样,二者都含有一个runtimeTimer类型的成员,这就是系统协程所维护的对象。
runtimeTimer类型是time包的名字,在runtime包中,这个类型叫做timer.

// 包装时间知道这个结构的布局。
// 如果此结构更改,请调整/time/sleep.go:/runtimeTimer。
// 对于GOOS=nacl,包syscall知道这个结构的布局。
// 如果此结构更改,请调整/syscall/net_nacl.go:/runtimeTimer。
type timer struct {
	tb *timersBucket // the bucket the timer lives in // 当前定时器寄存于系统timer堆的地址
	i  int           // heap index // 当前定时器寄存于系统timer堆的下标
	//定时器在何时唤醒,然后在何时+时段唤醒。。。(仅限大于0的时段)
	//每次在计时器goroutine中调用f(arg,now)时,f必须
	//一个行为良好的函数,而不是块。
	when   int64                      // 当前定时器下次触发时间
	period int64                      // 当前定时器周期性触发间隔(如果是Timer,间隔为0,表示不重复触发)
	f      func(interface{}, uintptr) // 定时器触发时执行的函数
	arg    interface{}                // 定时器触发时执行的参数一
	seq    uintptr                    // 定时器触发时执行的参数二(该参数只在网络收发场景下使用)
}

其中timersBucket便是系统协程存储timer的容器,里面有一个切片来存储timer,而i便是timer所在切片的下标。

8.1.2 timersBucket 的数据结构

//go:notinheap
type timersBucket struct {
	lock         mutex
	gp           *g       // 处理堆中事件的协程
	created      bool     // 时间处理协程是否已创建,默认为false,添加收个定时器是置为true
	sleeping     bool     // 事件处理协程(gp)是否在睡眠,(如果t中有定时器,那么还未到达触发的时间,gp会睡眠)
	rescheduling bool     // 事件处理协程(gp)是否已暂停,(如果t中定时器均已删除,那么gp会暂停)
	sleepUntil   int64    // 时间处理协程睡眠事件
	waitnote     note     // 时间处理协程睡眠事件(据此唤醒协程)
	t            []*timer // 定时器切片
}

Bucket是存储timer的桶。

  • lock: 互斥锁,在timer增加和删除时需要加锁,防止并发
  • gp: 事件处理协程,就是系统协程,这个协程在首次创建Timer或Ticker时生成
  • created: 状态值,表示系统协程是否创建
  • sleeping: 系统协程是否已暂停
  • sleepUntil: 系统协程睡眠到指定的时间(如果有新的定时任务则可能会提前唤醒)
  • waitnote: 提前唤醒时使用的通知
  • t: 保存timer的切片,当调用NewTimer或NewTicker时,便会有新的timer存储到切片中

系统协程在首次创建定时器时创建,定时器存储在切片中,系统写成负责计时并维护这个切片。

8.1.3 Ticker & timer & timersBucket 关系

假设创建了1个Timer,2个Ticker,关系如下
在这里插入图片描述

用户创建Timer或者Ticker时会生成一个timer,这个timer指向timersBucket,timersBucket记录timer的指针。

8.1.4 timersBucket 数组

通过timersBucket的数据结构可以看到,系统写成负责计时并维护其中的多个timer,一个timersBucket由一个特定的系统协程来维护。
当系统重的定时器非常多时,一个系统协程的处理能力可能跟不上,所以Go在实现时实际上提供了多个timerBucket,也就是有多个系统协程来处理定时器。
最理想的情况是应该预留GOMAXPROCS个timersBucket,以便充分使用CPU资源,但需要根据实际环境动态分配。为了实现简单,Go在实现时预留了64个timersBucket,可以满足绝大部分场景。
在这里插入图片描述

在addTimer时调用的
在这里插入图片描述

当协程创建定时器时,使用协程所属的ProcessId%64来计算定时器存入的timersBucket。
在上面的关系中,当三个协程创建定时器时,定时器的分布可能如下:
在这里插入图片描述

一般情况下,同一个Process的协程创建的定时器分布于同一个timersBucket中,只有当GOMAXPROCS大于64时才会出现多个Process分布于同一个timersBucket中的情况。

8.2 定时器运行机制

8.2.1 创建定时器

创建Timer或Ticker实际上分为两步:

  • 创建一个channel
  • 创建一个timer并启动(这里的timer是指runtime包中的timer,不是Timer)

在这里插入图片描述

在这里插入图片描述

不管是 Timer还是Ticker都是先创建channel,channel都是带有一个缓冲区的。
接着创建timer,调用startTimer启动。
startTimer在runtime包中实现,通过go:link关联
在这里插入图片描述

addTimer的实现如下:

func addtimer(t *timer) {
	tb := t.assignBucket()     // 分配 timersBucket ,从64个中选择一个
	lock(&tb.lock)             // 加锁
	ok := tb.addtimerLocked(t) // 加入切片
	unlock(&tb.lock)           // 解锁
	if !ok {                   // 加入失败
		badTimer() // 错误处理
	}
}

首先,每个timer都必须归属于某个timersBucket,所以第一步是先选择一个timersBucket,选择的算法很简单,将当前协程所属的Process ID 与 timersBucket数据长度求模,结果就是timersBucket数组的下标。
在这里插入图片描述

其次,每个timer都必须加入timersBucket,timersBucket数据结构中的切片t保存着timer的指针,新创建的timer也需要加入这个切片。
保存timer的切片是一个按timer触发事件排序的小顶堆,所以新timer插入的过程中会触发堆调整,堆顶的timer最快被触发。

// 在堆中添加一个计时器,并在新计时器为时启动或启动timerproc
// 比其他任何人都早。
// 计时器已锁定。
// 返回是否一切正常:如果数据结构损坏,则返回false
// 由于用户级别的竞争。
func (tb *timersBucket) addtimerLocked(t *timer) bool {
	//何时决不能为负数;否则timerproc将溢出
	//在其增量计算期间,并且永远不会使其他运行时计时器过期。
	if t.when < 0 {
		t.when = 1<<63 - 1
	}
	t.i = len(tb.t)              // 先把定时器插入堆尾
	tb.t = append(tb.t, t)       // 保存定时器
	if !siftupTimer(tb.t, t.i) { // 在堆中插入数据,触发重新排序
		return false
	}
	if t.i == 0 { // 堆排序后,如果新插入的定时器跑到了堆顶,需要唤醒系统协程来处理
		// siftup moved to top: new earliest deadline.
		if tb.sleeping && tb.sleepUntil > t.when { // 系统协程在睡眠(切片中有数据,未到时间),唤醒系统协程来处理新加入的定时器
			tb.sleeping = false
			notewakeup(&tb.waitnote)
		}
		if tb.rescheduling { // 系统协程已暂停(切片中没有数据),唤醒系统协程来处理新加入的定时器
			tb.rescheduling = false
			goready(tb.gp, 0)
		}
		if !tb.created { // 如果是系统协程收个定时器,则启动协程处理堆中的定时器
			tb.created = true
			go timerproc(tb) // 系统协程就是这里创建的
		}
	}
	return true
}
  • 如果timer的时间是负值,那么就会被修改为很大的值来保证后续定时算法的正确性
  • 系统协程是在首次添加timer时创建的,并不是一直存在
  • 新加入timer后,如果新的timer跑到了堆顶,则意味着新的timer需要立即处理,那么会唤醒系统协程

小顶堆排序

func siftupTimer(t []*timer, i int) bool { // 入参是数组和数组尾部
	if i >= len(t) {
		return false
	} // 数组越界
	when := t[i].when // 获取数组尾部元素的触发事件
	tmp := t[i]       // 存储到临时变量中
	for i > 0 {       // 只要没到堆顶,那么就一直运行
		p := (i - 1) / 4       // parent // 使用 4 叉堆,所以 (i-1)/4 得到父节点
		if when >= t[p].when { // 如果 子节点的触发事件大于等于 父节点,那么结束,满足小顶堆的规则
			break // 满足小顶堆的规则,结束
		}
		t[i] = t[p] // 否则先将父节点 写到 子节点位置上,(子节点在临时变量中有,不会丢失)
		t[i].i = i  // 将 timersBucket中的index 设置为子节点的index
		i = p       // 将指针移动到父节点,进行下一轮循环(因为使用数组保存树,所以当index=0时,就到达了根节点)
	}
	if tmp != t[i] { // 判断是否发生了并发,理论上 tmp 就是 t[i]
		t[i] = tmp // 如果并发导致数据不一致,那么强行设置
		t[i].i = i
	}
	return true
}

这是用数组存储的一个小顶堆的维护逻辑。

创建系统协程
当第一次addTimer的时候,会触发创建系统协程:

// Timerproc运行时间驱动的事件。
// 它一直休眠到tb堆中的下一个事件。
// 如果addtimer插入一个新的较早事件,它会提前唤醒timerproc。
func timerproc(tb *timersBucket) {
	tb.gp = getg() // 获取协程
	for {
		lock(&tb.lock)      // 加锁
		tb.sleeping = false // 设置标志
		now := nanotime()   // 获取当前时间
		delta := int64(-1)
		for {
			if len(tb.t) == 0 { // 切片数组为空,堆中没有等待的元素
				delta = -1 // 不创建系统协程,结束
				break
			}
			t := tb.t[0]         // 拿到小顶堆堆顶的元素
			delta = t.when - now // 计算需要等待的时间
			if delta > 0 {       // 如果还未到时间,结束,delta 大于0表示时间未到,小于0表示已过时执行
				break
			}
			ok := true
			if t.period > 0 { // Ticker 的 period 才会大于0, Timer 的 period 等于0
				// leave in heap but adjust next time to fire
				t.when += t.period * (1 + -delta/t.period)
				// delta小于0,表示本次是过时执行,那么下次执行时间需要加上delta,比如delta=1s,那么下次执行时间 when(next) = when(this) + period + delta
				// 将上述式子进行变形 when(next) = when(this) + period (1 + delta/period)
				// 因为已知delta <= 0 ,所以 when(next)=when(this) + period * (1 + -delta/period) => when += period * (1 + -delta/period)
				if !siftdownTimer(tb.t, 0) { // 将根节点值进行了增加,那么从 小堆顶 进行平衡
					ok = false
				}
			} else { // period = 0 表示是 Timer,触发一次后,需要从小顶堆中删除
				// remove from heap
				last := len(tb.t) - 1 // 切片长度缩小
				if last > 0 {         // 小顶堆不为空
					tb.t[0] = tb.t[last] // 将最大值设置到根节点,然后执行 根节点值增加,然后从 小堆顶进行平衡
					tb.t[0].i = 0
				}
				tb.t[last] = nil   // 将最后一个元素置空,删除,显示的置空,用于 gc
				tb.t = tb.t[:last] // 将最后一个元素删除
				if last > 0 {      // 如果 小顶堆不为空,那么从根节点进行平衡
					if !siftdownTimer(tb.t, 0) {
						ok = false
					}
				}
				t.i = -1 // mark as removed // 标记该 timer 已经被删除了,无法通过 index 在 timersBucket中索引到了
			}
			f := t.f         // 拿到 触发后的 func
			arg := t.arg     // 获取第一个参数,channel , Timer.C or Ticker.C
			seq := t.seq     // 获取第二个参数,只有网络才会用到
			unlock(&tb.lock) // 解锁
			if !ok {         // 如果小顶堆平衡失败,那么返回错误信息
				badTimer()
			}
			if raceenabled {
				raceacquire(unsafe.Pointer(t))
			}
			f(arg, seq)    // 执行预设的 func ,也就是 sendTime , 也就是将 now 写入 Timer.C 或 Ticker.C 触发事件
			lock(&tb.lock) // 加锁,防止多次触发,分两次加锁解锁,是为了防止执行 sendTime 的时候,太费时间,导致协程无法进行其他操作
		}
		if delta < 0 || faketime > 0 { // 小顶堆中没有元素了,系统协程需要暂停
			// No timers left - put goroutine to sleep.
			tb.rescheduling = true // 设置暂停标志
			goparkunlock(&tb.lock, waitReasonTimerGoroutineIdle, traceEvGoBlock, 1)
			continue
		}
		// At least one timer pending. Sleep until then.
		tb.sleeping = true               // 没有待触发的事件,设置睡眠标志
		tb.sleepUntil = now + delta      // 计算系统协程睡眠时间
		noteclear(&tb.waitnote)          // 清除系统协程的唤醒通知
		unlock(&tb.lock)                 // 解锁
		notetsleepg(&tb.waitnote, delta) // 系统协程睡眠,没有待触发的事件
	}
}

唤醒系统协程(睡眠)
当有定时器需要触发的时候,会唤醒系统协程,触发事件:

func notewakeup(n *note) {
	var v uintptr
	for { // 加锁,乐观锁,自旋
		v = atomic.Loaduintptr(&n.key)
		if atomic.Casuintptr(&n.key, v, locked) {
			break
		}
	}

	// Successfully set waitm to locked.
	// What was it before?
	switch {
	case v == 0:
		// Nothing was waiting. Done.
	case v == locked:
		// Two notewakeups! Not allowed.
		throw("notewakeup - double wakeup")
	default:
		// Must be the waiting m. Wake it up.
		semawakeup((*m)(unsafe.Pointer(v))) // 唤醒,唤醒后,进入 timerproc 的 for 循环,再次触发事件, 当小顶堆没有触发的定时器时,协程会在for最后一步sleep,唤醒后继续执行
	}
}

唤醒,唤醒后,进入 timerproc 的 for 循环,再次触发事件, 当小顶堆没有触发的定时器时,协程会在for最后一步sleep,唤醒后继续执行.

唤醒系统协程(暂停)
当小顶堆中没有元素,系统协程会进入暂停状态,等待addTimer

func goready(gp *g, traceskip int) {
	systemstack(func() {
		ready(gp, traceskip, true)
	})
}

睡眠(Sleep):time.Sleep(duration) 函数会使当前的 goroutine 暂停指定的时间。
在这段时间内,goroutine 不会执行任何操作,也不会消耗 CPU 资源。
这个函数通常用于模拟 I/O 操作,或者在测试中插入人为的延迟。
暂停(Yield):runtime.Gosched() 函数会使当前的 goroutine 让出 CPU,让其他 goroutine 有机会执行。
这个函数通常用于在一个长时间运行的 goroutine 中,插入一些 “断点”,以避免阻塞其他 goroutine 的执行。
需要注意的是,runtime.Gosched() 并不保证当前 goroutine 会立即停止执行,也不保证其他 goroutine 会立即开始执行。

8.2.2 删除定时器

当Timer执行结束或Ticker调用Stop时会触发定时器的删除操作。从timersBucket中删除定时器是添加定时器的你过程,即堆中元素删除后,触发小顶堆平衡。
不管是Timer还是Ticker的删除操作,最终都会执行runtime中的 stopTimer

//go:linkname stopTimer time.stopTimer
func stopTimer(t *timer) bool {
	return deltimer(t)
}

接下来看看deltimer的逻辑:

// Do not need to update the timerproc: if it wakes up early, no big deal.
func deltimer(t *timer) bool {
	if t.tb == nil { // 用户自己创建的 Timer 或者 Ticker 是不可用的
		// t.tb can be nil if the user created a timer
		// directly, without invoking startTimer e.g
		//    time.Ticker{C: c}
		// In this case, return early without any deletion.
		// See Issue 21874.
		return false
	}

	tb := t.tb

	lock(&tb.lock)                      // 加锁
	removed, ok := tb.deltimerLocked(t) // 移除元素
	unlock(&tb.lock)                    // 解锁
	if !ok {
		badTimer()
	}
	return removed
}

得益于在 timerproc中加了两次锁,删除小顶堆元素,不需要马上通知timerproc。
因为在计算时间的时候,是加锁的,中间执行是不加锁的,后面设置系统协程状态也是加锁的。
所以删除元素,要么发生在系统协程睡眠或暂停的时候,要么发生在 sendTime 的时候,不管那种,都不会影响触发的正确性。
因为可能删除小顶堆中任意位置的元素,所以需要从该节点出发,向上和向下平衡
在这里插入图片描述

8.3 资源泄露

对于不使用的Ticker需要显示的Stop,否则会产生资源泄露问题。

  • 首先,创建Ticker 的协程并不负责计时,只负责从Ticker的管道中获取事件
  • 其次,系统写成只负责定时器计时,向管道中发送事件,并不关心上层协程如何处理事件。

如果创建了Ticker,则系统协程将持续监控该Ticker的timer,定期触发事件。如果Ticker不在使用且没有Stop,那么系统协程的负担会越来越重,持续消耗CPU资源。

9. 性能优化 (go 1.14+)

上面的runtimeTimer的原理仅适用于Go 1.10 ~ 1.13 ,尽管定时器的性能已经能满足绝大多数场景,但在一些高度依赖定时器的业务场景中,
往往需要创建海量的定时器,这些场景中需要定时器能更精确、占用系统资源更少。
Go 1.14 中对定时器又做了一次大的性能优化,主要围绕如何管理runtimeTimer进行,包含如何存储runtimeTimer,如何检测以确保定时器能准时触发。

9.1 消除了timersBucket

在前面的版本中,NewTimer和NewTicker创建的runtimeTimer会存储到全局的timersBucket桶中,最多拥有64个timersBucket桶,如果GOMAXPROCS的值不超过64,timersBucket桶的数量等于GOMAXPROCS。
每个timersBucket桶中均包含一个堆用于保存runtimerTimer,此外每个timersBucket桶对应一个专门的协程(timerproc)来监控runtimeTimer
在这里插入图片描述

在Go 1.14中取消了timersBucket桶,直接把保存runtimeTimer的堆放到了处理器P中。
在处理器P的数据结构中,除了包含协程的队列,还直接包含了runtimeTimer。
在这里插入图片描述

消除timersBucket桶的同时,也不再需要timerproc来监控定时器了。

9.2 消除了timeproc

在Go 1.14之前的版本中,timerproc实际上专门监控定时器的协程执行体。
在Go 1.14的设计中取消了timerproc,因为runtimeTimer不在存储在timerBucket桶中,而是转移到每个处理器P中。
在这里插入图片描述

Go 1.14 中做的优化主要是为了取消timerproc,不在依赖timerproc来监控定时器,而是希望提供一种更搞笑的监控方式。

9.3 更少的锁竞争

在Go 1.14 之前,runtimeTimer存储在timersBucket桶中,runtimeTimer的添加、删除均需要加锁。
在处理器P的数据结构中,仍然有一把锁(timersLock)来限制timers的并发访问。实际上在Go 1.14 中,runtimer的添加、删除也需要加锁。
当协程发生系统调用时,当前的工作线程将释放持有的处理器P,当前工作线程专注于处理系统调用(被阻塞),然后启动一个新的工作线程来继续消费当前处理器P中的协程。当新的工作线程启动时,需要寻找空闲的处理器P,这是需要加锁的。
当程序中拥有大量的定时器时,在Go 1.14之前,每个timerproc处理完一个定时器都会休眠,即触发系统调用,从而释放处理器P,启动新的工作线程,多个新的工作线程在获取空闲处理器P时会争抢互斥锁。
从Go 1.14开始,处理器不在由timerproc处理,而是在每次协程调度时检查定时器是否需要触发,在协程调度时捎带检查定时器。
相较于之前的timerproc,定时器被关注的更加频繁,而且不会因为协程触发系统调用而产生新的工作线程,所以定时器触发的会更准时。

9.4 更少的上下文切换

在Go 1.14 之前,timerproc也是夹杂在系统其他的协程中被调度的,假设将timerproc标记为GT
在这里插入图片描述

由于timerproc夹杂在其他的协程中,当协程较多时,难以保证timerproc能被及时调度。假如程序中每1微妙就需要触发一个定时器,而timerproc每2微妙才被调度一次,那么定时器将产生1微妙的误差,从而不准时了。
在Go 1.14 中,由于每次调度协程时都会检查处理器,所以当有定时器需要触发时,先处理定时器,在调度协程,相当于每个协程都兼任了之前的timerproc的工作,但不会触发系统调用。
在这里插入图片描述

由于在设计上取消了timerproc,也避免了频繁的调度timerproc时产生的上下文切换,从而在一定程度上节省了系统资源。

9.5 优化效果

在github上有优化代码的性能测试结果
https://github.com/golang/go/commit/6becb033341602f2df9d7c55cc23e64b925bbee2
在这里插入图片描述

可以看到在多个涉及定时器的场景中,性能均有了较大程度的优化。

相关的好文:
https://studygolang.com/articles/26529
https://github.com/golang/go/commit/76f4fd8a5251b4f63ea14a3c1e2fe2e78eb74f81
https://xiaorui.cc/archives/6483
https://www.pengrl.com/p/20021/

标签:触发,定时器,协程,知识,Timer,timer,Go,Ticker
From: https://blog.csdn.net/a18792721831/article/details/142415290

相关文章

  • Golang在线客服系统源码:基于Gin框架,Websocket即时通讯企业网站客服聊天源码,包括后台管
    唯一客服系统是一款基于Golang的Gin框架构建的在线客服解决方案,支持独立部署,确保数据的私密性和安全性。它具备自适应的响应式设计,能够完美适配PC端、移动端以及APP内嵌等多种场景。客服端提供PC后台管理功能,实现实时消息接收和交流。此外,为了满足移动端回复需求,系统还利用uniapp......
  • 【开题报告】基于django+vue校园外卖配送管理平台小程序(论文+程序)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容研究背景随着移动互联网技术的飞速发展,校园生活日益数字化、便捷化。校园外卖服务作为校园生活的重要组成部分,已成为学生们日常饮食选择的重要渠道......
  • 无人机集群路径规划:麻雀搜索算法(Sparrow Search Algorithm, SSA)​求解无人机集群路
     一、单个无人机路径规划模型介绍无人机三维路径规划是指在三维空间中为无人机规划一条合理的飞行路径,使其能够安全、高效地完成任务。路径规划是无人机自主飞行的关键技术之一,它可以通过算法和模型来确定无人机的航迹,以避开障碍物、优化飞行时间和节省能量消耗。二、无人......
  • 无人机集群路径规划:​北方苍鹰优化算法(Northern Goshawk Optimization,NGO)​求解无人机
     一、单个无人机路径规划模型介绍无人机三维路径规划是指在三维空间中为无人机规划一条合理的飞行路径,使其能够安全、高效地完成任务。路径规划是无人机自主飞行的关键技术之一,它可以通过算法和模型来确定无人机的航迹,以避开障碍物、优化飞行时间和节省能量消耗。二、无人......
  • 银行项目测试,基础业务知识,一文全掌握!
    1、银行体系银行:指依法成立经营货币和信用业务的金融机构。中国银行体系:中央银行:中国人民银行。制定和实施货币政策、进行宏观调控;发行人民币、管理人民币流通;管理国库;持有、管理、经营国家外汇储备、黄金储备。是国家机关,领导银行的银行。银行业金融机构:政策性银行、......
  • 谷歌收录查询工具,Google收录查询工具操作教程
    谷歌收录查询工具是帮助网站所有者或SEO专业人士了解网站在谷歌搜索引擎中收录情况的重要工具。以下是一些常用的Google收录查询工具及其操作教程:一、GoogleSearchConsole(谷歌搜索控制台)功能概述:GoogleSearchConsole是一个免费的服务,它帮助网站所有者监控和管理他们的网站......
  • 如何避开学习和研究机器人方向无价值的知识节约时间
    往昔这是一篇十年前就想写,但是一直没有实力和勇气落笔的文字。如今简约授之以鱼,不如授之以渔。啰嗦机器人方向如何简单判定这个知识是否有价值。只谈一个方向,就是这个知识点是“死”还是“活”?什么是“死”?有标准答案的知识可以看作是一种“死”知识。智能时代,知识......
  • C语言定时器编程深入研究
    定时器是操作系统提供的用于计时的功能之一,常用于控制程序中的延时操作或周期性任务。本篇文章将详细介绍如何使用C语言处理定时器,包括基本的定时器设置方法、自定义定时器处理函数以及一些高级主题。1.引言定时器是操作系统向进程提供的一个用于计时的功能。在C语言中,......
  • 嵌入式硬件基础知识
    一、嵌入式系统的定义与特点定义:嵌入式系统是一种专为特定应用而设计的计算机系统,它通常被嵌入到更大的设备或系统中,作为这些设备或系统的一部分来执行预设的任务。与通用计算机相比,嵌入式系统具有更高的专用性、更低的功耗、更小的体积以及更强的实时性等特点。特点:专用性:......
  • RAG+Agent人工智能平台:RAGflow实现GraphRAG知识库问答,打造极致多模态问答与AI编排流体
    RAG+Agent人工智能平台:RAGflow实现GraphRAG知识库问答,打造极致多模态问答与AI编排流体验1.RAGflow简介最近更新:2024-09-13增加知识库问答搜索模式。2024-09-09在Agent中加入医疗问诊模板。2024-08-22支持用RAG技术实现从自然语言到SQL语句的转换。2024-08-02......