Kubernetes CSI插件注册(二)—— Kubelet CSI插件注册机制源码分析

Kubernetes的CSI Plugin注册机制的实现分为两个部分,第一部分是 sidecar "node-driver-registrar",第二部分是Kubelet的pluginManager,第一部分详细内容请参见《Kubernetes CSI插件注册(一)—— node-driver-registrar源码分析》,本文主要讲后者,即Kubelet的pluginManager模块的源码。


注意:本文基于Kubernetes 1.24.10版本编写。

2、Kubelet CSI插件注册原理

Kubelet的pluginManager模块会监听特定的目录 /var/lib/kubelet/plugins_registry(其中 /var/lib/kubelet 是Kubelet工作路径,可以变更), sidecar "node-driver-registrar"实现了特定的接口(GRPC服务,RegistrationServer)向Kubelet提供插件注册信息并创建一个socket放到该目录下(每个csi plugin会对应一个Node Driver Registrar组件,也就是说,一个Node Driver Registrar只负责一个plugin的注册工作,socket文件命名规则为:csi插件名-reg.sock),Kubelet的pluginManager模块会通过该socket获取CSI Plugin的信息(主要是csi plugin name和csi plugin socket的路径),并将其写入k8s node节点的annotations,从而实现CSI Plugin的注册操作。 结合sidecar "node-driver-registrar"和Kubelet的pluginManager可得出Kubernetes CSI插件注册的完整步骤,步骤如下:

第 1 步

Node Driver Registrar 连接 csi plugin 组件暴露的 grpc 服务 socket 地址,调用 GetPluginInfo 接口,获取 csi plugin 的 driver 名称。

第 2 步

在/var/lib/kubelet/plugins_registry目录下启动一个 socket(socket文件名规则,csi插件名-reg.sock),对外暴露 GetInfo 和 NotifyRegistrationStatus 两个方法向Kubelet提供插件注册信息。Kubelet 通过 Watcher 可以发现该 socket。

第 3 步

Kubelet 通过 Watcher 监控/var/lib/kubelet/plugins_registry/目录,发现上述 socket 后,通过该 socket 调用 Node-Driver-Registrar 的 GetInfo 方法,获取 csi plugin 类型、csi plugin 的 driver 名称、csi plugin 暴露的 grpc 服务 socket 地址以及 csi plugin 支持的版本。

第 4 步

Kubelet 通过 csi plugin 组件暴露的 grpc 服务 socket 地址对其 NodeGetInfo 方法进行调用,获取 csi plugin 的 nodeID 等信息。

第 5 步

Kubelet 根据上一步获得的信息,去更新 node 节点的 Annotations信息,同时创建(或更新)一个 CSINode 对象。

第 6 步

Kubelet 通过 socket 调用 Node-Driver-Registrar 容器的 NotifyRegistrationStatus 方法,通知注册 csi plugin 成功。

通过以上 6 步就实现了Kubernetes CSI Plugin注册机制。

3、Kubelet pluginManger源码分析

Kubelet的pluginManager模块借助sidecar "node-driver-registrar"提供的GRPC服务(即RegistrationServer)完成Kubernetes CSI插件注册,在《Kubernetes CSI插件注册(一)—— node-driver-registrar源码分析》这篇博文中分析了node-driver-registrar工程源码,在此篇博文中重点分析下Kubelet的pluginManager模块的源码,分析将分为pluginManager的初始化分析以及pluginManager的运行(处理逻辑)两部分。

