首页 > 编程语言 >Kubernetes:kubelet 源码分析之 pod 创建流程

Kubernetes:kubelet 源码分析之 pod 创建流程

时间:2024-05-20 15:29:18浏览次数:36  
标签:... Kubernetes err ctx kl kubelet 源码 pod


0. 前言

kubelet 是运行在 Kubernetes 节点上的“节点代理”,用来管理节点。

image

kubelet 主要负责所在节点上的资源对象的管理,例如 Pod 资源对象的创建,删除,监控,驱逐及生命周期管理等。

1. kubelet 源码分析

1.1 kubelet 模块

kubelet 包括的模块如下图:

image

从图中可以看出,kubelet 的模块众多,每个模块负责不同的功能。本文将围绕创建 Pod 流程有取舍的介绍 kubelet 各个模块。

在开始流程介绍前,让我们通过 kubelet 工作原理图将各个模块串联起来,这对于我们的源码分析是相当有帮助的。

image

1.2 kubelet 启动及调试

下载 Kubernetes 源码,配置调试参数:

{
    "version": "0.2.0",
    "configurations": [
        {
            "name": "Kubelet",
            "type": "go",
            "request": "launch",
            "mode": "auto",
            "program": "${fileDirname}",
            "args": [
                "--container-runtime-endpoint=/run/k3s/containerd/containerd.sock",
                "-v=5",
                "--port=10251",
                "--kubeconfig=/root/.kube/config",
            ]
        }
    ]
}

打断点进入 kubelet:

image

kubelet 使用 Cobra 作为应用命令行框架,和 kube-schedulerkube-apiserver 初始化过程类似,其流程如下:

image

这里,简要给出初始化示例代码:

// kubernetes/cmd/kubelet/app/server.go
func NewKubeletCommand() *cobra.Command {
    // 解析 flags
	cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError)
	cleanFlagSet.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
	kubeletFlags := options.NewKubeletFlags()

    // 获取 kubelet 配置
    kubeletConfig, err := options.NewKubeletConfiguration()

    cmd := &cobra.Command{
		...
        RunE: func(cmd *cobra.Command, args []string) error {
			...

            // 构建 kubeletServer
            kubeletServer := &options.KubeletServer{
				KubeletFlags:         *kubeletFlags,
				KubeletConfiguration: *kubeletConfig,
			}

            // 构建 kubeletDeps,kubeletDeps 是运行 kubelet 需要的依赖项
            kubeletDeps, err := UnsecuredDependencies(kubeletServer, utilfeature.DefaultFeatureGate)

            ...
            return Run(ctx, kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate)
        }
    }
}

进入 Run 函数运行 kubelet

// kubernetes/cmd/kubelet/app/server.go
func Run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) error {
	...
	if err := run(ctx, s, kubeDeps, featureGate); err != nil {
		return fmt.Errorf("failed to run Kubelet: %w", err)
	}
	return nil
}

func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) (err error) {
    ...
    if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
		return err
	}
    ...
}

run 函数的内容比较多,我们直接忽略,有重点的看 RunKubelet

// kubernetes/cmd/kubelet/app/server.go
func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
    ...
    k, err := createAndInitKubelet(kubeServer,
		kubeDeps,
		hostname,
		hostnameOverridden,
		nodeName,
		nodeIPs)
    if err != nil {
		return fmt.Errorf("failed to create kubelet: %w", err)
	}

    ...
    if runOnce {
		...
	} else {
		startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
		klog.InfoS("Started kubelet")
	}
	return nil
}

这里 createAndInitKubelet 创建 kubelet 对象,该对象在 startKubelet 中运行:

// kubernetes/cmd/kubelet/app/server.go
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
	// start the kubelet
	go k.Run(podCfg.Updates())

	// start the kubelet server
	if enableServer {
		go k.ListenAndServe(kubeCfg, kubeDeps.TLSOptions, kubeDeps.Auth, kubeDeps.TracerProvider)
	}
	if kubeCfg.ReadOnlyPort > 0 {
		go k.ListenAndServeReadOnly(netutils.ParseIPSloppy(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
	}
	go k.ListenAndServePodResources()
}

startKubelet 调用 kubelet.Run 方法运行 kubelet。我们直接进入 kubelet.Run 方法看其中做了什么。

// kubernetes/pkg/kubelet/kubelet.go
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
    ...
    // 初始化模块是初始化不依赖于 container runtime 的模块
    if err := kl.initializeModules(); err != nil {
		kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
		klog.ErrorS(err, "Failed to initialize internal modules")
		os.Exit(1)
	}

    ...
    kl.syncLoop(ctx, updates, kl)
}

