首页 > 编程语言 >informer中的WorkQueue机制的实现分析与源码解读(3)之限速队列RateLimitingQueue

informer中的WorkQueue机制的实现分析与源码解读(3)之限速队列RateLimitingQueue

时间:2024-08-18 18:58:53浏览次数:13  
标签:RateLimitingQueue 队列 限速 入队 item 算法 源码 failures WorkQueue

概述

前面2篇文章介绍了workqueue中的普通队列FIFO和延时队列。接下来我们分析workqueue中的第三种队列: 限速队列

client-go 的 util/workqueue 包里主要有三个队列,分别是普通队列Queue,延时队列DelayingQueue,限速队列RateLimitingQueue,后一个队列以前一个队列的实现为基础,层层添加新功能。

限速队列本质仍然是一个队列,是在FIFO普通队列和延时队列的基础之上,又扩展了新的功能,可以对加入队列的数据添加一个变化的等待时间,从而起到限制访问速率的作用。

RateLimitingQueue 的实现

限速队列,基于延迟队列和 FIFO 队列接口封装,限速队列接口(RateLimitingInterface)在原有功能上增加了 AddRateLimitedForgetNumRequeues 方法。限速队列的重点不在于 RateLimitingInterface 接口,而在于它提供的 4 种限速算法接口(RateLimiter)。其原理是,限速队列利用延迟队列的特性,延迟某个元素的插入时间,达到限速目的

找到限速队列的源码位置client-go/util/workqueue/rate_limiting_queue.goRateLimiter 数据结构如下:

// RateLimitingInterface is an interface that rate limits items being added to the queue.
type RateLimitingInterface interface {
  // 继承延时队列的功能
	DelayingInterface

	// AddRateLimited adds an item to the workqueue after the rate limiter says it's ok
  // AddRateLimited 在限速器指定等待时间后,把添加元素到workQueue,
	AddRateLimited(item interface{})

	// Forget indicates that an item is finished being retried.  Doesn't matter whether it's for perm failing
	// or for success, we'll stop the rate limiter from tracking it.  This only clears the `rateLimiter`, you
	// still have to call `Done` on the queue.
  // 从failures[item]这个map中删除item的入队次数
	Forget(item interface{})

	// NumRequeues returns back how many times the item was requeued
  // NumRequeues 返回item的入队了多少次数
	NumRequeues(item interface{}) int
}

限速队列接口方法说明如下。

  • When:获取指定元素应该等待的时间。
  • Forget:释放指定元素,清空该元素的重试次数的记录即failures[item]。这个方法仅对指数退避算法和计数器算法有效,令牌桶算法不关心同一个元素是否重新入队requeue
  • NumRequeues:获取指定元素的重试次数,即failures[item]的值。这个方法对指数退避算法和计数器算法有效,令牌桶算法不关心同一个元素是否requeue

AddRateLimited()方法的逻辑

// AddRateLimited AddAfter's the item based on the time when the rate limiter says it's ok
func (q *rateLimitingType) AddRateLimited(item interface{}) {
	q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}

首先 q.rateLimiter.When(item)用限速器计算出一个等待时间duration

再将item与等待时间duration,传递给AddAfter(item,duration)加入到延时队列中去。

限速算法

下面会分别详解 WorkQueue 提供的 3 种限速算法,这 3 种限速算法分别如下。

  • 排队指数算法(ItemExponentialFailureRateLimiter)。
  • 计数器算法(ItemFastSlowRateLimiter)。
  • 令牌桶算法(BucketRateLimiter)。

同时限速队列可以使用多种限速算法的组合,即混合模式(MaxOfRateLimiter)。

计数器限速算法

计数器算法是限速算法中最简单的一种,其原理是:限制一段时间内允许通过的元素数量,例如在 1 分钟内只允许通过 100 个元素,每插入一个元素,计数器自增 1,当计数器数到 100 的阈值且还在限速周期内时,则不允许元素再通过。

但 WorkQueue 的对计数器算法的实现有所不同,workQueue是此基础上进行了扩展与修改,引入了 fastDelayslowDelay

计数器算法提供了 4 个主要字段:failuresfastDelayslowDelaymaxFastAttempts。其中,

  • failures 字段用于统计元素入队的次数,每当 AddRateLimited 方法插入新元素时,会为该字段加 1;
  • fastDelayslowDelay 字段是用于定义 fast、slow 速率的;通常实际使用中fastDelayslowDelay小很多,例如分别为5毫秒与10秒。
  • maxFastAttempts 字段控制返回值使用fastDelay 还是slowDelay,入队次数少于maxFastAttempts,就返回fastDelay,否则就返slowDelay

