首页 > 其他分享 >K8S Operator的开发与使用

K8S Operator的开发与使用

时间:2023-01-18 09:46:45浏览次数:40  
标签:string err nil instance 开发 Result Operator K8S type

从应用角度考虑,为什么会出现如此多的Operator场景,为什么很多中间件和厂商都会提供基于Operator的部署方案,他的价值是什么?

随着时代的发展,企业应用部署环境从传统的物理机->虚拟机->容器->集群,现在k8s已司空见惯,作为企业应用运行基础环境的kubernates的能力也越来越多的被关注,集群的企业应用个人看来是分为几个方向

1、应用部署,使用容器的基本能力,部署应用,享受容器化以及集群管理带来的各项利好

2、Operator应用,当我们在运用集群能力部署应用时,可能需要一组资源同时部署、可能需要多个部署步骤、可能需要依赖多个对象、可能需要自定义参数及转换、可能需要监控故障、可能需要在故障时手动输入命令恢复,那么这些纯靠人工就需要好多步骤,如果开发一个Operator,通过它来管理对象的资源定义及管理,监控对象的生命周期,处理对象的异常事件,这样就很方便了,提升了组件的使用门槛和运维难度

3、集群能力提升,集群本身的能力是基于普遍场景,未结合企业应用的实际特点,故需要在集群原有能力基础上,扩展或新增能力,比如开发自己的调度器,比如构建自己的个性化弹性能力

既然要说Operator,就要说下什么是Operator,如果不了解K8S研发背景的可以百度或参考其他博文,大概就是k8s提供了各类扩展框架,只需要按照预定的规范创建即可,或者也可以借助其他开发工具,比如kubebuilder,看视频一看就懂 https://github.com/kubernetes-sigs/kubebuilder/blob/master/docs/gif/kb-demo.v2.0.1.svg

想一下,开发一个Operator需要什么? Operator就是注册了自己的逻辑到集群里,那么就要定义自己的CRD(非必需)、定义自己的监听、定义监听到对应事件之后的处理逻辑,可以先看一个最简单的实现 https://github.com/kubernetes/sample-controller

controller中注册事件监听处理器,分别处理时间AddFunc、UpdateFunc、DeleteFunc,将事件消息加入队列

消费队列处理,具体逻辑在handler里

上面是最最最简单的一个例子,其实也足够了解一般Operator定义的全过程了

CRD定义及使用在https://github.com/kubernetes/sample-controller/tree/master/artifacts/examples

好了,其实中间件提供的Operator也大同小异,这里以redis的两个operator为例看下这东西到底做了啥

首先是Redis集群,想一下,如果手工创建的核心步骤有哪些?

配置启动服务->CLUSTER MEET IP PORT互相发现->CLUSTER ADDSLOTS手动分配槽位->CLUSTER REPLICATE建立主从关系->各类故障发现及恢复

上面的步骤显而易见的繁琐,如果能通过集群自动运维是不是方便多了,于是就有了下面的这类Operator

https://github.com/ucloud/redis-cluster-operator

按文档部署,版本不匹配还报错

error: error validating "redis.kun_redisclusterbackups_crd.yaml": error validating data: [ValidationError(CustomResourceDefinition.spec): unknown field "additionalPrinterColumns" in io.k8s.apiextensions-apiserver.pkg.apis.apiextensions.v1.CustomResourceDefinitionSpec, ValidationError(CustomResourceDefinition.spec): unknown field "subresources" in io.k8s.apiextensions-apiserver.pkg.apis.apiextensions.v1.CustomResourceDefinitionSpec, ValidationError(CustomResourceDefinition.spec): unknown field "validation" in io.k8s.apiextensions-apiserver.pkg.apis.apiextensions.v1.CustomResourceDefinitionSpec]; if you choose to ignore these errors, turn validation off with --validate=false

当前版本的customresourcedefinitions版本是apiextensions.k8s.io/v1,而组件安装是基于apiVersion: apiextensions.k8s.io/v1beta1的,所以失败,怎么办?

转? kubectl-convert?不行 https://kubernetes.io/docs/tasks/tools/install-kubectl-linux/

回头到github里找issue,还真有 https://github.com/ucloud/redis-cluster-operator/issues/95,按照里面大佬给的果然就可以了

