首页 > 编程语言 >kubegres 源码解析(二) kubebuilder简介

kubegres 源码解析(二) kubebuilder简介

时间:2023-04-05 12:11:52浏览次数:55  
标签:Kind err ctrl kubegres CronJob 源码 go kubebuilder

摘要

Kubegres 完全使用 Kubebuilder V3 版本开发, 在对 Kubegres 进行代码解析前, 首先了解一下 Kubebuilder, 本文尝试理清几个问题:
  • 如何使用 Kubebuilder 生成 Controller/Operator 项目?
  • 项目结构是什么, 每个文件的作用是什么?
  • 具体到最重要的几个文件, 代码如何组织, 功能区划分?
出于时间关系考虑, 先略过测试的部分, 当然测试是非常重要的环节, 尤其对于 DevOps 来说.

Kubebuilder 架构

 

 

 

Process: 运行 main.go, 每个 Cluster 一个, 如果使用 leader election, 则会有多个进程. Manager: main.go 最后运行的就是 Manager.Start(), Manager 处理 leader election, exports metrics, webhook certs, cacheds events, 将 clients, broadcast events 交给 Controllers, 处理 signals 和 shutdown. Client: 与 kube-api server 交互, 处理认证和协议, 实现了读写分离. Cache: 存储最近 watched 和 GET 到的对象, 以供 Controllers 和 Webhooks 使用, Cache 要使用到 clients. Controller: 对于 CRD 来说, 每一个 CRD 对应一个 Kind, 每一个 Kind 对应一个 Controller, 否则 Kind 只能存储而无法编排, Controller 会对该 Kind 进行调谐. Controller 拥有它创建的 resources. Controller 如果读获取在 Caches 则读取, 否则走 Clients, 使用 Clients 进行 CREATE, UPDATE, DELETE, 这个可能要仔细分析 Cache 的相关实现. Controller 从 Predicate 获得 events, 每获得一个 event, 就调用一次 Reconciler. 负责事件的 back-off(回退), queuing 和 re-queuing. Webhook: 每个被调谐的 Kind 要么有 Webhook, 要么没有 Webhook, 接收 AdmissionRequest 请求, Webhook 可以分为两类, validating admission webhook 和 mutating admission webhook, validating 的用于验证传入的对象, mutating 的用于提供默认值. mutating 的会被优先调用. 本机的 Webhook 调试会用到 Cert-Manager, 这是因为 kube-api server 通过 HTTPS POST 访问 Webhook Server, 要开启 HTTPS server, 需要ca, key, cert 三件套, ca 是证书, key 是私钥, cert 是公钥. kube-api server 访问 Webhook 要指定证书, Webhook Server 启动需要 key 和 cert. 可以使用 Cert-Manager 为 Webhook Server 颁发自签名证书, 因为仅有 kube-api Server 会访问它. 时间关系, 不在此展开. Kubebuilder 本质上是对 controller-runtime, client-go 等包的进一步整合和优化, 使其更易用, 不过说到 Controller, 还有两张架构图也很常见.

 

 

下面这两张 Controller 的架构图更为熟悉和一致一些. Reflector List & Watch kube-api server, 拿到 Object 之后压入 Delta FIFO Queue, Informer 从 Queue 中 Pop 出 Object, 触发 Event Handler, 同时将 Object 交于 Indexer, 压入本地缓存, 也就是 Store, Event Handler 将处理后 Object 的 Key 压入 Workqueue, 再从 Workqueue 交予 Worker. 同样的底层, 为什么会出现这样的差别呢? 这也就是 Kubebuilder Operator 模式的作用所在, Kubebuilder 进行了重新的抽象, 在底层替我们做了大量的工作, 用户只需要关注 Reconcile 的实现, 其本质也就是实现 syncHandler. Kubebuilder 的底层实现, 可以参阅 深入了解kubebuilder_深入kubebuilder开发_qingwave的博客-CSDN博客, 时间关系, 在此不做展开.

项目初始化和创建 API

项目初始化

# create a project directory, and then run the init command.
mkdir project
cd project
# we'll use a domain of tutorial.kubebuilder.io,
# so all API groups will be <group>.tutorial.kubebuilder.io.
kubebuilder init --domain tutorial.kubebuilder.io --repo tutorial.kubebuilder.io/project
--repo 指定的是 module path, 如果项目目录不在 GOPATH 中, 是必须的. 这里主要看 main.go 的结构
package main