计数器算法核心实现的代码示例如下:

// ItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry after that
type ItemFastSlowRateLimiter struct {
	failuresLock sync.Mutex
	failures     map[interface{}]int

	maxFastAttempts int
	fastDelay       time.Duration
	slowDelay       time.Duration
}
func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
	r.failuresLock.Lock()
	defer r.failuresLock.Unlock()

  // 记录item的入队次数做计数,每执行一次when()就加1
	r.failures[item] = r.failures[item] + 1
  
  // item的入队次数如果小于maxFastAttempts,就返回 fastDelay
	if r.failures[item] <= r.maxFastAttempts {
		return r.fastDelay
	}
	// 如果重试入队的次数大于了 maxFastAttempts,就返回slowDelay
	// slowDelay的值,通常远大于fastDelay。
	// 通俗的讲,就是如果前几次重试,我把延时设置小一点,如果重试了maxFastAttempts都还失败了,那对不起,我只有把延时设置为一个比较大的值即fastDelay
	return r.slowDelay
}

// 返回item的计数次数
func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int {
	r.failuresLock.Lock()
	defer r.failuresLock.Unlock()

	return r.failures[item]
}

// 将item从计数的map中删除
func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {
	r.failuresLock.Lock()
	defer r.failuresLock.Unlock()

	delete(r.failures, item)
}

假设 fastDelay 是 5 * time.Millisecond,slowDelay 是 10 * time.SecondmaxFastAttempts 是 3。在一个限速周期内通过 AddRateLimited 方法插入 4 个相同的元素,那么前 3 个元素使用 fastDelay 定义的 fast 速率,当触发 maxFastAttempts 字段时,第 4 个元素使用 slowDelay 定义的 slow 速率。

我尝试通俗的翻译翻译: 就是说如果前3次(maxFastAttempts)入队,我计数器算法可以把延时设置小一点为5毫秒(fastDelay)。但如果你都重试了3次(maxFastAttempts)还失败了,那对不起,我只有把你的延时设置为一个比较大的值10秒(fastDelay)。 避免你一直在那里重试,占用系统的资源。如果用伪代码表示就是:

If failures[item] >=3 
   return fastDelay 
else 
   return slowDelay

指数退避限速算法

有些文章中,翻译为"排队指数算法",个人表示无法理解。因为算法英文名为ItemExponentialFailureRateLimiter, Exponential翻译为"指数的", Failure说明失败重试有关,在函数when()的实现中用了单词"backoff",所以我觉得算法翻译为"指数退避限速器"更简单明白。

如果结合kubernete的业务逻辑更好理解ItemExponentialFailureRateLimiter算法。这个算法处理的场景例如:假设一个pod待创建,首先加入工作队列,但某种原因导致k8s创建失败,就需要将该pod重新加入队列进行重试,那么加入队列前要设置一个等待时间,这里就可以选择用指数退避算法,表示因为Pod创建失败而重新入队,很明显,重试次数越多,等待的延时就越长,这个时长可以用math.pow(2,n),也就是指数增长来设置这个延时,但这个延时又不能无限大,所以要给延时封顶,也就是maxDelay的值。

接下来我们具体分析算法实现的源码,ItemExponentialFailureRateLimiter源码位于client-go/util/workqueue/default_rate_limiters.go

ItemExponentialFailureRateLimiter结构体的定义,failures一个用于记录处理(可能和Item业务处理失败有关)入队次数。baseDelay,maxDelay表示用于计算等待时间(或延时)的基础时间

// ItemExponentialFailureRateLimiter does a simple baseDelay*2^<num-failures> limit
// dealing with max failures and expiration are up to the caller
type ItemExponentialFailureRateLimiter struct {
	failuresLock sync.Mutex
	// 记录元素入队的次数
	failures     map[interface{}]int
	// 元素的基本延时,如果元素不重新入队,这个元素的延时就等baseDelay
	baseDelay time.Duration
	// 延时的最大值
	maxDelay  time.Duration
}


指数退避限速器的实现

When()函数从代码的实现看,利用了平时工作中最最常用"指数退避" 算法: 下一次重试等待时间是上次的等待时间的2次方。即简单的描述: 退避时间backoff=min(baseDelay*2^,maxDelay)

函数中使用一个名叫failures的map,记录每个item入队的次数计数。key就为item,value是入队的次数。所以ItemExponentialFailureRateLimiter,是针对同一个item多次入队,多次入队的原因通常是业务处理失败了(例如,一个名为one的pod,需要被创建加入了限速队列,由于创建失败后又被重新多次加入限速队列进行重试),所以map命名也failures也是这个道理。