# Thus this yaml has been adapted for k8s v1.22 as per:
# https://kubernetes.io/docs/reference/using-api/deprecation-guide/#customresourcedefinition-v122
#
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: distributedredisclusters.redis.kun
spec:
  group: redis.kun
  names:
    kind: DistributedRedisCluster
    listKind: DistributedRedisClusterList
    plural: distributedredisclusters
    singular: distributedrediscluster
    shortNames:
    - drc
  scope: Namespaced
  versions:
  - name: v1alpha1
    served: true
    storage: true
    subresources:
      status: {}
    additionalPrinterColumns:
      - name: MasterSize
        jsonPath: .spec.masterSize
        description: The number of redis master node in the ensemble
        type: integer
      - name: Status
        jsonPath: .status.status
        description: The status of redis cluster
        type: string
      - name: Age
        jsonPath: .metadata.creationTimestamp
        type: date
      - name: CurrentMasters
        jsonPath: .status.numberOfMaster
        priority: 1
        description: The current master number of redis cluster
        type: integer
      - name: Images
        jsonPath: .spec.image
        priority: 1
        description: The image of redis cluster
        type: string
    schema:
      openAPIV3Schema:
        description: DistributedRedisCluster is the Schema for the distributedredisclusters API
        type: object
        properties:
          apiVersion:
            description: 'APIVersion defines the versioned schema of this representation
              of an object. Servers should convert recognized schemas to the latest
              internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#resources'
            type: string
          kind:
            description: 'Kind is a string value representing the REST resource this
              object represents. Servers may infer this from the endpoint the client
              submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#types-kinds'
            type: string
          metadata:
            type: object
          spec:
            description: DistributedRedisClusterSpec defines the desired state of DistributedRedisCluster
            type: object
            properties:
              masterSize:
                format: int32
                type: integer
                minimum: 1
                maximum: 3
              clusterReplicas:
                format: int32
                type: integer
                minimum: 1
                maximum: 3
              serviceName:
                type: string
                pattern: '[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*'
              annotations:
                additionalProperties:
                  type: string
                type: object
              config:
                additionalProperties:
                  type: string
                type: object
              passwordSecret:
                additionalProperties:
                  type: string
                type: object
              storage:
                type: object
                properties:
                  type:
                    type: string
                  size:
                    type: string
                  class:
                    type: string
                  deleteClaim:
                    type: boolean
              image:
                type: string
              securityContext:
                description: 'SecurityContext defines the security options the container should be run with'
                type: object
                properties:
                  allowPrivilegeEscalation:
                    type: boolean
                  privileged:
                    type: boolean
                  readOnlyRootFilesystem:
                    type: boolean
                  capabilities:
                    type: object
                    properties:
                      add:
                        items:
                          type: string
                        type: array
                      drop:
                        items:
                          type: string
                        type: array
                  sysctls:
                    items:
                      type: object
                      properties:
                        name:
                          type: string
                        value:
                          type: string
                      required:
                        - name
                        - value
                    type: array
              resources:
                type: object
                properties:
                  requests:
                    type: object
                    additionalProperties:
                      type: string
                  limits:
                    type: object
                    additionalProperties:
                      type: string
              toleRations:
                type: array
                items:
                  type: object
                  properties:
                    effect:
                      type: string
                    key:
                      type: string
                    operator:
                      type: string
                    tolerationSeconds:
                      type: integer
                      format: int64
                    value:
                      type: string
          status:
            description: DistributedRedisClusterStatus defines the observed state of DistributedRedisCluster
            type: object
            properties:
              numberOfMaster:
                type: integer
                format: int32
              reason:
                type: string
              status:
                type: string
              maxReplicationFactor:
                type: integer
                format: int32
              minReplicationFactor:
                type: integer
                format: int32
              status:
                type: string
              reason:
                type: string

---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: redisclusterbackups.redis.kun
spec:
  group: redis.kun
  names:
    kind: RedisClusterBackup
    listKind: RedisClusterBackupList
    plural: redisclusterbackups
    singular: redisclusterbackup
    shortNames:
      - drcb
  scope: Namespaced
  versions:
    - name: v1alpha1
      # Each version can be enabled/disabled by Served flag.
      served: true
      # One and only one version must be marked as the storage version.
      storage: true
      subresources:
        status: {}
      additionalPrinterColumns:
        - jsonPath: .metadata.creationTimestamp
          name: Age
          type: date
        - jsonPath: .status.phase
          description: The phase of redis cluster backup
          name: Phase
          type: string
      schema:
        openAPIV3Schema:
          description: RedisClusterBackup is the Schema for the redisclusterbackups
            API
          properties:
            apiVersion:
              description: 'APIVersion defines the versioned schema of this representation
                of an object. Servers should convert recognized schemas to the latest
                internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#resources'
              type: string
            kind:
              description: 'Kind is a string value representing the REST resource this
                object represents. Servers may infer this from the endpoint the client
                submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#types-kinds'
              type: string
            metadata:
              type: object
            spec:
              description: RedisClusterBackupSpec defines the desired state of RedisClusterBackup
              type: object
            status:
              description: RedisClusterBackupStatus defines the observed state of RedisClusterBackup
              type: object
          type: object