Kubelet.Run 中包括了不少操作,这里还是抓重点看 Kubelet.syncLoop 主逻辑做了什么。

// kubernetes/pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoop(ctx context.Context, updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
	klog.InfoS("Starting kubelet main sync loop")

    // syncTicker 每秒检测一次是否有需要同步的 pod workers
    syncTicker := time.NewTicker(time.Second)
	defer syncTicker.Stop()

    // 每两秒检测一次是否有需要清理的 pod
	housekeepingTicker := time.NewTicker(housekeepingPeriod)
	defer housekeepingTicker.Stop()
    ...
	for {
		...
		kl.syncLoopMonitor.Store(kl.clock.Now())
		if !kl.syncLoopIteration(ctx, updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
			break
		}
		kl.syncLoopMonitor.Store(kl.clock.Now())
	}
}

func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
	syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
	select {
    case u, open := <-configCh:
        ...
        switch u.Op {
		case kubetypes.ADD:
			klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
			// After restarting, kubelet will get all existing pods through
			// ADD as if they are new pods. These pods will then go through the
			// admission process and *may* be rejected. This can be resolved
			// once we have checkpointing.
			handler.HandlePodAdditions(u.Pods)
            ...
        }
    }
}

Kubelet.syncLoopIteration 包括多个操作管道的行为,这里仅以 configCh 管道为例,看创建 pod 的行为。

handler.HandlePodAdditions(u.Pods) 这里打断点,然后创建 pod:

# helm install test .
NAME: test
LAST DEPLOYED: Sun May 19 15:34:54 2024
NAMESPACE: default
STATUS: deployed

image

I0519 15:34:54.577769 1801325 kubelet.go:2410] "SyncLoop ADD" source="api" pods=["default/test-6d47479b6b-pphb2"]

进入 handler.HandlePodAdditions

func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
	...
    for _, pod := range pods {
        // 获取 podManager 模块中记录的 pods
		existingPods := kl.podManager.GetPods()

        // 更新 podManager 中的 pod
        kl.podManager.AddPod(pod)

        // 根据 pod 的属性判断当前 pod 是不是 mirrorPod
        // mirrorPod 是仅受 kubelet 管理的,对 kubernetes 不可见的 pod
        pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod)
        if wasMirror {
            ...
        }

        // 判断 pod 是否处于 termination 状态
        if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
            activePods := kl.filterOutInactivePods(existingPods)
            if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
                ...
            } else {
                // 判断 pod 是否可以运行在当前 node
                if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
					kl.rejectPod(pod, reason, message)
					continue
				}
            }
        }

        kl.podWorkers.UpdatePod(UpdatePodOptions{
			Pod:        pod,
			MirrorPod:  mirrorPod,
			UpdateType: kubetypes.SyncPodCreate,
			StartTime:  start,
		})
    }
}

这里,podManager 模块负责存储和访问 pod 的信息,维持 static pod 和 mirror pods 的关系,podManager 会被 statusManager/volumeManager/runtimeManager 调用,podManger 记录所有被管理的 pod。

继续往下看 podWorkers.UpdatePod

# kubernetes/pkg/kubelet/pod_workers.go
func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
    ...
    status, ok := p.podSyncStatuses[uid]
    if !ok {
        klog.V(4).InfoS("Pod is being synced for the first time", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType)
        firstTime = true
		status = &podSyncStatus{
			syncedAt: now,
			fullname: kubecontainer.BuildPodFullName(name, ns),
		}
        ...
        p.podSyncStatuses[uid] = status
    }

    ...
    // 创建一个 pod worker 协程,如果该协程不存在的话
    podUpdates, exists := p.podUpdates[uid]
	if !exists {
        podUpdates = make(chan struct{}, 1)
		p.podUpdates[uid] = podUpdates
        ...
        go func() {
			defer runtime.HandleCrash()
			defer klog.V(3).InfoS("Pod worker has stopped", "podUID", uid)
			p.podWorkerLoop(uid, outCh)
		}()
    }
}