func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
	r.failuresLock.Lock()
	defer r.failuresLock.Unlock()
  
  // 记录上一次的计数,如果第一次进入的元素exp为0
	exp := r.failures[item]
	// item对应的计数加1
	r.failures[item] = r.failures[item] + 1

	// The backoff is capped such that 'calculated' value never overflows.
	// backoff翻译为"退避",表示退避算法的计数,backoff的值等于baseDelay*2^<num-failures>
	backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
	// 这里对backoff的最大值做了一个限制,backoff最大为maxDelay,默认maxDelay的值为1000s
	if backoff > math.MaxInt64 {
		return r.maxDelay
	}
  // 如果计算后的值calculated,大于1000,则返回maxDelay
	calculated := time.Duration(backoff)
	if calculated > r.maxDelay {
		return r.maxDelay
	}

	return calculated
}

// 返回失败的次数。
func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {
	r.failuresLock.Lock()
	defer r.failuresLock.Unlock()

	return r.failures[item]
}

// 从failures的map中删除item
func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {
	r.failuresLock.Lock()
	defer r.failuresLock.Unlock()

	delete(r.failures, item)
}

workQueue在实际使用中经常使用一个的默认限速器名为DefaultControllerRateLimiter

// DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue.  It has
// both overall and per-item rate limiting.  The overall is a token bucket and the per-item is exponential
func DefaultControllerRateLimiter() RateLimiter {
	return NewMaxOfRateLimiter(
    // 指定了使用指数退避限速器,baseDelay=5毫秒,maxDelay=1000秒
		NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
		// 10 qps, 100 bucket size.  This is only for retry speed and its only the overall factor (not per item)
		&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
	)
}

指数退避算法的举例说明

为了更深入了解,这里举例说明:假设一个名称为one的pod被AddRateLimited(one)入队,限速器使用的是指数退避限速器,参数baseDelay=5毫秒,maxDelay=1000秒。当pod第一次进入队后,根据算法计算过程:

exp=0
r.failures[one]=0+1=1
backoff=baseDelay * math.Pow(2,0)=0.005 * 1=0.005

之后业务调用get()处理one的pod,由于不明原因处理失败,AddRateLimited(one)重新第二次入队,

exp=r.failures[one]=1
r.failures[one]=r.failures[one]+1=1+1=2
backoff=baseDelay * math.Pow(2,1)=0.005*2=0.01

业务调用get()处理one的pod,由于不明原因处理失败,AddRateLimited(one)重新第三次入队,

exp=r.failures[one]=2
r.failures[one]=r.failures[one]+1=2+1=3
backoff=baseDelay+math.Pow(2,2)=0.005*4=0.02

按照计算规律,依次进行对重新入队次数n与exp、r.failures[one]、Backoff的计算关系如下

nExpr.failures[one]Backoff(秒)backoff(毫秒)
1010.005 * math.Pow(2,0)=0.005 * 1=0.0055
2120.005 * math.Pow(2,1)=0.005 * 2=0.0110
3230.005 * math.Pow(2,2)=0.005 * 4=0.0220
4340.005 * math.Pow(2,3)=0.005 * 8=0.0440
5450.005 * math.Pow(2,4)=0.005 * 16=0.0880
6560.005 * math.Pow(2,5)=0.005 * 32=0.16160
7670.005 * math.Pow(2,6)=0.005 * 64=0.32320
8780.005 * math.Pow(2,7)=0.005 * 128=0.64640
9890.005 * math.Pow(2,8)=0.005 * 256=1.281280
109100.005 * math.Pow(2,9)=0.005 * 512=2.562560
1110110.005 * math.Pow(2,10)=0.005 * 1024=5.125120
1211120.005 * math.Pow(2,10)=0.005 * 2048=10.2410240

将结果使用折线图表示:

在这里插入图片描述

指数退避算法的实战验证

找到client-go源码client-go/util/workqueue/default_rate_limiters.go

在第98行下面,插入打印信息:

fmt.Println(“backoff=”,time.Duration(backoff))

在这里插入图片描述

编写一段测试代码,创建一个限速队列,并且使用指数退避算法ItemExponentialFailureRateLimiter作为限速器。将同一个item入队10次,观察打印出的backoff数值。

