首页 > 其他分享 >Informer 使用

Informer 使用

时间:2022-12-26 12:22:51浏览次数:50  
标签:err deployment 使用 metav1 v1 Deployment Informer

Informer

介绍

想要获取资源,可以使用 clientset:

// 使用 clientset 获取 Deployments
deployments, err := clientset.AppsV1().Deployments("default").
List(context.Background(), metav1.ListOptions{})

如果想要监听并获取资源的更新,那么如果用以上的操作,只能将上述代码套上一个 for 循环来不断的执行 List() 操作,这显然是不合理的。

实际上除了常用的 CRUD 操作之外,我们还可以进行 Watch 操作,可以监听资源对象的增、删、改、查操作,这样我们就可以根据自己的业务逻辑去处理这些数据了。

Deployment 对象提供的操作接口:CURD,Watch:

// DeploymentInterface has methods to work with Deployment resources.
type DeploymentInterface interface {
    Create(ctx context.Context, deployment *v1.Deployment, opts metav1.CreateOptions) (*v1.Deployment, error)
    Update(ctx context.Context, deployment *v1.Deployment, opts metav1.UpdateOptions) (*v1.Deployment, error)
    UpdateStatus(ctx context.Context, deployment *v1.Deployment, opts metav1.UpdateOptions) (*v1.Deployment, error)
    Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error
    DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error
    Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.Deployment, error)
    List(ctx context.Context, opts metav1.ListOptions) (*v1.DeploymentList, error)
    Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
    Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *v1.Deployment, err error)
    Apply(ctx context.Context, deployment *appsv1.DeploymentApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Deployment, err error)
    ApplyStatus(ctx context.Context, deployment *appsv1.DeploymentApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Deployment, err error)
    GetScale(ctx context.Context, deploymentName string, options metav1.GetOptions) (*autoscalingv1.Scale, error)
    UpdateScale(ctx context.Context, deploymentName string, scale *autoscalingv1.Scale, opts metav1.UpdateOptions) (*autoscalingv1.Scale, error)
    ApplyScale(ctx context.Context, deploymentName string, scale *applyconfigurationsautoscalingv1.ScaleApplyConfiguration, opts metav1.ApplyOptions) (*autoscalingv1.Scale, error)

    DeploymentExpansion
}

Watch 通过一个 event 接口来监听对象的所有变化(增加、删除、更新):

// Interface can be implemented by anything that knows how to watch and report changes.
type Interface interface {
    // Stop stops watching. Will close the channel returned by ResultChan(). Releases
    // any resources used by the watch.
    Stop()

    // ResultChan returns a chan which will receive all the events. If an error occurs
    // or Stop() is called, the implementation will close this channel and
    // release any resources used by the watch.
    ResultChan() <-chan Event
}
// Event represents a single event to a watched resource.
// +k8s:deepcopy-gen=true
type Event struct {
    Type EventType

    // Object is:
    //  * If Type is Added or Modified: the new state of the object.
    //  * If Type is Deleted: the state of the object immediately before deletion.
    //  * If Type is Bookmark: the object (instance of a type being watched) where
    //    only ResourceVersion field is set. On successful restart of watch from a
    //    bookmark resourceVersion, client is guaranteed to not get repeat event
    //    nor miss any events.
    //  * If Type is Error: *api.Status is recommended; other types may make sense
    //    depending on context.
    Object runtime.Object
}
// Object interface must be supported by all API types registered with Scheme. Since objects in a scheme are
// expected to be serialized to the wire, the interface an Object must provide to the Scheme allows
// serializers to set the kind, version, and group the object is represented as. An Object may choose
// to return a no-op ObjectKindAccessor in cases where it is not expected to be serialized.
type Object interface {
    GetObjectKind() schema.ObjectKind
    DeepCopyObject() Object
}
// EventType defines the possible types of events.
type EventType string

const (
    Added    EventType = "ADDED"
    Modified EventType = "MODIFIED"
    Deleted  EventType = "DELETED"
    Bookmark EventType = "BOOKMARK"
    Error    EventType = "ERROR"
)

