首页 > 编程语言 >Kubernetes:kube-scheduler 源码分析

Kubernetes:kube-scheduler 源码分析

时间:2024-02-01 11:15:59浏览次数:52  
标签:... sched err framework 源码 scheduler pod kube


0. 前言

[译] kubernetes:kube-scheduler 调度器代码结构概述 介绍了 kube-scheduler 的代码结构。本文围绕代码结构,从源码角度出发,分析 kube-scheduler 的调度逻辑。

1. 启动 kube-scheduler

kube-scheduler 使用 Cobra 框架初始化参数,配置和应用。

// kubernetes/cmd/kube-scheduler/scheduler.go
func main() {
    // 启动 kube-scheduler 入口
	command := app.NewSchedulerCommand()
	...
}

// kubernetes/cmd/kube-scheduler/app/server.go
func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
    // 创建 kube-scheduler 选项
	opts := options.NewOptions()

    cmd := &cobra.Command{
		Use: "kube-scheduler",
		...
		RunE: func(cmd *cobra.Command, args []string) error {
			return runCommand(cmd, opts, registryOptions...)
		},
        ...
    }
    ...
}

// 运行 kube-scheduler
func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Option) error {
	...
    // 创建 kube-scheduler 配置 cc
    // 创建 kube-scheduler 实例 sched
	cc, sched, err := Setup(ctx, opts, registryOptions...)
	if err != nil {
		return err
	}
	...
	return Run(ctx, cc, sched)
}

从启动命令来看,这里重点关注的是 Setup 函数。在该函数内,创建 kube-scheduler 配置 cc 和调度器实例 sched

func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) {
    ...
    // 验证选项
	if errs := opts.Validate(); len(errs) > 0 {
		return nil, nil, utilerrors.NewAggregate(errs)
	}

    // 根据选项创建配置 c
	c, err := opts.Config(ctx)
	if err != nil {
		return nil, nil, err
	}

    // 补充配置为完整配置
    cc := c.Complete()

    // 外部注册插件
    outOfTreeRegistry := make(runtime.Registry)
	for _, option := range outOfTreeRegistryOptions {
		if err := option(outOfTreeRegistry); err != nil {
			return nil, nil, err
		}
	}

    ...
	// 创建调度器实例 sched
    sched, err := scheduler.New(ctx,
		cc.Client,
		cc.InformerFactory,
		cc.DynInformerFactory,
		recorderFactory,
		scheduler.WithComponentConfigVersion(cc.ComponentConfig.TypeMeta.APIVersion),
		scheduler.WithKubeConfig(cc.KubeConfig),
		scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
		scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
		scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
		scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
		scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
		scheduler.WithPodMaxInUnschedulablePodsDuration(cc.PodMaxInUnschedulablePodsDuration),
		scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
		scheduler.WithParallelism(cc.ComponentConfig.Parallelism),
		scheduler.WithBuildFrameworkCapturer(func(profile kubeschedulerconfig.KubeSchedulerProfile) {
			// Profiles are processed during Framework instantiation to set default plugins and configurations. Capturing them for logging
			completedProfiles = append(completedProfiles, profile)
		}),
	)

    ...
    return &cc, sched, nil
}

函数 scheduler.New 创建调度器实例 sched,进入函数内查看实例是如何创建的。

