首页 > 编程语言 >Kubernetes: kube-controller-manager 源码分析

Kubernetes: kube-controller-manager 源码分析

时间:2024-03-11 16:48:14浏览次数:33  
标签:... Kubernetes err ctx controller rsc manager 源码 func


0. 前言

Kubernetes 架构中,controller manager 是一个永不休止的控制回路组件,其负责控制集群资源的状态。通过监控 kube-apiserver 的资源状态,比较当前资源状态和期望状态,如果不一致,更新 kube-apiserver 的资源状态以保持当前资源状态和期望状态一致。

image

1. kube-controller-manager

下面从源码角度分析 kube-controller-manager 的工作方式。

kube-controller-manager 使用 Cobra 作为应用命令行框架,和 kube-schedulerkube-apiserver 初始化过程类似,其流程如下:

image

这里,简要给出初始化代码示例:

# kubernetes/cmd/kube-controller-manager/app/controllermanager.go
func NewControllerManagerCommand() *cobra.Command {
    // 创建选项
    s, err := options.NewKubeControllerManagerOptions()
    ...
    cmd := &cobra.Command{
        ...
        RunE: func(cmd *cobra.Command, args []string) error {
            ...
            // 根据选项,创建配置
            c, err := s.Config(KnownControllers(), ControllersDisabledByDefault(), ControllerAliases())
			if err != nil {
				return err
			}
            ...
            return Run(context.Background(), c.Complete())
        },
        ...
    }
    ...
}

进入 Run 函数,看 kube-controller-manager 是怎么运行的。

# kubernetes/cmd/kube-controller-manager/app/controllermanager.go
func Run(ctx context.Context, c *config.CompletedConfig) error {
    ...
    run := func(ctx context.Context, controllerDescriptors map[string]*ControllerDescriptor) {
        // 创建上下文
		controllerContext, err := CreateControllerContext(logger, c, rootClientBuilder, clientBuilder, ctx.Done())
		if err != nil {
			logger.Error(err, "Error building controller context")
			klog.FlushAndExit(klog.ExitFlushTimeout, 1)
		}

        // 开始控制器,这是主运行逻辑
		if err := StartControllers(ctx, controllerContext, controllerDescriptors, unsecuredMux, healthzHandler); err != nil {
			logger.Error(err, "Error starting controllers")
			klog.FlushAndExit(klog.ExitFlushTimeout, 1)
		}

        // 启动 informer
		controllerContext.InformerFactory.Start(stopCh)
		controllerContext.ObjectOrMetadataInformerFactory.Start(stopCh)
		close(controllerContext.InformersStarted)

		<-ctx.Done()
	}

    // No leader election, run directly
	if !c.ComponentConfig.Generic.LeaderElection.LeaderElect {
        // 创建控制器描述符
		controllerDescriptors := NewControllerDescriptors()
		controllerDescriptors[names.ServiceAccountTokenController] = saTokenControllerDescriptor
		run(ctx, controllerDescriptors)
		return nil
	}
    ...
}

kube-scheduler 类似,kube-controller-manager 也是多副本单实例运行的组件,需要 leader election 作为 leader 组件运行。这里不过多介绍,具体可参考 Kubernetes leader election 源码分析

运行控制器管理器。首先,在 NewControllerDescriptors 中注册资源控制器的描述符。

# kubernetes/cmd/kube-controller-manager/app/controllermanager.go
func NewControllerDescriptors() map[string]*ControllerDescriptor {
    register := func(controllerDesc *ControllerDescriptor) {
		...
		controllers[name] = controllerDesc
	}

    ...
    // register 函数注册资源控制器
    register(newEndpointsControllerDescriptor())
	register(newEndpointSliceControllerDescriptor())
	register(newEndpointSliceMirroringControllerDescriptor())
	register(newReplicationControllerDescriptor())
	register(newPodGarbageCollectorControllerDescriptor())
	register(newResourceQuotaControllerDescriptor())
    ...

    return controllers
}