func main() {

	// 创建一个指数退避算法的限速器
	rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second)
	// 创建一个限速队列,使用上面的指数退避算法的限速器
	queue := workqueue.NewRateLimitingQueue(rateLimiter)
	// 创建一个key,重复入队10次
	for i := 0; i < 11; i++ {
		fmt.Printf("Adding %d, ", i)
		queue.AddRateLimited("one")
	}
	fmt.Println()
}

测试结果:

在这里插入图片描述

可以从结果可以对比知道,和上面我们推理验证的是相吻合的。

指数退避算法的示意图

为了更形象化的描述,限速队列是如何使用指数退避算法的执行过程,这里再举例说明。

假设有4个item值为one和2个item值为two的对象,依次连续加入限速算法为指数退避算法的限速队列。

在这里插入图片描述

令牌桶限速算法

令牌桶算法是通过 Go 语言的第三方库 golang.org/x/time/rate 实现的。

令牌桶算法描述: 该算法的内部实现了一个存放 token(令牌)的“桶”,初始时“桶”是空的,token 会以固定速率往“桶”里填充,直到将其填满为止,多余的 token 会被丢弃。每个元素都会从令牌桶得到一个 token,只有得到 token 的元素才允许通过(accept),而没有得到 token 的元素处于等待状态。令牌桶算法通过控制发放 token 来达到限速目的。令牌桶算法原理如图 5-12 所示。

img

接下来我们看看client-go中如何使用令牌桶算法的。找到源码位置:client-go/util/workqueue/default_rate_limiters.go

BucketRateLimiter结构体定义如下。结构体中包含了 "golang.org/x/time/rate"中的限速器,说明client-go并没有重写一个令牌桶限速算法,而是直接使用的go官方包rate.go中的令牌限速器

// BucketRateLimiter adapts a standard bucket to the workqueue ratelimiter API
type BucketRateLimiter struct {
  // rate.go中的令牌限速器
	*rate.Limiter
}

When()直接使用了rate.go中的Limiter.Reserve().Delay()计算出的延时。


// When()直接使用了rate.go中的Limiter.Reserve().Delay()计算出的延时
func (r *BucketRateLimiter) When(item interface{}) time.Duration {
	return r.Limiter.Reserve().Delay()
}

// 令牌桶限速器中的Forget()没什么意义
func (r *BucketRateLimiter) Forget(item interface{}) {
}

// 令牌桶限速器中的 NumRequeues()一直返回0。
// 说明令牌桶限速器不关心同一个元素是否requeue(重新入队)
func (r *BucketRateLimiter) NumRequeues(item interface{}) int {
	return 0
}

本文仅仅介绍client-go中的令牌桶限速器使用的。关于令牌桶算法的详细实现不是简单几句能描述清楚的,所以令牌桶算法的实现机制会面专门写一篇文章介绍。

混合模式

混合模式是将多种限速算法混合使用,即多种限速算法同时生效。例如,同时使用指数退避算法和令牌桶算法。

接下来我们从client-go v1.12.0的examples中里的一行代码说起。代码如下:

	// create the workqueue
	queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

workqueue.NewTypedRateLimitingQueue()函数的作用是初始化一个限速队列,参数应该是一个限速器.这里使用的限速器是DefaultControllerRateLimiter(),找到源码:

// DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue.  It has
// both overall and per-item rate limiting.  The overall is a token bucket and the per-item is exponential
func DefaultControllerRateLimiter() RateLimiter {
	return NewMaxOfRateLimiter(
	  // 指数退避限速器,注意baseDelay=5毫秒,maxDelay=1000秒
		NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
		// 10 qps, 100 bucket size.  This is only for retry speed and its only the overall factor (not per item)
		// 同时也用了令牌桶限速器,注意qps=10, burst=100
		&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
	)
}

说明DefaultControllerRateLimiter()同时使用了两个限速算法ItemExponentialFailureRateLimiterBucketRateLimiter

继续剖析,发现MaxOfRateLimiter是一个实现了RateLimiter的结构体,这个结构体包括了一个限速器列表[]RateLimiter。当然MaxOfRateLimiter也会实现限速队列的接When(),Forget(),NumRequeues()三个方法。最重要的是When()方法,从下面代码我们可以得知,When()函数的逻辑是遍历多个限速器,看哪个限速器限的”越狠“,就使用谁返回的duration值。


// MaxOfRateLimiter calls every RateLimiter and returns the worst case response
// When used with a token bucket limiter, the burst could be apparently exceeded in cases where particular items
// were separately delayed a longer time.
type MaxOfRateLimiter struct {
	limiters []RateLimiter
}

func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter {
	return &MaxOfRateLimiter{limiters: limiters}
}

