channel问题
channel是go协程间通信的主要方式。
channel预设容量,很难评估,不支持动态扩容。
k8s的client-go提供了基于切片的线程安全的并发队列,解耦生产者与消费者,提供了去重、限速、重试加入队列等功能。
k8s controller处理事件官方例子
生产者
// 创建一个work queue
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
// 将一个待处理的对象添加work queue
queue.Add(key)
消费者
// workers代表处理并发处work queue中任务的协程数
for i := 0; i < workers; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
// runWorker是一个死循环,通过processNextItem从队列中取出key进行处理,然后取出next key继续处理
func (c *Controller) runWorker() {
for c.processNextItem() {
}
}
// processNextItem就是真正的处理逻辑了,
func (c *Controller) processNextItem() bool {
// quit为true代表队列已关闭
key, quit := c.queue.Get()
if quit {
return false
}
// 处理完后将key从队列中移除,并且该操作是并发安全的,后续会进行详细分析
defer c.queue.Done(key)
// 调用实际业务代码对每个key进行处理
err := c.syncToStdout(key.(string))
// 如果处理异常则进行错误处理
c.handleErr(err, key)
return true
}
// 错误处理代码,对上一函数中可能处理失败的key进行重试等操作
func (c *Controller) handleErr(err error, key interface{}) {
if err == nil {
// 如果处理没有错误,调用Forget方法将key从限速队列中移除
c.queue.Forget(key)
return
}
// NumRequeues方法返回了一个key重入队列的次数,若超过设定的阈值,则从限速队列中移除,不再处理
if c.queue.NumRequeues(key) < 5 {
klog.Infof("Error syncing pod %v: %v", key, err)
// 若重入队列次数没超过阈值,则添加到限速队列
c.queue.AddRateLimited(key)
return
}
c.queue.Forget(key)
// 错误上报
runtime.HandleError(err)
}
限速队列由多个队列一起完成。
基本队列work queue
接口
type Interface interface {
// 添加元素
Add(item interface{})
// 队列长度
Len() int
// 返回队首元素,并返队列队列是否关闭
Get() (item interface{}, shutdown bool)
// 处理完一个元素后从队列中删除
Done(item interface{})
// 关闭队列
ShutDown()
// 返回队列是否关闭
ShuttingDown() bool
}
实现
type Type struct {
// interface{}类型的切片,存储具体的key
queue []t
// 存储需要被处理的元素
dirty set
// 存储正在处理的元素
processing set
// 用于唤醒其他协程队列已满足条件可以继续处理,如队列由空变为非空
cond *sync.Cond
// 标记队列是否关闭
shuttingDown bool
// 监控指标相关字段
metrics queueMetrics
unfinishedWorkUpdatePeriod time.Duration
clock clock.Clock
}
shutdown标记通知消费者、监控组件等队列是否关闭。
Add方法
func (q *Type) Add(item interface{}) {
// 加锁互斥保证线程安全
q.cond.L.Lock()
defer q.cond.L.Unlock()
if q.shuttingDown {
return
}
// 去重
if q.dirty.has(item) {
return
}
// 记录监控指标
q.metrics.add(item)
q.dirty.insert(item)
// 当有相同元素正在处理时,同样进行去重操作,不予入队
if q.processing.has(item) {
return
}
// 最终添加到队列
q.queue = append(q.queue, item)
q.cond.Signal()
}
Add方法会先把key放入dirty和queue中,去重;若该key正在被处理,则不会加入,防止业务错误。
Get
func (q *Type) Get() (item interface{}, shutdown bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
// 如果队列元素为空且没有关闭则等待其他goroutine唤醒
for len(q.queue) == 0 && !q.shuttingDown {
q.cond.Wait()
}
// 若已经关闭则应该返回
if len(q.queue) == 0 {
// We must be shutting down.
return nil, true
}
// 从queue字段取出一个元素
item, q.queue = q.queue[0], q.queue[1:]
// 监控相关
q.metrics.get(item)
// 把key加入processing集合并从dirty删除,这样相同的key可以在当前key处理完后继续入队处理
q.processing.insert(item)
q.dirty.delete(item)
return item, false
}
key从dirty集合和queue中移除,放入processing。
Done & ShutDown
func (q *Type) Done(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.metrics.done(item)
q.processing.delete(item)
// 若key在处理的过程中,又再次被加入到队列,由Add方法可知,当key在processing中时,Add操作只是把key放到了dirty集合,并没有放入queue中,因此
// 相同的key处理完从processing中移除后,需要把key再放入到queue中,防止key被遗漏
if q.dirty.has(item) {
q.queue = append(q.queue, item)
q.cond.Signal()
}
}
func (q *Type) ShutDown() {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.shuttingDown = true
q.cond.Broadcast()
}
通过dirty和processing两个集合,work queue实现了去重,防止了相同key被同时处理的错误。
延迟队列DelayingQueue
接口
type DelayingInterface interface {
Interface
// 经过duration时间后item被重新加入队列
AddAfter(item interface{}, duration time.Duration)
}
延迟队列在work queue基础上实现,继承了Interface接口,多了AddAfter方法,通过设置指定的duration来达到限速的目的。
实现
func newDelayingQueue(clock clock.Clock, q Interface, name string) *delayingType {
ret := &delayingType{
Interface: q,
clock: clock,
heartbeat: clock.NewTicker(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(name),
}
go ret.waitingLoop()
return ret
}
newDelayingQueue返回一个delayingType类型的限速队列,启动一个waitingLoop协程处理被添加的key。
AddAfter
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
if q.ShuttingDown() {
return
}
q.metrics.retry()
// 没有延迟直接加入queue
if duration <= 0 {
q.Add(item)
return
}
select {
case <-q.stopCh:
// 将封装后的key放入waitingForAddCh channel
case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
}
}
将key和延迟时间封装成一个waitFor struct,readyAt是key应该加入到队列的时间,放入到waitingForAddCh中,waitingLoop协程会异步处理。默认waitingForAddCh的大小为1000,当channel满时添加key会被block。
waitingLoop
func (q *delayingType) waitingLoop() {
defer utilruntime.HandleCrash()
never := make(<-chan time.Time)
// 记录等待队列中第一个key需要的等待的时间
var nextReadyAtTimer clock.Timer
// 基于堆实现的优先级队列,需要最早被加入到队列的key放在最前面
waitingForQueue := &waitForPriorityQueue{}
heap.Init(waitingForQueue)
waitingEntryByData := map[t]*waitFor{}
for {
if q.Interface.ShuttingDown() {
return
}
now := q.clock.Now()
// 判断堆顶元素是否到期需要加入队列
for waitingForQueue.Len() > 0 {
entry := waitingForQueue.Peek().(*waitFor)
// 如果还没到期,继续等待或者继续监听后续key加入事件
if entry.readyAt.After(now) {
break
}
// 从堆顶弹出元素添加到队列
entry = heap.Pop(waitingForQueue).(*waitFor)
q.Add(entry.data)
delete(waitingEntryByData, entry.data)
}
nextReadyAt := never
if waitingForQueue.Len() > 0 {
if nextReadyAtTimer != nil {
nextReadyAtTimer.Stop()
}
entry := waitingForQueue.Peek().(*waitFor)
nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
nextReadyAt = nextReadyAtTimer.C()
}
select {
case <-q.stopCh:
return
// 等待心跳时间过期
case <-q.heartbeat.C():
// 等待对顶元素时间过期
case <-nextReadyAt:
// 从waitingForAddCh中取元素,若已经到期直接加入到队列,否则加入堆中等待处理
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}
// 尝试再取一个
drained := false
for !drained {
select {
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}
default:
drained = true
}
}
}
}
}
监听waitingForAddCh channel,取出待添加到队列的key,如果已经到期则直接加入,否则将key放入到堆中,然后每次从堆中取出最先过期的key进行判断处理。
限速队列RateLimitingQueue
接口
type RateLimitingInterface interface {
DelayingInterface
// 限速器rate limiter指定的时间到期后将item加入队列
AddRateLimited(item interface{})
// 将item从限速器删除,不再进行重试加入,但还是需要调用Done方法删除item
Forget(item interface{})
// 返回item被重新入队列的次数
NumRequeues(item interface{}) int
}
RateLimitingInterface是在DelayingInterface的基础上多了三个方法,调用NewNamedRateLimitingQueue方法传入RateLimiter,调用时可以传入不同的限速器ratelimiter实现,官方提供了四种rate Limiter实现,分别是BucketRateLimiter、ItemExponentialFailureRateLimiter、ItemFastSlowRateLimiter和MaxOfRateLimiter。
k8s中默认的控制器限速器初始化使用混合限速器。
RateLimiter需要实现三个方法
type RateLimiter interface {
// 返回key需要等待加入队列的时间
When(item interface{}) time.Duration
// 取消key的重试
Forget(item interface{})
// 记录一个key被重试了多少次
NumRequeues(item interface{}) int
}
BucketRateLimiter
基于token令牌桶的限速方法,通过三方库golang.org/x/time/rate实现。
rate.NewLimiter(rate.Limit(10), 100)
10代表每秒往桶中放入token的数量,100代表初始化token数量,前100个元素直接通过,第101个元素等待100ms,第102个元素等待200ms。
ItemExponentialFailureRateLimiter
指数退避算法,有两个主要参数baseDelay和maxDelay。baseDelay表示推迟的基数,每次添加相同的key对应的延迟加入时间会指数递增;maxDelay表示延迟时间的上限。
// 每次进来exp指数加1
exp := r.failures[item]
r.failures[item] = r.failures[item] + 1
backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
if backoff > math.MaxInt64 {
return r.maxDelay
}
baseDelay是10,maxDelay是1000,同一个key第一次进行需要等待的时间为10*2^1,第二次为10*2^2,以此类推。
ItemFastSlowRateLimiter
定义了两个时间fastDelay、lowDelay以及达到fastDelay的阈值maxFastAttempts。
r.failures[item] = r.failures[item] + 1
if r.failures[item] <= r.maxFastAttempts {
return r.fastDelay
}
return r.slowDelay
当重新加入队列的次数小于阈值maxFastAttempts,需要等待的时间为fastDelay,超过阈值则需要等待更长的时间slowDelay。
MaxOfRateLimiter
MaxOfRateLimiter则是多个RateLimiter的组合,需要延迟的时间为各个RateLimiter的时间最大值。
参考资料
https://monokaix.github.io/2021/09/12/workqueue/
标签:队列,限速,queue,item,key,interface,k8s From: https://www.cnblogs.com/WJQ2017/p/17706874.html