概述
前面2篇文章介绍了workqueue中的普通队列FIFO和延时队列。接下来我们分析workqueue中的第三种队列: 限速队列
client-go 的 util/workqueue 包里主要有三个队列,分别是普通队列Queue,延时队列DelayingQueue,限速队列RateLimitingQueue,后一个队列以前一个队列的实现为基础,层层添加新功能。
限速队列本质仍然是一个队列,是在FIFO普通队列和延时队列的基础之上,又扩展了新的功能,可以对加入队列的数据添加一个变化的等待时间,从而起到限制访问速率的作用。
RateLimitingQueue 的实现
限速队列,基于延迟队列和 FIFO 队列接口封装,限速队列接口(RateLimitingInterface)在原有功能上增加了 AddRateLimited
、Forget
、NumRequeues
方法。限速队列的重点不在于 RateLimitingInterface
接口,而在于它提供的 4 种限速算法接口(RateLimiter)。其原理是,限速队列利用延迟队列的特性,延迟某个元素的插入时间,达到限速目的
。
找到限速队列的源码位置client-go/util/workqueue/rate_limiting_queue.go
,RateLimiter
数据结构如下:
// RateLimitingInterface is an interface that rate limits items being added to the queue.
type RateLimitingInterface interface {
// 继承延时队列的功能
DelayingInterface
// AddRateLimited adds an item to the workqueue after the rate limiter says it's ok
// AddRateLimited 在限速器指定等待时间后,把添加元素到workQueue,
AddRateLimited(item interface{})
// Forget indicates that an item is finished being retried. Doesn't matter whether it's for perm failing
// or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you
// still have to call `Done` on the queue.
// 从failures[item]这个map中删除item的入队次数
Forget(item interface{})
// NumRequeues returns back how many times the item was requeued
// NumRequeues 返回item的入队了多少次数
NumRequeues(item interface{}) int
}
限速队列接口方法说明如下。
- When:获取指定元素应该等待的时间。
- Forget:释放指定元素,清空该元素的重试次数的记录即failures[item]。这个方法仅对指数退避算法和计数器算法有效,令牌桶算法不关心同一个元素是否重新入队requeue
- NumRequeues:获取指定元素的重试次数,即failures[item]的值。这个方法对指数退避算法和计数器算法有效,令牌桶算法不关心同一个元素是否requeue
AddRateLimited()方法
的逻辑
// AddRateLimited AddAfter's the item based on the time when the rate limiter says it's ok
func (q *rateLimitingType) AddRateLimited(item interface{}) {
q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}
首先 q.rateLimiter.When(item)
用限速器计算出一个等待时间duration
再将item与等待时间duration,传递给AddAfter(item,duration)加入到延时队列中去。
限速算法
下面会分别详解 WorkQueue 提供的 3 种限速算法,这 3 种限速算法分别如下。
- 排队指数算法(ItemExponentialFailureRateLimiter)。
- 计数器算法(ItemFastSlowRateLimiter)。
- 令牌桶算法(BucketRateLimiter)。
同时限速队列可以使用多种限速算法的组合,即混合模式(MaxOfRateLimiter)。
计数器限速算法
计数器算法是限速算法中最简单的一种,其原理是:限制一段时间内允许通过的元素数量,例如在 1 分钟内只允许通过 100 个元素,每插入一个元素,计数器自增 1,当计数器数到 100 的阈值且还在限速周期内时,则不允许元素再通过。
但 WorkQueue 的对计数器算法的实现有所不同,workQueue是此基础上进行了扩展与修改,引入了 fastDelay
和 slowDelay
。
计数器算法提供了 4 个主要字段:failures
、fastDelay
、slowDelay
及 maxFastAttempts
。其中,
failures
字段用于统计元素入队的次数,每当 AddRateLimited 方法插入新元素时,会为该字段加 1;fastDelay
和slowDelay
字段是用于定义 fast、slow 速率的;通常实际使用中fastDelay
比slowDelay
小很多,例如分别为5毫秒与10秒。maxFastAttempts
字段控制返回值使用fastDelay
还是slowDelay
,入队次数少于maxFastAttempts
,就返回fastDelay
,否则就返slowDelay
。
计数器算法核心实现的代码示例如下:
// ItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry after that
type ItemFastSlowRateLimiter struct {
failuresLock sync.Mutex
failures map[interface{}]int
maxFastAttempts int
fastDelay time.Duration
slowDelay time.Duration
}
func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
// 记录item的入队次数做计数,每执行一次when()就加1
r.failures[item] = r.failures[item] + 1
// item的入队次数如果小于maxFastAttempts,就返回 fastDelay
if r.failures[item] <= r.maxFastAttempts {
return r.fastDelay
}
// 如果重试入队的次数大于了 maxFastAttempts,就返回slowDelay
// slowDelay的值,通常远大于fastDelay。
// 通俗的讲,就是如果前几次重试,我把延时设置小一点,如果重试了maxFastAttempts都还失败了,那对不起,我只有把延时设置为一个比较大的值即fastDelay
return r.slowDelay
}
// 返回item的计数次数
func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
return r.failures[item]
}
// 将item从计数的map中删除
func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
delete(r.failures, item)
}
假设 fastDelay 是 5 * time.Millisecond
,slowDelay 是 10 * time.Second
,maxFastAttempts
是 3。在一个限速周期内通过 AddRateLimited 方法插入 4 个相同的元素,那么前 3 个元素使用 fastDelay
定义的 fast 速率,当触发 maxFastAttempts 字段时,第 4 个元素使用 slowDelay
定义的 slow 速率。
我尝试通俗的翻译翻译
: 就是说如果前3次(maxFastAttempts)入队,我计数器算法可以把延时设置小一点为5毫秒(fastDelay)。但如果你都重试了3次(maxFastAttempts)还失败了,那对不起,我只有把你的延时设置为一个比较大的值10秒(fastDelay)。 避免你一直在那里重试,占用系统的资源。如果用伪代码表示就是:
If failures[item] >=3
return fastDelay
else
return slowDelay
指数退避限速算法
有些文章中,翻译为"排队指数算法",个人表示无法理解。因为算法英文名为ItemExponentialFailureRateLimiter
, Exponential翻译为"指数的", Failure说明失败重试有关,在函数when()的实现中用了单词"backoff",所以我觉得算法翻译为"指数退避限速器"更简单明白。
如果结合kubernete的业务逻辑更好理解ItemExponentialFailureRateLimiter算法。这个算法处理的场景例如:假设一个pod待创建,首先加入工作队列,但某种原因导致k8s创建失败,就需要将该pod重新加入队列进行重试,那么加入队列前要设置一个等待时间,这里就可以选择用指数退避算法,表示因为Pod创建失败而重新入队,很明显,重试次数越多,等待的延时就越长,这个时长可以用math.pow(2,n),也就是指数增长来设置这个延时,但这个延时又不能无限大,所以要给延时封顶,也就是maxDelay的值。
接下来我们具体分析算法实现的源码,ItemExponentialFailureRateLimiter
源码位于client-go/util/workqueue/default_rate_limiters.go
ItemExponentialFailureRateLimiter
结构体的定义,failures一个用于记录处理(可能和Item业务处理失败有关)入队次数。baseDelay,maxDelay表示用于计算等待时间(或延时)的基础时间
// ItemExponentialFailureRateLimiter does a simple baseDelay*2^<num-failures> limit
// dealing with max failures and expiration are up to the caller
type ItemExponentialFailureRateLimiter struct {
failuresLock sync.Mutex
// 记录元素入队的次数
failures map[interface{}]int
// 元素的基本延时,如果元素不重新入队,这个元素的延时就等baseDelay
baseDelay time.Duration
// 延时的最大值
maxDelay time.Duration
}
指数退避限速器的实现
When()函数从代码的实现看,利用了平时工作中最最常用"指数退避" 算法: 下一次重试等待时间是上次的等待时间的2次方。即简单的描述: 退避时间backoff=min(baseDelay*2^,maxDelay)
函数中使用一个名叫failures的map,记录每个item入队的次数计数。key就为item,value是入队的次数。所以ItemExponentialFailureRateLimiter
,是针对同一个item多次入队,多次入队的原因通常是业务处理失败了(例如,一个名为one
的pod,需要被创建加入了限速队列,由于创建失败后又被重新多次加入限速队列进行重试),所以map命名也failures也是这个道理。
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
// 记录上一次的计数,如果第一次进入的元素exp为0
exp := r.failures[item]
// item对应的计数加1
r.failures[item] = r.failures[item] + 1
// The backoff is capped such that 'calculated' value never overflows.
// backoff翻译为"退避",表示退避算法的计数,backoff的值等于baseDelay*2^<num-failures>
backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
// 这里对backoff的最大值做了一个限制,backoff最大为maxDelay,默认maxDelay的值为1000s
if backoff > math.MaxInt64 {
return r.maxDelay
}
// 如果计算后的值calculated,大于1000,则返回maxDelay
calculated := time.Duration(backoff)
if calculated > r.maxDelay {
return r.maxDelay
}
return calculated
}
// 返回失败的次数。
func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
return r.failures[item]
}
// 从failures的map中删除item
func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
delete(r.failures, item)
}
workQueue在实际使用中经常使用一个的默认限速器名为DefaultControllerRateLimiter
// DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue. It has
// both overall and per-item rate limiting. The overall is a token bucket and the per-item is exponential
func DefaultControllerRateLimiter() RateLimiter {
return NewMaxOfRateLimiter(
// 指定了使用指数退避限速器,baseDelay=5毫秒,maxDelay=1000秒
NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
}
指数退避算法的举例说明
为了更深入了解,这里举例说明:假设一个名称为one
的pod被AddRateLimited(one)入队,限速器使用的是指数退避限速器,参数baseDelay=5毫秒,maxDelay=1000秒。当pod第一次进入队后,根据算法计算过程:
exp=0
r.failures[one]=0+1=1
backoff=baseDelay * math.Pow(2,0)=0.005 * 1=0.005
之后业务调用get()处理one
的pod,由于不明原因处理失败,AddRateLimited(one)重新第二次入队,
exp=r.failures[one]=1
r.failures[one]=r.failures[one]+1=1+1=2
backoff=baseDelay * math.Pow(2,1)=0.005*2=0.01
业务调用get()处理one
的pod,由于不明原因处理失败,AddRateLimited(one)重新第三次入队,
exp=r.failures[one]=2
r.failures[one]=r.failures[one]+1=2+1=3
backoff=baseDelay+math.Pow(2,2)=0.005*4=0.02
按照计算规律,依次进行对重新入队次数n与exp、r.failures[one]、Backoff的计算关系如下
n | Exp | r.failures[one] | Backoff(秒) | backoff(毫秒) |
---|---|---|---|---|
1 | 0 | 1 | 0.005 * math.Pow(2,0)=0.005 * 1=0.005 | 5 |
2 | 1 | 2 | 0.005 * math.Pow(2,1)=0.005 * 2=0.01 | 10 |
3 | 2 | 3 | 0.005 * math.Pow(2,2)=0.005 * 4=0.02 | 20 |
4 | 3 | 4 | 0.005 * math.Pow(2,3)=0.005 * 8=0.04 | 40 |
5 | 4 | 5 | 0.005 * math.Pow(2,4)=0.005 * 16=0.08 | 80 |
6 | 5 | 6 | 0.005 * math.Pow(2,5)=0.005 * 32=0.16 | 160 |
7 | 6 | 7 | 0.005 * math.Pow(2,6)=0.005 * 64=0.32 | 320 |
8 | 7 | 8 | 0.005 * math.Pow(2,7)=0.005 * 128=0.64 | 640 |
9 | 8 | 9 | 0.005 * math.Pow(2,8)=0.005 * 256=1.28 | 1280 |
10 | 9 | 10 | 0.005 * math.Pow(2,9)=0.005 * 512=2.56 | 2560 |
11 | 10 | 11 | 0.005 * math.Pow(2,10)=0.005 * 1024=5.12 | 5120 |
12 | 11 | 12 | 0.005 * math.Pow(2,10)=0.005 * 2048=10.24 | 10240 |
将结果使用折线图表示:
指数退避算法的实战验证
找到client-go源码client-go/util/workqueue/default_rate_limiters.go
在第98行下面,插入打印信息:
fmt.Println(“backoff=”,time.Duration(backoff))
编写一段测试代码,创建一个限速队列,并且使用指数退避算法ItemExponentialFailureRateLimiter
作为限速器。将同一个item入队10次,观察打印出的backoff数值。
func main() {
// 创建一个指数退避算法的限速器
rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second)
// 创建一个限速队列,使用上面的指数退避算法的限速器
queue := workqueue.NewRateLimitingQueue(rateLimiter)
// 创建一个key,重复入队10次
for i := 0; i < 11; i++ {
fmt.Printf("Adding %d, ", i)
queue.AddRateLimited("one")
}
fmt.Println()
}
测试结果:
可以从结果可以对比知道,和上面我们推理验证的是相吻合的。
指数退避算法的示意图
为了更形象化的描述,限速队列是如何使用指数退避算法的执行过程,这里再举例说明。
假设有4个item值为one
和2个item值为two
的对象,依次连续加入限速算法为指数退避算法
的限速队列。
令牌桶限速算法
令牌桶算法是通过 Go 语言的第三方库 golang.org/x/time/rate
实现的。
令牌桶算法描述: 该算法的内部实现了一个存放 token(令牌)的“桶”,初始时“桶”是空的,token 会以固定速率往“桶”里填充,直到将其填满为止,多余的 token 会被丢弃。每个元素都会从令牌桶得到一个 token,只有得到 token 的元素才允许通过(accept),而没有得到 token 的元素处于等待状态。令牌桶算法通过控制发放 token 来达到限速目的。令牌桶算法原理如图 5-12 所示。
接下来我们看看client-go中如何使用令牌桶算法的。找到源码位置:client-go/util/workqueue/default_rate_limiters.go
BucketRateLimiter
结构体定义如下。结构体中包含了 "golang.org/x/time/rate"
中的限速器,说明client-go并没有重写一个令牌桶限速算法,而是直接使用的go官方包rate.go中的令牌限速器
// BucketRateLimiter adapts a standard bucket to the workqueue ratelimiter API
type BucketRateLimiter struct {
// rate.go中的令牌限速器
*rate.Limiter
}
When()直接使用了rate.go中的Limiter.Reserve().Delay()计算出的延时。
// When()直接使用了rate.go中的Limiter.Reserve().Delay()计算出的延时
func (r *BucketRateLimiter) When(item interface{}) time.Duration {
return r.Limiter.Reserve().Delay()
}
// 令牌桶限速器中的Forget()没什么意义
func (r *BucketRateLimiter) Forget(item interface{}) {
}
// 令牌桶限速器中的 NumRequeues()一直返回0。
// 说明令牌桶限速器不关心同一个元素是否requeue(重新入队)
func (r *BucketRateLimiter) NumRequeues(item interface{}) int {
return 0
}
本文仅仅介绍client-go中的令牌桶限速器使用的。关于令牌桶算法的详细实现不是简单几句能描述清楚的,所以令牌桶算法的实现机制会面专门写一篇文章介绍。
混合模式
混合模式是将多种限速算法混合使用,即多种限速算法同时生效。例如,同时使用指数退避算法和令牌桶算法。
接下来我们从client-go v1.12.0的examples中里的一行代码说起。代码如下:
// create the workqueue
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
workqueue.NewTypedRateLimitingQueue()
函数的作用是初始化一个限速队列,参数应该是一个限速器.这里使用的限速器是DefaultControllerRateLimiter()
,找到源码:
// DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue. It has
// both overall and per-item rate limiting. The overall is a token bucket and the per-item is exponential
func DefaultControllerRateLimiter() RateLimiter {
return NewMaxOfRateLimiter(
// 指数退避限速器,注意baseDelay=5毫秒,maxDelay=1000秒
NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
// 同时也用了令牌桶限速器,注意qps=10, burst=100
&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
}
说明DefaultControllerRateLimiter()
同时使用了两个限速算法ItemExponentialFailureRateLimiter
和BucketRateLimiter
继续剖析,发现MaxOfRateLimiter
是一个实现了RateLimiter的结构体,这个结构体包括了一个限速器列表[]RateLimiter。当然MaxOfRateLimiter
也会实现限速队列的接When(),Forget(),NumRequeues()三个方法。最重要的是When()方法,从下面代码我们可以得知,When()函数的逻辑是遍历多个限速器,看哪个限速器限的”越狠“,就使用谁返回的duration值。
// MaxOfRateLimiter calls every RateLimiter and returns the worst case response
// When used with a token bucket limiter, the burst could be apparently exceeded in cases where particular items
// were separately delayed a longer time.
type MaxOfRateLimiter struct {
limiters []RateLimiter
}
func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter {
return &MaxOfRateLimiter{limiters: limiters}
}
// when 判断那个限速器限速的"越狠"就用那个限速器的值
func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
ret := time.Duration(0)
for _, limiter := range r.limiters {
// 依次用各种限速器的when()获取限速的延时值
curr := limiter.When(item)
// 谁限速的延时也大就用谁的
if curr > ret {
ret = curr
}
}
return ret
}
- 混合模式的示意图
总结
- workQueue中的限速队列,可以使用的算法有三种:计数器限速算法、指数指数退避限速算法、令牌桶限速算法,同时还支持多种算法组合的混合模式。
- 指数退避限速算法、计数器限速算法关心同一个对象多次入队的情况(requeue),重新入队的次数使用一个failures[item]记录,简单的说requeue次数越多,延时越大。令牌桶限速算法,不会关心同一个元素的是否重新入队。
参考文档
博客:《controller-runtime-client-go-rate-limiting》
书籍:《Kubernetes 源码剖析之 WorkQueue 队列》
标签:RateLimitingQueue,队列,限速,入队,item,算法,源码,failures,WorkQueue From: https://blog.csdn.net/xsw164711368/article/details/141303451