摘要
Kubegres 完全使用 Kubebuilder V3 版本开发, 在对 Kubegres 进行代码解析前, 首先了解一下 Kubebuilder, 本文尝试理清几个问题:- 如何使用 Kubebuilder 生成 Controller/Operator 项目?
- 项目结构是什么, 每个文件的作用是什么?
- 具体到最重要的几个文件, 代码如何组织, 功能区划分?
Kubebuilder 架构
Process: 运行 main.go, 每个 Cluster 一个, 如果使用 leader election, 则会有多个进程. Manager: main.go 最后运行的就是 Manager.Start(), Manager 处理 leader election, exports metrics, webhook certs, cacheds events, 将 clients, broadcast events 交给 Controllers, 处理 signals 和 shutdown. Client: 与 kube-api server 交互, 处理认证和协议, 实现了读写分离. Cache: 存储最近 watched 和 GET 到的对象, 以供 Controllers 和 Webhooks 使用, Cache 要使用到 clients. Controller: 对于 CRD 来说, 每一个 CRD 对应一个 Kind, 每一个 Kind 对应一个 Controller, 否则 Kind 只能存储而无法编排, Controller 会对该 Kind 进行调谐. Controller 拥有它创建的 resources. Controller 如果读获取在 Caches 则读取, 否则走 Clients, 使用 Clients 进行 CREATE, UPDATE, DELETE, 这个可能要仔细分析 Cache 的相关实现. Controller 从 Predicate 获得 events, 每获得一个 event, 就调用一次 Reconciler. 负责事件的 back-off(回退), queuing 和 re-queuing. Webhook: 每个被调谐的 Kind 要么有 Webhook, 要么没有 Webhook, 接收 AdmissionRequest 请求, Webhook 可以分为两类, validating admission webhook 和 mutating admission webhook, validating 的用于验证传入的对象, mutating 的用于提供默认值. mutating 的会被优先调用. 本机的 Webhook 调试会用到 Cert-Manager, 这是因为 kube-api server 通过 HTTPS POST 访问 Webhook Server, 要开启 HTTPS server, 需要ca, key, cert 三件套, ca 是证书, key 是私钥, cert 是公钥. kube-api server 访问 Webhook 要指定证书, Webhook Server 启动需要 key 和 cert. 可以使用 Cert-Manager 为 Webhook Server 颁发自签名证书, 因为仅有 kube-api Server 会访问它. 时间关系, 不在此展开. Kubebuilder 本质上是对 controller-runtime, client-go 等包的进一步整合和优化, 使其更易用, 不过说到 Controller, 还有两张架构图也很常见.
下面这两张 Controller 的架构图更为熟悉和一致一些. Reflector List & Watch kube-api server, 拿到 Object 之后压入 Delta FIFO Queue, Informer 从 Queue 中 Pop 出 Object, 触发 Event Handler, 同时将 Object 交于 Indexer, 压入本地缓存, 也就是 Store, Event Handler 将处理后 Object 的 Key 压入 Workqueue, 再从 Workqueue 交予 Worker. 同样的底层, 为什么会出现这样的差别呢? 这也就是 Kubebuilder Operator 模式的作用所在, Kubebuilder 进行了重新的抽象, 在底层替我们做了大量的工作, 用户只需要关注 Reconcile 的实现, 其本质也就是实现 syncHandler. Kubebuilder 的底层实现, 可以参阅 深入了解kubebuilder_深入kubebuilder开发_qingwave的博客-CSDN博客, 时间关系, 在此不做展开.
项目初始化和创建 API
项目初始化
# create a project directory, and then run the init command. mkdir project cd project # we'll use a domain of tutorial.kubebuilder.io, # so all API groups will be <group>.tutorial.kubebuilder.io. kubebuilder init --domain tutorial.kubebuilder.io --repo tutorial.kubebuilder.io/project--repo 指定的是 module path, 如果项目目录不在 GOPATH 中, 是必须的. 这里主要看 main.go 的结构
package main import ( "flag" "fmt" "os" // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) // to ensure that exec-entrypoint and run can make use of them. _ "k8s.io/client-go/plugin/pkg/client/auth" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/log/zap" // +kubebuilder:scaffold:imports )可以看到, 核心的库就是 controller-runtine, 我们将直接操作 ctrl 完成整个 main.go 的实现.
var ( scheme = runtime.NewScheme() setupLog = ctrl.Log.WithName("setup") ) func init() { utilruntime.Must(clientgoscheme.AddToScheme(scheme)) //+kubebuilder:scaffold:scheme }Scheme 是什么? AddToScheme() 是将 clientgoscheme 添加到新建的空 scheme 中.
func NewScheme() *Scheme { s := &Scheme{ gvkToType: map[schema.GroupVersionKind]reflect.Type{}, typeToGVK: map[reflect.Type][]schema.GroupVersionKind{}, unversionedTypes: map[reflect.Type]schema.GroupVersionKind{}, unversionedKinds: map[string]reflect.Type{}, fieldLabelConversionFuncs: map[schema.GroupVersionKind]FieldLabelConversionFunc{}, defaulterFuncs: map[reflect.Type]func(interface{}){}, versionPriority: map[string][]string{}, schemeName: naming.GetNameFromCallsite(internalPackages...), } s.converter = conversion.NewConverter(nil) // Enable couple default conversions by default. utilruntime.Must(RegisterEmbeddedConversions(s)) utilruntime.Must(RegisterStringConversions(s)) return s }GVK, 即 Group Version Kind, Scheme 提供了从 GVK 到 Go Types 的双向 Mapping. 在创建 Manager 的时候, Scheme 会作为参数传入. Group Version Kind, 用于描述 Kubernetes API, Group 是一系列相关功能的集合, 每个 Group 包含多个 Versions, 标注不同的实现版本. 每一个 Group-Version 含有一个或多个 API 类型, 每一种类型, 我们称之为一个 Kind. Kind 在不同的 Versions 要做到兼容, 把新 Kind 的参数调用老 Kind, 数据不能丢失, 更不能出错. 还有一个 Resource 的概念, Resource 是 a use of a Kind, 即对 Kind 实际上的使用. 对于 CRD, Kind 和 Resource 一一对应. 举个例子, /apis/batch/v1/jobs, 这里面 batch 是 Group, v1 是 Version, jobs 是 Resource, Job 是 Kind. 所以类似的还有一个 GVR 的概念.
func main() { var metricsAddr string var enableLeaderElection bool var probeAddr string flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.") flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") flag.BoolVar(&enableLeaderElection, "leader-elect", false, "Enable leader election for controller manager. "+ "Enabling this will ensure there is only one active controller manager.") opts := zap.Options{ Development: true, } opts.BindFlags(flag.CommandLine) flag.Parse() ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts))) mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ Scheme: scheme, MetricsBindAddress: metricsAddr, Port: 9443, HealthProbeBindAddress: probeAddr, LeaderElection: enableLeaderElection, LeaderElectionID: "80807133.tutorial.kubebuilder.io", }) if err != nil { setupLog.Error(err, "unable to start manager") os.Exit(1) }获取命令行参数, 指定 Logger, 创建 Manager, 如上面的架构图, Manager 会管理 Controllers, Client 和 Cache. 我们只需要把 Manager Start 起来, 它会去运行 controllers 和 webhooks. Manager 会接受一个 graceful shutdown 信号, 这在部署到 Kubernetes 时就可以支持 graceful pod termination.
if err = (&controllers.CronJobReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "CronJob") os.Exit(1) } if err = (&batchv1.CronJob{}).SetupWebhookWithManager(mgr); err != nil { setupLog.Error(err, "unable to create webhook", "webhook", "CronJob") os.Exit(1) } // +kubebuilder:scaffold:builder if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { setupLog.Error(err, "unable to set up health check") os.Exit(1) } if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil { setupLog.Error(err, "unable to set up ready check") os.Exit(1) } setupLog.Info("starting manager") if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { setupLog.Error(err, "problem running manager") os.Exit(1) } }设置 Controller, Webhook 到 Manager, 设置 healthz, readyz, 最后 mgr.Start().
创建 API
kubebuilder create api --group batch --version v1 --kind CronJob此时的目录结构
├── Dockerfile ├── Makefile ├── PROJECT ├── README.md ├── api │ └── v1 │ ├── cronjob_types.go │ ├── cronjob_webhook.go │ ├── groupversion_info.go │ ├── webhook_suite_test.go │ └── zz_generated.deepcopy.go ├── bin ├── config │ ├── certmanager │ ├── crd │ ├── default │ ├── manager │ ├── prometheus │ ├── rbac │ ├── samples ├── controllers │ ├── cronjob_controller.go │ └── suite_test.go ├── cover.out ├── go.mod ├── go.sum ├── hack └── main.go我们只需要关注两个文件, api/v1/cronjob_types.go 和 controllers/cronjob_controller.go. api/v1/cronjob_types.go
// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! // NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. // CronJobSpec defines the desired state of CronJob type CronJobSpec struct { // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // Important: Run "make" to regenerate code after modifying this file } // CronJobStatus defines the observed state of CronJob type CronJobStatus struct { // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster // Important: Run "make" to regenerate code after modifying this file }CronJobSpec 代表我们期望的状态, CronJobStatus 代表当前集群和其他对象当前实际的状态.
//+kubebuilder:object:root=true //+kubebuilder:subresource:status // CronJob is the Schema for the cronjobs API type CronJob struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` Spec CronJobSpec `json:"spec,omitempty"` Status CronJobStatus `json:"status,omitempty"` } //+kubebuilder:object:root=true // CronJobList contains a list of CronJob type CronJobList struct { metav1.TypeMeta `json:",inline"` metav1.ListMeta `json:"metadata,omitempty"` Items []CronJob `json:"items"` } func init() { SchemeBuilder.Register(&CronJob{}, &CronJobList{}) }CronJob 和 CronJobList 结构体对应于实际的 Kind CronJob, TypeMeta 和 ObjectMeta 对应于我们熟悉的 YAML 文件的 metadata 部分, TypeMeta 描述 API Version 和 Kind, ObjectMeta 描述 name, Namespace 和 labels. CronJobList 是 CronJob 的复数形式, 在 LIST 方法中使用. //+kubebuilder:object:root=true 标明 CronJob 这个 struct 是个 Kind. 最后把 CronJob 和 CronJobList 注册到 Scheme 中, 建立从 Go Struct 到 GVK 的对应关系. controllers/cronjob_controller.go
// CronJobReconciler reconciles a CronJob object type CronJobReconciler struct { client.Client Scheme *runtime.Scheme }这里的 Client, 已经实现了读写分离, 即 GET 和 LIST, 如果 Cache 里面有则去 Cache 里面读, 否则去 kube-api server, CREATE, UPDATE, DELETE 则直接去 API Server, 具体到与 API Server 的通信, Client 主要有两个作用, 第一是建立 HTTPS 连接, 第二是处理 Kubernetes 的认证和权限. Scheme 则就是一个 Mapping.
func (r *CronJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { _ = log.FromContext(ctx) // 一些设置, 比如 log // 更新 Status // 根据 Spec 进行调谐 return ctrl.Result{}, nil }Reconcile 的核心逻辑其实就是围绕 CornJobStatus 和 CronJobSpec 展开, 更新 Status, 根据 Spec 进行调谐, 也就是 syncHandler. 它的返回值提一下, 有四种: 1. 报错 (也会触发 requeue)
return ctrl.Result{}, err2. 不报错
return ctrl.Result{Requeue: true}, nil3. Reconcile 完成, 停止
return ctrl.Result{}, nil4. 一段时间之后再次执行 Reconcile
return ctrl.Result{RequeueAfter: nextRun.Sub(r.Now()))}, nil需要注意的是, 报错也会触发 Requeue.
func (r *MemcachedReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&cachev1alpha1.Memcached{}). Owns(&appsv1.Deployment{}). Complete(r) }这里就是把 Reconciler 注册到 Manager, 需要注意的是 For 和 Owns. For定义对象类型为reconciled,并配置ControllerManagedBy通过reconciled对象来响应创建/删除/更新事件。 owned定义了由 ControllerManagedBy 生成的对象类型,并配置 ControllerManagedBy 通过协调所有者对象来响应创建/删除/更新事件。