首页 > 其他分享 >Kubernetes 控制器管理器的工作原理

Kubernetes 控制器管理器的工作原理

时间:2022-10-18 10:10:22浏览次数:67  
标签:控制器 管理器 err Kubernetes listener Controller 事件 sharedIndexInformer Informer

在 Kubernetes Master 节点中,有三个重要的组件:ApiServer、ControllerManager 和 Scheduler,它们共同负责整个集群的管理。在本文中,我们尝试梳理一下ControllerManager的工作流程和原理。

Kubernetes 控制器管理器的工作原理_模板类

什么是控制器管理器

根据官方文档:kube-controller-manager 运行控制器,这是处理集群中常规任务的后台线程。

例如,当通过 Deployment 创建的 Pod 异常退出时,RS Controller 会接受并处理退出并创建新的 Pod 以保持预期的副本数。


几乎每个特定的资源都由特定的 Controller 管理以维持预期的状态,而 Controller Manager 的职责是聚合所有 Controller:

  1. 提供基础设施以降低控制器实现的复杂性
  2. 启动和维护控制器的正常运行时间

这样,Controller 保证集群中的资源保持在预期状态,Controller Manager 确保 Controller 保持在预期状态。

 

 

控制器工作流程

在我们解释 Controller Manager 如何为 Controller 提供基础架构和运行时环境之前,让我们先了解一下 Controller 工作流程是什么样的。

从高维的角度来看,ControllerManager主要提供了分发事件的能力,而不同的Controller只需要注册相应的Handler即可等待接收和处理事件。

Kubernetes 控制器管理器的工作原理_事件分发_02

以Deployment Controller为例,其中的​​NewDeploymentController​​方法​​pkg/controller/deployment/deployment_controller.go​​包括Event Handler的注册,对于Deployment Controller,只需要根据不同的事件实现不同的处理逻辑,就可以实现对相应资源的管理。

dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc:    dc.addDeployment,UpdateFunc: dc.updateDeployment,// This will enter the sync loop and no-op, because the deployment has been deleted from the store.DeleteFunc: dc.deleteDeployment,
})
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: dc.addReplicaSet,UpdateFunc: dc.updateReplicaSet,DeleteFunc: dc.deleteReplicaSet,
})
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{DeleteFunc: dc.deletePod,
})

可以看到,在ControllerManager的帮助下,Controller的逻辑可以很纯粹的通过实现对应的EventHandler来完成,那么ControllerManager具体做了哪些工作呢?

 

 

控制器管理器架构

帮助 Controller Manager 进行事件分发的关键模块是 client-go,而更关键的模块之一是 informer。

kubernetes在github上提供了client-go的架构图,从中可以看出Controller是描述的下半部分(CustomController),而Controller Manager主要是完成的上半部分。

Kubernetes 控制器管理器的工作原理_模板类_03

 

 

Informer Factory

从上图中可以看出,Informer 是一个非常关键的“桥梁”,所以对 Informer 的管理是 Controller Manager 做的第一件事。

由于每个 Informer 都与 Api Server 保持一个 watch long 连接,因此这个单实例工厂通过为所有 Controller 提供一个唯一的入口点来获取 Informer,从而确保每种类型的 Informer 仅实例化一次。

这个单例工厂的初始化逻辑。

// NewSharedInformerFactoryWithOptions constructs a new instance of a SharedInformerFactory with additional options.
func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
factory := &sharedInformerFactory{
client: client,namespace: v1.NamespaceAll,
defaultResync: defaultResync,
informers: make(map[reflect.Type]cache.SharedIndexInformer),
startedInformers: make(map[reflect.Type]bool),
customResync: make(map[reflect.Type]time.Duration),}

// Apply all optionsfor _, opt := range options {
factory = opt(factory)}

return factory
}

 

从上面的初始化逻辑可以看出,其中最重要的部分​​sharedInformerFactory​​是名为 的map ​​informers​​,其中key是资源类型,value是关心该资源类型的Informer。每种类型的 Informer 只会被实例化一次并存储在map中。不同的 Controller 只有在需要相同的资源时才会得到相同的 Informer 实例。