func New(ctx context.Context,
	client clientset.Interface,
	informerFactory informers.SharedInformerFactory,
	dynInformerFactory dynamicinformer.DynamicSharedInformerFactory,
	recorderFactory profile.RecorderFactory,
	opts ...Option) (*Scheduler, error) {
    ...
    // 注册内置插件
    registry := frameworkplugins.NewInTreeRegistry()

    // merge 内置插件和外部注册插件
	if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil {
		return nil, err
	}

    // 注册指标
    metrics.Register()

    // 注册外部扩展器
	extenders, err := buildExtenders(logger, options.extenders, options.profiles)
	if err != nil {
		return nil, fmt.Errorf("couldn't build extenders: %w", err)
	}

    // 实例化 podLister 负责监控 pod 变化
    podLister := informerFactory.Core().V1().Pods().Lister()
    // 实例化 nodeLister 负责监控 node 变化
	nodeLister := informerFactory.Core().V1().Nodes().Lister()

    // 创建 snapshot,snapshot 作为缓存存在
	snapshot := internalcache.NewEmptySnapshot()

    ...
    // 创建 profiles,profiles 中存储的是调度器框架
	profiles, err := profile.NewMap(ctx, options.profiles, registry, recorderFactory,
		frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
		frameworkruntime.WithClientSet(client),
		frameworkruntime.WithKubeConfig(options.kubeConfig),
		frameworkruntime.WithInformerFactory(informerFactory),
		frameworkruntime.WithSnapshotSharedLister(snapshot),
		frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)),
		frameworkruntime.WithParallelism(int(options.parallelism)),
		frameworkruntime.WithExtenders(extenders),
		frameworkruntime.WithMetricsRecorder(metricsRecorder),
	)

    // 创建 preEnqueuePlugin 插件
    preEnqueuePluginMap := make(map[string][]framework.PreEnqueuePlugin)
    ...

    // 创建优先级队列 podQueue
    podQueue := internalqueue.NewSchedulingQueue(
		profiles[options.profiles[0].SchedulerName].QueueSortFunc(),
		informerFactory,
		internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second),
		internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second),
		internalqueue.WithPodLister(podLister),
		internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration),
		internalqueue.WithPreEnqueuePluginMap(preEnqueuePluginMap),
		internalqueue.WithQueueingHintMapPerProfile(queueingHintsPerProfile),
		internalqueue.WithPluginMetricsSamplePercent(pluginMetricsSamplePercent),
		internalqueue.WithMetricsRecorder(*metricsRecorder),
	)

    ...
    // 创建调度器缓存
    schedulerCache := internalcache.New(ctx, durationToExpireAssumedPod)
    ...

    // 实例化调度器
    sched := &Scheduler{
		Cache:                    schedulerCache,
		client:                   client,
		nodeInfoSnapshot:         snapshot,
		percentageOfNodesToScore: options.percentageOfNodesToScore,
		Extenders:                extenders,
		StopEverything:           stopEverything,
		SchedulingQueue:          podQueue,
		Profiles:                 profiles,
		logger:                   logger,
	}

    // 将队列的 Pop 方法赋值给 sched.NextPod
	sched.NextPod = podQueue.Pop
	...

    // 添加 Event 回调 handler
	if err = addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(queueingHintsPerProfile)); err != nil {
		return nil, fmt.Errorf("adding event handlers: %w", err)
	}

	return sched, nil
}

scheduler.New 创建了 snapshot, eventHandler, profiles(framework)cache 等对象,结合着调度框架将它们关联起来会更清晰。

2. 运行 kube-scheduler

创建完各个对象之后,接下来运行 kube-scheduler 将各个对象关联起来运行。

func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {
    ...
    // 选举 leader
    waitingForLeader := make(chan struct{})
	isLeader := func() bool {
		select {
		case _, ok := <-waitingForLeader:
			// if channel is closed, we are leading
			return !ok
		default:
			// channel is open, we are waiting for a leader
			return false
		}
	}

    ...
    // 运行 informer
    startInformersAndWaitForSync := func(ctx context.Context) {
		// Start all informers.
		cc.InformerFactory.Start(ctx.Done())
		// DynInformerFactory can be nil in tests.
		if cc.DynInformerFactory != nil {
			cc.DynInformerFactory.Start(ctx.Done())
		}

		// Wait for all caches to sync before scheduling.
		cc.InformerFactory.WaitForCacheSync(ctx.Done())
		// DynInformerFactory can be nil in tests.
		if cc.DynInformerFactory != nil {
			cc.DynInformerFactory.WaitForCacheSync(ctx.Done())
		}

		// Wait for all handlers to sync (all items in the initial list delivered) before scheduling.
		if err := sched.WaitForHandlersSync(ctx); err != nil {
			logger.Error(err, "waiting for handlers to sync")
		}

		logger.V(3).Info("Handlers synced")
	}
	if !cc.ComponentConfig.DelayCacheUntilActive || cc.LeaderElection == nil {
		startInformersAndWaitForSync(ctx)
	}

    // leader 节点运行调度逻辑,暂略
    if cc.LeaderElection != nil {
        ...
    }

    close(waitingForLeader)
	sched.Run(ctx)
	return fmt.Errorf("finished without leader elect")
}

Run 函数内包含三部分处理:

  • 选举 leader 节点。如果是单节点,则跳过选举。
  • 运行 informer,负责监控 pod 和 node 变化。
  • 运行调度器

