首页 > 其他分享 >client-go核心组件Informer

client-go核心组件Informer

时间:2023-02-26 14:33:06浏览次数:50  
标签:kc err client clientset go Informer 资源

Kubernetes组件在工作过程中需要大量监控并查询集群中的资源对象。以Deployment控制器为例,它需要实时关注Deployment和要控制的ReplicaSet的状态变更,实时收敛ReplicaSet的状态,使ReplicaSet与用户自定义的Deployment的状态保持一致。其他控制器也是如此,它们需要频繁查询所关注的资源对象,这势必会对API Server和ETCD造成查询负担。

组件和API Server之间采用HTTP通信,并且没有采用任何第三方中间件,需要保证组件之间通信消息的可靠性、顺序性和实时性。

上面所提到的问题,可以通过client-go的组件Informer来解决。在为一个自定义的CRD编写控制器时,最需要熟练掌握的也是Informer。

Informer的核心机制是List/Watch,当连接到API Server时,Informer会先获取(List模式)Kubernetes中所有用户关心的资源对象并存储到本地缓存中,之后会对这些资源对象进行监听(Watch模式),监控资源对象的变化,当资源对象发生变更时,会修改当前缓存中的数据,保证其与ETCD的数据一致。在Kubernetes的开发中,Informer的使用场景和优势如下。

1.当用户使用Informer查询Kubernetes中的资源对象时,查询的是本地的缓存,速度非常快,并且减轻了API Server和ETCD集群的查询压力。

2.Informer支持用户使用ShareInformer的AddEventHandler进行事件订阅和回调处理。事件订阅和回调处理是Kubernetes内置资源对象(Pod、Deployment、Service等)控制器、用户为自定义CRD编写控制器时需要使用的。

最优雅的开发实践是使用Clientset进行资源对象的增、删、改,并使用Informer去查询。

Informer的整体架构设计如下所示。

展现了Informer连接API Server后整个数据流向和方法调用,整体流程如下。

1)Controller控制Reflector的启动,调用List和Watch方法,从API Server先获取(List)所有的Kubernetes资源,并存入DeltaFIFO这个先进先出的队列,之后会监听(Watch)资源对象的变更事件,持续将后续收到的事件存入DeltaFIFO。

2)Controller的processLoop方法会将DeltaFIFO中的数据使用Pop方法取出来,并交给回调函数HandlerDetal,回调函数HandlerDetal负责处理取出来的数据。

3)当资源对象操作类型是Add、Update、Delete时,回调函数HandlerDetal将数据存储到Indexer。Indexer是对ThreadSafeMap的封装,ThreadSafeMap是一个并发安全的内存存储。存入后方便用户使用Informer的Lister进行资源的高效查询。

4)HandlerDetal除了将事件存入Indexer外,还会将数据通过distribute函数分发到ShareInformer,这样用户在使用informer.AddEventHandler函数时才会收到事件的通知并触发回调。

5)Kubernetes的控制器或者用户自定义的控制器在使用ShareInformer的AddEventHandler订阅事件时,在事件触发后,一般将数据通过workqueue.add方法存入工作队列,这个队列有一些特性非常好用。

6)Kubernetes的控制器或者用户自定义的控制器会使用get方法从工作队列中获取数据,并进行控制器的主要逻辑处理,不断收敛资源状态使之和资源定义一致。如果处理过程中发生错误(如调用更新资源方法时发生网络I/O错误),则将数据重新放回工作队列中并限速,等待下次处理。

需要注意步骤1)、步骤2)中提到的Controller和步骤5)步骤6)中提到的控制器是有区别的,前者是Informer的一个组件,源码路径是k8s.io/client-go/tools/cache/controller.go,用来控制整个Informer的启动流程;后者用来控制资源对象,收敛资源状态。