3.1 pluginManager的初始化源码分析



  kubelet的main()方法 (cmd/kubelet/kubelet.go 40行) --> code := run(command) (cmd/kubelet/kubelet.go 47行) --> 
  return Run(....)(cmd/kubelet/app/server.go 267行)-->  err := run(....);(cmd/kubelet/app/server.go 406行)--> 
  RunKubelet(....)(cmd/kubelet/app/server.go 760行)--> createAndInitKubelet(....) (cmd/kubelet/app/server.go 1131行)--> 
  kubelet.NewMainKubelet(....)(cmd/kubelet/app/server.go 1236行)


        // pkg/kubelet/kubelet:740
	klet.pluginManager = pluginmanager.NewPluginManager(
		//  这里sockDir的值就是 /var/lib/kubelet/plugins_registry
		klet.getPluginsRegistrationDir(), /* sockDir */



调用创建pluginManager结构体对象时会将sockDir传参进入pluginManager的desiredStateOfWorldPopulator结构体当中,相当于pluginManager会监听plugins_registry目录(负责向kubelet注册csi driver的组件Node Driver Registrar会创建暴露服务的socket在该目录下),pluginManager通过Node Driver Registrar组件暴露的socket获取plugin信息(包括plugin的socket地址、plugin名称等),从而最终做到根据该目录下socket文件的新增/删除来做相应的plugin注册/取消注册操作。
func NewPluginManager(
	sockDir string,
	recorder record.EventRecorder) PluginManager {
	asw := cache.NewActualStateOfWorld()
	dsw := cache.NewDesiredStateOfWorld()
	reconciler := reconciler.NewReconciler(

	pm := &pluginManager{
		desiredStateOfWorldPopulator: pluginwatcher.NewWatcher(
		reconciler:          reconciler,
		desiredStateOfWorld: dsw,
		actualStateOfWorld:  asw,
	return pm


注意看,pluginManager结构体中定义了desiredStateOfWorldPopulator、actualStateOfWorld、desiredStateOfWorld 和 reconciler 字段。

type PluginManager interface {
	// Starts the plugin manager and all the asynchronous loops that it controls
	Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})

	// AddHandler adds the given plugin handler for a specific plugin type, which
	// will be added to the actual state of world cache so that it can be passed to
	// the desired state of world cache in order to be used during plugin
	// registration/deregistration
	AddHandler(pluginType string, pluginHandler cache.PluginHandler)

const (
	// loopSleepDuration is the amount of time the reconciler loop waits
	// between successive executions
	loopSleepDuration = 1 * time.Second

// pluginManager implements the PluginManager interface
type pluginManager struct {
	// desiredStateOfWorldPopulator (the plugin watcher) runs an asynchronous
	// periodic loop to populate the desiredStateOfWorld.
	desiredStateOfWorldPopulator *pluginwatcher.Watcher

	// reconciler runs an asynchronous periodic loop to reconcile the
	// desiredStateOfWorld with the actualStateOfWorld by triggering register
	// and unregister operations using the operationExecutor.
	reconciler reconciler.Reconciler

	// actualStateOfWorld is a data structure containing the actual state of
	// the world according to the manager: i.e. which plugins are registered.
	// The data structure is populated upon successful completion of register
	// and unregister actions triggered by the reconciler.
	actualStateOfWorld cache.ActualStateOfWorld

	// desiredStateOfWorld is a data structure containing the desired state of
	// the world according to the plugin manager: i.e. what plugins are registered.
	// The data structure is populated by the desired state of the world
	// populator (plugin watcher).
	desiredStateOfWorld cache.DesiredStateOfWorld

var _ PluginManager = &pluginManager{}


actualStateOfWorld结构体中存放的是已经完成了plugin注册操作的Node Driver Registrar组件暴露的socket相关信息。

type ActualStateOfWorld interface {

	// GetRegisteredPlugins generates and returns a list of plugins
	// that are successfully registered plugins in the current actual state of world.
	GetRegisteredPlugins() []PluginInfo

	// AddPlugin add the given plugin in the cache.
	// An error will be returned if socketPath of the PluginInfo object is empty.
	// Note that this is different from desired world cache's AddOrUpdatePlugin
	// because for the actual state of world cache, there won't be a scenario where
	// we need to update an existing plugin if the timestamps don't match. This is
	// because the plugin should have been unregistered in the reconciler and therefore
	// removed from the actual state of world cache first before adding it back into
	// the actual state of world cache again with the new timestamp
	AddPlugin(pluginInfo PluginInfo) error

	// RemovePlugin deletes the plugin with the given socket path from the actual
	// state of world.
	// If a plugin does not exist with the given socket path, this is a no-op.
	RemovePlugin(socketPath string)

	// PluginExists checks if the given plugin exists in the current actual
	// state of world cache with the correct timestamp
	PluginExistsWithCorrectTimestamp(pluginInfo PluginInfo) bool

// NewActualStateOfWorld returns a new instance of ActualStateOfWorld
func NewActualStateOfWorld() ActualStateOfWorld {
	return &actualStateOfWorld{
		socketFileToInfo: make(map[string]PluginInfo),

type actualStateOfWorld struct {

	// socketFileToInfo is a map containing the set of successfully registered plugins
	// The keys are plugin socket file paths. The values are PluginInfo objects
	socketFileToInfo map[string]PluginInfo

var _ ActualStateOfWorld = &actualStateOfWorld{}

// PluginInfo holds information of a plugin
type PluginInfo struct {
	SocketPath string
	Timestamp  time.Time
	Handler    PluginHandler
	Name       string


desiredStateOfWorld结构体中存放的是在pluginManager监听目录下存在的希望完成plugin注册操作的Node Driver Registrar组件暴露的socket相关信息。

type DesiredStateOfWorld interface {
	// AddOrUpdatePlugin add the given plugin in the cache if it doesn't already exist.
	// If it does exist in the cache, then the timestamp of the PluginInfo object in the cache will be updated.
	// An error will be returned if socketPath is empty.
	AddOrUpdatePlugin(socketPath string) error

	// RemovePlugin deletes the plugin with the given socket path from the desired
	// state of world.
	// If a plugin does not exist with the given socket path, this is a no-op.
	RemovePlugin(socketPath string)

	// GetPluginsToRegister generates and returns a list of plugins
	// in the current desired state of world.
	GetPluginsToRegister() []PluginInfo

	// PluginExists checks if the given socket path exists in the current desired
	// state of world cache
	PluginExists(socketPath string) bool

// NewDesiredStateOfWorld returns a new instance of DesiredStateOfWorld.
func NewDesiredStateOfWorld() DesiredStateOfWorld {
	return &desiredStateOfWorld{
		socketFileToInfo: make(map[string]PluginInfo),

type desiredStateOfWorld struct {

	// socketFileToInfo is a map containing the set of successfully registered plugins
	// The keys are plugin socket file paths. The values are PluginInfo objects
	socketFileToInfo map[string]PluginInfo

var _ DesiredStateOfWorld = &desiredStateOfWorld{}



type Reconciler interface {
	// Run starts running the reconciliation loop which executes periodically,
	// checks if plugins are correctly registered or unregistered.
	// If not, it will trigger register/unregister operations to rectify.
	Run(stopCh <-chan struct{})

	// AddHandler adds the given plugin handler for a specific plugin type,
	// which will be added to the actual state of world cache.
	AddHandler(pluginType string, pluginHandler cache.PluginHandler)

// NewReconciler returns a new instance of Reconciler.
// loopSleepDuration - the amount of time the reconciler loop sleeps between
//	successive executions
//	syncDuration - the amount of time the syncStates sleeps between
//	successive executions
// operationExecutor - used to trigger register/unregister operations safely
//	(prevents more than one operation from being triggered on the same
//	socket path)
// desiredStateOfWorld - cache containing the desired state of the world
// actualStateOfWorld - cache containing the actual state of the world
func NewReconciler(
	operationExecutor operationexecutor.OperationExecutor,
	loopSleepDuration time.Duration,
	desiredStateOfWorld cache.DesiredStateOfWorld,
	actualStateOfWorld cache.ActualStateOfWorld) Reconciler {
	return &reconciler{
		operationExecutor:   operationExecutor,
		loopSleepDuration:   loopSleepDuration,
		desiredStateOfWorld: desiredStateOfWorld,
		actualStateOfWorld:  actualStateOfWorld,
		handlers:            make(map[string]cache.PluginHandler),

type reconciler struct {
	operationExecutor   operationexecutor.OperationExecutor
	loopSleepDuration   time.Duration
	desiredStateOfWorld cache.DesiredStateOfWorld
	actualStateOfWorld  cache.ActualStateOfWorld
	handlers            map[string]cache.PluginHandler

var _ Reconciler = &reconciler{}


// Watcher is the plugin watcher
type Watcher struct {
	path                string
	fs                  utilfs.Filesystem
	fsWatcher           *fsnotify.Watcher
	desiredStateOfWorld cache.DesiredStateOfWorld

// NewWatcher provides a new watcher for socket registration
func NewWatcher(sockDir string, desiredStateOfWorld cache.DesiredStateOfWorld) *Watcher {
	return &Watcher{
		path:                sockDir,
		fs:                  &utilfs.DefaultFs{},
		desiredStateOfWorld: desiredStateOfWorld,

3.2 pluginManager的运行(处理逻辑)源码分析 


  kubelet的main()方法 (cmd/kubelet/kubelet.go 40行) --> code := run(command) (cmd/kubelet/kubelet.go 47行) --> 
  return Run(....)(cmd/kubelet/app/server.go 267行)-->  err := run(....);(cmd/kubelet/app/server.go 406行)--> 
  RunKubelet(....)(cmd/kubelet/app/server.go 760行)--> startKubelet(....) (cmd/kubelet/app/server.go 1181行)--> 
  go k.Run(....)(cmd/kubelet/app/server.go 1190行)--> 	go wait.Until(kl.updateRuntimeUp, ...) (pkg/kubelet/kubelet.go 1438行) --> 
  kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules) (pkg/kubelet/kubelet.go 2370行,oneTimeInitializer是sync.Once函数)-->
  go kl.pluginManager.Run(kl.sourcesReady, wait.NeverStop) (pkg/kubelet/kubelet.go 1396行) 


	// Adding Registration Callback function for CSI Driver
	kl.pluginManager.AddHandler(pluginwatcherapi.CSIPlugin, plugincache.PluginHandler(csi.PluginHandler))
	// Adding Registration Callback function for Device Manager
	kl.pluginManager.AddHandler(pluginwatcherapi.DevicePlugin, kl.containerManager.GetPluginRegistrationHandler())
	// Start the plugin manager
	klog.V(4).InfoS("Starting plugin manager")
	go kl.pluginManager.Run(kl.sourcesReady, wait.NeverStop)


type PluginHandler interface {
	// Validate returns an error if the information provided by
	// the potential plugin is erroneous (unsupported version, ...)
	ValidatePlugin(pluginName string, endpoint string, versions []string) error
	// RegisterPlugin is called so that the plugin can be register by any
	// plugin consumer
	// Error encountered here can still be Notified to the plugin.
	RegisterPlugin(pluginName, endpoint string, versions []string) error
	// DeRegister is called once the pluginwatcher observes that the socket has
	// been deleted.
	DeRegisterPlugin(pluginName string)

kl.pluginManager.Run(pkg/kubelet/pluginmanager/plugin_manager.go  109行):

(1)pm.desiredStateOfWorldPopulator.Start():持续监听/var/lib/kubelet/plugins_registry/目录下socket文件的变化事件(遍历/var/lib/kubelet/plugins_registry/目录包括其子目录下的所有socket文件),将Node Driver Registrar的socket信息写入desiredStateOfWorld中/从desiredStateOfWorld中删除;
(2)pm.reconciler.Run(): 对dsw和asw进行了同步,完成了plugin的注册/取消注册的操作。

func (pm *pluginManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
	// 捕获崩溃并记录错误,默认不传参的话,在程序发送崩溃时,在控制台打印一下崩溃日志后再崩溃,方便技术人员排查程序错误
	defer runtime.HandleCrash()

	   对 plugins_registry 目录启动了一个watcher,监听create和delete的操作
	   监听到 create 事件,会将socket路径写入 dsw
	   监听到 delete 事件,会将socket路径从 dsw 移除
	klog.V(2).InfoS("The desired_state_of_world populator (plugin watcher) starts")

	klog.InfoS("Starting Kubelet Plugin Manager")
	   调用了常见的 wait.Until,以cron的形式去调rc.reconcile(),对比dsw和asw,并进行同步
	go pm.reconciler.Run(stopCh)

	metrics.Register(pm.actualStateOfWorld, pm.desiredStateOfWorld)
	klog.InfoS("Shutting down Kubelet Plugin Manager")

pm.desiredStateOfWorldPopulator.Start(stopCh) (pkg/kubelet/pluginmanager/pluginwatcher/plugin_watcher.go):

对 plugins_registry 目录启动了一个watcher,然后遍历plugins_registry目录(遍历目录中的现有子级),如果是dir便添加到watcher中监控,如果是socket文件便调用w.handleCreateEvent,将该socket加入到desiredStateOfWorld中,主要用于当节点kubelet服务重启时,注册节点已存在的kubelet插件。


// Start watches for the creation and deletion of plugin sockets at the path
func (w *Watcher) Start(stopCh <-chan struct{}) error {
	klog.V(2).InfoS("Plugin Watcher Start", "path", w.path)

	// 如果plugins_registry目录(/var/lib/kubelet/plugins_registry/)目录不存在的话便创建plugins_registry目录
	// Creating the directory to be watched if it doesn't exist yet,
	// and walks through the directory to discover the existing plugins.
	if err := w.init(); err != nil {
		return err

	fsWatcher, err := fsnotify.NewWatcher()
	if err != nil {
		return fmt.Errorf("failed to start plugin fsWatcher, err: %v", err)
	w.fsWatcher = fsWatcher

	 当前节点plugins_registry目录下已经存在的各种csi类型的Node Driver Registrar组件暴露的socket文件(当然也可能是设备插件暴露的socket文件)
	// Traverse plugin dir and add filesystem watchers before starting the plugin processing goroutine.
	if err := w.traversePluginDir(w.path); err != nil {
		klog.ErrorS(err, "Failed to traverse plugin socket path", "path", w.path)

	// 启动协程,for循环监听plugins_registry目录(遍历目录中的现有子级)下产生的fsnotify.create和fsnotify.delete的事件,并调用w.handleCreateEvent方法
	go func(fsWatcher *fsnotify.Watcher) {
		for {
			select {
			case event := <-fsWatcher.Events:
				//TODO: Handle errors by taking corrective measures
				if event.Op&fsnotify.Create == fsnotify.Create {
					err := w.handleCreateEvent(event)
					if err != nil {
						klog.ErrorS(err, "Error when handling create event", "event", event)
				} else if event.Op&fsnotify.Remove == fsnotify.Remove {
			case err := <-fsWatcher.Errors:
				if err != nil {
					klog.ErrorS(err, "FsWatcher received error")
			case <-stopCh:

	return nil

// Walks through the plugin directory discover any existing plugin sockets.
// Ignore all errors except root dir not being walkable
func (w *Watcher) traversePluginDir(dir string) error {
	// watch the new dir
	err := w.fsWatcher.Add(dir)
	if err != nil {
		return fmt.Errorf("failed to watch %s, err: %v", w.path, err)
	// traverse existing children in the dir
	return w.fs.Walk(dir, func(path string, info os.FileInfo, err error) error {
		if err != nil {
			if path == dir {
				return fmt.Errorf("error accessing path: %s error: %v", path, err)

			klog.ErrorS(err, "Error accessing path", "path", path)
			return nil

		// do not call fsWatcher.Add twice on the root dir to avoid potential problems.
		if path == dir {
			return nil

		switch mode := info.Mode(); {
		case mode.IsDir():
			if err := w.fsWatcher.Add(path); err != nil {
				return fmt.Errorf("failed to watch %s, err: %v", path, err)
		case mode&os.ModeSocket != 0:
			event := fsnotify.Event{
				Name: path,
				Op:   fsnotify.Create,
			//TODO: Handle errors by taking corrective measures
			if err := w.handleCreateEvent(event); err != nil {
				klog.ErrorS(err, "Error when handling create", "event", event)
			klog.V(5).InfoS("Ignoring file", "path", path, "mode", mode)

		return nil



func (w *Watcher) handleCreateEvent(event fsnotify.Event) error {
	klog.V(6).InfoS("Handling create event", "event", event)

	fi, err := os.Stat(event.Name)
	// TODO: This is a workaround for Windows 20H2 issue for os.Stat(). Please see
	// microsoft/Windows-Containers#97 for details.
	// Once the issue is resvolved, the following os.Lstat() is not needed.
	if err != nil && runtime.GOOS == "windows" {
		fi, err = os.Lstat(event.Name)
	if err != nil {
		return fmt.Errorf("stat file %s failed: %v", event.Name, err)

	if strings.HasPrefix(fi.Name(), ".") {
		klog.V(5).InfoS("Ignoring file (starts with '.')", "path", fi.Name())
		return nil

	if !fi.IsDir() {
		isSocket, err := util.IsUnixDomainSocket(util.NormalizePath(event.Name))
		if err != nil {
			return fmt.Errorf("failed to determine if file: %s is a unix domain socket: %v", event.Name, err)
		if !isSocket {
			klog.V(5).InfoS("Ignoring non socket file", "path", fi.Name())
			return nil

		return w.handlePluginRegistration(event.Name)

	// 如果新增的是dir,再将dir及dir中所有文件添加到fsWtacher对象中监控
	return w.traversePluginDir(event.Name)

func (w *Watcher) handlePluginRegistration(socketPath string) error {
	if runtime.GOOS == "windows" {
		socketPath = util.NormalizePath(socketPath)
	// Update desired state of world list of plugins
	// If the socket path does exist in the desired world cache, there's still
	// a possibility that it has been deleted and recreated again before it is
	// removed from the desired world cache, so we still need to call AddOrUpdatePlugin
	// in this case to update the timestamp
	klog.V(2).InfoS("Adding socket path or updating timestamp to desired state cache", "path", socketPath)
	err := w.desiredStateOfWorld.AddOrUpdatePlugin(socketPath)
	if err != nil {
		return fmt.Errorf("error adding socket path %s or updating timestamp to desired state cache: %v", socketPath, err)
	return nil

func (dsw *desiredStateOfWorld) AddOrUpdatePlugin(socketPath string) error {
	defer dsw.Unlock()

	if socketPath == "" {
		return fmt.Errorf("socket path is empty")
	if _, ok := dsw.socketFileToInfo[socketPath]; ok {
		klog.V(2).InfoS("Plugin exists in actual state cache, timestamp will be updated", "path", socketPath)

	// Update the PluginInfo object.
	// Note that we only update the timestamp in the desired state of world, not the actual state of world
	// because in the reconciler, we need to check if the plugin in the actual state of world is the same
	// version as the plugin in the desired state of world
	dsw.socketFileToInfo[socketPath] = PluginInfo{
		SocketPath: socketPath,
		Timestamp:  time.Now(),
	return nil



func (w *Watcher) handleDeleteEvent(event fsnotify.Event) {
	klog.V(6).InfoS("Handling delete event", "event", event)

	socketPath := event.Name
	klog.V(2).InfoS("Removing socket path from desired state cache", "path", socketPath)

func (dsw *desiredStateOfWorld) RemovePlugin(socketPath string) {
	defer dsw.Unlock()

	delete(dsw.socketFileToInfo, socketPath)





kubelet的pluginManager会监听某个特定目录,而负责向kubelet注册csi driver的组件Node Driver Registrar会创建暴露服务的socket在该目录下(每个plugin会对应一个Node Driver Registrar组件,也就是说,一个Node Driver Registrar只负责一个plugin的注册工作),pluginManager通过Node Driver Registrar组件暴露的socket获取plugin信息(包括plugin的socket地址、plugin名称等),从而最终做到根据该目录下socket文件的新增/删除来做相应的plugin注册/取消注册操作。 