func (p *podWorkers) podWorkerLoop(podUID types.UID, podUpdates <-chan struct{}) {
    var lastSyncTime time.Time
    for range podUpdates {
        // startPodSync 判断 pod 是否可以被同步
		ctx, update, canStart, canEverStart, ok := p.startPodSync(podUID)

        ...
        err := func() error {
            var status *kubecontainer.PodStatus
            var err error
            switch {
			case update.Options.RunningPod != nil:
            default:
                status, err = p.podCache.GetNewerThan(update.Options.Pod.UID, lastSyncTime)
                ...
            }
        }

        switch {
			case update.WorkType == TerminatedPod:
            ...
            default:
				isTerminal, err = p.podSyncer.SyncPod(ctx, update.Options.UpdateType, update.Options.Pod, update.Options.MirrorPod, status)
			}

            lastSyncTime = p.clock.Now()
			return err
		}()

        ...
    }
}

这里,要注意的是 podWorkers.podCache.GetNewerThan 获取的是最新的 pod 状态。其中,PLEG 获取 container runtime 的 pod 状态,存入 podCache 中。podCache 中的 pod 状态和 kubeletkube-apiserver 获取的 pod 状态做对比,以获取最新的 pod 状态。

接着,进入 podWorkers.podSyncer.SyncPod 同步 pod:

func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {
    ...
	klog.V(4).InfoS("SyncPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
    ...
    // 生成 apiPodStatus 以同步至 statusManager
    apiPodStatus := kl.generateAPIPodStatus(pod, podStatus, false)
    ...
    // 获取 statusManager 中存储的 pod 状态
    existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
    ...
    // 调用 statusManager 同步 pod 状态
    kl.statusManager.SetPodStatus(pod, apiPodStatus)

    ...
    // ensure the kubelet knows about referenced secrets or configmaps used by the pod
    if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
		if kl.secretManager != nil {
			kl.secretManager.RegisterPod(pod)
		}
		if kl.configMapManager != nil {
			kl.configMapManager.RegisterPod(pod)
		}
	}

    // 创建 pod container manager
    pcm := kl.containerManager.NewPodContainerManager()
    ...

    // Make data directories for the pod
	if err := kl.makePodDataDirs(pod); err != nil {
		...
	}

    // Wait for volumes to attach/mount
	if err := kl.volumeManager.WaitForAttachAndMount(ctx, pod); err != nil {
		...
	}

    // Fetch the pull secrets for the pod
	pullSecrets := kl.getPullSecretsForPod(pod)

	// Ensure the pod is being probed
	kl.probeManager.AddPod(pod)

    ...
    result := kl.containerRuntime.SyncPod(sctx, pod, podStatus, pullSecrets, kl.backOff)
    ...
}

Kubelet.SyncPod 首先更新 statusManager 中 pod 的状态信息,接着开始创建 pod 所需要的资源,如 data directoriesvolumessecrets。在调用 container runtime 同步 pod 前,将 pod 添加到 probeManger 模块,以检测 pod 状态。这里关于 probeManger 模块的详细内容可参考。

进入 Kubelet.containerRuntime.SyncPod 查看 container runtime 是怎么同步 pod 的。