# kubernetes/cmd/kube-controller-manager/app/apps.go
func newReplicaSetControllerDescriptor() *ControllerDescriptor {
	return &ControllerDescriptor{
		name:     names.ReplicaSetController,
		aliases:  []string{"replicaset"},
		initFunc: startReplicaSetController,
	}
}

每个资源控制器描述符包括 initFunc 和启动控制器函数的映射。

runStartControllers 运行控制器。

# kubernetes/cmd/kube-controller-manager/app/controllermanager.go
func StartControllers(ctx context.Context, controllerCtx ControllerContext, controllerDescriptors map[string]*ControllerDescriptor,
	unsecuredMux *mux.PathRecorderMux, healthzHandler *controllerhealthz.MutableHealthzHandler) error {
    ...
    // 遍历获取资源控制器描述符
    for _, controllerDesc := range controllerDescriptors {
		if controllerDesc.RequiresSpecialHandling() {
			continue
		}

        // 运行资源控制器
		check, err := StartController(ctx, controllerCtx, controllerDesc, unsecuredMux)
		if err != nil {
			return err
		}
		if check != nil {
			// HealthChecker should be present when controller has started
			controllerChecks = append(controllerChecks, check)
		}
	}

    ...
    return nil
}

func StartController(ctx context.Context, controllerCtx ControllerContext, controllerDescriptor *ControllerDescriptor,
	unsecuredMux *mux.PathRecorderMux) (healthz.HealthChecker, error) {
    ...
    // 获取资源控制器描述符的启动函数
    initFunc := controllerDescriptor.GetInitFunc()
    
    // 启动资源控制器
	ctrl, started, err := initFunc(klog.NewContext(ctx, klog.LoggerWithName(logger, controllerName)), controllerCtx, controllerName)
	if err != nil {
		logger.Error(err, "Error starting controller", "controller", controllerName)
		return nil, err
	}
    ...
}

kubernetes 有多个控制器,这里以 Replicaset 控制器为例,介绍控制器是怎么运行的。

进入 Replicaset 控制器的 initFunc 函数运行控制器。

# kubernetes/cmd/kube-controller-manager/app/apps.go
func startReplicaSetController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
	go replicaset.NewReplicaSetController(
		klog.FromContext(ctx),
		controllerContext.InformerFactory.Apps().V1().ReplicaSets(),
		controllerContext.InformerFactory.Core().V1().Pods(),
		controllerContext.ClientBuilder.ClientOrDie("replicaset-controller"),
		replicaset.BurstReplicas,
	).Run(ctx, int(controllerContext.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs))
	return nil, true, nil
}

运行 initFunc 实际上运行的是 startReplicaSetControllerstartReplicaSetController 启动一个 goroutine 运行 replicaset.NewReplicaSetControllerReplicaSetController.Runreplicaset.NewReplicaSetController 创建了 informerEventhandlerReplicaSetController.Run 负责对 EventHandler 中加入队列的资源做处理。示意图如下:

image

首先,进入 replicaset.NewReplicaSetController 查看函数做了什么。

# kubernetes/pkg/controller/replicaset/replica_set.go
func NewReplicaSetController(logger klog.Logger, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicaSetController {
	...
	return NewBaseController(logger, rsInformer, podInformer, kubeClient, burstReplicas,
		apps.SchemeGroupVersion.WithKind("ReplicaSet"),
		"replicaset_controller",
		"replicaset",
		controller.RealPodControl{
			KubeClient: kubeClient,
			Recorder:   eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replicaset-controller"}),
		},
		eventBroadcaster,
	)
}

func NewBaseController(logger klog.Logger, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
	gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface, eventBroadcaster record.EventBroadcaster) *ReplicaSetController {

	rsc := &ReplicaSetController{
		GroupVersionKind: gvk,
		kubeClient:       kubeClient,
		podControl:       podControl,
		eventBroadcaster: eventBroadcaster,
		burstReplicas:    burstReplicas,
		expectations:     controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
		queue:            workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName),
	}

	rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			rsc.addRS(logger, obj)
		},
		UpdateFunc: func(oldObj, newObj interface{}) {
			rsc.updateRS(logger, oldObj, newObj)
		},
		DeleteFunc: func(obj interface{}) {
			rsc.deleteRS(logger, obj)
		},
	})
	...

	podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			rsc.addPod(logger, obj)
		},
		UpdateFunc: func(oldObj, newObj interface{}) {
			rsc.updatePod(logger, oldObj, newObj)
		},
		DeleteFunc: func(obj interface{}) {
			rsc.deletePod(logger, obj)
		},
	})
	...

	rsc.syncHandler = rsc.syncReplicaSet

	return rsc
}

