一个 Kubernetes 控制器是一个主动的协调过程,它会监视某个对象的期望 desired 状态,并且还会监视实际 current 状态,然后不断的尝试使当前的实际状态更接近期望的状态。最简单的实现方式是一个循环:
for { desired := getDesiredState() current := getCurrentState() makeChanges(desired, current) }
开发要点
1. 使用 workqueue.Interface 来确保多个 worker 不会同时处理同一个项目(namespace/name),大部分的控制器都需要监控多个资源的状态变化,例如 ReplicaSet Controller 需要监听 ReplicaSet 本身以及所有管理的 pod 的变化,但几乎所有控制器都可以总结为基于 Owner 关系,例如 ReplicaSet 控制器需要对删除的 Pod 做出反应,可以通过监听 pod 的删除事件并通过 pod 的 OwnerRef 关系得到 ReplicaSet 并将其加入队列来实现。
2.对 controller 而言,即使是同一种类型的资源的顺序也没有保证。例如在明确先创建 ns1/pod1,再创建 ns2/pod2,对控制器而言(或者说你编写的 reconcile 业务函数而言)可能是先观察到 ns2/pod2 的创建。其实也比较好理解,毕竟有多个并发的 worker 从 queue 中取出资源对象进行处理,每个资源对象处理的时间不一样。在编写控制器时,应该避免依赖于事件的特定顺序。
3. 基于水平触发来驱动,而不是边缘触发。获取到的 desired 状态本身是没有状态变化的信息,例如从 apiserver Get 到一个 pod,这个 pod 对象本身不包含这个 pod 上发生的一些 update 信息,我们应该用获取到最新的期望状态,并比较当前实际的状态来 reconcile。另一个方式是在资源的 status 中记录控制器对该资源做出的最新的决策信息,这样,即使控制器在一段时间内关闭,它也能够根据对象的状态来决定是否需要处理该对象。这种方式可以确保控制器的行为是可预测和一致的。
可以看到对于 controller 关注的多个资源,例如 replicaSet 和 pod,这些资源的状态变化对应的事件,最终都是对应到 enqueue replicaSet 资源,然后从 informer cache 中查询到 replicaSet 的 desired 状态进行 reconcile,也就是说到 reconcile 阶段已经不再关注触发的事件是 add/update/delete 的哪种类型,只是作为一个触发
type Controller struct { // pods gives cached access to pods. pods informers.PodLister podsSynced cache.InformerSynced // queue is where incoming work is placed to de-dup and to allow "easy" // rate limited requeues on errors queue workqueue.RateLimitingInterface } func NewController(pods informers.PodInformer) *Controller { c := &Controller{ pods: pods.Lister(), podsSynced: pods.Informer().HasSynced, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "controller-name"), } // register event handlers to fill the queue with pod creations, updates and deletions pods.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { key, err := cache.MetaNamespaceKeyFunc(obj) if err == nil { c.queue.Add(key) } }, UpdateFunc: func(old interface{}, new interface{}) { key, err := cache.MetaNamespaceKeyFunc(new) if err == nil { c.queue.Add(key) } }, DeleteFunc: func(obj interface{}) { // IndexerInformer uses a delta nodeQueue, therefore for deletes we have to use this // key function. key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err == nil { c.queue.Add(key) } }, },) return c } func (c *Controller) Run(threadiness int, stopCh chan struct{}) { // don't let panics crash the process defer utilruntime.HandleCrash() // make sure the work queue is shutdown which will trigger workers to end defer c.queue.ShutDown() klog.Infof("Starting <NAME> controller") // wait for your secondary caches to fill before starting your work if !cache.WaitForCacheSync(stopCh, c.podsSynced) { return } // start up your worker threads based on threadiness. Some controllers // have multiple kinds of workers for i := 0; i < threadiness; i++ { // runWorker will loop until "something bad" happens. The .Until will // then rekick the worker after one second go wait.Until(c.runWorker, time.Second, stopCh) } // wait until we're told to stop <-stopCh klog.Infof("Shutting down <NAME> controller") } func (c *Controller) runWorker() { // hot loop until we're told to stop. processNextWorkItem will // automatically wait until there's work available, so we don't worry // about secondary waits for c.processNextWorkItem() { } } // processNextWorkItem deals with one key off the queue. It returns false // when it's time to quit. func (c *Controller) processNextWorkItem() bool { // pull the next work item from queue. It should be a key we use to lookup // something in a cache key, quit := c.queue.Get() if quit { return false } // you always have to indicate to the queue that you've completed a piece of // work defer c.queue.Done(key) // do your work on the key. This method will contains your "do stuff" logic err := c.syncHandler(key.(string)) if err == nil { // if you had no error, tell the queue to stop tracking history for your // key. This will reset things like failure counts for per-item rate // limiting c.queue.Forget(key) return true } // there was a failure so be sure to report it. This method allows for // pluggable error handling which can be used for things like // cluster-monitoring utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err)) // since we failed, we should requeue the item to work on later. This // method will add a backoff to avoid hotlooping on particular items // (they're probably still not going to work right away) and overall // controller protection (everything I've done is broken, this controller // needs to calm down or it can starve other useful work) cases. c.queue.AddRateLimited(key) return true }
标签:指南,cache,err,work,queue,Controller,key,K8s From: https://www.cnblogs.com/orchidzjl/p/18039884