import (
    "flag"
    "fmt"
    "os"

    // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
    // to ensure that exec-entrypoint and run can make use of them.
    _ "k8s.io/client-go/plugin/pkg/client/auth"

    "k8s.io/apimachinery/pkg/runtime"
    utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    clientgoscheme "k8s.io/client-go/kubernetes/scheme"
    _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
    ctrl "sigs.k8s.io/controller-runtime"
    "sigs.k8s.io/controller-runtime/pkg/cache"
    "sigs.k8s.io/controller-runtime/pkg/healthz"
    "sigs.k8s.io/controller-runtime/pkg/log/zap"
    // +kubebuilder:scaffold:imports
)
可以看到, 核心的库就是 controller-runtine, 我们将直接操作 ctrl 完成整个 main.go 的实现.
var (
    scheme   = runtime.NewScheme()
    setupLog = ctrl.Log.WithName("setup")
)

func init() {
    utilruntime.Must(clientgoscheme.AddToScheme(scheme))

    //+kubebuilder:scaffold:scheme
}
Scheme 是什么? AddToScheme() 是将 clientgoscheme 添加到新建的空 scheme 中.
func NewScheme() *Scheme {
    s := &Scheme{
        gvkToType:                 map[schema.GroupVersionKind]reflect.Type{},
        typeToGVK:                 map[reflect.Type][]schema.GroupVersionKind{},
        unversionedTypes:          map[reflect.Type]schema.GroupVersionKind{},
        unversionedKinds:          map[string]reflect.Type{},
        fieldLabelConversionFuncs: map[schema.GroupVersionKind]FieldLabelConversionFunc{},
        defaulterFuncs:            map[reflect.Type]func(interface{}){},
        versionPriority:           map[string][]string{},
        schemeName:                naming.GetNameFromCallsite(internalPackages...),
    }
    s.converter = conversion.NewConverter(nil)

    // Enable couple default conversions by default.
    utilruntime.Must(RegisterEmbeddedConversions(s))
    utilruntime.Must(RegisterStringConversions(s))
    return s
}
GVK, 即 Group Version Kind, Scheme 提供了从 GVK 到 Go Types 的双向 Mapping. 在创建 Manager 的时候, Scheme 会作为参数传入. Group Version Kind, 用于描述 Kubernetes API, Group 是一系列相关功能的集合, 每个 Group 包含多个 Versions, 标注不同的实现版本. 每一个 Group-Version 含有一个或多个 API 类型, 每一种类型, 我们称之为一个 Kind. Kind 在不同的 Versions 要做到兼容, 把新 Kind 的参数调用老 Kind, 数据不能丢失, 更不能出错. 还有一个 Resource 的概念, Resource 是 a use of a Kind, 即对 Kind 实际上的使用. 对于 CRD, Kind 和 Resource 一一对应. 举个例子, /apis/batch/v1/jobs, 这里面 batch 是 Group, v1 是 Version, jobs 是 Resource, Job 是 Kind. 所以类似的还有一个 GVR 的概念.
func main() {
    var metricsAddr string
    var enableLeaderElection bool
    var probeAddr string
    flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
    flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
    flag.BoolVar(&enableLeaderElection, "leader-elect", false,
        "Enable leader election for controller manager. "+
            "Enabling this will ensure there is only one active controller manager.")
    opts := zap.Options{
        Development: true,
    }
    opts.BindFlags(flag.CommandLine)
    flag.Parse()

    ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))

    mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
        Scheme:                 scheme,
        MetricsBindAddress:     metricsAddr,
        Port:                   9443,
        HealthProbeBindAddress: probeAddr,
        LeaderElection:         enableLeaderElection,
        LeaderElectionID:       "80807133.tutorial.kubebuilder.io",
    })
    if err != nil {
        setupLog.Error(err, "unable to start manager")
        os.Exit(1)
    }
获取命令行参数, 指定 Logger, 创建 Manager, 如上面的架构图, Manager 会管理 Controllers, Client 和 Cache. 我们只需要把 Manager Start 起来, 它会去运行 controllers 和 webhooks. Manager 会接受一个 graceful shutdown 信号, 这在部署到 Kubernetes 时就可以支持 graceful pod termination.
    if err = (&controllers.CronJobReconciler{
        Client: mgr.GetClient(),
        Scheme: mgr.GetScheme(),
    }).SetupWithManager(mgr); err != nil {
        setupLog.Error(err, "unable to create controller", "controller", "CronJob")
        os.Exit(1)
    }

    if err = (&batchv1.CronJob{}).SetupWebhookWithManager(mgr); err != nil {
        setupLog.Error(err, "unable to create webhook", "webhook", "CronJob")
        os.Exit(1)
    }
    // +kubebuilder:scaffold:builder

    if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
        setupLog.Error(err, "unable to set up health check")
        os.Exit(1)
    }
    if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
        setupLog.Error(err, "unable to set up ready check")
        os.Exit(1)
    }

    setupLog.Info("starting manager")
    if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
        setupLog.Error(err, "problem running manager")
        os.Exit(1)
    }
}
设置 Controller, Webhook 到 Manager, 设置 healthz, readyz, 最后 mgr.Start().