进入 sched.Run 查看调度器是如何运行的。

func (sched *Scheduler) Run(ctx context.Context) {
	...
    // 从队列中去需要调度的 pod
	sched.SchedulingQueue.Run(logger)

	// 调度 pod
	go wait.UntilWithContext(ctx, sched.scheduleOne, 0)

	<-ctx.Done()
	...
}

sched.Run 主要做了两件事。从优先级队列中取用于调度的 pod,然后通过 sched.scheduleOne 调度该 pod。

首先,看取调度 pod 的过程,如下。

func (p *PriorityQueue) Run(logger klog.Logger) {
	go wait.Until(func() {
		p.flushBackoffQCompleted(logger)
	}, 1.0*time.Second, p.stop)
	go wait.Until(func() {
		p.flushUnschedulablePodsLeftover(logger)
	}, 30*time.Second, p.stop)
}

优先级队列由 ActiveQBackoffQUnschedulableQ 组成,其逻辑关系如下。

PriorityQueue.Run 中启动两个 goroutine 分别运行 p.flushBackoffQCompletedp.flushUnschedulablePodsLeftover 方法。p.flushBackoffQCompleted 将处于 BackOffQ 的 pod 移到 ActiveQp.flushUnschedulablePodsLeftoverUnschedulableQ 的 pod 移到 ActiveQ 或者 BackOffQ。详细取调度 pod 的逻辑可查看 kube-scheduler 调度队列

接着,进入 sched.scheduleOne 查看 pod 是怎么调度的。

func (sched *Scheduler) scheduleOne(ctx context.Context) {
	...
	// 获取需要调度的 pod
	podInfo, err := sched.NextPod(logger)

	...
	// 进入调度循环调度 pod
	scheduleResult, assumedPodInfo, status := sched.schedulingCycle(schedulingCycleCtx, state, fwk, podInfo, start, podsToActivate)
	if !status.IsSuccess() {
		sched.FailureHandler(schedulingCycleCtx, fwk, assumedPodInfo, status, scheduleResult.nominatingInfo, start)
		return
	}

	// 进入绑定循环绑定 pod
	go func() {
		...
		status := sched.bindingCycle(bindingCycleCtx, state, fwk, scheduleResult, assumedPodInfo, start, podsToActivate)
		...
	}()
}

sched.scheduleOne 主要包括三部分:获取需要调度的 pod,进入调度循环调度 pod 和进入绑定循环绑定 pod。其逻辑结构如下。

进一步,查看每一部分的源码。

2.1 sched.NextPod 获取需要调度的 pod

func (p *PriorityQueue) Pop(logger klog.Logger) (*framework.QueuedPodInfo, error) {
	...
	for p.activeQ.Len() == 0 {
		if p.closed {
			logger.V(2).Info("Scheduling queue is closed")
			return nil, nil
		}

		// 如果 activeQ 没有 pod 的话,阻塞等待
		p.cond.Wait()
	}

	// 从 activeQ 中取 pod
	obj, err := p.activeQ.Pop()
	if err != nil {
		return nil, err
	}
	pInfo := obj.(*framework.QueuedPodInfo)
	...

	return pInfo, nil
}

sched.NextPod 的逻辑主要是看 activeQ 队列中有没有 pod,如果有的话,取 pod 调度。如果没有的话,阻塞等待,直到 activeQ 中有 pod。

2.2 sched.schedulingCycle 调度 pod

func (sched *Scheduler) schedulingCycle(
	ctx context.Context,
	state *framework.CycleState,
	fwk framework.Framework,
	podInfo *framework.QueuedPodInfo,
	start time.Time,
	podsToActivate *framework.PodsToActivate,
) (ScheduleResult, *framework.QueuedPodInfo, *framework.Status) {
	...
	// 调度 Pod
	scheduleResult, err := sched.SchedulePod(ctx, fwk, state, pod)
	...

	assumedPodInfo := podInfo.DeepCopy()
	assumedPod := assumedPodInfo.Pod
	err = sched.assume(logger, assumedPod, scheduleResult.SuggestedHost)
	...

	// 运行 Reserve 插件的 Reserve 方法
	if sts := fwk.RunReservePluginsReserve(ctx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
		...
	}

	// 运行 Permit 插件
	runPermitStatus := fwk.RunPermitPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
	if !runPermitStatus.IsWait() && !runPermitStatus.IsSuccess() {
		...
	}

	...
	return scheduleResult, assumedPodInfo, nil
}

