Shared Informer 源码分析
我们分析了 Indexer 组件的实现,实际上最开始的时候我们在 Informer 示例中通过 Informer 的 Lister 获取的资源对象数据就来自于 Indexer,当然除了 Lister 之外最重要的就是资源对象事件监听的操作,这些都是在 SharedInformer 中去实现的,所以我们需要去分析下 SharedInformer 的实现,这样就可以完整的将前面的内容串联起来了。
SharedInfomer
我们平时说的 Informer 其实就是 SharedInformer,它是可以共享使用的。如果同一个资源的 Informer 被实例化多次,那么就会运行多个 ListAndWatch 操作,这会加大 APIServer 的压力。而 SharedInformer 通过一个 map 来让同一类资源的 Informer 实现共享一个 Refelctor,这样就不会出现上面这个问题了。接下来我们先来查看 SharedInformer 的具体实现:
// k8s.io/client-go/tools/cache/shared_informer.go
type SharedInformer interface {
// 添加资源事件处理器,当有资源变化时就会通过回调通知使用者
AddEventHandler(handler ResourceEventHandler)
// 需要周期同步的资源事件处理器
AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
// 获取一个 Store 对象,前面我们讲解了很多实现 Store 的结构
GetStore() Store
// 获取一个 Controller,下面会详细介绍,主要是用来将 Reflector 和 DeltaFIFO 组合到一起工作
GetController() Controller
// SharedInformer 的核心实现,启动并运行这个 SharedInformer
// 当 stopCh 关闭时候,informer 才会退出
Run(stopCh <-chan struct{})
// 告诉使用者全量的对象是否已经同步到了本地存储中
HasSynced() bool
// 最新同步资源的版本
LastSyncResourceVersion() string
}
// 在 SharedInformer 基础上扩展了添加和获取 Indexers 的能力
type SharedIndexInformer interface {
SharedInformer
// 在启动之前添加 indexers 到 informer 中
AddIndexers(indexers Indexers) error
GetIndexer() Indexer
}
如果我们要处理资源的事件的话,就需要添加一个事件处理器,传入一个 ResourceEventHandler
接口,其定义如下所示:
// k8s.io/client-go/tools/cache/controller.go
type ResourceEventHandler interface {
// 添加对象回调函数
OnAdd(obj interface{})
// 更新对象回调函数
OnUpdate(oldObj, newObj interface{})
// 删除对象回调函数
OnDelete(obj interface{})
}
然后接下来我们来看看 SharedIndexInformer
的具体实现类的定义:
// k8s.io/client-go/tools/cache/shared_informer.go
type sharedIndexInformer struct {
// Indexer也是一种Store,这个我们知道的,Controller负责把Reflector和FIFO逻辑串联起来
// 所以这两个变量就涵盖了开篇那张图里面的Reflector、DeltaFIFO和LocalStore(cache)
indexer Indexer
// 在 Controller 中将 Reflector 和 DeltaFIFO 关联了起来
controller Controller
// 对 ResourceEventHandler 进行了一层层封装,统一由 sharedProcessor 管理
processor *sharedProcessor
// 监控对象在一个时间窗口内是否发生了变化
cacheMutationDetector MutationDetector
// 用于 Reflector 中真正执行 ListAndWatch 的操作
listerWatcher ListerWatcher
// informer 中要处理的对象
objectType runtime.Object
// 定期同步周期
resyncCheckPeriod time.Duration
// 任何通过 AddEventHandler 添加的处理程序的默认重新同步的周期
defaultEventHandlerResyncPeriod time.Duration
clock clock.Clock
// 启动、停止标记
started, stopped bool
startedLock sync.Mutex
blockDeltas sync.Mutex
}
Controller
上面我们看到在 sharedIndexInformer 中定义了一个 Controller,这里的 Controller 并不是我们比较熟悉的 kube-controller-manager 管理的各种控制器,这里的 Controller 定义在 client-go/tools/cache/controller.go
中,目的是用来把 Reflector、DeltaFIFO 这些组件组合起来形成一个相对固定的、标准的处理流程。我们先来看下 Controller 的定义:
// k8s.io/client-go/tools/cache/controller.go
// Controller 的抽象接口
type Controller interface {
// Run 函数主要做两件事,一件是构造并运行一个 Reflector 反射器,将对象/通知从 Config 的
// ListerWatcher 送到 Config 的 Queue 队列,并在该队列上调用 Resync 操作
// 另外一件事就是不断从队列中弹出对象,并使用 Config 的 ProcessFunc 进行处理
Run(stopCh <-chan struct{})
HasSynced() bool // APIServer 中的资源对象是否同步到了 Store 中
LastSyncResourceVersion() string // 最新的资源版本号
}
因为 Controller 把多个模块整合起来实现了一套业务逻辑,所以在创建Controller 的时候需要提供一些配置:
// k8s.io/client-go/tools/cache/controller.go
type Config struct {
Queue // 资源对象的队列,其实就是一个 DeltaFIFO
ListerWatcher // 用来构造 Reflector 的
Process ProcessFunc // DeltaFIFO 队列 Pop 的时候回调函数,用于处理弹出的对象
ObjectType runtime.Object // 对象类型,也就是 Reflector 中使用的
FullResyncPeriod time.Duration // 全量同步周期,在 Reflector 中使用
ShouldResync ShouldResyncFunc // Reflector 中是否需要 Resync 操作
RetryOnError bool // 出现错误是否需要重试
}
Controller 自己构造 Reflector 获取对象,Reflector 作为 DeltaFIFO 生产者持续监控 APIServer 的资源变化并推送到队列中。Controller 的 Run() 就是是队列的消费者,从队列中弹出对象并调用 Process() 进行处理。接下来我们来看 Controller 的一个具体实现 controller:
// k8s.io/client-go/tools/cache/controller.go
// controller是 Controller 的一个具体实现
type controller struct {
config Config // 配置
reflector *Reflector // 反射器
reflectorMutex sync.RWMutex // 反射器的锁
clock clock.Clock // 时钟
}
// 控制器核心实现
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
// 新建一个协程,如果收到系统退出的信号就关闭队列
go func() {
<-stopCh
c.config.Queue.Close()
}()
// 实例化一个 Reflector,传入的参数都是从 Config 中获取的
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.clock = c.clock
// 将反射器给到controller
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
// 等待所有协程执行完毕
var wg wait.Group
defer wg.Wait()
// StartWithChannel 会启动协程执行 Reflector.Run(),接收到 stopCh 信号才会退出协程
wg.StartWithChannel(stopCh, r.Run)
// wait.Unitl() 就是周期性的调用 c.processLoop() 操作处理弹出的对象
wait.Until(c.processLoop, time.Second, stopCh)
}
从上面的核心函数 Run 的实现方式来看,该函数中主要就是实例化一个 Reflector,然后启动一个协程去执行这个反射器的 Run 函数,这个 Run 函数前面我们已经讲解过就是去调用 ListAndWatch
函数进行 List 和 Watch 操作,这个操作中具体的实现就是 Config 中的 ListerWatcher
。然后的一个核心就是 processLoop() 函数的实现:
// k8s.io/client-go/tools/cache/controller.go
// 处理队列弹出的对象
func (c *controller) processLoop() {
// 死循环,不断从队列中弹出对象来处理
for {
// 从队列中弹出一个对象,然后处理这个对象
// 真正处理的是通过 Config 传递进来的 Process 函数
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
// 如果队列关闭了那就直接退出了
if err == ErrFIFOClosed {
return
}
// 如果配置的是错误后允许重试
if c.config.RetryOnError {
// 如果错误可以再重试那么将弹出的对象重新入队列进行处理
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
上面的代码其实核心的处理就是从 DeltaFIFO 中不断 Pop 出一个对象,然后交给 Config 传递进来的 Process 函数去处理,这个函数是在 SharedInformer 中初始化的时候传递进来的。
sharedProcessor
然后上面 SharedIndexInformer
的实现中还有一个比较重要的属性就是 sharedProcessor
,就是专门来处理事件的,通过 AddEventHandler 函数添加的处理器就会被封装成 processorListener,然后通过 sharedProcessor 管理起来,其定义如下所示:
// k8s.io/client-go/tools/cache/shared_informer.go
// sharedProcessor 有一个 processorListener 的集合,可以向它的监听器分发事件通知对象。
type sharedProcessor struct {
listenersStarted bool // 所有处理器是否已经启动
listenersLock sync.RWMutex // 读写锁?
listeners []*processorListener // 通用的处理器列表
syncingListeners []*processorListener // 需要定时同步的处理器列表
clock clock.Clock
wg wait.Group
}
type processorListener struct {
nextCh chan interface{}
addCh chan interface{} // 添加事件的通道
handler ResourceEventHandler
// pendingNotifications 是一个无边界的环形缓冲区,用于保存所有尚未分发的通知。
pendingNotifications buffer.RingGrowing
requestedResyncPeriod time.Duration
resyncPeriod time.Duration
nextResync time.Time
resyncLock sync.Mutex
}
processorListener 中就包含一个资源事件处理器,那么我们是如何传递事件进来的呢?首先我们来看看添加一个处理器是如何实现的:
// k8s.io/client-go/tools/cache/shared_informer.go
func (p *processorListener) add(notification interface{}) {
p.addCh <- notification
}
可以看到添加事件很简单,直接通过 addCh 这个通道接收,notification 就是我们所说的事件,也就是前面我们常说的 DeltaFIFO 输出的 Deltas。上面我们可以看到 addCh 是定义成的一个无缓冲通道,所以这个 add() 函数就是一个事件分发器,从 DeltaFIFO 中弹出的对象要逐一送到多个处理器,如果处理器没有及时处理 addCh 则会阻塞住:
// k8s.io/client-go/tools/cache/shared_informer.go
func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
defer close(p.nextCh) // 通知 run() 函数停止
var nextCh chan<- interface{}
var notification interface{}
// 死循环
for {
select {
// nextCh 还没初始化时,会被阻塞
case nextCh <- notification:
// 如果发送成功了(下面的 run 函数中消耗了数据后),从缓冲中再取一个事件出来
var ok bool
notification, ok = p.pendingNotifications.ReadOne()
if !ok { // 没有事件被 Pop,设置 nextCh 为 nil
nextCh = nil // Disable 这个 select 的 case
}
// 从 p.addCh 通道中读取一个事件,消费 addCh 通道
case notificationToAdd, ok := <-p.addCh:
if !ok { // 如果关闭了,则退出
return
}
// notification 为空说明还没发送任何事件给处理器(pendingNotifications 为空)
if notification == nil {
// 把刚刚获取的事件通过 p.nextCh 发送给处理器
notification = notificationToAdd
nextCh = p.nextCh
} else {
// 上一个事件还没发送完成(已经有一个通知等待发送),就先放到缓冲通道中
p.pendingNotifications.WriteOne(notificationToAdd)
}
}
}
}
pop() 函数的实现的利用了 golang 的 select 来同时操作多个 channel ,select 的 case 表达式都没有满足求值条件,那么 select 语句就会被阻塞,直到至少有一个 case 表达式满足条件为止,如果多个 case 语句同时满足则随机选择一个 case 执行。接下来,我们看看从 nextCh 读取事件后是如何处理的:
// k8s.io/client-go/tools/cache/shared_informer.go
func (p *processorListener) run() {
// 当关闭 stopCh 后才会退出
stopCh := make(chan struct{})
wait.Until(func() {
// 不断从 nextCh 通道中取数据
for next := range p.nextCh {
// 判断事件类型
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
}
}
// 当 p.nextCh 是空的且是关闭的时候才能到达这里,关闭 stopCh
close(stopCh)
}, 1*time.Second, stopCh)
}
run() 和 pop() 是 processorListener 的两个最核心的函数,processorListener 就是实现了事件的缓冲和处理,在没有事件的时候可以阻塞处理器,当事件较多是可以把事件缓冲起来,实现了事件分发器与处理器的异步处理。processorListener 的 run() 和 pop() 函数其实都是通过 sharedProcessor 启动的协程来调用的,所以下面我们再来对 sharedProcessor 进行分析了。首先看下如何添加一个 processorListener:
// k8s.io/client-go/tools/cache/shared_informer.go
// 添加处理器
func (p *sharedProcessor) addListener(listener *processorListener) {
p.listenersLock.Lock() // 加锁
defer p.listenersLock.Unlock()
// 调用 addListenerLocked 函数
p.addListenerLocked(listener)
// 如果事件处理列表中的处理器已经启动了,则手动启动下面的两个协程
// 相当于启动后了
if p.listenersStarted {
// 通过 wait.Group 启动两个协程,就是上面我们提到的 run 和 pop 函数
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
}
// 将处理器添加到处理器的列表中
func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
// 添加到通用处理器列表中
p.listeners = append(p.listeners, listener)
// 添加到需要定时同步的处理器列表中
p.syncingListeners = append(p.syncingListeners, listener)
}
这里添加处理器的函数 addListener 其实在 sharedIndexInformer 中的 AddEventHandler 函数中就会调用这个函数来添加处理器。然后就是事件分发的函数实现:
// k8s.io/client-go/tools/cache/shared_informer.go
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
// sync 表示 obj 对象是否是同步事件对象
// 将对象分发给每一个事件处理器列表中的处理器
if sync {
for _, listener := range p.syncingListeners {
listener.add(obj)
}
} else {
for _, listener := range p.listeners {
listener.add(obj)
}
}
然后就是将 sharedProcessor 运行起来:
// k8s.io/client-go/tools/cache/shared_informer.go
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
func() {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
// 遍历所有的处理器,为处理器启动两个后台协程:run 和 pop 操作
// 后续添加的处理器就是在上面的 addListener 中去启动的
for _, listener := range p.listeners {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
// 标记为所有处理器都已启动
p.listenersStarted = true
}()
// 等待退出信号
<-stopCh
// 接收到退出信号后,关闭所有的处理器
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
// 遍历所有处理器
for _, listener := range p.listeners {
// 关闭 addCh,pop 会停止,pop 会通知 run 停止
close(listener.addCh)
}
// 等待所有协程退出,就是上面所有处理器中启动的两个协程 pop 与 run
p.wg.Wait()
}
到这里 sharedProcessor 就完成了对 ResourceEventHandler 的封装处理,当然最终 sharedProcessor 还是在 SharedInformer 中去调用的。
SharedInformer 实现
接下来我们就来看下 SharedInformer 的具体实现:
// k8s.io/client-go/tools/cache/shared_informer.go
// 为 listwatcher 创建一个新的实例,用于 Reflector 从 apiserver 获取资源
// 所以需要外部提供一个资源类型
func NewSharedInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration) SharedInformer {
// 调用 NewSharedIndexInformer 实现
return NewSharedIndexInformer(lw, exampleObject, defaultEventHandlerResyncPeriod, Indexers{})
}
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object,
defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
realClock := &clock.RealClock{}
sharedIndexInformer := &sharedIndexInformer{
~~~~ processor: &sharedProcessor{clock: realClock},
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher: lw,
objectType: exampleObject,
resyncCheckPeriod: defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
clock: realClock,
}
return sharedIndexInformer
}
实例化 SharedInformer 比较简单,实例化完成后就可以添加事件处理器了:
// k8s.io/client-go/tools/cache/shared_informer.go
// 使用默认的同步周期添加事件处理器
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
}
// 真正的添加事件处理器的实现
func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
s.startedLock.Lock()
defer s.startedLock.Unlock()
// 如果已经结束了,那就直接返回了
if s.stopped {
klog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler)
return
}
// 如果同步周期>0
if resyncPeriod > 0 {
// 同步周期不能小于最小的时间
if resyncPeriod < minimumResyncPeriod {
klog.Warningf("resyncPeriod %d is too small. Changing
it to the minimum allowed value of %d", resyncPeriod, minimumResyncPeriod)
resyncPeriod = minimumResyncPeriod
}
//
if resyncPeriod < s.resyncCheckPeriod {
// 如果已经启动了,那就用 resyncCheckPeriod 这个周期
if s.started {
klog.Warningf("resyncPeriod %d is smaller than resyncCheckPeriod
%d and the informer has already started. Changing it to %d", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod)
resyncPeriod = s.resyncCheckPeriod
} else {
// 如果事件处理器的同步周期小于当前的 resyncCheckPeriod 且还没启动
// 则更新 resyncCheckPeriod 为 resyncPeriod
// 并相应调整所有监听器的同步周期
s.resyncCheckPeriod = resyncPeriod
s.processor.resyncCheckPeriodChanged(resyncPeriod)
}
}
}
// 新建事件处理器
listener := newProcessListener(handler, resyncPeriod,
determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
// 如果没有启动,那么就直接添加处理器就可以了
if !s.started {
// 上面我们分析过添加事件处理器
s.processor.addListener(listener)
return
}
// blockDeltas 提供了一种方法来停止所有的事件分发,以便后面的事件处理器可以安全地加入 SharedInformer。
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
// 添加处理器
s.processor.addListener(listener)
// 因为到这里证明 SharedInformer 已经启动了,可能很多对象已经让其他处理器处理过了
// 所以这些对象就不会再通知新添加的处理器了,所以这里遍历 indexer 中的所有对象去通知新添加的处理器
for _, item := range s.indexer.List() {
listener.add(addNotification{newObj: item})
}
}
事件处理器添加完过后就要看下 SharedInformer 是如何把事件分发给每个处理器的了:
// k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
// 新建一个 DeltaFIFO
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
})
// 用于构造 Controller 的配置
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
// Controller 调用 DeltaFIFO 的 Pop 函数传入这个回调函数来处理弹出的对象
Process: s.HandleDeltas,
}
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
// 新建一个 Controller 并标记为已经启动
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait() // 等待处理器停止
defer close(processorStopCh) // 通知处理器停止
// 启动两个协程
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
wg.StartWithChannel(processorStopCh, s.processor.run)
defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
// 标记为已停止
s.stopped = true
}()
// 启动 Controller
s.controller.Run(stopCh)
}
sharedIndexInformer 通过 Run() 函数启动了 Controller 和 sharedProcess,Controller 通过 DeltaFIFO 的 Pop 函数弹出 Deltas 对象,并使用 HandleDeltas 函数来处理这个对象,前面其实我们就讲解过。这个函数把 Deltas 转换为 sharedProcess 需要的各种Notification 类型。
// k8s.io/client-go/tools/cache/shared_informer.go
// DeltaFIFO 的对象被 Pop 后的处理函数
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
// 因为 Deltas 是 Delta 列表,里面包含一个对象的多个操作
// 所以要从最老的 Delta 到最新的 Delta 遍历处理
for _, d := range obj.(Deltas) {
switch d.Type { // 根据对象操作类型进行处理
// 同步、替换、添加、更新类型
case Sync, Replaced, Added, Updated:
s.cacheMutationDetector.AddObject(d.Object)
// 如果 indexer 中有这个对象,则当成更新事件进行处理
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
if err := s.indexer.Update(d.Object); err != nil {
return err
}
isSync := false
switch {
case d.Type == Sync:
isSync = true
case d.Type == Replaced:
if accessor, err := meta.Accessor(d.Object); err == nil {
if oldAccessor, err := meta.Accessor(old); err == nil {
isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
}
}
}
// 通知处理器处理事件
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
// 将对象添加到 indexer 存储中
if err := s.indexer.Add(d.Object); err != nil {
return err
}
// 然后通知处理器处理事件
s.processor.distribute(addNotification{newObj: d.Object}, false)
}
// 删除类型
case Deleted:
// 从 indexer 中删除对象
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
// 通知所有的处理器对象被删除了
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}
到这里我们就将整个 SharedInformer 的流程就梳理清楚了,最后我们再来总结下 SharedInformer 的整个流程:
-
通过 Reflector 实现资源对象的 List 和 Watch 操作
-
将通过 List 全量列举的对象存储在 Indexer 中,然后再 Watch 资源,一旦有变化就更新 Indexer,并在 Indexer 中采用 Namespace 做对象索引
-
更新到 Indexer 的过程通过 DeltaFIFO 实现有顺序的更新,因为资源状态是通过全量+增量的方式实现同步的,所以顺序错误会造成状态不一致
-
使用者可以注册回调函数,在更新到 Indexer 的同时通知使用者去处理,为了保证回调处理不被某一个处理器阻塞,SharedInformer 实现了processorListener 异步缓冲处理
-
整个过程是通过 Controller 来驱动的