首页 > 编程语言 >Kubernetes: client-go 源码剖析(一)

Kubernetes: client-go 源码剖析(一)

时间:2024-08-22 18:26:10浏览次数:8  
标签:return cache client 源码 func go informer

kubernetes:client-go 系列文章:

0. 前言

在看 kube-scheduler 组件的过程中遇到了 kube-scheduler 对于 client-go 的调用,泛泛的理解调用过程总有种隔靴搔痒的感觉,于是调转头先把 client-go 理清楚在回来看 kube-scheduler

为什么要看 client-go,并且要深入到原理,源码层面去看。很简单,因为它很重要。重要在两方面:

  1. kubernetes 组件通过 client-go 和 kube-apiserver 交互。
  2. client-go 简单,易用,大部分基于 Kubernetes 做二次开发的应用,在和 kube-apiserver 交互时会使用 client-go

当然,不仅在于使用,理解层面,对于我们学习代码开发,架构等也有帮助。

1. client-go 客户端对象

client-go 支持四种客户端对象,分别是 RESTClientClientSetDynamicClient 和 DiscoveryClient

image

组件或者二次开发的应用可以通过这四种客户端对象和 kube-apiserver 交互。其中,RESTClient 是最基础的客户端对象,它封装了 HTTP Request,实现了 RESTful 风格的 APIClientSet 基于 RESTClient,封装了对于 Resource 和 Version 的请求方法。DynamicClient 相比于 ClientSet 提供了全资源,包括自定义资源的请求方法。DiscoveryClient 用于发现 kube-apiserver 支持的资源组,资源版本和资源信息。

每种客户端适用的场景不同,主要是对 HTTP Request 做了层层封装,具体的代码实现可参考 client-go 客户端对象

2. informer 机制

仅仅封装 HTTP Request 是不够的,组件通过 client-go 和 kube-apiserver 交互,必然对实时性,可靠性等有很高要求。试想,如果 ETCD 中存储的数据和组件通过 client-go 从 ETCD 获取的数据不匹配的话,那将会是一个非常严重的问题。

如何实现 client-go 的实时性,可靠性?client-go 给出的答案是:informer 机制。

image

                                  client-go informer 流程图

informer 机制的核心组件包括:

  • Reflector: 主要负责两类任务:
    1. 通过 client-go 客户端对象 list kube-apiserver 资源,并且 watch kube-apiserver 资源变更。
    2. 作为生产者,将获取的资源放入 Delta FIFO 队列。
  • Informer: 主要负责三类任务:
    1. 作为消费者,将 Reflector 放入队列的资源拿出来。
    2. 将资源交给 indexer 组件。
    3. 交给 indexer 组件之后触发回调函数,处理回调事件。
  • Indexerindexer 组件负责将资源信息存入到本地内存数据库(实际是 map 对象),该数据库作为缓存存在,其资源信息和 ETCD 中的资源信息完全一致(得益于 watch 机制)。因此,client-go 可以从本地 indexer 中读取相应的资源,而不用每次都从 kube-apiserver 中获取资源信息。这也实现了 client-go 对于实时性的要求。

接下来从源码角度看各个组件的处理流程,力图做到知其然,知其所以然。

2 informer 源码分析

直接阅读 informer 源码是非常晦涩难懂的,这里通过 informer 的代码示例开始学习:

package main

import (
	"log"
	"time"

	v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/informers"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/tools/clientcmd"
)

func main() {
    // 解析 kubeconfig
	config, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config")
	if err != nil {
		panic(err)
	}

    // 创建 ClientSet 客户端对象
	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		panic(err)
	}

	stopCh := make(chan struct{})
	defer close(stopCh)

    // 创建 sharedInformers
	sharedInformers := informers.NewSharedInformerFactory(clientset, time.Minute)
    // 创建 informer
	informer := sharedInformers.Core().V1().Pods().Informer()

    // 创建 Event 回调 handler
	informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			mObj := obj.(v1.Object)
			log.Printf("New Pod Added to Store: %s", mObj.GetName())
		},
		UpdateFunc: func(oldObj, newObj interface{}) {
			oObj := oldObj.(v1.Object)
			nObj := newObj.(v1.Object)
			log.Printf("%s Pod Updated to %s", oObj.GetName(), nObj.GetName())
		},
		DeleteFunc: func(obj interface{}) {
			mObj := obj.(v1.Object)
			log.Printf("Pod Deleted from Store: %s", mObj.GetName())
		},
	})

    // 运行 informer
	informer.Run(stopCh)
}