对于Controller Manager来说,保持所有Informer正常工作是所有Controller正常工作的基本条件。通过这个​​sharedInformerFactory​​map维护所有informer实例,所以​​sharedInformerFactory​​也负责提供一个统一的启动入口。

// Start initializes all requested informers.
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()

for informerType, informer := range f.informers {if !f.startedInformers[informerType] {go informer.Run(stopCh)
f.startedInformers[informerType] = true}}
}

Controller Manager启动时,最重要的是通过​​Start​​这个工厂的方法运行所有的Informer。

 

 

Informer creation

以下是这些 Informer 的创建方式,Controller Manager​​NewControllerInitializers​​在​​cmd/kube-controller-manager/app/controllermanager.go​​. 由于代码冗长,这里仅提供部署控制器的示例。

初始化部署控制器的逻辑​​startDeploymentController​​在​​cmd/kube-controller-manager/app/apps.go​​.

func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}] {return nil, false, nil}
dc, err := deployment.NewDeploymentController(
ctx.InformerFactory.Apps().V1().Deployments(),
ctx.InformerFactory.Apps().V1().ReplicaSets(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.ClientBuilder.ClientOrDie("deployment-controller"),)if err != nil {return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)}
go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)return nil, true, nil
}

最关键的逻辑在 中​​deployment.NewDeploymentController​​,实际上是创建了 Deployment Controller,创建函数的前三个参数分别是 Deployment、ReplicaSet 和 Pod 的 Informer。如您所见,Informer 的单例工厂提供了一个入口点,用于使用 ApiGroup 作为路径创建具有不同资源的 Informer。

但是,重要的是要注意这一点。​​Apps().V1().Deployments()​​ 返回 type 的实例​​deploymentInformer​​,但​​deploymentInformer​​不是真正的 Informer(尽管它的 Informer 名称)。它只是一个模板类,其主要功能是为创建专注于部署的 Informer 提供模板作为特定资源。

// Deployments returns a DeploymentInformer.
func (v *version) Deployments() DeploymentInformer {return &deploymentInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}

 

创建 Informer 的真正逻辑在​​deploymentInformer.Informer()​​( ​​client-go/informers/apps/v1/deployment.go​​) 中,是默认的 Deployment Informer 创建模板方法,通过将资源实例和该模板方法传递给 Informer 工厂​​f.defaultInformer​​的方法来创建只关注 Deployment 资源的 Informer。​​InformerFor​

func (f *deploymentInformer) Informer() cache.SharedIndexInformer {return f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer)
}

简要说明。

  1. 您可以通过 Informer 工厂获取特定类型的 Informer 模板类(即​​deploymentInformer​​本例中)
  2. ​Informer()​​实际上,为特定资源创建 Informer 的是 Informer 模板类的方法。
  3. 该​​Informer()​​方法只是通过​​InformerFor​​Informer 工厂创建真正的 Informer

这里使用了模板方法(设计模式),虽然有点绕,但是可以参考下图梳理一下。理解它的关键是 Informer 的差异化创建逻辑委托给了模板类

Kubernetes 控制器管理器的工作原理_实例化_04

最后,命名的结构​​sharedIndexInformer​​将被实例化,并实际承担 Informer 的职责。它也是注册到 Informer 工厂映射的实例。

 

 

Informer operation

由于真正的 Informer 实例是一个类型的对象​​sharedIndexInformer​​,当 Informer 工厂启动时(通过执行​​Start​​方法),它​​sharedIndexInformer​​就是实际运行的。

​sharedIndexInformer​​是client-go中的一个组件,它的方法​​Run​​有几十行,但是工作量很大。这是我们进入控制器管理器最有趣的部分的地方。


func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()

fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)

cfg := &Config{Queue: fifo,ListerWatcher: s.listerWatcher,ObjectType: s.objectType,FullResyncPeriod: s.resyncCheckPeriod,RetryOnError: false,ShouldResync: s.processor.shouldResync,

Process: s.HandleDeltas,}

func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()

s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true}()

// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make(chan struct{})var wg wait.Group
defer wg.Wait() // Wait for Processor to stopdefer close(processorStopCh) // Tell Processor to stop
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
wg.StartWithChannel(processorStopCh, s.processor.run)

defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true // Don't want any new listeners}()
s.controller.Run(stopCh)
}

启动逻辑​​sharedIndexInformer​​做了几件事。

  1. 创建一个名为 的队列​​fifo​​。
  2. 创建并运行一个名为​​controller​​.
  3. 开始了​​cacheMutationDetector​​。
  4. 开始了​​processor​​。

这些术语(或组件)在上一篇文章中没有提到,但这四件事是 Controller Manager 工作的核心,因此我将在下面逐一介绍。

 

 

sharedIndexInformer

​sharedIndexInformer​​是一个共享的 Informer 框架,不同的 Controller 只需要提供一个模板类(就像​​deploymentInformer​​上面提到的)来创建一个特定于他们需要的 Informer。

​sharedIndexInformer​​包含一堆工具来完成 Informer 的工作,主要代码在​​client-go/tools/cache/shared_informer.go​​. 它的创建逻辑也在其中。

// NewSharedIndexInformer creates a new instance for the listwatcher.
func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
realClock := &clock.RealClock{}
sharedIndexInformer := &sharedIndexInformer{
processor: &sharedProcessor{clock: realClock},
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher: lw,
objectType: objType,
resyncCheckPeriod: defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
clock: realClock,}return sharedIndexInformer
}

在创建逻辑中,有几点需要注意:

  1. processor:提供EventHandler注册和事件分发的功能
  2. indexer:提供资源缓存功能
  3. listerWatcher:由模板类提供,包含特定资源的List和Watch方法
  4. objectType:用于标记要关注的具体资源类型
  5. cacheMutationDetector:监控Informer的缓存

此外,它还包含​​DeltaFIFO​​队列和​​controller​​上面的启动逻辑中提到的,下面分别介绍。

 

 

sharedProcessor

处理器是 sharedIndexInformer 中一个非常有趣的组件。ControllerManager通过一个Informer单例工厂保证不同的Controller共享同一个Informer,但是不同的Controller在共享的Informer上注册了不同的Handler。

处理器是管理注册的Handler并将事件分发给不同的Handler的组件。

type sharedProcessor struct {
listenersStarted bool
listenersLock sync.RWMutex
listeners []*processorListener
syncingListeners []*processorListener
clock clock.Clock
wg wait.Group
}

sharedProcessor 工作的核心围绕着​​listeners​​.

当我们向 Informer 注册一个 Handler 时,它最终会被转换为一个名为​​processorListener​

func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {ret := &processorListener{
nextCh: make(chan interface{}),
addCh: make(chan interface{}),
handler: handler,
pendingNotifications: *buffer.NewRingGrowing(bufferSize),
requestedResyncPeriod: requestedResyncPeriod,
resyncPeriod: resyncPeriod,}

ret.determineNextResync(now)

return ret
}

 

该实例主要包含两个通道和外部注册的 Handler 方法。此处实例化的​​processorListener​​对象最终将被添加到​​sharedProcessor.listeners​​列表中。

func (p *sharedProcessor) addListener(listener *processorListener) {
p.listenersLock.Lock()
defer p.listenersLock.Unlock()

p.addListenerLocked(listener)if p.listenersStarted {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)}
}

 

如图所示,Controller 中的 Handler 方法最终会添加到 Listener 中,Listener 会附加到​​sharedProcessor​

Kubernetes 控制器管理器的工作原理_模板类_05

前面说过,启动时​​sharedIndexInformer​​会运行​​sharedProcessor​​,启动的逻辑​​sharedProcessor​​与这些有关​​listeners​​。

func (p *sharedProcessor) run(stopCh <-chan struct{}) {
func() {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()for _, listener := range p.listeners {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)}
p.listenersStarted = true}()<-stopCh
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()for _, listener := range p.listeners {
close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop}
p.wg.Wait() // Wait for all .pop() and .run() to stop
}

可以看到,启动的时候会依次​​sharedProcessor​​执行 的​​run​​和​​pop​​方法,所以现在来看这两个方法。​​listener​

 

 

Starting the listener

由于监听器包含注册到Controller的Handler方法,所以监听器最重要的作用就是在事件发生时触发这些方法,并​​listener.run​​不断从​​nextCh​​通道中获取事件并执行相应的处理程序。