创建 API

kubebuilder create api --group batch --version v1 --kind CronJob
此时的目录结构
├── Dockerfile
├── Makefile
├── PROJECT
├── README.md
├── api
│   └── v1
│       ├── cronjob_types.go
│       ├── cronjob_webhook.go
│       ├── groupversion_info.go
│       ├── webhook_suite_test.go
│       └── zz_generated.deepcopy.go
├── bin

├── config
│   ├── certmanager

│   ├── crd

│   ├── default

│   ├── manager

│   ├── prometheus

│   ├── rbac

│   ├── samples


├── controllers
│   ├── cronjob_controller.go
│   └── suite_test.go
├── cover.out
├── go.mod
├── go.sum
├── hack

└── main.go
我们只需要关注两个文件, api/v1/cronjob_types.go 和 controllers/cronjob_controller.go. api/v1/cronjob_types.go
// EDIT THIS FILE!  THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required.  Any new fields you add must have json tags for the fields to be serialized.

// CronJobSpec defines the desired state of CronJob
type CronJobSpec struct {
    // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
    // Important: Run "make" to regenerate code after modifying this file
}

// CronJobStatus defines the observed state of CronJob
type CronJobStatus struct {
    // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
    // Important: Run "make" to regenerate code after modifying this file
}
CronJobSpec 代表我们期望的状态, CronJobStatus 代表当前集群和其他对象当前实际的状态.
//+kubebuilder:object:root=true
//+kubebuilder:subresource:status

// CronJob is the Schema for the cronjobs API
type CronJob struct {
    metav1.TypeMeta   `json:",inline"`
    metav1.ObjectMeta `json:"metadata,omitempty"`

    Spec   CronJobSpec   `json:"spec,omitempty"`
    Status CronJobStatus `json:"status,omitempty"`
}

//+kubebuilder:object:root=true

// CronJobList contains a list of CronJob
type CronJobList struct {
    metav1.TypeMeta `json:",inline"`
    metav1.ListMeta `json:"metadata,omitempty"`
    Items           []CronJob `json:"items"`
}

func init() {
    SchemeBuilder.Register(&CronJob{}, &CronJobList{})
}
CronJob 和 CronJobList 结构体对应于实际的 Kind CronJob, TypeMeta 和 ObjectMeta 对应于我们熟悉的 YAML 文件的 metadata 部分, TypeMeta 描述 API Version 和 Kind, ObjectMeta 描述 name, Namespace 和 labels. CronJobList 是 CronJob 的复数形式, 在 LIST 方法中使用. //+kubebuilder:object:root=true 标明 CronJob 这个 struct 是个 Kind. 最后把 CronJob 和 CronJobList 注册到 Scheme 中, 建立从 Go Struct 到 GVK 的对应关系.   controllers/cronjob_controller.go
// CronJobReconciler reconciles a CronJob object
type CronJobReconciler struct {
    client.Client
    Scheme *runtime.Scheme
}
这里的 Client, 已经实现了读写分离, 即 GET 和 LIST, 如果 Cache 里面有则去 Cache 里面读, 否则去 kube-api server, CREATE, UPDATE, DELETE 则直接去 API Server, 具体到与 API Server 的通信, Client 主要有两个作用, 第一是建立 HTTPS 连接, 第二是处理 Kubernetes 的认证和权限. Scheme 则就是一个 Mapping.
func (r *CronJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    _ = log.FromContext(ctx)

    // 一些设置, 比如 log
    // 更新 Status
    // 根据 Spec 进行调谐

    return ctrl.Result{}, nil
}
Reconcile 的核心逻辑其实就是围绕 CornJobStatus 和 CronJobSpec 展开, 更新 Status, 根据 Spec 进行调谐, 也就是 syncHandler. 它的返回值提一下, 有四种: 1. 报错 (也会触发 requeue)
return ctrl.Result{}, err
2. 不报错
return ctrl.Result{Requeue: true}, nil
3. Reconcile 完成, 停止
return ctrl.Result{}, nil
4. 一段时间之后再次执行 Reconcile
return ctrl.Result{RequeueAfter: nextRun.Sub(r.Now()))}, nil
需要注意的是, 报错也会触发 Requeue.
func (r *MemcachedReconciler) SetupWithManager(mgr ctrl.Manager) error {
    return ctrl.NewControllerManagedBy(mgr).
        For(&cachev1alpha1.Memcached{}).
        Owns(&appsv1.Deployment{}).
        Complete(r)
}
这里就是把 Reconciler 注册到 Manager, 需要注意的是 For 和 Owns. For定义对象类型为reconciled,并配置ControllerManagedBy通过reconciled对象来响应创建/删除/更新事件。 owned定义了由 ControllerManagedBy 生成的对象类型,并配置 ControllerManagedBy 通过协调所有者对象来响应创建/删除/更新事件。