执行结果如下:

# go run informer.go 
2023/12/14 12:00:26 New Pod Added to Store: prometheus-alertmanager-0
2023/12/14 12:01:26 prometheus-alertmanager-0 Pod Updated to prometheus-alertmanager-0

上述代码示例分为三部分:创建 informer,创建 informer 的 EventHandler,运行 informer。下面,通过这三部分流程介绍 client-go 的核心组件。

2.1 创建 informer

创建 informer 分为两步。

1)创建工厂 sharedInformerFactory

// sharedInformers factory 
sharedInformers := informers.NewSharedInformerFactory(clientset, time.Minute)

// client-go/informers/factory.go
func NewSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory {
	return NewSharedInformerFactoryWithOptions(client, defaultResync)
}

func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
	factory := &sharedInformerFactory{
		client:           client,
		namespace:        v1.NamespaceAll,
		defaultResync:    defaultResync,
		informers:        make(map[reflect.Type]cache.SharedIndexInformer),
		startedInformers: make(map[reflect.Type]bool),
		customResync:     make(map[reflect.Type]time.Duration),
	}

	// Apply all options
	for _, opt := range options {
		factory = opt(factory)
	}

	return factory
}

sharedInformerFactory 实现了 SharedInformerFactory 接口,该工厂负责创建 informer

2)创建 informer

// 创建 informer
informer := sharedInformers.Core().V1().Pods().Informer()

// 调用 Core 方法
func (f *sharedInformerFactory) Core() core.Interface {
	return core.New(f, f.namespace, f.tweakListOptions)
}

func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface {
	return &group{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
}

// 调用 V1 方法
func (g *group) V1() v1.Interface {
	return v1.New(g.factory, g.namespace, g.tweakListOptions)
}

func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface {
	return &version{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
}

// 调用 Pods 方法
func (v *version) Pods() PodInformer {
	return &podInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}

经过层层构建创建 podInformer 对象,该对象实现了 PodInformer 接口,调用接口的 Informer 方法创建 informer 对象:

func (f *podInformer) Informer() cache.SharedIndexInformer {
	return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}

podInformer.Informer 实际调用的是 sharedInformerFactory.InformerFor

func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
	f.lock.Lock()
	defer f.lock.Unlock()

    // 反射出资源对象 obj 的 type 
	informerType := reflect.TypeOf(obj)

    // 读取并判断资源对象的 informer
	informer, exists := f.informers[informerType]
	if exists {
		return informer
	}

	...

    // 调用 newFunc 创建 informer
	informer = newFunc(f.client, resyncPeriod)

    // 将 type:informer 加入到 factory 的 informers 中
	f.informers[informerType] = informer

	return informer
}

从 InformerFor 方法可以看出,sharedInformerFactory 的 share 体现在同一个资源类型共享 informer

这么设计在于,每个 informer 包括一个 ReflectorReflector 通过访问 kube-apiserver 实现 ListAndWatch 操作。共享 informer 实际是共享 Reflector,这种共享机制将减少 Reflector 对于 kube-apiserver 的访问,降低 kube-apiserver 的负载,节约资源。

继续看,创建 informer 的 newFunc 函数做了什么:

informer = newFunc(f.client, resyncPeriod)

// client-go/informers/core/v1/pod.go
func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
	return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}

func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
	return cache.NewSharedIndexInformer(
		&cache.ListWatch{
			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.CoreV1().Pods(namespace).List(context.TODO(), options)
			},
			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
			},
		},
		&corev1.Pod{},
		resyncPeriod,
		indexers,
	)
}