sched.schedulingCycle 包含几个步骤:sched.SchedulePod 调度 Pod,将调度的还未绑定的 Pod 作为 assumedPod 添加到缓存,运行 Reserve 插件和 Permit 插件。

首先,看 sched.SchedulePod 是怎么调度 Pod 的。

func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
	feasibleNodes, diagnosis, err := sched.findNodesThatFitPod(ctx, fwk, state, pod)
	if err != nil {
		return result, err
	}
	...
}

sched.SchedulePod 中,sched.findNodesThatFitPod 为 Pod 寻找合适的节点。

// kubernetes/pkg/scheduler/schedule_one.go
func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*framework.NodeInfo, framework.Diagnosis, error) {
	...
	// 从 snapshot 中取所有节点
	allNodes, err := sched.nodeInfoSnapshot.NodeInfos().List()
	if err != nil {
		return nil, diagnosis, err
	}

	preRes, s := fwk.RunPreFilterPlugins(ctx, state, pod)
	if !s.IsSuccess() {
		...
	}

	...
	// 寻找 pod 可调用的节点
	feasibleNodes, err := sched.findNodesThatPassFilters(ctx, fwk, state, pod, &diagnosis, nodes)
	...
}

// kubernetes/pkg/scheduler/schedule_one.go
func (sched *Scheduler) findNodesThatPassFilters(
	ctx context.Context,
	fwk framework.Framework,
	state *framework.CycleState,
	pod *v1.Pod,
	diagnosis *framework.Diagnosis,
	nodes []*framework.NodeInfo) ([]*framework.NodeInfo, error) {
	...
	checkNode := func(i int) {
		...
		status := fwk.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo)
	}
	...
}

// kubernetes/pkg/scheduler/framework/runtime/framework.go
func (f *frameworkImpl) RunFilterPluginsWithNominatedPods(ctx context.Context, state *framework.CycleState, pod *v1.Pod, info *framework.NodeInfo) *framework.Status {
	...
	for i := 0; i < 2; i++ {
		...
		// 运行 Filter 插件
		status = f.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse)
		if !status.IsSuccess() && !status.IsRejected() {
			return status
		}
	}

	return status
}

sched.findNodesThatFitPod 运行 Filter 插件获取可用的节点 feasibleNodes。接着,如果可用的节点只有一个,则返回调度结果。如果有多个节点则运行 priority 插件寻找最合适的节点作为调度节点。逻辑如下。

func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
	...
	feasibleNodes, diagnosis, err := sched.findNodesThatFitPod(ctx, fwk, state, pod)
	if err != nil {
		return result, err
	}

	...
	if len(feasibleNodes) == 1 {
		return ScheduleResult{
			SuggestedHost:  feasibleNodes[0].Node().Name,
			EvaluatedNodes: 1 + len(diagnosis.NodeToStatusMap),
			FeasibleNodes:  1,
		}, nil
	}

	priorityList, err := sched.prioritizeNodes(ctx, fwk, state, pod, feasibleNodes)
	if err != nil {
		return result, err
	}

	host, _, err := selectHost(priorityList, numberOfHighestScoredNodesToReport)
	...

	return ScheduleResult{
		SuggestedHost:  host,
		EvaluatedNodes: len(feasibleNodes) + len(diagnosis.NodeToStatusMap),
		FeasibleNodes:  len(feasibleNodes),
	}, err

获得调度结果 scheduleResult 后,在 sched.schedulingCycle 中的 sched.assume 将 assumePod 的 NodeName 更新为调度的节点 host,并且将 assumePod 添加到缓存中。缓存允许运行假定的操作,该操作将 Pod 临时存储在缓存中,使得 Pod 看起来像已经在快照的所有消费者的指定节点上运行那样。假定操作忽视了 kube-apiserver 和 Pod 实际更新的时间,从而增加调度器的吞吐量。

func (sched *Scheduler) assume(logger klog.Logger, assumed *v1.Pod, host string) error {
	assumed.Spec.NodeName = host

	if err := sched.Cache.AssumePod(logger, assumed); err != nil {
		logger.Error(err, "Scheduler cache AssumePod failed")
		return err
	}
	...
	return nil
}