这个接口虽然我们可以直接去使用,但是实际上不建议这么使用。因为往往集群中的资源较多,我们需要自己在客户端维护一套缓存,而这个维护的成本也是非常大的,为此 client-go 提供了自己的实现机制,那就是 InformersInformers 是这个事件接口和带索引查找功能的内存缓存的组合,这样也是目前常用的方法。Informers 第一次被调用的时候首先会在客户端调用 List 来获取全量的对象集合,然后通过 Watch 来获取增量的对象集合。

运行原理

一个控制器每次需要获取对象的时候都要访问 APIServer,这会给系统带来很高的负载,Informers 的内存缓存就是用来解决这个问题的,此外 Informers 还可以几乎实时的监控对象的变化,而不需要轮询请求,这样就可以保证客户端的缓存数据与服务端的一致,就可以大大降低 APIServer 的压力了。

上图展示了 Informer 的基本处理流程:

  • 以 events 事件的方式从 APIServer 中获取数据

  • 提供一个类似客户端的 Lister 接口,从内存中 get 和 list 对象

  • 为 添加、删除、更新注册事件处理程序

此外 Informers 也有错误处理方式,当长期运行的 watch 连接中断时,它们会尝试使用另一个 watch 请求来恢复连接,在不丢失任何事件的情况下恢复事件流。如果中断的时间较长,而且 APIServer 丢失了事件(etcd 在新的 watch 请求成功之前从数据库中清除了这些事件),那么 Informers 就会重新 List 全量数据。

而且在重新 List 全量操作的时候还可以配置一个重新同步的周期参数,用于协调内存缓存数据和业务逻辑的数据一致性,每次过了该周期后,注册的事件处理程序就将被所有的对象调用,通常这个周期参数以分为单位,比如10分钟或者30分钟。

注意:重新同步是纯内存操作,不会触发对服务器的调用。

Informers 的这些高级特性以及超强的鲁棒性,都足以让我们不去直接使用客户端的 Watch() 方法来处理自己的业务逻辑,而且在 Kubernetes 中也有很多地方都有使用到 Informers。但是在使用 Informers 的时候,通常每个 GroupVersionResource(GVR)只实例化一个 Informers,但是有时候我们在一个应用中往往有使用多种资源对象的需求,这个时候为了方便共享 Informers,我们可以通过使用共享 Informer 工厂来实例化一个 Informer。

共享 Informer 工厂允许我们在应用中为同一个资源共享 Informer,也就是说不同的控制器循环可以使用相同的 watch 连接到后台的 APIServer,例如,kube-controller-manager 中的控制器数据量就非常多,但是对于每个资源(比如 Pod),在这个进程中只有一个 Informer。

示例

package main

import (
    "flag"
    "fmt"
    "os"
    "path/filepath"
    "time"

    v1 "k8s.io/api/apps/v1"
    "k8s.io/apimachinery/pkg/labels"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/tools/clientcmd"
)

func main() {
    var err error
    var config *rest.Config

    var kubeconfig *string

    if home := homeDir(); home != "" {
        kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
    } else {
        kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
    }
    flag.Parse()

    // 使用 ServiceAccount 创建集群配置 (InCluster模式)
    if config, err = rest.InClusterConfig(); err != nil {
        // 使用 KubeConfig 文件创建集群配置
        if config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig); err != nil {
            panic(err.Error())
        }
    }

    // 创建 ClientSet
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        panic(err.Error())
    }

    // 初始化 informer factory (这里设置每 30s 重新List一次)
    informerFactory := informers.NewSharedInformerFactory(clientset, time.Second*30)
    // 对 Deployment 监听
    deploymentInformer := informerFactory.Apps().V1().Deployments()
    // 创建 Informer (相当于注册到工厂去,这样下面启动的时候就回去 List & Watch 对应的资源)
    informer := deploymentInformer.Informer()
    // 创建 Lister
    lister := deploymentInformer.Lister()
    // 注册事件处理程序
    informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    onAdd,
        UpdateFunc: onUpdate,
        DeleteFunc: onDelete,
    })

    stopper := make(chan struct{})
    defer close(stopper)

    // 启动 informer, List & Watch
    informerFactory.Start(stopper)
    // 等待所有启动的 Informer 缓存被同步
    informerFactory.WaitForCacheSync(stopper)

    // 从本地缓存中获取 default namespace 中所有的 deployment 列表
    deployments, err := lister.Deployments("default").List(labels.Everything())
    if err != nil {
        fmt.Println(err.Error())
    }
    for idx, deployment := range deployments {
        fmt.Printf("%d -> %s\n", idx+1, deployment.Name)
    }
    <-stopper
}