newFunc 实际调用的是 NewFilteredPodInformer 函数,在函数内创建 cache.ListAndWatch 对象,对象中包括 ListFunc 和 WatchFunc 回调函数,回调函数内调用 ClientSet 实现 list 和 watch 资源对象。

继续看 cache.NewSharedIndexInformer

// client-go/tools/cache/shared_informer.go
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
	return NewSharedIndexInformerWithOptions(
		lw,
		exampleObject,
		SharedIndexInformerOptions{
			ResyncPeriod: defaultEventHandlerResyncPeriod,
			Indexers:     indexers,
		},
	)
}

func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.Object, options SharedIndexInformerOptions) SharedIndexInformer {
	realClock := &clock.RealClock{}

	return &sharedIndexInformer{
		indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers),
		processor:                       &sharedProcessor{clock: realClock},
		listerWatcher:                   lw,
		objectType:                      exampleObject,
		objectDescription:               options.ObjectDescription,
		resyncCheckPeriod:               options.ResyncPeriod,
		defaultEventHandlerResyncPeriod: options.ResyncPeriod,
		clock:                           realClock,
		cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
	}
}

在 NewSharedIndexInformerWithOptions 函数内创建 informer sharedIndexInformer。可以看到,sharedIndexInformer 内包括了 indexer 核心组件。

informer 创建完成。接下来为 informer 添加回调函数 EventHandler

2.2 创建 EventHandler

代码实现如下:

informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc: func(obj interface{}) {
        mObj := obj.(v1.Object)
        log.Printf("New Pod Added to Store: %s", mObj.GetName())
    },
    UpdateFunc: func(oldObj, newObj interface{}) {
        oObj := oldObj.(v1.Object)
        nObj := newObj.(v1.Object)
        log.Printf("%s Pod Updated to %s", oObj.GetName(), nObj.GetName())
    },
    DeleteFunc: func(obj interface{}) {
        mObj := obj.(v1.Object)
        log.Printf("Pod Deleted from Store: %s", mObj.GetName())
    },
})

创建 EventHandler 的 handler 中包括三种回调函数:AddFuncUpdateFunc 和 DeleteFunc,三种回调函数分别在资源有增加,变更,删除时触发。

在 sharedIndexInformer.AddEventHandler 内,将 handler 传递给 sharedIndexInformer.AddEventHandlerWithResyncPeriod 方法,该方法主要创建 listener 对象:

// client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error) {
	return s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
}

func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error) {
    ...
	listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize, s.HasSynced)

    if !s.started {
		return s.processor.addListener(listener), nil
	}
    ...
}

// client-go/tools/cache/shared_informer.go
func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int, hasSynced func() bool) *processorListener {
	ret := &processorListener{
		nextCh:                make(chan interface{}),
		addCh:                 make(chan interface{}),
		handler:               handler,
		syncTracker:           &synctrack.SingleFileTracker{UpstreamHasSynced: hasSynced},
		pendingNotifications:  *buffer.NewRingGrowing(bufferSize),
		requestedResyncPeriod: requestedResyncPeriod,
		resyncPeriod:          resyncPeriod,
	}

	ret.determineNextResync(now)

	return ret
}

func (p *sharedProcessor) addListener(listener *processorListener) ResourceEventHandlerRegistration {
    ...

	p.listeners[listener] = true
    ...

	return listener
}

listener 对象包含通道 addCh 和 nextCh,以及 handler 等对象。最后将 listener 存入 sharedIndexInformer.sharedProcessor 中。

创建完 informer 的 EventHandler,接下来该运行 informer 了。


芝兰生于空谷,不以无人而不芳。   分类: Kubernetes 好文要顶 关注我 收藏该文 微信分享 lubanseven 
粉丝 - 26 关注 - 3
会员号:4187     +加关注 2 0     升级成为会员   « 上一篇: 回顾云原生
» 下一篇: Kubernetes: client-go 源码剖析(二)

标签:return,cache,client,源码,func,go,informer
From: https://www.cnblogs.com/cheyunhua/p/18374492