总结

本文简述了 Kubebuilder 架构, 并介绍了它所生成项目的结构, 主要的文件和功能区域划分, 建立一个大概的印象, 具体的细节在之后 Kubegres 源码解析中进行详细叙述.

标签:Kind,err,ctrl,kubegres,CronJob,源码,go,kubebuilder
From: https://www.cnblogs.com/senjougahara/p/17289092.html

相关文章

  • Zookeeper Session源码
    我们说客户端与服务端建立连接交互的时候会创建一个Session与之对应,那假设客户端请求来了,服务端是如何处理的?Session又是如何创建出来的?我们先来看第一个问题:服务端如何处理客户端发来的请求?一、如何处理请求所谓的请求全称是网络请求,涉及到网络就少不了Socket通信,ZooKeep......
  • 多任务版TCP服务端程序开发案例+源码
    Python进阶篇-系列文章全篇......
  • Android 构建工具--AAPT2源码解析(一)
    一、什么是AAPT2在Android开发过程中,我们通过Gradle命令,启动一个构建任务,最终会生成构建产物“APK”文件。常规APK的构建流程如下:(引用自Google官方文档)编译所有的资源文件,生成资源表和R文件;编译Java文件并把class文件打包为dex文件;打包资源和dex文件,生成未签名的APK文件;签名APK生成......
  • flask:cbv源码分析、模板语法、请求与响应、session及源码分析、闪现(flash)、请求扩展
    目录一、cbv源码分析1.1基于类的视图写法1.2源码分析1.3分析源码,查找不传别名的时候为什么函数名会变成别名1.4flask的路由注册使用装饰器,如果写了一个登录认证装饰器,那么应该放在路由装饰器上还是下?1.5dispatch_request讲解1.6知识点总结二、模板语法2.1py2.2html三、请......
  • Spring——springboot启动源码分析
    摘要主要介绍的有关于Spring的Spring的事务注解原理和实战(SpringFramework)一、什么是事务的传播?简单的理解就是多个事务方法相互调用时,事务如何在这些方法间传播。:举个栗子,方法A是一个事务的方法,方法A执行过程中调用了方法B,那么方法B有无事务以及方法B对事务的要求不同都会对......
  • 深入剖析 RocketMQ 源码 - 负载均衡机制
    一、引言RocketMQ是一款优秀的分布式消息中间件,在各方面的性能都比目前已有的消息队列要好,RocketMQ默认采用长轮询的拉模式,单机支持千万级别的消息堆积,可以非常好的应用在海量消息系统中。RocketMQ主要由Producer、Broker、Consumer、Namesvr等组件组成,其中Producer负责生产消......
  • cbv分析、模板、请求与响应、session及源码分析、闪现、请求扩展
    cbv分析#基于类的视图,写法fromflaskimportFlask,requestfromflask.viewsimportView,MethodViewapp=Flask(__name__)app.debug=True#视图类,继承MethodView,类中写跟请求方式同名的方法即可,之前学的所有都一致classIndexView(MethodView):defget(s......
  • SpringBoot——注解@SpringBootConfiguration源码分析
    摘要SpringMVC是一个MVC开源框架,用来代替Struts。它是Spring项目里面的一个重要组成部分,能与SpringIOC容器紧密结合,以及拥有松耦合、方便配置、代码分离等特点,让JAVA程序员开发WEB项目变得更加容易。SpringMVC的异常处理?1.web.xml中异常处理通常为了给用户提供良好......
  • Spring——spring MVC源码分析与实战
    摘要SpringWebMVC是基于ServletAPI构建的原始Web框架,基于servlet3.0的规范实现的web框架。springmvc主要实现请求的接受,请求解析,请求响应等步骤。本文将详细的介绍springMVC的源码和springMVC的启动流程与相关原理。一、传统springMVC执行流程用户发送请求至前端控制器Dispa......
  • JDK源码——集合类Iterator、 Collection类
    摘要主要是讲解这个集合的原理类相关的类。参看:https://zhuanlan.zhihu.com/p/165393520这个图由Map指向Collection的Produces并不是说Map是Collection的一个子类(子接口),这里的意思是指Map的KeySet获取到的一个视图是Collection的子接口。我们可以看到集合有两个基本接口:Map和Collec......