函数定义了 ReplicaSetControllerpodInformer,负责监控 kube-apiserverReplicaSetPod 的变化,根据资源的不同变动触发对应的 Event Handler

接着,进入 Run 查看函数做了什么。

# kubernetes/pkg/controller/replicaset/replica_set.go
func (rsc *ReplicaSetController) Run(ctx context.Context, workers int) {
	...
	// 同步缓存和 kube-apiserver 中获取的资源
	if !cache.WaitForNamedCacheSync(rsc.Kind, ctx.Done(), rsc.podListerSynced, rsc.rsListerSynced) {
		return
	}

	for i := 0; i < workers; i++ {
		// worker 负责处理队列中的资源
		go wait.UntilWithContext(ctx, rsc.worker, time.Second)
	}

	<-ctx.Done()
}

func (rsc *ReplicaSetController) worker(ctx context.Context) {
	// worker 是永不停止的
	for rsc.processNextWorkItem(ctx) {
	}
}

func (rsc *ReplicaSetController) processNextWorkItem(ctx context.Context) bool {
	// 读取队列中的资源
	key, quit := rsc.queue.Get()
	if quit {
		return false
	}
	defer rsc.queue.Done(key)

	// 处理队列中的资源
	err := rsc.syncHandler(ctx, key.(string))
	if err == nil {
		rsc.queue.Forget(key)
		return true
	}

	...
	return true
}

可以看到,rsc.syncHandler 处理队列中的资源,rsc.syncHandler 实际执行的是 ReplicaSetController.syncReplicaSet

理清了代码的结构,我们以一个删除 Pod 示例看 kube-controller-manager 是怎么运行的。

1.1 删除 Pod 示例

1.1.1 示例条件

创建 Replicaset 如下:

# helm list
NAME    NAMESPACE       REVISION        UPDATED                                 STATUS          CHART           APP VERSION
test    default         1               2024-02-29 16:24:43.896757193 +0800 CST deployed        test-0.1.0      1.16.0

# kubectl get replicaset
NAME                       DESIRED   CURRENT   READY   AGE
test-6d47479b6b            1         1         1       10d

# kubectl get pods
NAME                             READY   STATUS    RESTARTS   AGE
test-6d47479b6b-5k6cb            1/1     Running   0          9d

删除 pod 查看 kube-controller-manager 是怎么运行的。

1.1.2 运行流程

删除 pod:

# kubectl delete pods test-6d47479b6b-5k6cb

删除 pod 后,podInformerEvent handler 接受到 pod 的变化,调用 ReplicaSetController.deletePod 函数:

func (rsc *ReplicaSetController) deletePod(logger klog.Logger, obj interface{}) {
	pod, ok := obj.(*v1.Pod)

	...
	logger.V(4).Info("Pod deleted", "delete_by", utilruntime.GetCaller(), "deletion_timestamp", pod.DeletionTimestamp, "pod", klog.KObj(pod))
	...
	rsc.queue.Add(rsKey)
}

ReplicaSetController.deletePod 将删除的 pod 加入到队列中。接着,worker 中的 ReplicaSetController.processNextWorkItem 从队列中获取删除的 pod,进入 ReplicaSetController.syncReplicaSet 处理。