在client-go中,每个资源对象都有相应的Informer机制,用户在开发过程中直接使用的就是每个资源的Informer,这里先给出了Informer的具体使用示例,让读者明白如何使用,然后解析client-go的组件和它们的工作原理,其中的关键点如下。

GetKubeClient函数定义了如何通过一个配置文件生成Clientset,示例如下所示。

func GetKubeClient(cfgpath string) (*kubernetes.Clientset, *restclient.Config) {
    configfile := cfgpath
    kubeconfig, err := clientcmd.BuildConfigFromFlags("", configfile)
    if err != nil {
        logger.Errorf("BuildConfigFromFlags kube clientset err:", err)
        panic(err)
    }
    clientset, err := kubernetes.NewForConfig(kubeconfig)

    if err != nil {
        logger.Errorf("NewForConfig err:%s", err)
        panic(err)
    }
    logger.Info("GetKubeClient OK")
    return clientset, kubeconfig
}

编写结构体KubeController,示例如下所示。

package main
//KController 对象
type KubeController struct {
    kubeConfig         *restclient.Config
    status             int32
    clusterId          []string
    env                []string
    clientset          *kubernetes.Clientset
    factory            informers.SharedInformerFactory
//定义Deployment、Pod、Service等资源对象的Informer、Lister以及HasSynce
    ......   
    podInformer        coreinformers.PodInformer
    podsLister         corelisters.PodLister
    podsSynced         cache.InformerSynced
    ......
}

编写NewKubeController函数,示例如下所示。

//创建KController对象
func NewKubeController(kubeConfig *restclient.Config, clientset *kubernetes.
    Clientset, defaultResync time.Duration) *KubeController {
    kc := &KubeController{kubeConfig: kubeConfig, clientset: clientset}
    //通过Clientset生成SharedInformerFactory
    //defaultResync参数可以控制reflector调用List的周期,如果设置为0,启动后获取
    //(List)一次全量的资源对象并放入缓存,后续不会再同步
    kc.factory = informers.NewSharedInformerFactory(clientset, defaultResync)
    //生成Deployment、Pod、Service等资源对象的Informer、Lister以及HasSynced
    ......
    kc.podInformer = kc.factory.Core().V1().Pods()
    kc.podsLister = kc.podInformer.Lister()
    kc.podsSynced = kc.podInformer.Informer().HasSynced
    ......
    return kc
}

编写Run方法,启动Informer,示例如下所示。