View Code

整体看下代码有啥逻辑,忽略部分,可以看到入口是main.go,进来之后新建了manager,将Schema定义和管理器注册到了K8S,启动了管理器

目录基本符合K8S的整体编码风格,cmd定义执行起点,pkg为主程序,api定义api定义描述,controller定义了具体的监听和事件处理,其他工具类便于操作k8s、便于访问redis、便于创建各类资源等

以集群控制器为例,加入了Reconciler

加入了监听逻辑对创建、更新、删除都做了监听增加了对应的处理逻辑

协调处理逻辑,获取集群定义,检查状态与预期对比,构建集群,发现异常以及自动恢复

// Reconcile reads that state of the cluster for a DistributedRedisCluster object and makes changes based on the state read
// and what is in the DistributedRedisCluster.Spec
// Note:
// The Controller will requeue the Request to be processed again if the returned error is non-nil or
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
func (r *ReconcileDistributedRedisCluster) Reconcile(request reconcile.Request) (reconcile.Result, error) {
    reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)
    reqLogger.Info("Reconciling DistributedRedisCluster")

    // Fetch the DistributedRedisCluster instance
    instance := &redisv1alpha1.DistributedRedisCluster{}
    err := r.client.Get(context.TODO(), request.NamespacedName, instance)
    if err != nil {
        if errors.IsNotFound(err) {
            return reconcile.Result{}, nil
        }
        return reconcile.Result{}, err
    }

    ctx := &syncContext{
        cluster:   instance,
        reqLogger: reqLogger,
    }

    err = r.ensureCluster(ctx)
    if err != nil {
        switch GetType(err) {
        case StopRetry:
            reqLogger.Info("invalid", "err", err)
            return reconcile.Result{}, nil
        }
        reqLogger.WithValues("err", err).Info("ensureCluster")
        newStatus := instance.Status.DeepCopy()
        SetClusterScaling(newStatus, err.Error())
        r.updateClusterIfNeed(instance, newStatus, reqLogger)
        return reconcile.Result{RequeueAfter: requeueAfter}, nil
    }

    matchLabels := getLabels(instance)
    redisClusterPods, err := r.statefulSetController.GetStatefulSetPodsByLabels(instance.Namespace, matchLabels)
    if err != nil {
        return reconcile.Result{}, Kubernetes.Wrap(err, "GetStatefulSetPods")
    }

    ctx.pods = clusterPods(redisClusterPods.Items)
    reqLogger.V(6).Info("debug cluster pods", "", ctx.pods)
    ctx.healer = clustermanger.NewHealer(&heal.CheckAndHeal{
        Logger:     reqLogger,
        PodControl: k8sutil.NewPodController(r.client),
        Pods:       ctx.pods,
        DryRun:     false,
    })
    err = r.waitPodReady(ctx)
    if err != nil {
        switch GetType(err) {
        case Kubernetes:
            return reconcile.Result{}, err
        }
        reqLogger.WithValues("err", err).Info("waitPodReady")
        newStatus := instance.Status.DeepCopy()
        SetClusterScaling(newStatus, err.Error())
        r.updateClusterIfNeed(instance, newStatus, reqLogger)
        return reconcile.Result{RequeueAfter: requeueAfter}, nil
    }

    password, err := statefulsets.GetClusterPassword(r.client, instance)
    if err != nil {
        return reconcile.Result{}, Kubernetes.Wrap(err, "getClusterPassword")
    }

    admin, err := newRedisAdmin(ctx.pods, password, config.RedisConf(), reqLogger)
    if err != nil {
        return reconcile.Result{}, Redis.Wrap(err, "newRedisAdmin")
    }
    defer admin.Close()

    clusterInfos, err := admin.GetClusterInfos()
    if err != nil {
        if clusterInfos.Status == redisutil.ClusterInfosPartial {
            return reconcile.Result{}, Redis.Wrap(err, "GetClusterInfos")
        }
    }

    requeue, err := ctx.healer.Heal(instance, clusterInfos, admin)
    if err != nil {
        return reconcile.Result{}, Redis.Wrap(err, "Heal")
    }
    if requeue {
        return reconcile.Result{RequeueAfter: requeueAfter}, nil
    }

    ctx.admin = admin
    ctx.clusterInfos = clusterInfos
    err = r.waitForClusterJoin(ctx)
    if err != nil {
        switch GetType(err) {
        case Requeue:
            reqLogger.WithValues("err", err).Info("requeue")
            return reconcile.Result{RequeueAfter: requeueAfter}, nil
        }
        newStatus := instance.Status.DeepCopy()
        SetClusterFailed(newStatus, err.Error())
        r.updateClusterIfNeed(instance, newStatus, reqLogger)
        return reconcile.Result{}, err
    }

    // mark .Status.Restore.Phase = RestorePhaseRestart, will
    // remove init container and restore volume that referenced in stateulset for
    // dump RDB file from backup, then the redis master node will be restart.
    if instance.IsRestoreFromBackup() && instance.IsRestoreRunning() {
        reqLogger.Info("update restore redis cluster cr")
        instance.Status.Restore.Phase = redisv1alpha1.RestorePhaseRestart
        if err := r.crController.UpdateCRStatus(instance); err != nil {
            return reconcile.Result{}, err
        }
        if err := r.ensurer.UpdateRedisStatefulsets(instance, getLabels(instance)); err != nil {
            return reconcile.Result{}, err
        }
        waiter := &waitStatefulSetUpdating{
            name:                  "waitMasterNodeRestarting",
            timeout:               60 * time.Second,
            tick:                  5 * time.Second,
            statefulSetController: r.statefulSetController,
            cluster:               instance,
        }
        if err := waiting(waiter, ctx.reqLogger); err != nil {
            return reconcile.Result{}, err
        }
        return reconcile.Result{Requeue: true}, nil
    }

    // restore succeeded, then update cr and wait for the next Reconcile loop
    if instance.IsRestoreFromBackup() && instance.IsRestoreRestarting() {
        reqLogger.Info("update restore redis cluster cr")
        instance.Status.Restore.Phase = redisv1alpha1.RestorePhaseSucceeded
        if err := r.crController.UpdateCRStatus(instance); err != nil {
            return reconcile.Result{}, err
        }
        // set ClusterReplicas = Backup.Status.ClusterReplicas,
        // next Reconcile loop the statefulSet's replicas will increase by ClusterReplicas, then start the slave node
        instance.Spec.ClusterReplicas = instance.Status.Restore.Backup.Status.ClusterReplicas
        if err := r.crController.UpdateCR(instance); err != nil {
            return reconcile.Result{}, err
        }
        return reconcile.Result{}, nil
    }

    if err := admin.SetConfigIfNeed(instance.Spec.Config); err != nil {
        return reconcile.Result{}, Redis.Wrap(err, "SetConfigIfNeed")
    }

    status := buildClusterStatus(clusterInfos, ctx.pods, instance, reqLogger)
    if is := r.isScalingDown(instance, reqLogger); is {
        SetClusterRebalancing(status, "scaling down")
    }
    reqLogger.V(4).Info("buildClusterStatus", "status", status)
    r.updateClusterIfNeed(instance, status, reqLogger)

    instance.Status = *status
    if needClusterOperation(instance, reqLogger) {
        reqLogger.Info(">>>>>> clustering")
        err = r.syncCluster(ctx)
        if err != nil {
            newStatus := instance.Status.DeepCopy()
            SetClusterFailed(newStatus, err.Error())
            r.updateClusterIfNeed(instance, newStatus, reqLogger)
            return reconcile.Result{}, err
        }
    }

    newClusterInfos, err := admin.GetClusterInfos()
    if err != nil {
        if clusterInfos.Status == redisutil.ClusterInfosPartial {
            return reconcile.Result{}, Redis.Wrap(err, "GetClusterInfos")
        }
    }
    newStatus := buildClusterStatus(newClusterInfos, ctx.pods, instance, reqLogger)
    SetClusterOK(newStatus, "OK")
    r.updateClusterIfNeed(instance, newStatus, reqLogger)
    return reconcile.Result{RequeueAfter: time.Duration(reconcileTime) * time.Second}, nil
}

同样,Sentinel的Operator原理也是类似,不做过多重复描述

https://github.com/ucloud/redis-operator

标签:string,err,nil,instance,开发,Result,Operator,K8S,type
From: https://www.cnblogs.com/it-worker365/p/17058369.html

相关文章