func (p *processorListener) run() {// this call blocks until the channel is closed.  When a panic happens during the notification// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)// the next notification will be attempted.  This is usually better than the alternative of never// delivering again.
stopCh := make(chan struct{})
wait.Until(func() {// this gives us a few quick retries before a long pause and then a few more quick retries
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {for next := range p.nextCh {switch notification := next.(type) {case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)case addNotification:
p.handler.OnAdd(notification.newObj)case deleteNotification:
p.handler.OnDelete(notification.oldObj)default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))}}// the only way to get here is if the p.nextCh is empty and closedreturn true, nil})

// the only way to get here is if the p.nextCh is empty and closedif err == nil {
close(stopCh)}}, 1*time.Minute, stopCh)
}

可以看到​​listener.run​​不断从​​nextCh​​通道中获取事件,但是通道中的事件是​​nextCh​​从哪里来的呢?将​​listener.pop​​事件放入​​nextCh​​.

​listener.pop​​是一个非常聪明和有趣的逻辑。

func (p *processorListener) pop() {
defer utilruntime.HandleCrash()defer close(p.nextCh) // Tell .run() to stop

var nextCh chan<- interface{}var notification interface{}for {select {case nextCh <- notification:// Notification dispatchedvar ok bool
notification, ok = p.pendingNotifications.ReadOne()if !ok { // Nothing to pop
nextCh = nil // Disable this select case}case notificationToAdd, ok := <-p.addCh:if !ok {return}if notification == nil { // No notification to pop (and pendingNotifications is empty)// Optimize the case - skip adding to pendingNotifications
notification = notificationToAdd
nextCh = p.nextCh
} else { // There is already a notification waiting to be dispatched
p.pendingNotifications.WriteOne(notificationToAdd)}}}
}

 

​listener​​包含两个通道​​addCh​​的原因​​nextCh​​是:Informer 无法预测​​listener.handler​​消耗事件的速度是否比产生事件的速度快,因此它添加了一个名为​​pendingNotifications​​. 队列来保存没有被及时消费的事件。

Kubernetes 控制器管理器的工作原理_模板类_06

​pop​​一方面,该方法不断获取最新事件,​​addCh​​以确保生产者不会阻塞。然后它确定缓冲区是否存在,如果存在,则将事件添加到缓冲区,如果不存在,则尝试将其推送到​​nextCh​​.

另一方面,它确定缓冲区中是否还有任何事件,如果还有库存,它会不断将其传递给​​nextCh​​.

该​​pop​​方法实现了一种带有缓冲区的分发机制,该缓冲区允许事件不断地从 传递​​addCh​​到​​nextCh​​。但是问题来了,这些​​addCh​​事件是从哪里来的?

源代码非常简单,​​listener​​有一个​​add​​以事件为输入的方法,它将新事件推送到​​addCh​​. 该​​add​​方法由​​sharedProcessor​​管理所有 s 的​​listener s​​调用。


 

 

如上所述,​​sharedProcessor​​负责管理所有的Handler和分发事件,但​​distribute​​真正分发的是方法。

func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()

if sync {for _, listener := range p.syncingListeners {
listener.add(obj)}} else {for _, listener := range p.listeners {
listener.add(obj)}}
}

到目前为止,我们对一个部分有了更清晰的了解:

  1. Controller 向 Informer 注册 Handler。
  2. Informer 通过​​sharedProcessor​​.
  3. Informer 接收事件并通过​​sharedProcessor.distribute​​.
  4. Controller由对应的Handler触发处理自己的逻辑

那么剩下的问题是 Informer 事件从何而来?

 

 

DeltaFIFO

在分析 Informer fetch 事件之前,需要提前告知的一个非常有趣的小工具​​fifo​​是​​sharedIndexInformer.Run​​.

fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)

DeltaFIFO 是一个非常有趣的队列,其代码定义在​​client-go/tools/cache/delta_fifo.go​​. 对于队列来说,最重要的肯定是 Add 和 Pop 方法。DeltaFIFO 提供了几种 Add 方法,虽然根据不同的事件类型(add/update/delete/sync)来区分不同的方法,但最终都是执行​​queueActionLocked​​。