// kubernetes/pkg/scheduler/internal/cache/cache.go
func (cache *cacheImpl) AssumePod(logger klog.Logger, pod *v1.Pod) error {
	...
	return cache.addPod(logger, pod, true)
}

继续如 调度框架 所示,在 sched.schedulingCycle 中执行 ReservePermit 插件,插件执行通过后调度周期返回 Pod 的调度结果。

接着,进入绑定周期。

2.3 绑定周期

绑定周期是一个异步的 goroutine,负责将调度到节点的 Pod 发送给 kube-apiserver。进入绑定周期查看绑定逻辑的实现。

// kubernetes/pkg/scheduler/schedule_one.go
func (sched *Scheduler) scheduleOne(ctx context.Context) {
	...
	// 调度周期返回调度结果
	scheduleResult, assumedPodInfo, status := sched.schedulingCycle(schedulingCycleCtx, state, fwk, podInfo, start, podsToActivate)
	if !status.IsSuccess() {
		sched.FailureHandler(schedulingCycleCtx, fwk, assumedPodInfo, status, scheduleResult.nominatingInfo, start)
		return
	}

	// 绑定周期绑定调度结果
	go func() {
		...
		status := sched.bindingCycle(bindingCycleCtx, state, fwk, scheduleResult, assumedPodInfo, start, podsToActivate)
		if !status.IsSuccess() {
			sched.handleBindingCycleError(bindingCycleCtx, state, fwk, assumedPodInfo, start, scheduleResult, status)
			return
		}
		...
	}()
}

func (sched *Scheduler) bindingCycle(
	ctx context.Context,
	state *framework.CycleState,
	fwk framework.Framework,
	scheduleResult ScheduleResult,
	assumedPodInfo *framework.QueuedPodInfo,
	start time.Time,
	podsToActivate *framework.PodsToActivate) *framework.Status {
	...
	// 运行 Permit 插件
	if status := fwk.WaitOnPermit(ctx, assumedPod); !status.IsSuccess() {
		...
	}

	// 运行 PreBind 插件
	if status := fwk.RunPreBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost); !status.IsSuccess() {
		...
	}

	// 运行 Bind 插件
	if status := sched.bind(ctx, fwk, assumedPod, scheduleResult.SuggestedHost, state); !status.IsSuccess() {
		return status
	}

	// 运行 PostBind 插件
	fwk.RunPostBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
	...
}

可以看到,绑定周期运行一系列插件进行绑定,进入 Bind 插件查看绑定的行为。

func (sched *Scheduler) bind(ctx context.Context, fwk framework.Framework, assumed *v1.Pod, targetNode string, state *framework.CycleState) (status *framework.Status) {
	...
	return fwk.RunBindPlugins(ctx, state, assumed, targetNode)
}

// kubernetes/pkg/scheduler/framework/runtime/framework.go
func (f *frameworkImpl) RunBindPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) {
	...
	for _, pl := range f.bindPlugins {
		status = f.runBindPlugin(ctx, pl, state, pod, nodeName)
		if status.IsSkip() {
			continue
		}
		...
	}
	...
}

func (f *frameworkImpl) runBindPlugin(ctx context.Context, bp framework.BindPlugin, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
	...
	status := bp.Bind(ctx, state, pod, nodeName)
	...
	return status
}