相关文章

  • BanG Dream! It's MyGO!!!!!
    BanGDream!It'sMyGO!!!!!题目描述在“BanGDream!It'sMyGO!!!”的世界里,各个乐团的演出和排练场地像星星一样被连接在一起,形成了一张美丽的网络图。每个乐团都有自己独特的演出场地和练习室,这些地点通过各种路径互相连接,组成了一张复杂的图谱。koala作为一名热爱音乐的乐......
  • 【源码+论文】基于springboot的信息技术知识竞赛系统的设计与实现
    系统包含:源码+论文所用技术:SpringBoot+Vue+SSM+Mybatis+Mysql免费提供给大家参考或者学习,获取资料请私聊我目录第1章绪论 11.1选题动因 11.2目的和意义 11.3论文结构安排 2第2章开发环境与技术 32.1MYSQL数据库 32.2Tomcat介绍 32.3vue技术 42.4Sp......
  • 【论文+源码】基于springboot搭建的疫情管理系统
    系统包含:源码+论文所用技术:SpringBoot+Vue+SSM+Mybatis+Mysql免费提供给大家参考或者学习,获取资料请私聊我目录目录 III1绪论 11.1研究背景 11.2目的和意义 11.3论文结构安排 22相关技术 32.1springboot框架介绍 32.2B/S结构介绍 32.3Mysql数据......
  • candence allego 差分信号设置
    一、设置差分对1、Logic→AssignDifferential;2、依次点击要建立差分对的走线,并在DiffPairname处给差分对命名。二、差分规则Setup→Constraint→ConstraintManager,进入线束约束管理器,在线束约束管理器界面,左侧有一个WorksheetSelector,在WorksheetSelector里选择Phys......
  • 基于SpringBoot+Vue的学生作业管理系统的详细设计和实现(25年最新,附源码+论文+部署讲
    文章目录1.前言2.系统演示录像3.论文参考4.代码运行展示图5.技术框架5.1SpringBoot技术介绍5.2Vue技术介绍6.可行性分析7.系统测试7.1系统测试的目的7.2系统功能测试8.数据库表设计9.代码参考10.数据库脚本11.找我做程序,有什么保障?12.联系我们1.前......
  • PriorityQueue源码解析
    PriorityQueue优先级队列:默认每次取出权值最小的元素,元素的大小评判可以通过元素自身的自然顺序,也可以在构造时传入比较器进行定义顺序规则。用法//不传比较器PriorityQueue<Integer>pq=newPriorityQueue<>();pq.add(3);pq.add(4);pq.add(1);pq.add(2);//输出顺序1......
  • ArrayDeque源码解读
    ArrayDequeArrayDeque和LinkedList是Deque的两个通用实现,在使用Queue时,由于Queue只是一个接口,因此创建Queue时也会使用ArrayDeque为了实现在数组两端进行操作元素的需求,因此ArrayDeque使用循环数组作为底层数据结构,同时,ArrayDeque中定义了head和tail两个指针指向头和尾因为是循......
  • java+vue计算机毕设旅游景点预约系统【源码+开题+论文】
    本系统(程序+源码)带文档lw万字以上文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景随着旅游业的蓬勃发展,人们对旅游体验的需求日益个性化与高效化。传统的旅游预订方式往往存在信息不对称、购票流程繁琐、景点拥堵等问题,影响了游客的......
  • java+vue计算机毕设开放实验室网上预约系统【源码+开题+论文】
    本系统(程序+源码)带文档lw万字以上文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景随着高等教育体系的不断发展和教育资源的日益丰富,实验室作为培养学生实践能力和创新精神的重要场所,其使用效率与管理水平成为衡量高校教学质量的重要......
  • java+vue计算机毕设农资电子监管系统的设计与实现【源码+开题+论文】
    本系统(程序+源码)带文档lw万字以上文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景随着农业现代化的不断推进,农资产品的流通与管理成为保障农业生产高效、安全的重要环节。传统农资管理模式存在信息不对称、监管难度大、效率低下等问......