首页 > 其他分享 >K8s Controller 开发指南

K8s Controller 开发指南

时间:2024-02-28 12:01:39浏览次数:19  
标签:指南 cache err work queue Controller key K8s

一个 Kubernetes 控制器是一个主动的协调过程,它会监视某个对象的期望 desired 状态,并且还会监视实际 current 状态,然后不断的尝试使当前的实际状态更接近期望的状态。最简单的实现方式是一个循环:

for {
  desired := getDesiredState()
  current := getCurrentState()
  makeChanges(desired, current)
}

 

开发要点

1. 使用 workqueue.Interface 来确保多个 worker 不会同时处理同一个项目(namespace/name),大部分的控制器都需要监控多个资源的状态变化,例如 ReplicaSet Controller 需要监听 ReplicaSet 本身以及所有管理的 pod 的变化,但几乎所有控制器都可以总结为基于 Owner 关系,例如 ReplicaSet 控制器需要对删除的 Pod 做出反应,可以通过监听 pod 的删除事件并通过 pod 的 OwnerRef 关系得到 ReplicaSet 并将其加入队列来实现。

 

2.对 controller 而言,即使是同一种类型的资源的顺序也没有保证。例如在明确先创建 ns1/pod1,再创建 ns2/pod2,对控制器而言(或者说你编写的 reconcile 业务函数而言)可能是先观察到 ns2/pod2 的创建。其实也比较好理解,毕竟有多个并发的 worker 从 queue 中取出资源对象进行处理,每个资源对象处理的时间不一样。在编写控制器时,应该避免依赖于事件的特定顺序。

 

3. 基于水平触发来驱动,而不是边缘触发。获取到的 desired 状态本身是没有状态变化的信息,例如从 apiserver Get 到一个 pod,这个 pod 对象本身不包含这个 pod 上发生的一些 update 信息,我们应该用获取到最新的期望状态,并比较当前实际的状态来 reconcile。另一个方式是在资源的 status 中记录控制器对该资源做出的最新的决策信息,这样,即使控制器在一段时间内关闭,它也能够根据对象的状态来决定是否需要处理该对象。这种方式可以确保控制器的行为是可预测和一致的。

 

可以看到对于 controller 关注的多个资源,例如 replicaSet 和 pod,这些资源的状态变化对应的事件,最终都是对应到 enqueue replicaSet 资源,然后从 informer cache 中查询到 replicaSet 的 desired 状态进行 reconcile,也就是说到 reconcile 阶段已经不再关注触发的事件是 add/update/delete 的哪种类型,只是作为一个触发

type Controller struct {
    // pods gives cached access to pods.
    pods informers.PodLister
    podsSynced cache.InformerSynced

    // queue is where incoming work is placed to de-dup and to allow "easy"
    // rate limited requeues on errors
    queue workqueue.RateLimitingInterface
}

func NewController(pods informers.PodInformer) *Controller {
    c := &Controller{
        pods: pods.Lister(),
        podsSynced: pods.Informer().HasSynced,
        queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "controller-name"),
    }
    
    // register event handlers to fill the queue with pod creations, updates and deletions
    pods.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            key, err := cache.MetaNamespaceKeyFunc(obj)
            if err == nil {
                c.queue.Add(key)
            }
        },
        UpdateFunc: func(old interface{}, new interface{}) {
            key, err := cache.MetaNamespaceKeyFunc(new)
            if err == nil {
                c.queue.Add(key)
            }
        },
        DeleteFunc: func(obj interface{}) {
            // IndexerInformer uses a delta nodeQueue, therefore for deletes we have to use this
            // key function.
            key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
            if err == nil {
                c.queue.Add(key)
            }
        },
    },)
    
    return c
}

func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
    // don't let panics crash the process
    defer utilruntime.HandleCrash()
    // make sure the work queue is shutdown which will trigger workers to end
    defer c.queue.ShutDown()

    klog.Infof("Starting <NAME> controller")

    // wait for your secondary caches to fill before starting your work
    if !cache.WaitForCacheSync(stopCh, c.podsSynced) {
        return
    }

    // start up your worker threads based on threadiness.  Some controllers
    // have multiple kinds of workers
    for i := 0; i < threadiness; i++ {
        // runWorker will loop until "something bad" happens.  The .Until will
        // then rekick the worker after one second
        go wait.Until(c.runWorker, time.Second, stopCh)
    }

    // wait until we're told to stop
    <-stopCh
    klog.Infof("Shutting down <NAME> controller")
}

func (c *Controller) runWorker() {
    // hot loop until we're told to stop.  processNextWorkItem will
    // automatically wait until there's work available, so we don't worry
    // about secondary waits
    for c.processNextWorkItem() {
    }
}