// kubernetes/pkg/kubelet/kuberuntime/kuberuntime_manager.go
func (m *kubeGenericRuntimeManager) SyncPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
	// Step 1: Compute sandbox and container changes.
	podContainerChanges := m.computePodActions(ctx, pod, podStatus)
    ...
    // Step 2: Kill the pod if the sandbox has changed.
	if podContainerChanges.KillPod {
        ...
    } else {
        // Step 3: kill any running containers in this pod which are not to keep.
        for containerID, containerInfo := range podContainerChanges.ContainersToKill {
            ...
        }
    }

    ...
    // Step 4: Create a sandbox for the pod if necessary.
    podSandboxID := podContainerChanges.SandboxID
	if podContainerChanges.CreateSandbox {
        ...
        createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod))
		result.AddSyncResult(createSandboxResult)
        ...
        podSandboxID, msg, err = m.createPodSandbox(ctx, pod, podContainerChanges.Attempt)
        if err != nil {
            ...
        }

        // 调用 runtime cri 接口查询创建的 pod sandbox 状态
        resp, err := m.runtimeService.PodSandboxStatus(ctx, podSandboxID, false)
        ...
    }

    configPodSandboxResult := kubecontainer.NewSyncResult(kubecontainer.ConfigPodSandbox, podSandboxID)
	result.AddSyncResult(configPodSandboxResult)

    start := func(ctx context.Context, typeName, metricLabel string, spec *startSpec) error {
        ...
        klog.V(4).InfoS("Creating container in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod))
        ...
        if msg, err := m.startContainer(ctx, podSandboxID, podSandboxConfig, spec, pod, podStatus, pullSecrets, podIP, podIPs); err != nil {
            ...
        }
        ...
    }

    // Step 5: start ephemeral containers
	for _, idx := range podContainerChanges.EphemeralContainersToStart {
		start(ctx, "ephemeral container", metrics.EphemeralContainer, ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx]))
	}

    if !utilfeature.DefaultFeatureGate.Enabled(features.SidecarContainers) {
        ...
    } else {
        // Step 6: start init containers.
		for _, idx := range podContainerChanges.InitContainersToStart {
            container := &pod.Spec.InitContainers[idx]
			// Start the next init container.
			if err := start(ctx, "init container", metrics.InitContainer, containerStartSpec(container)); err != nil {
                ...
            }

            // Successfully started the container; clear the entry in the failure
			klog.V(4).InfoS("Completed init container for pod", "containerName", container.Name, "pod", klog.KObj(pod))
        }
    }

    // Step 7: For containers in podContainerChanges.ContainersToUpdate[CPU,Memory] list, invoke UpdateContainerResources
	if isInPlacePodVerticalScalingAllowed(pod) {
		if len(podContainerChanges.ContainersToUpdate) > 0 || podContainerChanges.UpdatePodResources {
			m.doPodResizeAction(pod, podStatus, podContainerChanges, result)
		}
	}

    // Step 8: start containers in podContainerChanges.ContainersToStart.
	for _, idx := range podContainerChanges.ContainersToStart {
		start(ctx, "container", metrics.Container, containerStartSpec(&pod.Spec.Containers[idx]))
	}

	return

Kubelet.containerRuntime.SyncPod 中通过调用 runtime cri 接口创建 pod sandbox 和 container。以创建 pod sandbox 为例,在 kubeGenericRuntimeManager.createPodSandbox 中调用 kubeGenericRuntimeManager.instrumentedRuntimeService.RunPodSandbox 创建 pod sandbox:

func (m *kubeGenericRuntimeManager) createPodSandbox(ctx context.Context, pod *v1.Pod, attempt uint32) (string, string, error) {
    podSandboxConfig, err := m.generatePodSandboxConfig(pod, attempt)
    if err != nil {
        ...
    }

    // 创建 pod 的 log 目录
    err = m.osInterface.MkdirAll(podSandboxConfig.LogDirectory, 0755)
    ...

    podSandBoxID, err := m.runtimeService.RunPodSandbox(ctx, podSandboxConfig, runtimeHandler)
    if err != nil {
        ...
    }

    return podSandBoxID, "", nil
}

func (in instrumentedRuntimeService) RunPodSandbox(ctx context.Context, config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error) {
	...
	out, err := in.service.RunPodSandbox(ctx, config, runtimeHandler)
	...
	return out, err
}

func (r *remoteRuntimeService) RunPodSandbox(ctx context.Context, config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error) {
	...
	klog.V(10).InfoS("[RemoteRuntimeService] RunPodSandbox", "config", config, "runtimeHandler", runtimeHandler, "timeout", timeout)
	...
	resp, err := r.runtimeClient.RunPodSandbox(ctx, &runtimeapi.RunPodSandboxRequest{
		Config:         config,
		RuntimeHandler: runtimeHandler,
	})
    ...
}

