首页 > 其他分享 >kube-scheduler 启动分析

kube-scheduler 启动分析

时间:2023-08-21 15:23:18浏览次数:27  
标签:Run func 启动 client scheduler go informer kube

先看一段 kubernetes scheduler 的描述:

The Kubernetes scheduler is a control plane process which assigns Pods to Nodes. 
The scheduler determines which Nodes are valid placements for each Pod in the scheduling queue according to constraints and available resources. 
The scheduler then ranks each valid Node and binds the Pod to a suitable Node. 
Multiple different schedulers may be used within a cluster; kube-scheduler is the reference implementation. 

简单地讲:kube-scheduler 监听 pod 和 node,负责把 pod 分配给 node。

在 cmd/kube-scheduler/app/server.go:298 的 Setup 函数中断点,跟读代码

// pkg/scheduler/scheduler.go:395
// NewInformerFactory creates a SharedInformerFactory and initializes a scheduler specific // in-place podInformer. func NewInformerFactory(cs clientset.Interface, resyncPeriod time.Duration) informers.SharedInformerFactory { informerFactory := informers.NewSharedInformerFactory(cs, resyncPeriod) informerFactory.InformerFor(&v1.Pod{}, newPodInformer) return informerFactory }

informerFactory 负责管理和创建 informer,而 informer 则是 node 和 pod 等资源的监听者。informerFactory 维持一个 map,把所有的 informer 放在该 map 中:

// vendor/k8s.io/client-go/informers/factory.go:164
// InternalInformerFor returns the SharedIndexInformer for obj using an internal // client. func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer { f.lock.Lock() defer f.lock.Unlock() informerType := reflect.TypeOf(obj) informer, exists := f.informers[informerType] if exists { return informer } resyncPeriod, exists := f.customResync[informerType] if !exists { resyncPeriod = f.defaultResync } informer = newFunc(f.client, resyncPeriod) f.informers[informerType] = informer return informer }

刚启动时,尝试从 map 中获取 podInformer, 未找到则创建一个,并放置到 map 中:

 

创建 informerFactory 后,分别为 pod 和 node 创建 informer,其中 pod 的 informer 是默认创建的。

接下来跟读创建 podInformer 的过程:

// pkg/scheduler/scheduler.go:500
// newPodInformer creates a shared index informer that returns only non-terminal pods. func newPodInformer(cs clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { selector := fmt.Sprintf("status.phase!=%v,status.phase!=%v", v1.PodSucceeded, v1.PodFailed) tweakListOptions := func(options *metav1.ListOptions) { options.FieldSelector = selector } return coreinformers.NewFilteredPodInformer(cs, metav1.NamespaceAll, resyncPeriod, nil, tweakListOptions) }

 

从 metav1.NamespaceAll 这个入参可以看出,创建的  podInfromer 会监听所有 namespace 的 pod。

继续深挖,终于看到了 ListWatch 对象:

// vendor/k8s.io/client-go/informers/core/v1/pod.go:58
// NewFilteredPodInformer constructs a new informer for Pod type. // Always prefer using an informer factory to get a shared informer instead of getting an independent // one. This reduces memory footprint and number of connections to the server. func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { return cache.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { if tweakListOptions != nil { tweakListOptions(&options) } return client.CoreV1().Pods(namespace).List(context.TODO(), options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { if tweakListOptions != nil { tweakListOptions(&options) } return client.CoreV1().Pods(namespace).Watch(context.TODO(), options) }, }, &corev1.Pod{}, resyncPeriod, indexers, ) }

此处的 ListWatch 对象封装了对 pod 的 list 和 watch 操作,具体逻辑跟读运行时代码才能看到:

接下来,跟一跟 list watch 的运行原理,schduler 在启动时,其中一步是启动所有的 informers:

// cmd/kube-scheduler/app/server.go:145
// Run executes the scheduler based on the given configuration. It only returns on error or when context is done. func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error { // To help debugging, immediately log version klog.InfoS("Starting Kubernetes Scheduler", "version", version.Get()) klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK")) // Config registration. if cz, err := configz.New("componentconfig"); err == nil { cz.Set(cc.ComponentConfig) } else { return fmt.Errorf("unable to register configz: %s", err) } // Prepare the event broadcaster. cc.EventBroadcaster.StartRecordingToSink(ctx.Done()) // Setup healthz checks. var checks []healthz.HealthChecker if cc.ComponentConfig.LeaderElection.LeaderElect { checks = append(checks, cc.LeaderElection.WatchDog) } waitingForLeader := make(chan struct{}) isLeader := func() bool { select { case _, ok := <-waitingForLeader: // if channel is closed, we are leading return !ok default: // channel is open, we are waiting for a leader return false } } // Start up the healthz server. if cc.SecureServing != nil { handler := buildHandlerChain(newHealthzAndMetricsHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer) // TODO: handle stoppedCh and listenerStoppedCh returned by c.SecureServing.Serve if _, _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil { // fail early for secure handlers, removing the old error loop from above return fmt.Errorf("failed to start secure server: %v", err) } } // Start all informers. cc.InformerFactory.Start(ctx.Done()) // DynInformerFactory can be nil in tests. if cc.DynInformerFactory != nil { cc.DynInformerFactory.Start(ctx.Done()) } // Wait for all caches to sync before scheduling. cc.InformerFactory.WaitForCacheSync(ctx.Done()) // DynInformerFactory can be nil in tests. if cc.DynInformerFactory != nil { cc.DynInformerFactory.WaitForCacheSync(ctx.Done()) } // If leader election is enabled, runCommand via LeaderElector until done and exit. if cc.LeaderElection != nil { cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{ OnStartedLeading: func(ctx context.Context) { close(waitingForLeader) sched.Run(ctx) }, OnStoppedLeading: func() { select { case <-ctx.Done(): // We were asked to terminate. Exit 0. klog.InfoS("Requested to terminate, exiting") os.Exit(0) default: // We lost the lock. klog.ErrorS(nil, "Leaderelection lost") klog.FlushAndExit(klog.ExitFlushTimeout, 1) } }, } leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection) if err != nil { return fmt.Errorf("couldn't create leader elector: %v", err) } leaderElector.Run(ctx) return fmt.Errorf("lost lease") } // Leader election is disabled, so runCommand inline until done. close(waitingForLeader) sched.Run(ctx) return fmt.Errorf("finished without leader elect") }

 

遍历 factory 之 map 中保存的的 informer,逐个启动:

// vendor/k8s.io/client-go/informers/factory.go:128
// Start initializes all requested informers. func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) { f.lock.Lock() defer f.lock.Unlock() for informerType, informer := range f.informers { if !f.startedInformers[informerType] { go informer.Run(stopCh) f.startedInformers[informerType] = true } } }

 

 

我们看 podInformer 这个实例的 Run 过程:

 1 // vendor/k8s.io/client-go/tools/cache/shared_informer.go:397 Run 方法
 2 
 3 func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
 4     defer utilruntime.HandleCrash()
 5 
 6     if s.HasStarted() {
 7         klog.Warningf("The sharedIndexInformer has started, run more than once is not allowed")
 8         return
 9     }
10     fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
11         KnownObjects:          s.indexer,
12         EmitDeltaTypeReplaced: true,
13     })
14 
15     cfg := &Config{
16         Queue:            fifo,
17         ListerWatcher:    s.listerWatcher,
18         ObjectType:       s.objectType,
19         FullResyncPeriod: s.resyncCheckPeriod,
20         RetryOnError:     false,
21         ShouldResync:     s.processor.shouldResync,
22 
23         Process:           s.HandleDeltas,
24         WatchErrorHandler: s.watchErrorHandler,
25     }
26 
27     func() {
28         s.startedLock.Lock()
29         defer s.startedLock.Unlock()
30 
31         s.controller = New(cfg)
32         s.controller.(*controller).clock = s.clock
33         s.started = true
34     }()
35 
36     // Separate stop channel because Processor should be stopped strictly after controller
37     processorStopCh := make(chan struct{})
38     var wg wait.Group
39     defer wg.Wait()              // Wait for Processor to stop
40     defer close(processorStopCh) // Tell Processor to stop
41     wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
42     wg.StartWithChannel(processorStopCh, s.processor.run)
43 
44     defer func() {
45         s.startedLock.Lock()
46         defer s.startedLock.Unlock()
47         s.stopped = true // Don't want any new listeners
48     }()
49     s.controller.Run(stopCh)
50 }

 

 继续跟踪 s.controller.Run()

 1 // vendor/k8s.io/client-go/tools/cache/controller.go:128
 2 // Run begins processing items, and will continue until a value is sent down stopCh or it is closed.
 3 // It's an error to call Run more than once.
 4 // Run blocks; call via go.
 5 func (c *controller) Run(stopCh <-chan struct{}) {
 6     defer utilruntime.HandleCrash()
 7     go func() {
 8         <-stopCh
 9         c.config.Queue.Close()
10     }()
11     r := NewReflector(
12         c.config.ListerWatcher,
13         c.config.ObjectType,
14         c.config.Queue,
15         c.config.FullResyncPeriod,
16     )
17     r.ShouldResync = c.config.ShouldResync
18     r.WatchListPageSize = c.config.WatchListPageSize
19     r.clock = c.clock
20     if c.config.WatchErrorHandler != nil {
21         r.watchErrorHandler = c.config.WatchErrorHandler
22     }
23 
24     c.reflectorMutex.Lock()
25     c.reflector = r
26     c.reflectorMutex.Unlock()
27 
28     var wg wait.Group
29 
30     wg.StartWithChannel(stopCh, r.Run)
31 
32     wait.Until(c.processLoop, time.Second, stopCh)
33     wg.Wait()
34 }

 