func homeDir() string {
    if h := os.Getenv("HOME"); h != "" {
        return h
    } else {
        return os.Getenv("USERPOFILE") // windows
    }
}

func onAdd(obj interface{}) {
    deploy := obj.(*v1.Deployment)
    fmt.Printf("add a deployment: %s\n", deploy.Name)
}

func onUpdate(old, new interface{}) {
    oldDeploy := old.(*v1.Deployment)
    newDeploy := new.(*v1.Deployment)
    fmt.Printf("update deployment: %s -> %s\n", oldDeploy.Name, newDeploy.Name)
}

func onDelete(obj interface{}) {
    deploy := obj.(*v1.Deployment)
    fmt.Printf("delete a deployment: %s\n", deploy.Name)
}

上面的代码运行可以获得 default 命名空间之下的所有 Deployment 信息以及整个集群的 Deployment 数据:

$ go run main.go
add a deployment: dingtalk-hook
add a deployment: spin-orca
add a deployment: argocd-server
add a deployment: istio-egressgateway
add a deployment: vault-agent-injector
add a deployment: rook-ceph-osd-0
add a deployment: rook-ceph-osd-2
add a deployment: code-server
......
1 -> nginx
2 -> helloworld-v1
3 -> productpage-v1
4 -> details-v1
......

这是因为我们首先通过 Informer 注册了事件处理程序,这样当我们启动 Informer 的时候首先会将集群的全量 Deployment 数据同步到本地的缓存中,会触发 AddFunc 这个回调函数,然后我们又在下面使用 Lister() 来获取 default 命名空间下面的所有 Deployment 数据,这个时候的数据是从本地的缓存中获取的,所以就看到了上面的结果,由于我们还配置了每30s重新全量 List 一次,所以正常每30s我们也可以看到所有的 Deployment 数据出现在 UpdateFunc 回调函数下面,我们也可以尝试去删除一个 Deployment,同样也会出现对应的 DeleteFunc 下面的事件。

架构

下图是整个 client-go 完整的架构图,或者是说我们要去实现一个自定义控制器的一个整体的流程,其中黄色图标是开发者需要自行开发的部分,而其他部分是 client-go 已经提供的,直接使用即可。由于 client-go 非常复杂,我们这里先对最核心的部分 Informer 来说明。

Reflector(反射器)

Reflector 用于监控(Watch)指定的 Kubernetes 资源,当监控的资源发生变化时,触发相应的变更事件,例如 Add 事件、Update 事件、Delete 事件,并将其资源对象存放到本地缓存 DeltaFIFO 中。

DeltaFIFO

DeltaFIFO 是一个生产者-消费者的队列,生产者是 Reflector,消费者是 Pop 函数,FIFO 是一个先进先出的队列,而 Delta 是一个资源对象存储,它可以保存资源对象的操作类型,例如 Add 操作类型、Update 操作类型、Delete 操作类型、Sync 操作类型等。

Indexer

Indexer 是 client-go 用来存储资源对象并自带索引功能的本地存储,Reflector 从 DeltaFIFO 中将消费出来的资源对象存储至 Indexer。Indexer 与 Etcd 集群中的数据保持完全一致。这样我们就可以很方便地从本地存储中读取相应的资源对象数据,而无须每次从远程 APIServer 中读取,以减轻服务器的压力。