// kubernetes/vendor/k8s.io/cri-api/pkg/apis/runtime/v1/api.pb.go
func (c *runtimeServiceClient) RunPodSandbox(ctx context.Context, in *RunPodSandboxRequest, opts ...grpc.CallOption) (*RunPodSandboxResponse, error) {
	out := new(RunPodSandboxResponse)
	err := c.cc.Invoke(ctx, "/runtime.v1.RuntimeService/RunPodSandbox", in, out, opts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}

可以看到,这里通过调用 cri 接口的 /runtime.v1.RuntimeService/RunPodSandbox 创建 pod sandbox,至于创建 container 也是类似,调用 runtime cri 的接口实现创建 pod 的 container。

2. 小结

本文从 kubelet 源码层面介绍了 pod 创建的流程,后续将重点看 runtime 是如何工作的。


标签:...,Kubernetes,err,ctx,kl,kubelet,源码,pod
From: https://www.cnblogs.com/xingzheanan/p/18202067

相关文章

  • 排队叫号系统项目源码开发搭建
    一款基于PHP开发的多项目多场景排队叫号系统,支持大屏幕投屏,语音播报叫号,可用于餐厅排队取餐、美甲店排队取号、排队领取、排队就诊、排队办理业务等诸多场景,助你轻松应对各种排队取号叫号场景。采用GatewayWorker开发完成的 程序开发:PHP+MySQL程序演示:http://jh1.yetukeji.t......
  • cmake以源码的方式引入第三方项目
    最前#本文将介绍一种以源码的方式引入第三方库的方法准备#主项目,需要引用第三方库的某些函数第三方库,以源码的形式提供给主项目使用注意:本文的背景:已经将第三方源码下载好。一个例子#我这里准备一个简单的项目,调用第三方库 fmt;其中fmt是以源码的形式引入项目cmake......
  • MySQL全文索引源码剖析之Insert语句执行过程
    本文分享自华为云社区《MySQL全文索引源码剖析之Insert语句执行过程》,作者:GaussDB数据库。1.背景介绍全文索引是信息检索领域的一种常用的技术手段,用于全文搜索问题,即根据单词,搜索包含该单词的文档,比如在浏览器中输入一个关键词,搜索引擎需要找到所有相关的文档,并且按相关性......
  • lodash已死?radash库方法介绍及源码解析 —— 函数柯里化 + Number篇
    写在前面tips:点赞+收藏=学会!主页有更多其他篇章的方法,欢迎访问查看。本篇我们继续介绍radash中函数柯里化和Number相关的方法使用和源码解析。函数柯里化chain:创建一个函数链并依次执行使用说明功能描述:用于创建一个函数链,该链依次执行一系列函数,每个函数的输出......
  • 关于学习VUE源码的感受! 学习VUE源码最好的方式 !!!
    仓库地址仓库whoelse666mini-vue崔学社mini-vue文章导航Vue3源码实战课|构建你自己的Vue3|掌握源码最有效的学习方法就是手写一遍!Vue3源码实战课阮一峰推荐最佳学习vue3源码的利器-mini-vue学习源码经历过程vue从出来到现在也有好些年了,相信几乎所所有从事......
  • 全网首一份!你最需要的PPTP MS-CHAP V2 挑战响应编程模拟计算教程!代码基于RFC2759,附全
    本文基于网络密码课上的实验本来想水一水就过去,代码就网上找找,不行就GPT写,但是!一份都找不到,找到的代码都是跑不了的,总会是就是乱七八糟。所以准备认真的写一份。代码编译成功的前提是要预先装好openssl库!本随笔主要有三个内容:编写程序,模拟计算NTResponse、AuthenticatorRespo......
  • 64-SpringBoot源码分析
    Starter是什么?我们如何使用这些Starter?为什么包扫描只会扫描核心启动类所在的包及其子包?在SpringBoot启动过程中,是如何完成自动配置的?内嵌Tomcat是如何创建并启动的?引入了web场景对应的Starter,SpringMVC是如何完成自动装配的?1.源码环境构建https://gith......
  • 一对一视频聊天源码,水印功能实现方案不容错过
    一对一视频聊天源码,水印功能实现方案不容错过一、基于原图生成水印图片(后端)这种方案就是将原图片添加水印之后生成了新图片,后续在一对一视频聊天源码前端页面进行展示是后端接口不返回原图片,而是返回带有水印的图片即可。这种方式最大的优点就是安全,因为水印图......
  • 一对一视频源码,实现限流对优化系统性能尤为重要
    一对一视频源码,实现限流对优化系统性能尤为重要,主要内容为滑动日志,令牌桶,漏桶三种限流算法的Java实现获取连接许可的接口一、滑动日志用一个有序集合来存储所有请求的时间戳,以空间换时间的方式来简化计算二、令牌桶利用延迟计算来维护令牌数量三、漏桶漏桶算法原理类似......
  • 一对一视频聊天源码,JDBC数据源隔离方法
    在开发一对一视频聊天源码时,数据隔离需要对DB,Redis,RabbitMQ进行数据隔离,接下来主要介绍一下JDBC数据源隔离方法。通过实现Spring动态数据源AbstractRoutingDataSource,通过ThreadLocal识别出来压测数据,如果是压测数据就路由到影子库,如果是正常流量则路由到主库,通过流量识别的改......