//启动Factory,获取缓存
func (kc *KubeController) Run(stopPodch chan struct{}) {
    //defer close(stopPodCh)
    defer utilruntime.HandleCrash()
    defer logger.Error("KubeController shutdown")
    //传入停止的stopCh
    kc.factory.Start(stopPodch)
    //等待资源查询(List)完成后同步到缓存
    if !cache.WaitForCacheSync(stopPodch, kc.nodesSynced, kc.deploymentsSynced, 
        kc.podsSynced,
        kc.ingressesSynced, kc.servicesSynced, kc.configMapsSynced, kc.namespaceSynced) {
        utilruntime.HandleError(fmt.Errorf("timed out waiting for kuberesource 
            caches to sync"))
        return
    }
    //同步成功,设置标志位status 为1
    kc.status = 1
    logger.Info("KubeController start")
    <-stopPodch
}

编写main()函数,示例如下所示。

func main() {
    clientset, kubeConfig := GetKubeClient("conf/config")
    kc := NewKubeController(kubeConfig, clientset, time.Second*3000)
    stopPodch := make(chan struct{})
    go func() {
        kc.Run(stopPodch)
        <-stopPodch
    }()
    //等待所有资源对象同步完成再继续
    for {
        if kc.status == 1 {
            break
        }
        time.Sleep(time.Second * 1)
        fmt.Println("sleep 1S")
    }
    //使用Pod的Lister获取指定Pod的完整资源对象,打印Pod所在的Kubernetes节点名称
    pod , err := kc.podsLister.Pods("tech-daily").Get("hello-omega-deployment-
        7d8ff89d87-25kb4")
    if err != nil {
        logger.Errorf("get pods err:%s", err)
    }
    logger.Infof("the pods hostname is :%s", pod.Spec.NodeName)
}

需要注意informers.NewSharedInformerFactory,Informer也被称为Shared Informer,在实际使用过程中,如果同一个资源的Informer被实例化多次,那么每一个Informer都会使用一个Reflector,并且每一个Reflector都会调用List/Watch,这样做导致的后果是会带来重复的序列化和反序列化,进而增加API Server的压力。

而Shared Informer跟它的命名一样,可以使同一类资源共享一个Reflector,从而避免重复的工作。Shared Informer定义了一个map的数据结构,用于存放所有的Informer字段,示例如下所示。源码路径为k8s.io/client-go/informers/factory.go。

type sharedInformerFactory struct {
    client           kubernetes.Interface
    namespace        string
    tweakListOptions internalinterfaces.TweakListOptionsFunc
    lock             sync.Mutex
    defaultResync    time.Duration
    customResync     map[reflect.Type]time.Duration
    informers map[reflect.Type]cache.SharedIndexInformer
    startedInformers map[reflect.Type]bool
}

标签:kc,err,client,clientset,go,Informer,资源
From: https://www.cnblogs.com/muzinan110/p/17156638.html

相关文章

  • 「文档数据库之争」MongoDB和CouchDB的比较
    MongoDB和CouchDB都是基于文档的NoSQL数据库类型。文档数据库又称mdocumentstore,通常用于存储半结构化数据的文档格式及其详细描述。它允许创建和更新程序,而不需要引用主模......
  • 论文阅读_AlphaGo_Zero
    论文信息name_en:MasteringthegameofGowithouthumanknowledgename_ch:在没有人类知识的情况下掌握围棋游戏paper_addr:http://www.nature.com/articles/nature2......
  • go 单元测试
    go单元测试单元测试单元测试的写法:首先文件的命名方式xxx_test.go函数的命名方式funcTestxxx(t*testing.T)运行测试用例gotestxxx_test.go例如文件fmt.go......
  • Windows 下搭建 googletest 测试框架(C/C++)
    主要分为以下几个部分环境准备源代码准备googletest与测试代码编译执行一、环境准备1.MinGW可以直接下载MinGW,x86_64-posix-sjlj下载地址也可以通过下载带编......
  • Django 框架基础9:视图(V)类视图、中间件
    1、类视图类视图是采用面向对象的思路定义类视图①继承自django.views的View。②不同的请求方式有不同的业务逻辑.类视图的方法就直接采用http请求名字作为函数名,如,ge......
  • go中的map和锁
    Go中的map和锁声明和初始化只声明,vargMapmap[string]string使用var声明声明初始化varhMap=map[string]string使用make初始化packagemainimport"fmt"......
  • django.template.exceptions.TemplateDoesNotExist: rest_framework/api.html
    django.template.exceptions.TemplateDoesNotExist:rest_framework/api.html报错,使用postman进行提交请求是能正常调用的,但是使用浏览器就会抛出这个错误这是因为没在set......
  • Go语言初尝
    概述对于语言设计之争,唯一需要牢记的一句话是:如果把C变成C++,那么C就消失了。Go是一个轻量级的简洁的支持并发的现代语言,可以用于探索性个人项目,这是我想学这......
  • Go编程实战:博客备份
    在“博客备份”一文中,编写了两个python脚本,然后使用一个命令行将博客备份流程串联起来。这里,我们将使用Go来实现。不太熟悉Go语言的读者,可以阅读之前写的两篇文章......
  • python-djanggo 实现读取excel 表格在网页中展示
    1.准备读取数据放到项目文件夹下   2.熟悉表结构    3.准备处理依赖库pipinstall-ihttps://pypi.tuna.tsinghua.edu.cn/simplepandasopenpyxl  ......