1、概述
Kubernetes的CSI Plugin注册机制的实现分为两个部分,第一部分是 sidecar "node-driver-registrar",第二部分是Kubelet的pluginManager,第一部分详细内容请参见《Kubernetes CSI插件注册(一)—— node-driver-registrar源码分析》,本文主要讲后者,即Kubelet的pluginManager模块的源码。
工程源码:https://github.com/kubernetes/kubernetes/tree/release-1.24
注意:本文基于Kubernetes 1.24.10版本编写。
2、Kubelet CSI插件注册原理
Kubelet的pluginManager模块会监听特定的目录 /var/lib/kubelet/plugins_registry(其中 /var/lib/kubelet 是Kubelet工作路径,可以变更), sidecar "node-driver-registrar"实现了特定的接口(GRPC服务,RegistrationServer)向Kubelet提供插件注册信息并创建一个socket放到该目录下(每个csi plugin会对应一个Node Driver Registrar组件,也就是说,一个Node Driver Registrar只负责一个plugin的注册工作,socket文件命名规则为:csi插件名-reg.sock),Kubelet的pluginManager模块会通过该socket获取CSI Plugin的信息(主要是csi plugin name和csi plugin socket的路径),并将其写入k8s node节点的annotations,从而实现CSI Plugin的注册操作。 结合sidecar "node-driver-registrar"和Kubelet的pluginManager可得出Kubernetes CSI插件注册的完整步骤,步骤如下:![](/i/l/?n=23&i=blog/624219/202302/624219-20230213111638087-941028349.png)
第 1 步
Node Driver Registrar 连接 csi plugin 组件暴露的 grpc 服务 socket 地址,调用 GetPluginInfo 接口,获取 csi plugin 的 driver 名称。
第 2 步
在/var/lib/kubelet/plugins_registry目录下启动一个 socket(socket文件名规则,csi插件名-reg.sock),对外暴露 GetInfo 和 NotifyRegistrationStatus 两个方法向Kubelet提供插件注册信息。Kubelet 通过 Watcher 可以发现该 socket。
第 3 步
Kubelet 通过 Watcher 监控/var/lib/kubelet/plugins_registry/
目录,发现上述 socket 后,通过该 socket 调用 Node-Driver-Registrar 的 GetInfo 方法,获取 csi plugin 类型、csi plugin 的 driver 名称、csi plugin 暴露的 grpc 服务 socket 地址以及 csi plugin 支持的版本。
第 4 步
Kubelet 通过 csi plugin 组件暴露的 grpc 服务 socket 地址对其 NodeGetInfo 方法进行调用,获取 csi plugin 的 nodeID 等信息。
第 5 步
Kubelet 根据上一步获得的信息,去更新 node 节点的 Annotations信息,同时创建(或更新)一个 CSINode 对象。
第 6 步
Kubelet 通过 socket 调用 Node-Driver-Registrar 容器的 NotifyRegistrationStatus 方法,通知注册 csi plugin 成功。
通过以上 6 步就实现了Kubernetes CSI Plugin注册机制。
3、Kubelet pluginManger源码分析
Kubelet的pluginManager模块借助sidecar "node-driver-registrar"提供的GRPC服务(即RegistrationServer)完成Kubernetes CSI插件注册,在《Kubernetes CSI插件注册(一)—— node-driver-registrar源码分析》这篇博文中分析了node-driver-registrar工程源码,在此篇博文中重点分析下Kubelet的pluginManager模块的源码,分析将分为pluginManager的初始化分析以及pluginManager的运行(处理逻辑)两部分。3.1 pluginManager的初始化源码分析
NewMainKubelet():
Kubelet进程启动时,会调用NewMainKubelet()方法初始化Kubelet,紧接着NewMainKubelet()方法会调用pluginmanager.NewPluginManager来初始化pluginManager,所以把NewMainKubelet()作为分析入口,Kubelet程序从main方法启动到调用NewMainKubelet()方法的调用栈如下:
kubelet的main()方法 (cmd/kubelet/kubelet.go 40行) --> code := run(command) (cmd/kubelet/kubelet.go 47行) --> return Run(....)(cmd/kubelet/app/server.go 267行)--> err := run(....);(cmd/kubelet/app/server.go 406行)--> RunKubelet(....)(cmd/kubelet/app/server.go 760行)--> createAndInitKubelet(....) (cmd/kubelet/app/server.go 1131行)--> kubelet.NewMainKubelet(....)(cmd/kubelet/app/server.go 1236行)
NewMainKubelet()中调用pluginmanager.NewPluginManager来初始化pluginManager。
// pkg/kubelet/kubelet:740 klet.pluginManager = pluginmanager.NewPluginManager( // 这里sockDir的值就是 /var/lib/kubelet/plugins_registry klet.getPluginsRegistrationDir(), /* sockDir */ kubeDeps.Recorder, )
接着看下pluginManager结构体对象初始化及pluginManager的结构体的结构。
PluginManager结构体的初始化(pkg/kubelet/pluginmanager/plugin_manager.go):
调用创建pluginManager结构体对象时会将sockDir传参进入pluginManager的desiredStateOfWorldPopulator结构体当中,相当于pluginManager会监听plugins_registry
目录(负责向kubelet注册csi driver的组件Node Driver Registrar会创建暴露服务的socket在该目录下),pluginManager通过Node Driver Registrar组件暴露的socket获取plugin信息(包括plugin的socket地址、plugin名称等),从而最终做到根据该目录下socket文件的新增/删除来做相应的plugin注册/取消注册操作。
func NewPluginManager( sockDir string, recorder record.EventRecorder) PluginManager { asw := cache.NewActualStateOfWorld() dsw := cache.NewDesiredStateOfWorld() reconciler := reconciler.NewReconciler( operationexecutor.NewOperationExecutor( operationexecutor.NewOperationGenerator( recorder, ), ), loopSleepDuration, dsw, asw, ) pm := &pluginManager{ desiredStateOfWorldPopulator: pluginwatcher.NewWatcher( sockDir, dsw, ), reconciler: reconciler, desiredStateOfWorld: dsw, actualStateOfWorld: asw, } return pm }
PluginManager结构体(pkg/kubelet/pluginmanager/plugin_manager.go):
注意看,pluginManager结构体中定义了desiredStateOfWorldPopulator、actualStateOfWorld、desiredStateOfWorld 和 reconciler 字段。
type PluginManager interface { // Starts the plugin manager and all the asynchronous loops that it controls Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) // AddHandler adds the given plugin handler for a specific plugin type, which // will be added to the actual state of world cache so that it can be passed to // the desired state of world cache in order to be used during plugin // registration/deregistration AddHandler(pluginType string, pluginHandler cache.PluginHandler) } const ( // loopSleepDuration is the amount of time the reconciler loop waits // between successive executions loopSleepDuration = 1 * time.Second ) // pluginManager implements the PluginManager interface type pluginManager struct { // desiredStateOfWorldPopulator (the plugin watcher) runs an asynchronous // periodic loop to populate the desiredStateOfWorld. desiredStateOfWorldPopulator *pluginwatcher.Watcher // reconciler runs an asynchronous periodic loop to reconcile the // desiredStateOfWorld with the actualStateOfWorld by triggering register // and unregister operations using the operationExecutor. reconciler reconciler.Reconciler // actualStateOfWorld is a data structure containing the actual state of // the world according to the manager: i.e. which plugins are registered. // The data structure is populated upon successful completion of register // and unregister actions triggered by the reconciler. actualStateOfWorld cache.ActualStateOfWorld // desiredStateOfWorld is a data structure containing the desired state of // the world according to the plugin manager: i.e. what plugins are registered. // The data structure is populated by the desired state of the world // populator (plugin watcher). desiredStateOfWorld cache.DesiredStateOfWorld } var _ PluginManager = &pluginManager{}如果没怎么看过其他的k8s代码,可能对k8s的actualStateOfWorld(后文简称asw)和desiredStateOfWorld(后文简称dsw)不熟悉。
在遵循声明式管理的k8s中,dsw一般对应了k8s资源的spec,即期望状态。asw一般对应了k8s资源的status,即实际状态。k8s最常见的一个处理框架,即是监测到外界变化后,先将其写入dsw,然后在reconciler中对dsw和asw进行比较,做相应的处理后,再把dsw中的变化写入asw,让两者同步。
在pluginManger中,也有着相似的处理:
asw中存放的是已经在k8s中已成功注册的plugin信息,dsw中存放的是期望注册的plugin信息,后续会讲到Kubelet是如何监测到plugin的变化并将其存储到dsw,最终同步到asw。
actualStateOfWorld(pkg/kubelet/pluginmanager/cache/actual_state_of_world.go):
actualStateOfWorld结构体中存放的是已经完成了plugin注册操作的Node Driver Registrar组件暴露的socket相关信息。
type ActualStateOfWorld interface { // GetRegisteredPlugins generates and returns a list of plugins // that are successfully registered plugins in the current actual state of world. GetRegisteredPlugins() []PluginInfo // AddPlugin add the given plugin in the cache. // An error will be returned if socketPath of the PluginInfo object is empty. // Note that this is different from desired world cache's AddOrUpdatePlugin // because for the actual state of world cache, there won't be a scenario where // we need to update an existing plugin if the timestamps don't match. This is // because the plugin should have been unregistered in the reconciler and therefore // removed from the actual state of world cache first before adding it back into // the actual state of world cache again with the new timestamp AddPlugin(pluginInfo PluginInfo) error // RemovePlugin deletes the plugin with the given socket path from the actual // state of world. // If a plugin does not exist with the given socket path, this is a no-op. RemovePlugin(socketPath string) // PluginExists checks if the given plugin exists in the current actual // state of world cache with the correct timestamp PluginExistsWithCorrectTimestamp(pluginInfo PluginInfo) bool } // NewActualStateOfWorld returns a new instance of ActualStateOfWorld func NewActualStateOfWorld() ActualStateOfWorld { return &actualStateOfWorld{ socketFileToInfo: make(map[string]PluginInfo), } } type actualStateOfWorld struct { // socketFileToInfo is a map containing the set of successfully registered plugins // The keys are plugin socket file paths. The values are PluginInfo objects socketFileToInfo map[string]PluginInfo sync.RWMutex } var _ ActualStateOfWorld = &actualStateOfWorld{} // PluginInfo holds information of a plugin type PluginInfo struct { SocketPath string Timestamp time.Time Handler PluginHandler Name string }
desiredStateOfWorld(pkg/kubelet/pluginmanager/cache/desired_state_of_world.go):
desiredStateOfWorld结构体中存放的是在pluginManager监听目录下存在的希望完成plugin注册操作的Node Driver Registrar组件暴露的socket相关信息。
type DesiredStateOfWorld interface { // AddOrUpdatePlugin add the given plugin in the cache if it doesn't already exist. // If it does exist in the cache, then the timestamp of the PluginInfo object in the cache will be updated. // An error will be returned if socketPath is empty. AddOrUpdatePlugin(socketPath string) error // RemovePlugin deletes the plugin with the given socket path from the desired // state of world. // If a plugin does not exist with the given socket path, this is a no-op. RemovePlugin(socketPath string) // GetPluginsToRegister generates and returns a list of plugins // in the current desired state of world. GetPluginsToRegister() []PluginInfo // PluginExists checks if the given socket path exists in the current desired // state of world cache PluginExists(socketPath string) bool } // NewDesiredStateOfWorld returns a new instance of DesiredStateOfWorld. func NewDesiredStateOfWorld() DesiredStateOfWorld { return &desiredStateOfWorld{ socketFileToInfo: make(map[string]PluginInfo), } } type desiredStateOfWorld struct { // socketFileToInfo is a map containing the set of successfully registered plugins // The keys are plugin socket file paths. The values are PluginInfo objects socketFileToInfo map[string]PluginInfo sync.RWMutex } var _ DesiredStateOfWorld = &desiredStateOfWorld{}
reconciler(pkg/kubelet/pluginmanager/reconciler/reconciler.go):
reconciler对比desiredStateOfWorld与actualStateOfWorld做调谐,做plugin的注册操作/取消注册操作。
type Reconciler interface { // Run starts running the reconciliation loop which executes periodically, // checks if plugins are correctly registered or unregistered. // If not, it will trigger register/unregister operations to rectify. Run(stopCh <-chan struct{}) // AddHandler adds the given plugin handler for a specific plugin type, // which will be added to the actual state of world cache. AddHandler(pluginType string, pluginHandler cache.PluginHandler) } // NewReconciler returns a new instance of Reconciler. // // loopSleepDuration - the amount of time the reconciler loop sleeps between // // successive executions // syncDuration - the amount of time the syncStates sleeps between // successive executions // // operationExecutor - used to trigger register/unregister operations safely // // (prevents more than one operation from being triggered on the same // socket path) // // desiredStateOfWorld - cache containing the desired state of the world // actualStateOfWorld - cache containing the actual state of the world func NewReconciler( operationExecutor operationexecutor.OperationExecutor, loopSleepDuration time.Duration, desiredStateOfWorld cache.DesiredStateOfWorld, actualStateOfWorld cache.ActualStateOfWorld) Reconciler { return &reconciler{ operationExecutor: operationExecutor, loopSleepDuration: loopSleepDuration, desiredStateOfWorld: desiredStateOfWorld, actualStateOfWorld: actualStateOfWorld, handlers: make(map[string]cache.PluginHandler), } } type reconciler struct { operationExecutor operationexecutor.OperationExecutor loopSleepDuration time.Duration desiredStateOfWorld cache.DesiredStateOfWorld actualStateOfWorld cache.ActualStateOfWorld handlers map[string]cache.PluginHandler sync.RWMutex } var _ Reconciler = &reconciler{}
Watcher(pkg/kubelet/pluginmanager/pluginwatcher/plugin_watcher.go):
监听plugins_registry
目录下socket文件的新增/删除,进而更新dsw中存放的plugin信息。// Watcher is the plugin watcher type Watcher struct { path string fs utilfs.Filesystem fsWatcher *fsnotify.Watcher desiredStateOfWorld cache.DesiredStateOfWorld } // NewWatcher provides a new watcher for socket registration func NewWatcher(sockDir string, desiredStateOfWorld cache.DesiredStateOfWorld) *Watcher { return &Watcher{ path: sockDir, fs: &utilfs.DefaultFs{}, desiredStateOfWorld: desiredStateOfWorld, } }
3.2 pluginManager的运行(处理逻辑)源码分析
上面介绍了pluginManager的初始化,接下来介绍pluginManager的运行也即Run方法进行分析,分析一下pluginManager的处理逻辑。
因为调用逻辑比较复杂,这里直接跳过了调用过程的分析,直接进入kl.pluginManager.Run()的分析,下面只给出该方法的一个调用链:
kubelet的main()方法 (cmd/kubelet/kubelet.go 40行) --> code := run(command) (cmd/kubelet/kubelet.go 47行) --> return Run(....)(cmd/kubelet/app/server.go 267行)--> err := run(....);(cmd/kubelet/app/server.go 406行)--> RunKubelet(....)(cmd/kubelet/app/server.go 760行)--> startKubelet(....) (cmd/kubelet/app/server.go 1181行)--> go k.Run(....)(cmd/kubelet/app/server.go 1190行)--> go wait.Until(kl.updateRuntimeUp, ...) (pkg/kubelet/kubelet.go 1438行) --> kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules) (pkg/kubelet/kubelet.go 2370行,oneTimeInitializer是sync.Once函数)--> go kl.pluginManager.Run(kl.sourcesReady, wait.NeverStop) (pkg/kubelet/kubelet.go 1396行)
注意,在执行kl.pluginManager.Run()方法前,pluginManager对象添加了CSI插件注册回调方法和设备插件回调方法。
// Adding Registration Callback function for CSI Driver kl.pluginManager.AddHandler(pluginwatcherapi.CSIPlugin, plugincache.PluginHandler(csi.PluginHandler)) // Adding Registration Callback function for Device Manager kl.pluginManager.AddHandler(pluginwatcherapi.DevicePlugin, kl.containerManager.GetPluginRegistrationHandler()) // Start the plugin manager klog.V(4).InfoS("Starting plugin manager") go kl.pluginManager.Run(kl.sourcesReady, wait.NeverStop)
插件handler结构体必须实现插件验证方法、插件注册方法以及插件删除方法。
type PluginHandler interface { // Validate returns an error if the information provided by // the potential plugin is erroneous (unsupported version, ...) ValidatePlugin(pluginName string, endpoint string, versions []string) error // RegisterPlugin is called so that the plugin can be register by any // plugin consumer // Error encountered here can still be Notified to the plugin. RegisterPlugin(pluginName, endpoint string, versions []string) error // DeRegister is called once the pluginwatcher observes that the socket has // been deleted. DeRegisterPlugin(pluginName string) }
kl.pluginManager.Run(pkg/kubelet/pluginmanager/plugin_manager.go 109行):
下面直接看到kl.pluginManager.Run的代码,该方法主要逻辑有两个:
(1)pm.desiredStateOfWorldPopulator.Start():持续监听/var/lib/kubelet/plugins_registry/目录下socket文件的变化事件(遍历/var/lib/kubelet/plugins_registry/目录包括其子目录下的所有socket文件),将Node Driver Registrar的socket信息写入desiredStateOfWorld中/从desiredStateOfWorld中删除;
(2)pm.reconciler.Run(): 对dsw和asw进行了同步,完成了plugin的注册/取消注册的操作。
func (pm *pluginManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) { // 捕获崩溃并记录错误,默认不传参的话,在程序发送崩溃时,在控制台打印一下崩溃日志后再崩溃,方便技术人员排查程序错误 defer runtime.HandleCrash() /* 对 plugins_registry 目录启动了一个watcher,监听create和delete的操作 监听到 create 事件,会将socket路径写入 dsw 监听到 delete 事件,会将socket路径从 dsw 移除 */ pm.desiredStateOfWorldPopulator.Start(stopCh) klog.V(2).InfoS("The desired_state_of_world populator (plugin watcher) starts") klog.InfoS("Starting Kubelet Plugin Manager") /* 调用了常见的 wait.Until,以cron的形式去调rc.reconcile(),对比dsw和asw,并进行同步 */ go pm.reconciler.Run(stopCh) metrics.Register(pm.actualStateOfWorld, pm.desiredStateOfWorld) <-stopCh klog.InfoS("Shutting down Kubelet Plugin Manager") }
pm.desiredStateOfWorldPopulator.Start(stopCh) (pkg/kubelet/pluginmanager/pluginwatcher/plugin_watcher.go):
对 plugins_registry 目录启动了一个watcher,然后遍历plugins_registry目录(遍历目录中的现有子级),如果是dir便添加到watcher中监控,如果是socket文件便调用w.handleCreateEvent,将该socket加入到desiredStateOfWorld中,主要用于当节点kubelet服务重启时,注册节点已存在的kubelet插件。
之后运行一个goroutine,持续监听plugins_registry注册目录的变化事件:
(1)当变化事件为新增事件时,即plugins_registry目录下多了文件,则调用w.handleCreateEvent,如果多的文件是socket文件类型,将该socket加入到desiredStateOfWorld中;
(2)当变化事件为删除事件时,即plugins_registry目录下删除了文件,则调用w.handleDeleteEvent,,如果删除的文件是socket文件类型,将该socket从desiredStateOfWorld中删除。
// Start watches for the creation and deletion of plugin sockets at the path func (w *Watcher) Start(stopCh <-chan struct{}) error { klog.V(2).InfoS("Plugin Watcher Start", "path", w.path) // 如果plugins_registry目录(/var/lib/kubelet/plugins_registry/)目录不存在的话便创建plugins_registry目录 // Creating the directory to be watched if it doesn't exist yet, // and walks through the directory to discover the existing plugins. if err := w.init(); err != nil { return err } fsWatcher, err := fsnotify.NewWatcher() if err != nil { return fmt.Errorf("failed to start plugin fsWatcher, err: %v", err) } w.fsWatcher = fsWatcher /* 在开始插件处理goroutine之前,遍历plugins_registry目录(遍历目录中的现有子级),如果是dir便添加到fsWtacher对象中监控,如果是socket文件便 新增fsnotify事件,并调用w.handleCreateEvent,将该socket加入到dsw中,如果是其他普通文件便忽略掉。其主要作用是当节点kubelet服务重启时,将 当前节点plugins_registry目录下已经存在的各种csi类型的Node Driver Registrar组件暴露的socket文件(当然也可能是设备插件暴露的socket文件) 注册到dsw,以使得kubelet的pluginManager组件完成各种csi组件的注册。 */ // Traverse plugin dir and add filesystem watchers before starting the plugin processing goroutine. if err := w.traversePluginDir(w.path); err != nil { klog.ErrorS(err, "Failed to traverse plugin socket path", "path", w.path) } // 启动协程,for循环监听plugins_registry目录(遍历目录中的现有子级)下产生的fsnotify.create和fsnotify.delete的事件,并调用w.handleCreateEvent方法 go func(fsWatcher *fsnotify.Watcher) { for { select { case event := <-fsWatcher.Events: //TODO: Handle errors by taking corrective measures if event.Op&fsnotify.Create == fsnotify.Create { err := w.handleCreateEvent(event) if err != nil { klog.ErrorS(err, "Error when handling create event", "event", event) } } else if event.Op&fsnotify.Remove == fsnotify.Remove { w.handleDeleteEvent(event) } continue case err := <-fsWatcher.Errors: if err != nil { klog.ErrorS(err, "FsWatcher received error") } continue case <-stopCh: w.fsWatcher.Close() return } } }(fsWatcher) return nil } // Walks through the plugin directory discover any existing plugin sockets. // Ignore all errors except root dir not being walkable func (w *Watcher) traversePluginDir(dir string) error { // watch the new dir err := w.fsWatcher.Add(dir) if err != nil { return fmt.Errorf("failed to watch %s, err: %v", w.path, err) } // traverse existing children in the dir return w.fs.Walk(dir, func(path string, info os.FileInfo, err error) error { if err != nil { if path == dir { return fmt.Errorf("error accessing path: %s error: %v", path, err) } klog.ErrorS(err, "Error accessing path", "path", path) return nil } // do not call fsWatcher.Add twice on the root dir to avoid potential problems. if path == dir { return nil } switch mode := info.Mode(); { case mode.IsDir(): if err := w.fsWatcher.Add(path); err != nil { return fmt.Errorf("failed to watch %s, err: %v", path, err) } case mode&os.ModeSocket != 0: event := fsnotify.Event{ Name: path, Op: fsnotify.Create, } //TODO: Handle errors by taking corrective measures if err := w.handleCreateEvent(event); err != nil { klog.ErrorS(err, "Error when handling create", "event", event) } default: klog.V(5).InfoS("Ignoring file", "path", path, "mode", mode) } return nil }) }
w.handleCreateEvent()(pkg/kubelet/pluginmanager/pluginwatcher/plugin_watcher.go):
w.handleCreateEvent()主要逻辑:
(1)判断新增事件是否为文件,且是否是socket文件;
(2)是socket文件,则调用w.handlePluginRegistration做处理,主要是将该socket加入到desiredStateOfWorld中。
func (w *Watcher) handleCreateEvent(event fsnotify.Event) error { klog.V(6).InfoS("Handling create event", "event", event) fi, err := os.Stat(event.Name) // TODO: This is a workaround for Windows 20H2 issue for os.Stat(). Please see // microsoft/Windows-Containers#97 for details. // Once the issue is resvolved, the following os.Lstat() is not needed. if err != nil && runtime.GOOS == "windows" { fi, err = os.Lstat(event.Name) } if err != nil { return fmt.Errorf("stat file %s failed: %v", event.Name, err) } if strings.HasPrefix(fi.Name(), ".") { klog.V(5).InfoS("Ignoring file (starts with '.')", "path", fi.Name()) return nil } if !fi.IsDir() { isSocket, err := util.IsUnixDomainSocket(util.NormalizePath(event.Name)) if err != nil { return fmt.Errorf("failed to determine if file: %s is a unix domain socket: %v", event.Name, err) } if !isSocket { klog.V(5).InfoS("Ignoring non socket file", "path", fi.Name()) return nil } return w.handlePluginRegistration(event.Name) } // 如果新增的是dir,再将dir及dir中所有文件添加到fsWtacher对象中监控 return w.traversePluginDir(event.Name) } func (w *Watcher) handlePluginRegistration(socketPath string) error { if runtime.GOOS == "windows" { socketPath = util.NormalizePath(socketPath) } // Update desired state of world list of plugins // If the socket path does exist in the desired world cache, there's still // a possibility that it has been deleted and recreated again before it is // removed from the desired world cache, so we still need to call AddOrUpdatePlugin // in this case to update the timestamp klog.V(2).InfoS("Adding socket path or updating timestamp to desired state cache", "path", socketPath) err := w.desiredStateOfWorld.AddOrUpdatePlugin(socketPath) if err != nil { return fmt.Errorf("error adding socket path %s or updating timestamp to desired state cache: %v", socketPath, err) } return nil } func (dsw *desiredStateOfWorld) AddOrUpdatePlugin(socketPath string) error { dsw.Lock() defer dsw.Unlock() if socketPath == "" { return fmt.Errorf("socket path is empty") } if _, ok := dsw.socketFileToInfo[socketPath]; ok { klog.V(2).InfoS("Plugin exists in actual state cache, timestamp will be updated", "path", socketPath) } // Update the PluginInfo object. // Note that we only update the timestamp in the desired state of world, not the actual state of world // because in the reconciler, we need to check if the plugin in the actual state of world is the same // version as the plugin in the desired state of world dsw.socketFileToInfo[socketPath] = PluginInfo{ SocketPath: socketPath, Timestamp: time.Now(), } return nil }
w.handleDeleteEvent(event)(pkg/kubelet/pluginmanager/pluginwatcher/plugin_watcher.go):
w.handleDeleteEvent()主要逻辑:
(1)将socket从desiredStateOfWorld中删除。
func (w *Watcher) handleDeleteEvent(event fsnotify.Event) { klog.V(6).InfoS("Handling delete event", "event", event) socketPath := event.Name klog.V(2).InfoS("Removing socket path from desired state cache", "path", socketPath) w.desiredStateOfWorld.RemovePlugin(socketPath) } func (dsw *desiredStateOfWorld) RemovePlugin(socketPath string) { dsw.Lock() defer dsw.Unlock() delete(dsw.socketFileToInfo, socketPath) }
pm.reconciler.Run(stopCh)(pkg/kubelet/pluginmanager/reconciler/reconciler.go):
pm.reconciler.Run()主要逻辑为对比desiredStateOfWorld与actualStateOfWorld做调谐,做plugin的注册操作/取消注册操作。具体逻辑如下:
(1)对比actualStateOfWorld,如果desiredStateOfWorld中没有该socket信息,或者desiredStateOfWorld中该socket的Timestamp值与actualStateOfWorld中的不相等(即plugin更新了),则说明该plugin需要取消注册(更新的plugin需先取消注册,然后再次注册),调用rc.operationExecutor.UnregisterPlugin做plugin取消注册操作;
(2)对比desiredStateOfWorld,如果actualStateOfWorld中没有该socket信息,或者desiredStateOfWorld中该socket的Timestamp值与actualStateOfWorld中的不相等,则调用rc.operationExecutor.RegisterPlugin做plugin注册操作。
4、总结
kubelet的pluginManager会监听某个特定目录,而负责向kubelet注册csi driver的组件Node Driver Registrar会创建暴露服务的socket在该目录下(每个plugin会对应一个Node Driver Registrar组件,也就是说,一个Node Driver Registrar只负责一个plugin的注册工作),pluginManager通过Node Driver Registrar组件暴露的socket获取plugin信息(包括plugin的socket地址、plugin名称等),从而最终做到根据该目录下socket文件的新增/删除来做相应的plugin注册/取消注册操作。
plugin注册完成后,后续kubelet将通过plugin暴露的socket与plugin进行通信,做存储挂载/解除挂载等操作。
参考:https://juejin.cn/post/7126222522755842055
参考:https://www.cnblogs.com/lianngkyle/p/14906274.html
标签:插件,CSI,socket,plugin,kubelet,注册,go,pluginManager,desiredStateOfWorld From: https://www.cnblogs.com/zhangmingcheng/p/17107702.html