wg.StartWithChannel(stopCh, r.Run) 进入到 Reflector 的 Run 方法中,Reflector 是封装了 list-watch 逻辑的客户端。

 

标签:Run,func,启动,client,scheduler,go,informer,kube
From: https://www.cnblogs.com/allenwas3/p/17580460.html

相关文章

  • Linux设置开机启动
    1.建立一个sh文件tee/etc/init.d/myservice.sh<<EOF#!/bin/sh#此脚本用于启动和停止my_service服务。start(){echo"Startingmy_serviceservice..."/usr/bin/my_servicestartecho"my_serviceservicestartedsuccessfully."}stop(){e......
  • Asp.net Core Web API 启动时出现报错Failed to load API definition
    1、新建的Asp.netCoreWebAPI项目启动时一般是没有问题的,如果在controller下增加多个操作后再启动会出现swagger的报错-FailedtoloadAPIdefinition,基本是由于以下两种原因,分别排查即可: (1)如上图1位置:默认webapi模板创建时只有一个方法所以没有[action] 当我们增加多个......
  • mysql在启动时报错"Failed to open log xxxxxx/mysql-bin.000003 not found,errno 2"
    问题描述:mysql在启动时报错"Failedtoopenlogxxxxxx/mysql-bin.000003notfound,errno2",如下所示:数据库:mysql5.5.18系统:rhel6.564位架构:一主一从场景描述:主库最新binlog文件被手动删除后,重启数据库报错.1、异常重现23082014:52:19InnoDB:1.1.8started;logseque......
  • springboot添加启动事件监听
    当我们需要在springboot启动时做的第一件事,可以通过添加事件监听实现,具体如下:添加事件importorg.springframework.boot.context.event.ApplicationStartingEvent;importorg.springframework.context.ApplicationListener;publicclassMyEventimplementsApplicationLis......
  • 如何快速在 Kubernetes 集群中新建用户
    如何快速在Kubernetes集群中新建用户Se7en 奇妙的Linux世界 2023-08-1911:59 发表于重庆收录于合集#Kubernetes274个#云原生261个#Docker197个#程序员421个公众号关注 「奇妙的Linux世界」设为「星标」,每天带你玩转Linux! Kubernetes中的用户K8S中......
  • dasctf2023 june toka garden & bios-mbr os 启动流程
    前言被纯真拉来看题楽。日常忏悔没有学好操作系统。借着dasctf6tokagarden了解了下操作系统bios-mbr的启动流程。bios-mbr启动流程启动(boot)一词来自于一句谚语"pulloneselfupbyone'sbootstraps"("拽着鞋带把自己拉起来")这当然是不可能的事情。最早的时候,工程师......
  • Kubernetes 服务发布 Service labels和selector
    上节讲了创建无状态应用统一使用deployment的方式来去创建应用通过rs挂载你响应的pod,但是有一个问题不管是deployment还是replicaSet他们没有一个统一的ip地址,只有在每个pod上有IP,那我们要去访问一个对应的功能的时候,只能通过单个ip地址的方式去访问,这种方式会有问题,他无法实......
  • 解决虚拟上docker 启动ES访问不成功
    可能原因是ES太耗内存了,我发现启动了ES容器后,像dockerps这种命令都很慢而且查看日志发现进行垃圾回收出现错误。所以判断可能是因为ES太耗内存。采用以下命令启动容器,应该可以解决。dockerrun-id--nameelasticsearch-d--restart=always-p9200:9200-p9300:9300-v/usr......
  • Ruoyi集成flyway后启动报错
    ruoyi系列框架是开源中非常好的源码平台,使用宽松的开源协议进行源代码的开放。不管是单体版、前后端分离甚至是微服务架构,均提供了相应的代码。基于ruoyi可以做自己的后台系统,也可以学习很多技术的集成。而flyway是java里面的数据库脚本自动管理工具,使用flyway可以在应用程序升级时......
  • KubeSphere 社区双周报 | Java functions framework 支持 SkyWalking | 2023.8.4-8.17
    KubeSphere社区双周报主要整理展示新增的贡献者名单和证书、新增的讲师证书以及两周内提交过commit的贡献者,并对近期重要的PR进行解析,同时还包含了线上/线下活动和布道推广等一系列社区动态。本次双周报涵盖时间为:2023.08.04-2023.08.17。贡献者名单新晋KubeSphereCon......