func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string) error {
	...
	namespace, name, err := cache.SplitMetaNamespaceKey(key)
	...

	// 获取 pod 对应的 replicaset
	rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
	...

	// 获取所有 pod
	allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
	if err != nil {
		return err
	}

	// Ignore inactive pods.
	filteredPods := controller.FilterActivePods(logger, allPods)

	// 获取 replicaset 下的 pod
	// 这里 pod 被删掉了,filteredPods 为 0
	filteredPods, err = rsc.claimPods(ctx, rs, selector, filteredPods)
	if err != nil {
		return err
	}

	// replicaset 下的 pod 被删除
	// 进入 rsc.manageReplicas
	var manageReplicasErr error
	if rsNeedsSync && rs.DeletionTimestamp == nil {
		manageReplicasErr = rsc.manageReplicas(ctx, filteredPods, rs)
	}
	...
}

继续进入 ReplicaSetController.manageReplicas

func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
	diff := len(filteredPods) - int(*(rs.Spec.Replicas))
	...
	if diff < 0 {
		logger.V(2).Info("Too few replicas", "replicaSet", klog.KObj(rs), "need", *(rs.Spec.Replicas), "creating", diff)
		...
		successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
			err := rsc.podControl.CreatePods(ctx, rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))
			if err != nil {
				if apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
					// if the namespace is being terminated, we don't have to do
					// anything because any creation will fail
					return nil
				}
			}
			return err
		})
		...
	}
	...
}

filteredPods 小于 Replicaset 中 spec 域定义的 Replicas 时,进入 rsc.podControl.CreatePods 创建 pod:

func (r RealPodControl) CreatePods(ctx context.Context, namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error {
	return r.CreatePodsWithGenerateName(ctx, namespace, template, controllerObject, controllerRef, "")
}

func (r RealPodControl) CreatePodsWithGenerateName(ctx context.Context, namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference, generateName string) error {
	...
	return r.createPods(ctx, namespace, pod, controllerObject)
}

func (r RealPodControl) createPods(ctx context.Context, namespace string, pod *v1.Pod, object runtime.Object) error {
	...
	newPod, err := r.KubeClient.CoreV1().Pods(namespace).Create(ctx, pod, metav1.CreateOptions{})
	...
	logger.V(4).Info("Controller created pod", "controller", accessor.GetName(), "pod", klog.KObj(newPod))
	...

	return nil
}

接着,回到 ReplicaSetController.syncReplicaSet

func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string) error {
	...
	newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)
	updatedRS, err := updateReplicaSetStatus(logger, rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
	if err != nil {
		return err
	}
	...
}

虽然 pod 重建过,不过这里的 filteredPods 是 0,updateReplicaSetStatus 会更新 Replicaset 的当前状态为 0。

更新了 Replicaset 的状态又会触发 ReplicasetEvent Handler,从而再次进入 ReplicaSetController.syncReplicaSet。这时,如果 pod 重建完成,filteredPods 将过滤出重建的 pod,调用 updateReplicaSetStatus 更新 Replicaset 的当前状态到期望状态。

2. 小结

本文介绍了 kube-controller-manager 的运行流程,并且从一个删除 pod 的示例入手,看 kube-controller-manager 是如何控制资源状态的。


标签:...,Kubernetes,err,ctx,controller,rsc,manager,源码,func
From: https://www.cnblogs.com/xingzheanan/p/18066472