// queueActionLocked appends to the delta list for the object.
// Caller must lock first.
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {id, err := f.KeyOf(obj)if err != nil {return KeyError{obj, err}}

// If object is supposed to be deleted (last event is Deleted),// then we should ignore Sync events, because it would result in// recreation of this object.if actionType == Sync && f.willObjectBeDeletedLocked(id) {return nil}

newDeltas := append(f.items[id], Delta{actionType, obj})
newDeltas = dedupDeltas(newDeltas)

if len(newDeltas) > 0 {if _, exists := f.items[id]; !exists {
f.queue = append(f.queue, id)}
f.items[id] = newDeltas
f.cond.Broadcast()} else {// We need to remove this from our map (extra items in the queue are// ignored if they are not in the map).delete(f.items, id)}return nil
}

 

该​​queueActionLocked​​方法的第一个参数 actionType 是事件类型。

const (Added   DeltaType = "Added"   // watch api 获得的创建事件Updated DeltaType = "Updated" // watch api 获得的更新事件Deleted DeltaType = "Deleted" // watch api 获得的删除事件Sync DeltaType = "Sync"       // 触发了 List Api,需要刷新缓存
)

 

事件类型和入队方式表明这是一个具有业务功能的队列,而不仅仅是“先进先出”,入队方式有两个非常巧妙的设计。

  1. 队列中的事件会先判断资源是否有未消费的事件,然后进行适当的处​​理。
  2. 如果 list 方法发现资源已经被删除,则不处理。

第二点比较容易理解,如果触发了列表请求,发现要处理的资源已经被删除了,那么就不需要再排队了。第一点需要和out of queue方法一起看。

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()for {for len(f.queue) == 0 {// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.// When Close() is called, the f.closed is set and the condition is broadcasted.// Which causes this loop to continue and return from the Pop().if f.IsClosed() {return nil, ErrFIFOClosed}

f.cond.Wait()}
id := f.queue[0]
f.queue = f.queue[1:]if f.initialPopulationCount > 0 {
f.initialPopulationCount--}
item, ok := f.items[id]if !ok {// Item may have been deleted subsequently.continue}delete(f.items, id)
err := process(item)if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err}// Don't need to copyDeltas here, because we're transferring// ownership to the caller.return item, err
}
}

 

DeltaFIFO 的​​Pop​​方法有一个输入,即处理函数。当它从队列中出来时,DeltaFIFO会首先根据资源id获取资源所有的事件,然后交给handler函数。

工作流程如图所示。

Kubernetes 控制器管理器的工作原理_实例化_07

一般来说,DeltaFIFO的queue方法首先判断资源是否已经在​​items​​,如果是,则资源还没有被消费(仍然在排队),所以直接将事件追加到​​items[resource_id]​​。如果不在 中​​items​​,则​​items[resource_id]​​创建 then 并将资源 id 附加到​​queue​​.

DeltaFIFO out-of-queue 方法从 获取队列顶部的资源 id ​​queue​​,然后从 获取该资源的所有事件​​items​​,最后调用该方法​​PopProcessFunc​​传入的类型处理程序​​Pop​​。


所以,DeltaFIFO 的特点是队列中的(资源的)事件,当它从队列中出来时,它获取队列中第一个资源的所有事件。这种设计确保不会因为某个资源疯狂地创建事件而导致饥饿,从而使其他资源没有机会被处理。

 

 

控制器 controller

DeltaFIFO 是一个非常重要的组件,唯一真正使它有价值的是 Informer ​​controller​​。

虽然 K8s 源代码确实使用了这个词​​controller​​,但这​​controller​​不是像部署控制器那样的资源控制器。相反,它是一个自上而下的事件控制器(从 API 服务器获取事件并将它们发送到 Informer 进行处理)。

职责​​controller​​是双重的。

  1. 通过 List-Watch 从 Api Server 获取事件并将事件推送到 DeltaFIFO
  2. ​HandleDeltas​​以 的方法​​sharedIndexInformer​​作为参数调用 DeltaFIFO 的 Pop 方法

定义​​controller​​很简单,其核心是​​Reflector​​。

type controller struct {config         Config
reflector *Reflector
reflectorMutex sync.RWMutex
clock clock.Clock
}

 

