Kubernetes组件在工作过程中需要大量监控并查询集群中的资源对象。以Deployment控制器为例,它需要实时关注Deployment和要控制的ReplicaSet的状态变更,实时收敛ReplicaSet的状态,使ReplicaSet与用户自定义的Deployment的状态保持一致。其他控制器也是如此,它们需要频繁查询所关注的资源对象,这势必会对API Server和ETCD造成查询负担。
组件和API Server之间采用HTTP通信,并且没有采用任何第三方中间件,需要保证组件之间通信消息的可靠性、顺序性和实时性。
上面所提到的问题,可以通过client-go的组件Informer来解决。在为一个自定义的CRD编写控制器时,最需要熟练掌握的也是Informer。
Informer的核心机制是List/Watch,当连接到API Server时,Informer会先获取(List模式)Kubernetes中所有用户关心的资源对象并存储到本地缓存中,之后会对这些资源对象进行监听(Watch模式),监控资源对象的变化,当资源对象发生变更时,会修改当前缓存中的数据,保证其与ETCD的数据一致。在Kubernetes的开发中,Informer的使用场景和优势如下。
1.当用户使用Informer查询Kubernetes中的资源对象时,查询的是本地的缓存,速度非常快,并且减轻了API Server和ETCD集群的查询压力。
2.Informer支持用户使用ShareInformer的AddEventHandler进行事件订阅和回调处理。事件订阅和回调处理是Kubernetes内置资源对象(Pod、Deployment、Service等)控制器、用户为自定义CRD编写控制器时需要使用的。
最优雅的开发实践是使用Clientset进行资源对象的增、删、改,并使用Informer去查询。
Informer的整体架构设计如下所示。
展现了Informer连接API Server后整个数据流向和方法调用,整体流程如下。
1)Controller控制Reflector的启动,调用List和Watch方法,从API Server先获取(List)所有的Kubernetes资源,并存入DeltaFIFO这个先进先出的队列,之后会监听(Watch)资源对象的变更事件,持续将后续收到的事件存入DeltaFIFO。
2)Controller的processLoop方法会将DeltaFIFO中的数据使用Pop方法取出来,并交给回调函数HandlerDetal,回调函数HandlerDetal负责处理取出来的数据。
3)当资源对象操作类型是Add、Update、Delete时,回调函数HandlerDetal将数据存储到Indexer。Indexer是对ThreadSafeMap的封装,ThreadSafeMap是一个并发安全的内存存储。存入后方便用户使用Informer的Lister进行资源的高效查询。
4)HandlerDetal除了将事件存入Indexer外,还会将数据通过distribute函数分发到ShareInformer,这样用户在使用informer.AddEventHandler函数时才会收到事件的通知并触发回调。
5)Kubernetes的控制器或者用户自定义的控制器在使用ShareInformer的AddEventHandler订阅事件时,在事件触发后,一般将数据通过workqueue.add方法存入工作队列,这个队列有一些特性非常好用。
6)Kubernetes的控制器或者用户自定义的控制器会使用get方法从工作队列中获取数据,并进行控制器的主要逻辑处理,不断收敛资源状态使之和资源定义一致。如果处理过程中发生错误(如调用更新资源方法时发生网络I/O错误),则将数据重新放回工作队列中并限速,等待下次处理。
需要注意步骤1)、步骤2)中提到的Controller和步骤5)步骤6)中提到的控制器是有区别的,前者是Informer的一个组件,源码路径是k8s.io/client-go/tools/cache/controller.go,用来控制整个Informer的启动流程;后者用来控制资源对象,收敛资源状态。
在client-go中,每个资源对象都有相应的Informer机制,用户在开发过程中直接使用的就是每个资源的Informer,这里先给出了Informer的具体使用示例,让读者明白如何使用,然后解析client-go的组件和它们的工作原理,其中的关键点如下。
GetKubeClient函数定义了如何通过一个配置文件生成Clientset,示例如下所示。
func GetKubeClient(cfgpath string) (*kubernetes.Clientset, *restclient.Config) { configfile := cfgpath kubeconfig, err := clientcmd.BuildConfigFromFlags("", configfile) if err != nil { logger.Errorf("BuildConfigFromFlags kube clientset err:", err) panic(err) } clientset, err := kubernetes.NewForConfig(kubeconfig) if err != nil { logger.Errorf("NewForConfig err:%s", err) panic(err) } logger.Info("GetKubeClient OK") return clientset, kubeconfig }
编写结构体KubeController,示例如下所示。
package main //KController 对象 type KubeController struct { kubeConfig *restclient.Config status int32 clusterId []string env []string clientset *kubernetes.Clientset factory informers.SharedInformerFactory //定义Deployment、Pod、Service等资源对象的Informer、Lister以及HasSynce ...... podInformer coreinformers.PodInformer podsLister corelisters.PodLister podsSynced cache.InformerSynced ...... }
编写NewKubeController函数,示例如下所示。
//创建KController对象 func NewKubeController(kubeConfig *restclient.Config, clientset *kubernetes. Clientset, defaultResync time.Duration) *KubeController { kc := &KubeController{kubeConfig: kubeConfig, clientset: clientset} //通过Clientset生成SharedInformerFactory //defaultResync参数可以控制reflector调用List的周期,如果设置为0,启动后获取 //(List)一次全量的资源对象并放入缓存,后续不会再同步 kc.factory = informers.NewSharedInformerFactory(clientset, defaultResync) //生成Deployment、Pod、Service等资源对象的Informer、Lister以及HasSynced ...... kc.podInformer = kc.factory.Core().V1().Pods() kc.podsLister = kc.podInformer.Lister() kc.podsSynced = kc.podInformer.Informer().HasSynced ...... return kc }
编写Run方法,启动Informer,示例如下所示。
//启动Factory,获取缓存 func (kc *KubeController) Run(stopPodch chan struct{}) { //defer close(stopPodCh) defer utilruntime.HandleCrash() defer logger.Error("KubeController shutdown") //传入停止的stopCh kc.factory.Start(stopPodch) //等待资源查询(List)完成后同步到缓存 if !cache.WaitForCacheSync(stopPodch, kc.nodesSynced, kc.deploymentsSynced, kc.podsSynced, kc.ingressesSynced, kc.servicesSynced, kc.configMapsSynced, kc.namespaceSynced) { utilruntime.HandleError(fmt.Errorf("timed out waiting for kuberesource caches to sync")) return } //同步成功,设置标志位status 为1 kc.status = 1 logger.Info("KubeController start") <-stopPodch }
编写main()函数,示例如下所示。
func main() { clientset, kubeConfig := GetKubeClient("conf/config") kc := NewKubeController(kubeConfig, clientset, time.Second*3000) stopPodch := make(chan struct{}) go func() { kc.Run(stopPodch) <-stopPodch }() //等待所有资源对象同步完成再继续 for { if kc.status == 1 { break } time.Sleep(time.Second * 1) fmt.Println("sleep 1S") } //使用Pod的Lister获取指定Pod的完整资源对象,打印Pod所在的Kubernetes节点名称 pod , err := kc.podsLister.Pods("tech-daily").Get("hello-omega-deployment- 7d8ff89d87-25kb4") if err != nil { logger.Errorf("get pods err:%s", err) } logger.Infof("the pods hostname is :%s", pod.Spec.NodeName) }
需要注意informers.NewSharedInformerFactory,Informer也被称为Shared Informer,在实际使用过程中,如果同一个资源的Informer被实例化多次,那么每一个Informer都会使用一个Reflector,并且每一个Reflector都会调用List/Watch,这样做导致的后果是会带来重复的序列化和反序列化,进而增加API Server的压力。
而Shared Informer跟它的命名一样,可以使同一类资源共享一个Reflector,从而避免重复的工作。Shared Informer定义了一个map的数据结构,用于存放所有的Informer字段,示例如下所示。源码路径为k8s.io/client-go/informers/factory.go。
type sharedInformerFactory struct { client kubernetes.Interface namespace string tweakListOptions internalinterfaces.TweakListOptionsFunc lock sync.Mutex defaultResync time.Duration customResync map[reflect.Type]time.Duration informers map[reflect.Type]cache.SharedIndexInformer startedInformers map[reflect.Type]bool }标签:kc,err,client,clientset,go,Informer,资源 From: https://www.cnblogs.com/muzinan110/p/17156638.html