// processNextWorkItem deals with one key off the queue.  It returns false
// when it's time to quit.
func (c *Controller) processNextWorkItem() bool {
    // pull the next work item from queue.  It should be a key we use to lookup
    // something in a cache
    key, quit := c.queue.Get()
    if quit {
        return false
    }
    // you always have to indicate to the queue that you've completed a piece of
    // work
    defer c.queue.Done(key)

    // do your work on the key.  This method will contains your "do stuff" logic
    err := c.syncHandler(key.(string))
    if err == nil {
        // if you had no error, tell the queue to stop tracking history for your
        // key. This will reset things like failure counts for per-item rate
        // limiting
        c.queue.Forget(key)
        return true
    }

    // there was a failure so be sure to report it.  This method allows for
    // pluggable error handling which can be used for things like
    // cluster-monitoring
    utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err))

    // since we failed, we should requeue the item to work on later.  This
    // method will add a backoff to avoid hotlooping on particular items
    // (they're probably still not going to work right away) and overall
    // controller protection (everything I've done is broken, this controller
    // needs to calm down or it can starve other useful work) cases.
    c.queue.AddRateLimited(key)

    return true
}

 

标签:指南,cache,err,work,queue,Controller,key,K8s
From: https://www.cnblogs.com/orchidzjl/p/18039884

相关文章

  • 性能测试-TPS估算指南
    TPS(TransactionsPerSecond)即每秒处理事务数,是衡量系统性能的重要指标。以下是几种常见的TPS计算方法:普通计算方法计算公式:TPS=总请求数/总时间示例:假设在2019年第32周,系统有4.13万的浏览量,即总请求数约为41300。总时间按一周计算,即24小时*3600秒。计算:TPS=41300/......
  • socket io 微服务 k8s 解决方案
    spingboot+socketio依赖对于socketio-client2.x版本<dependency><groupId>com.corundumstudio.socketio</groupId><artifactId>netty-socketio</artifactId><version>1.7.19</version></dependency>注册Naco......
  • ASP.NET Core MVC应用模型的构建[3]: Controller的收集
    从编程的角度来看,一个MVC应用是由一系列Controller类型构建而成的,所以对于一个代表应用模型的ApplicationModel对象来说,它的核心就是Controllers属性返回的一组ControllerModel对象,每个ControllerModel对象是应用模型针对Controller类型的描述。一、ControllerModel二、实例演示......
  • 在K8S中,Pod优雅终止过程是什么?
    在Kubernetes(K8s)中,Pod的优雅终止过程是一个有序的过程,旨在确保Pod中运行的应用程序能够平滑关闭,释放资源,并尽可能减少因突然关闭带来的数据丢失和服务中断。以下是Pod优雅终止的一般步骤:删除Pod请求用户或控制器发出删除Pod的请求,比如通过kubectldeletepod<pod-name>......
  • 在K8S中,helm是什么?如何使用?
    在Kubernetes(K8s)中,Helm是一个用于简化Kubernetes应用程序部署和管理的包管理工具。它采用了声明式配置和模板化的概念,使得Kubernetes资源的打包、安装、升级、回滚等操作变得更加方便和高效。Helm主要由两个核心组件构成:HelmCLI(命令行接口):提供了一系列命令,让用户能够在本......
  • 在K8S中,etcd数据库中数据如何更改?
    在Kubernetes(K8s)中,etcd中的数据更改主要发生在以下几个场景中:创建、更新和删除Kubernetes资源对象:当用户在Kubernetes中创建(POST)、更新(PUT)或删除(DELETE)一个资源对象(如Pod、Service、Deployment等)时,KubernetesAPI服务器会将这些更改转换为对etcd的相应操作。API服务器会将更改......
  • 在K8S中,etcd是什么类型数据库?
    在Kubernetes(K8s)中,etcd是一个高度可靠、分布式的键值存储数据库,用于存储集群的共享配置和服务发现信息。etcd在Kubernetes架构中扮演着核心组件的角色,是集群状态数据的关键存储后端。1.etcd的类型:键值存储(Key-ValueStore):etcd最基本的功能是作为键值存储,它允许用户存储和......
  • 在K8S中,etcdctl如何使用?
    在Kubernetes(K8s)中,etcdctl是用于直接与etcd集群交互的命令行工具,主要用于管理和调试etcd存储中的键值对数据。etcd是K8s集群的核心组件之一,它作为一个高可用的分布式键值存储系统,用于保存集群的所有重要配置数据。以下是如何在Kubernetes环境中使用etcdctl的基本......
  • EasyDarwin视频直播点播平台:编辑拉流转推流实战指南
    在当今数字化时代,视频直播和点播已经成为人们获取信息和娱乐消遣的重要方式。为了满足不同场景下的直播需求,EasyDarwin视频直播点播平台提供了功能强大的支持。本文将重点介绍在EasyDarwin平台上进行拉流转推流操作,并深入探讨推流名称、源地址、控制选项等关键要素。一、EasyDarw......
  • k8s架构解析
    Kubernetes(K8s)是一个开源的容器编排平台,用于自动化部署、扩展和管理容器化应用程序。Kubernetes由多个组件组成,每个组件都扮演着不同的角色。以下是Kubernetes中一些主要组件的详细说明:kube-apiserver:API服务器是Kubernetes集群的中心,提供了资源操作的唯一入口。它负责接收......