相关文章

  • Kubernetes ETCD
    简述ETCD及其特点etcd是一个分布式的、高可用的、一致的key-value存储数据库,基于Go语言实现,主要用于共享配置和服务发现。特点:1)完全复制:集群中的每个节点都可以使用完整的存档;2)高可用性:Etcd可用于避免硬件的单点故障或网络问题;3)一致性:每次读取都会返回跨多主机的最新写入;4)简......
  • drf源码剖析----版本、reverse
    点击查看代码classAPIView(View):defdispatch(self,request,*args,**kwargs):self.args=argsself.kwargs=kwargsrequest=self.initialize_request(request,*args,**kwargs)self.request=requestself.headers......
  • zookeeper源码(10)node增删改查及监听
    本文将从leader处理器入手,详细分析node的增删改查流程及监听器原理。回顾数据读写流程leaderZookeeperServer.processPacket封装Request并提交给业务处理器LeaderRequestProcessor做本地事务升级PrepRequestProcessor做事务准备ProposalRequestProcessor事务操作发proposal......
  • Swoole 源码分析之 epoll 多路复用模块
    首发原文链接:Swoole源码分析之HttpServer模块大家好,我是码农先森。引言在传统的IO模型中,每个IO操作都需要创建一个单独的线程或进程来处理,这样的操作会导致系统资源的大量消耗和管理开销。而IO多路复用技术通过使用少量的线程或进程同时监视多个IO事件,能够更高效地处理大......
  • Java面试必考题之线程的生命周期,结合源码,透彻讲解!
    写在开头在前面的几篇博客里,我们学习了Java的多线程,包括线程的作用、创建方式、重要性等,那么今天我们就要正式踏入线程,去学习更加深层次的知识点了。第一个需要学的就是线程的生命周期,也可以将之理解为线程的几种状态,以及互相之间的切换,这几乎是Java多线程的面试必考题,每一年都......
  • dolphinscheduler 实现master宕机故障转移能力源码分析
    DS(dolphinscheduler)的master是去中心化的,而故障转移能力是由master完成的,那么是多个master同时干故障转移,还是选举出一个master来干这件事情呢?回归到源码进行分析1.master启动方法@PostConstructpublicvoidrun()throwsSchedulerException{....this.failoverE......
  • 通达信买点100%指标公式源码1
    {通达信买点100%指标公式源码1}Var1:=1;趋势线:((3*SMA((CLOSE-LLV(LOW,27))/(HHV(HIGH,27)-LLV(LOW,27))*100,5,1)-2*SMA(SMA((CLOSE-LLV(LOW,27))/(HHV(HIGH,27)-LLV(LOW,27))*100,5,1),3,1)-50)*1.032+50),COLORRED;Var2:=(2*CLOSE+HIGH+LOW+OPEN)/5;Var3:=LLV(LOW,34)......
  • 通达信《鱼窝打分+鱼游打分》鱼仙指标 尾盘专用打分1支 止跌止盈量化计算 盘中捉涨停
    {通达信《鱼窝打分+鱼游打分》鱼仙指标尾盘专用打分1支止跌止盈量化计算盘中捉涨停捉妖源码文件分享}通达信《鱼窝打分+鱼游打分》鱼仙指标尾盘专用打分1支止跌止盈量化计算源码文件分享本指标每天尾盘打分1只《2022鱼仙指标盘中捉涨停妖栏》鱼窝打分鱼游打分稳......
  • 通达信成本无敌主图指标公式源码
    {通达信成本无敌主图指标公式源码}{指标介绍:出现绿色曲线时,此时我们持股,绿色曲线消失时我们持币。同时对于股价站上60日均线即大红粗线,成本无敌向上发散,此时就是打板时重点关注的对象。}JJJ:=IF(DYNAINFO(8)>0.01,0.01*DYNAINFO(10)/DYNAINFO(8),DYNAINFO(3));DDD:=(DYNAINF......
  • 通达信蓝防守黄进攻主图指标公式源码
    {通达信蓝防守黄进攻主图指标公式源码}DIFF:=EMA(CLOSE,12)-EMA(CLOSE,26);DEA:=EMA(DIFF,9);macd:=2*(DIFF-DEA);H9:=HHV(C,9),LINETHICK1,COLORGRAY;L9:=LLV(C,9),LINETHICK1,COLORGRAY;DRAWBAND(H9,RGB(255,217,40),L9,RGB(255,255,255));STICKLINE(MacD<0OR(DIF......