// kubernetes/pkg/scheduler/plugins/defaultbinder/default_binder.go
func (b DefaultBinder) Bind(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status {
	...
	logger.V(3).Info("Attempting to bind pod to node", "pod", klog.KObj(p), "node", klog.KRef("", nodeName))
	binding := &v1.Binding{
		ObjectMeta: metav1.ObjectMeta{Namespace: p.Namespace, Name: p.Name, UID: p.UID},
		Target:     v1.ObjectReference{Kind: "Node", Name: nodeName},
	}
	err := b.handle.ClientSet().CoreV1().Pods(binding.Namespace).Bind(ctx, binding, metav1.CreateOptions{})
	if err != nil {
		return framework.AsStatus(err)
	}
	return nil
}

在 Bind 插件中调用 ClientSet 的 Bind 方法将 Pod 和 node 绑定的结果发给 kube-apiserver,实现绑定操作。

3. 总结

本文从源码角度分析了 kube-scheduler 的调度流程,力图做到知其然知其所以然。


标签:...,sched,err,framework,源码,scheduler,pod,kube
From: https://www.cnblogs.com/xingzheanan/p/18000774

相关文章

  • Mybatis 源码系列:领略设计模式在 Mybatis 其中的应用
    目录一、Builder模式二、工厂模式三、单例模式四、代理模式五、组合模式六、模板方式模式七、适配器模式八、装饰器模式九、迭代器模式虽然我们都知道有23种设计模式,但是大多停留在概念层面,真实开发中很少遇到,Mybatis源码中使用了大量的设计模式,阅读源码并观察设计模式在其中的应......
  • 龙蜥8.6 源码安装python3.12
    ​ 闲来无事用虚拟机安装了一下龙蜥系统。[root@localhosthome]#cat/etc/*release*AnolisOSrelease8.6NAME="AnolisOS"VERSION="8.6"ID="anolis"ID_LIKE="rhelfedoracentos"VERSION_ID="8.6"PLATFORM_ID="platform:an......
  • 用 Python 实现ChatGPT OpenAI(直接上源码)
    网上一大堆教程,好多讲的很墨迹,你需要折腾半天才能调试通,up这里给大家直接上源码干货。详细教程后面补充,着急使用的可以直接拿走调试说明到openai里面替换你自己的app_keyhttps://platform.openai.com/登录账号登录之后,点击右上角“Personal”,展开菜单,找到“ViewAP......
  • Pass Artifact between tfx compoents when running with kubeflow pipeline
    WhatisArtifact?AnArtifactisafileordirectoryproducedbyatfxcomponent,whichcanbepassedtoadownstreamcomponent,andthenthedownstreamcomponentcanuseit.HowdoestfxpassanArtifactbetweencomponents?tfxpipelinehasanargument......
  • Corretto-11源码-Java命令入口
    背景由于工作中需要开发编译器,开始阅读JavaC和JDK源码了解相关过程,并做出相关整理参考本文参考ChatGPT相关解释(很多内容都是杜撰,不可信),进行自我理解后整理发出项目https://github.com/corretto/corretto-11入口(src/java.base/share/native/libjli/java.c)入口文件为java.c......
  • 基于springboot开发的工作流系统,bpmn.js,vue源码及功能分析(activiti)
    前言activiti工作流引擎项目,企业erp、oa、hr、crm等企事业办公系统轻松落地,一套完整并且实际运用在多套项目中的案例,满足日常业务流程审批需求。一、项目形式springboot+vue+activiti集成了activiti在线编辑器,流行的前后端分离部署开发模式,快速开发平台,可插拔工作流服务。工作......
  • Semaphore源码阅读
    目录简介代码分析成员变量方法SyncNonFairSyncFairSync本人的源码阅读主要聚焦于类的使用场景,一般只在java层面进行分析,没有深入到一些native方法的实现。并且由于知识储备不完整,很可能出现疏漏甚至是谬误,欢迎指出共同学习本文基于corretto-17.0.9源码,参考本文时请打开相应的源......
  • 一站式企事业内部培训考学平台源码及功能剖析,在线移动培训考学平台,企业版抖音
    企业培训考学知识库管理系统是一个综合性的平台,用于支持企业的培训和考试需求。1.文档管理及在线预览:1.系统支持上传各种类型的文档,如Word、PDF、PPT、Excel等。2.用户可以直接在线预览这些文档,无需下载。3.对于视频格式的资料,系统也提供了在线播放的功能。2.在线考试与试题......
  • 使用kubectl中的强制删除pod命令
    1.使用kubectl中的强制删除命令kubectldeletepodnginx-nmmp--force--grace-period=02.删除非正常的poda.查询出所有要删除的podNamekubectlgetpo|grepUnknown|awk'{print$1}'b.批量删除kubectldeletepod`kubectlgetpo|grepUnknown|awk'{print$1}'`-......
  • 在K8S中,apiservice与kube-schedule高可用原理?
    在Kubernetes(简称K8s)中,为了实现高可用性(HA),不同的组件有不同的机制:kube-apiserver高可用原理:负载均衡:在一个集群中,通常会部署多个kube-apiserver实例,并通过负载均衡器(如云服务商的负载均衡服务或硬件负载均衡器,或者是内部软件如NGINX等)对外提供统一入口。这样可以确保即使有单......