这里理论知识太多,直接去查看源码显得有一定困难,我们可以用一个实际的示例来进行说明,比如现在我们删除一个 Pod,一个 Informers 的执行流程是怎样的:

  1. 首先初始化 Informer,Reflector 通过 List 接口获取所有的 Pod 对象

  2. Reflector 拿到所有 Pod 后,将全部 Pod 放到 Store(本地缓存)中

  3. 如果有人调用 Lister 的 List/Get 方法获取 Pod,那么 Lister 直接从 Store 中去拿数据

  4. Informer 初始化完成后,Reflector 开始 Watch Pod 相关的事件

  5. 此时如果我们删除 Pod1,那么 Reflector 会监听到这个事件,然后将这个事件发送到 DeltaFIFO 中

  6. DeltaFIFO 首先先将这个事件存储在一个队列中,然后去操作 Store 中的数据,删除其中的 Pod1

  7. DeltaFIFO 然后 Pop 这个事件到事件处理器(资源事件处理器)中进行处理

  8. LocalStore 会周期性地把所有的 Pod 信息重新放回 DeltaFIFO 中去

标签:err,deployment,使用,metav1,v1,Deployment,Informer
From: https://www.cnblogs.com/geraldkohn/p/17005515.html

相关文章

  • Centos7.8误删Python2.7之后,导致yum和Python命令无法使用
    Centos7.8误删Python2.7之后,导致yum和Python命令无法使用先简单介绍下我的情况与背景:我在昨天写一个模块,跑Python脚本报错,由于我不熟习Python2,3之间语法有差异,导致......
  • 为什么阿里的Java开发规范中禁止使用Executors创建线程池?
    一.问题概述最近壹哥有个学生出去面试,面试官的一个问题是:在开发中你使用什么方式创建线程池?这个学生答曰:使用jdk中自带的工厂类Executors创建线程池!该学生回答完问题后,感......
  • babel的使用(关于使用async报错的问题)
    一、配置文件.babelrc.babelrc文件存放在项目的根目录下。{"presets":[],"plugins":[]}presets字段设定转码规则,你可以根据需要安装。$npmin......
  • 使用Babel将ES6代码转为ES5代码,从而在现有环境执行。
    https://blog.csdn.net/weixin_44797182/article/details/127622359前言在线转码https://babeljs.io/repl/#https://es6console.com/1.快速入门(1)ES6的某些高级语法在浏......
  • VScode 使用 emmet
    背景在很多的编辑场合,很多时候回出现很多逻辑性的问题。可能觉得html是一门没有逻辑的语言,实际上,它是有一定的思想编辑的。后来出现了emmet,这个不仅仅是一种快捷方式,同......
  • c++ 使用socket实现C/S端文件的下载传输
    首先是服务器端,大致说下流程:服务器创建线程去处理应答accept(),当接受到客户端连接请求时,首先获取要发送的指定的文件数据总大小给客户端,接着就是循环读取要发送的文件数据......
  • 从零演示如何基于 IDL 方式来定义 Dubbo 服务并使用 Triple 协议
    使用IDL定义服务具有更好的跨语言友好性,然而Triple协议并不是和IDL强绑定的,也可以使用JavaInterface+Pojo的方式定义服务并启用Triple协议,具体可参见示例。更......
  • 如何使用 YonBuilder 进行报表分析?
    报表是基于业务元数据、业务模型、数据模型等数据来源展示与分析业务的重要工具,在YonBuilder中可以通过简单拖拽、选择,快速生成报表分析,提升报表开发效率。本期通过员工信息......
  • 如何使用hutool进行AES加密和解密?
    如何使用hutool进行AES加密和解密?下面直接贴出工具类,有需要的小伙伴可以直接拿去用。importcn.hutool.crypto.asymmetric.AsymmetricCrypto;importcn.hutool.crypto.......
  • 使用opensl 的BufferQueueAudioPlayer对wav文件的播放
    创建音频引擎之后,读取wav文件到内存,然后使用BufferQueueAudioPlayer进行播放。这里在读取wav文件的时候需要对wav文件的前44个类似头信息进行解析,然后在进行播放的时候,在......