​controllerr​​的代码​​Reflector​​比较繁琐但是很简单,就是通过​​listerWatcher ​​​​sharedIndexInformer​​中定义的​​list-watch​​,将获取到的事件推送到​​DeltaFIFO​​中。

控制器启动后,启动​​Reflector​​,然后执行​​processLoop​​,这是一个死循环,不断从DeltaFIFO中读取资源事件,并交给​​sharedIndexInformer​​(分配给config.Process)的​​HandleDeltas​​方法。

func (c *controller) processLoop() {for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))if err != nil {if err == ErrFIFOClosed {return}if c.config.RetryOnError {// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)}}}
}

 

如果我们看一下 sharedIndexInformer 的 HandleDeltas 方法,我们可以看到整个事件消费过程是有效的。

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()

// from oldest to newestfor _, d := range obj.(Deltas) {switch d.Type {case Sync, Added, Updated:
isSync := d.Type == Sync
s.cacheMutationDetector.AddObject(d.Object)if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {if err := s.indexer.Update(d.Object); err != nil {return err
}
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)} else {if err := s.indexer.Add(d.Object); err != nil {return err
}
s.processor.distribute(addNotification{newObj: d.Object}, isSync)}case Deleted:if err := s.indexer.Delete(d.Object); err != nil {return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)}}return nil
}

 

前面我们了解到,该​​processor.attribute​​方法将事件分发给 all ​​listeners​​,并​​controller​​使用​​Reflector​​来从 ApiServer 中获取事件并放入队列中,然后通过​​processLoop​​为要处理的资源从队列中取出事件,最后调用​​processor.attribute​​via的​​HandleDeltas​​方法​​sharedIndexInformer​​。所有事件,最后​​processor.attribute​​是通过 的​​HandleDeltas​​方法调用的​​sharedIndexInformer​​。

因此,我们可以按如下方式组织整个事件流。

Kubernetes 控制器管理器的工作原理_实例化_08

 

Indexer

上面,我们整理了从事件接收到分发的​​所有逻辑,但是在sharedIndexInformer的HandleDeltas方法中,有一些逻辑比较有意思,就是所有的事件都是​​s.indexer​​先更新再分发。

前文提到,Indexer是一个线程安全的存储,作为缓存来缓解资源控制器(Controller)查询资源时对ApiServer的压力。


当有事件更新时,会先刷新Indexer中的缓存,然后将事件分发给资源控制器,资源控制器会先从Indexer获取资源详情,从而减少对APIServer的不必要查询请求。


Indexer存储的具体实现在client-go/tools/cache/thread_safe_store.go中,数据存储在​​threadSafeMap​​.

type threadSafeMap struct {lock  sync.RWMutex
items map[string]interface{}

// indexers maps a name to an IndexFunc
indexers Indexers// indices maps a name to an Index
indices Indices
}

 

本质上,​​threadSafeMap​​是一个带有读/写锁的映射,除此之外还可以定义索引,有趣的是由两个字段实现。

  1. ​Indexers​​是一个定义了多个索引函数的map,key是indexName,value是索引函数(计算资源的索引值)。
  2. ​Indices​​保存索引值和数据key的映射关系,​​Indices​​是一个两级的map,第一级的key是indexName,对应​​Indexers​​并决定用什么方法计算索引值,value是一个保存关联的map “索引值-资源键”关联。

相关逻辑比较简单,如下图所示。

Kubernetes 控制器管理器的工作原理_模板类_09

MutationDetector

更新数据的​​HandleDeltas​​方法除了.​​sharedIndexInformer​​​​s.indexer​​​​s.cacheMutationDetector​

开头提到,在​​sharedIndexInformer​​启动的时候,也会启动一个​​cacheMutationDetector​​来监控索引器的缓存。

因为 indexer 缓存实际上是一个指针,所以多个 Controller 访问 indexer 的缓存资源实际上得到的是同一个资源实例。如果一个Controller玩不好,修改了一个资源的属性,肯定会影响其他Controller的正确性。

当 Informer 接收到新事件时,MutationDetector 会保存指向资源的指针(索引器也是如此)和资源的深层副本。通过周期性地检查指针指向的资源是否与开头存储的深拷贝相匹配,我们就知道缓存的资源是否被修改过。