// when 判断那个限速器限速的"越狠"就用那个限速器的值
func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
	ret := time.Duration(0)
	for _, limiter := range r.limiters {
    // 依次用各种限速器的when()获取限速的延时值
		curr := limiter.When(item)
		// 谁限速的延时也大就用谁的
		if curr > ret {
			ret = curr
		}
	}

	return ret
}
  • 混合模式的示意图

在这里插入图片描述

总结

  1. workQueue中的限速队列,可以使用的算法有三种:计数器限速算法、指数指数退避限速算法、令牌桶限速算法,同时还支持多种算法组合的混合模式。
  2. 指数退避限速算法、计数器限速算法关心同一个对象多次入队的情况(requeue),重新入队的次数使用一个failures[item]记录,简单的说requeue次数越多,延时越大。令牌桶限速算法,不会关心同一个元素的是否重新入队。

参考文档

博客:《controller-runtime-client-go-rate-limiting》

博客:《k8s_clientgo_workqueue》

书籍:《Kubernetes 源码剖析之 WorkQueue 队列》

标签:RateLimitingQueue,队列,限速,入队,item,算法,源码,failures,WorkQueue
From: https://blog.csdn.net/xsw164711368/article/details/141303451

相关文章

  • 基于C#+SQL Server的餐饮信息管理系统设计与实现 毕业论文+任务书+问卷+数据库设计文
    !!!有需要的小伙伴可以通过文章末尾名片咨询我哦!!! ......
  • 基于微信小程序的外卖点餐系统的设计与实现22 毕业论文+开题报告+答辩PPT+论文检测查
    !!!有需要的小伙伴可以通过文章末尾名片咨询我哦!!! ......
  • [开题报告]FLASK框架远程医疗信息系统c4np0(源码+论文)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容研究背景随着信息技术的飞速发展,远程医疗作为一种新型医疗服务模式,正逐步改变着传统医疗服务的格局。在全球化背景下,医疗资源分布不均、偏远地区医......
  • [开题报告]FLASK框架长株潭旅游舆情系统e48wf(源码+论文)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容研究背景长株潭地区作为湖南省的经济与文化中心,拥有丰富的自然风光和深厚的人文底蕴,吸引了大量游客前来观光旅游。然而,随着旅游业的蓬勃发展,旅游舆......
  • [开题报告]FLASK框架自习室管理系统1g708(源码+论文)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容研究背景在当今高等教育体系中,自习室作为学生日常学习与自我提升的关键场所,其管理效能直接影响到学生的学习效果与满意度。随着学生人数的不断增加......
  • [开题报告]FLASK框架自助料理网上订餐系统p2933(源码+论文)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容研究背景随着互联网的飞速发展和人们生活节奏的加快,线上订餐服务已成为现代都市人不可或缺的生活方式之一。自助料理作为一种集个性化、健康、便捷......
  • 【源码+论文】springboot高校学科竞赛平台
    系统包含:源码+论文所用技术:SpringBoot+Vue+SSM+Mybatis+Mysql获取资料请私聊我目录1系统概述 11.1研究背景 11.2研究目的 11.3系统设计思想 12相关技术 22.1MYSQL数据库 22.2B/S结构 32.3SpringBoot框架简介43系统分析 43.1可行性分析 43.1.1技术......
  • 【源码+论文】springboot基于智能推荐的卫生健康系统
    系统包含:源码+论文所用技术:SpringBoot+Vue+SSM+Mybatis+Mysql获取资料请私聊我目录1系统概述 11.1研究背景 11.2研究目的 11.3系统设计思想 12相关技术 22.1MYSQL数据库 22.2B/S结构 32.3SpringBoot框架简介43系统分析 43.1可行性分析 43.1.1技术......
  • 【源码+论文】springboot知识管理系统
    系统包含:源码+论文所用技术:SpringBoot+Vue+SSM+Mybatis+Mysql获取资料请私聊我目录1系统概述 11.1研究背景 11.2研究目的 11.3系统设计思想 12相关技术 32.1MYSQL数据库 32.2B/S结构 32.3SpringBoot框架简介42.4VUE框架 43系统分析 53.1可行性分......
  • 呆头鹅矩阵系统:短视频运营的创新之选——下载-官网-源码
    在短视频占据互联网重要地位的当下,呆头鹅矩阵系统以其创新的功能和可靠的保障,成为企业打开知名度、实现精准获客的强大工具。呆头鹅矩阵系统为用户提供了高效的短视频矩阵管理方案。它如同一个强大的指挥中心,将多个短视频平台的账号集中管理,无需繁琐地在多个手机上切换登录账......