但是,是否启用监控会受到环境变量的影响​​KUBE_CACHE_MUTATION_DETECTOR​​。如果未设置此环境变量,​​sharedIndexInformer​​ 将实例化​​dummyMutationDetector​​并且在启动后不执行任何操作。

如果​​KUBE_CACHE_MUTATION_DETECTOR​​为​​true​​,sharedIndexInformer 实例化​​defaultCacheMutationDetector​​,它以 1s 的间隔执行缓存的定期检查,如果发现缓存被修改,则触发故障处理函数,如果未定义该函数,则触发恐慌。

 

 

概括

本文对ControllerManager进行了狭义的解释,因为它不包括具体的资源管理器(Controller),而只是解释了ControllerManager是如何“管理控制器”的。

可以看到ControllerManager做了很多工作来保证Controller可以只关注自己关心的事件,而这项工作的核心就是Informer。当您了解 Informer 如何与其他组件一起工作时,就会清楚控制器管理器为资源管理器铺平了道路。

 

Kubernetes 控制器管理器的工作原理_实例化_10

 

标签:控制器,管理器,err,Kubernetes,listener,Controller,事件,sharedIndexInformer,Informer
From: https://blog.51cto.com/u_7731123/5765247

相关文章

  • kubernetes插件管理器krew
    kubernetes插件管理器krew1.介绍Krew是kubectl插件的包管理工具。借助Krew,可以轻松地使用kubectlplugin:发现插件、安装和管理插件。使用类似apt、dnf或者brew。对于kub......
  • 实验6:开源控制器实践——RYU
    实验6:开源控制器实践——RYU(一)基本要求搭建下图所示SDN拓扑,协议使用OpenFlow1.0,并连接Ryu控制器,通过Ryu的图形界面查看网络拓扑阅读Ryu文档的TheFirstApplica......
  • 实验5:开源控制器实践——POX
    实验5:开源控制器实践——POX一、实验目的能够理解POX控制器的工作原理;通过验证POX的forwarding.hub和forwarding.l2_learning模块,初步掌握POX控制器的使用方法;够运......
  • 实验5:开源控制器实践——POX
    一、实验目的能够理解POX控制器的工作原理;通过验证POX的forwarding.hub和forwarding.l2_learning模块,初步掌握POX控制器的使用方法;能够运用POX控制器编写自定义网络......
  • 实验6:开源控制器实践——RYU
    实验6:开源控制器实践——RYU一、基本要求1.搭建下图所示SDN拓扑,协议使用OpenFlow1.0,并连接Ryu控制器,通过Ryu的图形界面查看网络拓扑。建立拓扑并连接Ryu控制器,浏览......
  • K8s---Kubernetes简介
    Kubernetes简介:kubernetes,简称k8s,是因为KuberneteS之间是由8个字符组成的,是一个开源的,用于管理云平台中多个主机上的容器化应用,也称容器的资源管理器、容器的编排工具......
  • 实验6:开源控制器实践——RYU
    实验目的能够独立部署RYU控制器;能够理解RYU控制器实现软件定义的集线器原理;能够理解RYU控制器实现软件定义的交换机原理。实验要求(一)基本要求搭建下图所示SDN拓扑,协......
  • 一起学kubernetes系列(3)‌K8S的WebUI:Dashboard&Kuboard
    ​本篇介绍2套kubernetes的WEBUI管理控制台Kubernetes官方Dashboard部署KubernetesDashboard是Kubernetes的官方WebUI。使用KubernetesDashboard,您可以:向Kubern......
  • 一起学kubernetes系列(1)‌明明白白安装Kubernetes1.16.2
    ​前言首次安装Kubernetes我们采用kubeadm来安装单Master节点的方式,安装最新版的Kubernetes和Calico,一步一步循序渐进。介绍Master:集群控制管理节点,所有的命令都经由master......
  • 实验6:开源控制器实践——RYU
    一、实验目的1.能够独立部署RYU控制器;2.能够理解RYU控制器实现软件定义的集线器原理;3.能够理解RYU控制器实现软件定义的交换机原理。二、实验环境Ubuntu20.04Deskto......