从应用角度考虑,为什么会出现如此多的Operator场景,为什么很多中间件和厂商都会提供基于Operator的部署方案,他的价值是什么?
随着时代的发展,企业应用部署环境从传统的物理机->虚拟机->容器->集群,现在k8s已司空见惯,作为企业应用运行基础环境的kubernates的能力也越来越多的被关注,集群的企业应用个人看来是分为几个方向
1、应用部署,使用容器的基本能力,部署应用,享受容器化以及集群管理带来的各项利好
2、Operator应用,当我们在运用集群能力部署应用时,可能需要一组资源同时部署、可能需要多个部署步骤、可能需要依赖多个对象、可能需要自定义参数及转换、可能需要监控故障、可能需要在故障时手动输入命令恢复,那么这些纯靠人工就需要好多步骤,如果开发一个Operator,通过它来管理对象的资源定义及管理,监控对象的生命周期,处理对象的异常事件,这样就很方便了,提升了组件的使用门槛和运维难度
3、集群能力提升,集群本身的能力是基于普遍场景,未结合企业应用的实际特点,故需要在集群原有能力基础上,扩展或新增能力,比如开发自己的调度器,比如构建自己的个性化弹性能力
既然要说Operator,就要说下什么是Operator,如果不了解K8S研发背景的可以百度或参考其他博文,大概就是k8s提供了各类扩展框架,只需要按照预定的规范创建即可,或者也可以借助其他开发工具,比如kubebuilder,看视频一看就懂 https://github.com/kubernetes-sigs/kubebuilder/blob/master/docs/gif/kb-demo.v2.0.1.svg
想一下,开发一个Operator需要什么? Operator就是注册了自己的逻辑到集群里,那么就要定义自己的CRD(非必需)、定义自己的监听、定义监听到对应事件之后的处理逻辑,可以先看一个最简单的实现 https://github.com/kubernetes/sample-controller
controller中注册事件监听处理器,分别处理时间AddFunc、UpdateFunc、DeleteFunc,将事件消息加入队列
消费队列处理,具体逻辑在handler里
上面是最最最简单的一个例子,其实也足够了解一般Operator定义的全过程了
CRD定义及使用在https://github.com/kubernetes/sample-controller/tree/master/artifacts/examples
好了,其实中间件提供的Operator也大同小异,这里以redis的两个operator为例看下这东西到底做了啥
首先是Redis集群,想一下,如果手工创建的核心步骤有哪些?
配置启动服务->CLUSTER MEET IP PORT互相发现->CLUSTER ADDSLOTS手动分配槽位->CLUSTER REPLICATE建立主从关系->各类故障发现及恢复
上面的步骤显而易见的繁琐,如果能通过集群自动运维是不是方便多了,于是就有了下面的这类Operator
https://github.com/ucloud/redis-cluster-operator
按文档部署,版本不匹配还报错
error: error validating "redis.kun_redisclusterbackups_crd.yaml": error validating data: [ValidationError(CustomResourceDefinition.spec): unknown field "additionalPrinterColumns" in io.k8s.apiextensions-apiserver.pkg.apis.apiextensions.v1.CustomResourceDefinitionSpec, ValidationError(CustomResourceDefinition.spec): unknown field "subresources" in io.k8s.apiextensions-apiserver.pkg.apis.apiextensions.v1.CustomResourceDefinitionSpec, ValidationError(CustomResourceDefinition.spec): unknown field "validation" in io.k8s.apiextensions-apiserver.pkg.apis.apiextensions.v1.CustomResourceDefinitionSpec]; if you choose to ignore these errors, turn validation off with --validate=false
当前版本的customresourcedefinitions版本是apiextensions.k8s.io/v1,而组件安装是基于apiVersion: apiextensions.k8s.io/v1beta1的,所以失败,怎么办?
转? kubectl-convert?不行 https://kubernetes.io/docs/tasks/tools/install-kubectl-linux/
回头到github里找issue,还真有 https://github.com/ucloud/redis-cluster-operator/issues/95,按照里面大佬给的果然就可以了
# Thus this yaml has been adapted for k8s v1.22 as per: # https://kubernetes.io/docs/reference/using-api/deprecation-guide/#customresourcedefinition-v122 # apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: name: distributedredisclusters.redis.kun spec: group: redis.kun names: kind: DistributedRedisCluster listKind: DistributedRedisClusterList plural: distributedredisclusters singular: distributedrediscluster shortNames: - drc scope: Namespaced versions: - name: v1alpha1 served: true storage: true subresources: status: {} additionalPrinterColumns: - name: MasterSize jsonPath: .spec.masterSize description: The number of redis master node in the ensemble type: integer - name: Status jsonPath: .status.status description: The status of redis cluster type: string - name: Age jsonPath: .metadata.creationTimestamp type: date - name: CurrentMasters jsonPath: .status.numberOfMaster priority: 1 description: The current master number of redis cluster type: integer - name: Images jsonPath: .spec.image priority: 1 description: The image of redis cluster type: string schema: openAPIV3Schema: description: DistributedRedisCluster is the Schema for the distributedredisclusters API type: object properties: apiVersion: description: 'APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#resources' type: string kind: description: 'Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#types-kinds' type: string metadata: type: object spec: description: DistributedRedisClusterSpec defines the desired state of DistributedRedisCluster type: object properties: masterSize: format: int32 type: integer minimum: 1 maximum: 3 clusterReplicas: format: int32 type: integer minimum: 1 maximum: 3 serviceName: type: string pattern: '[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*' annotations: additionalProperties: type: string type: object config: additionalProperties: type: string type: object passwordSecret: additionalProperties: type: string type: object storage: type: object properties: type: type: string size: type: string class: type: string deleteClaim: type: boolean image: type: string securityContext: description: 'SecurityContext defines the security options the container should be run with' type: object properties: allowPrivilegeEscalation: type: boolean privileged: type: boolean readOnlyRootFilesystem: type: boolean capabilities: type: object properties: add: items: type: string type: array drop: items: type: string type: array sysctls: items: type: object properties: name: type: string value: type: string required: - name - value type: array resources: type: object properties: requests: type: object additionalProperties: type: string limits: type: object additionalProperties: type: string toleRations: type: array items: type: object properties: effect: type: string key: type: string operator: type: string tolerationSeconds: type: integer format: int64 value: type: string status: description: DistributedRedisClusterStatus defines the observed state of DistributedRedisCluster type: object properties: numberOfMaster: type: integer format: int32 reason: type: string status: type: string maxReplicationFactor: type: integer format: int32 minReplicationFactor: type: integer format: int32 status: type: string reason: type: string --- apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: name: redisclusterbackups.redis.kun spec: group: redis.kun names: kind: RedisClusterBackup listKind: RedisClusterBackupList plural: redisclusterbackups singular: redisclusterbackup shortNames: - drcb scope: Namespaced versions: - name: v1alpha1 # Each version can be enabled/disabled by Served flag. served: true # One and only one version must be marked as the storage version. storage: true subresources: status: {} additionalPrinterColumns: - jsonPath: .metadata.creationTimestamp name: Age type: date - jsonPath: .status.phase description: The phase of redis cluster backup name: Phase type: string schema: openAPIV3Schema: description: RedisClusterBackup is the Schema for the redisclusterbackups API properties: apiVersion: description: 'APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#resources' type: string kind: description: 'Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#types-kinds' type: string metadata: type: object spec: description: RedisClusterBackupSpec defines the desired state of RedisClusterBackup type: object status: description: RedisClusterBackupStatus defines the observed state of RedisClusterBackup type: object type: objectView Code
整体看下代码有啥逻辑,忽略部分,可以看到入口是main.go,进来之后新建了manager,将Schema定义和管理器注册到了K8S,启动了管理器
目录基本符合K8S的整体编码风格,cmd定义执行起点,pkg为主程序,api定义api定义描述,controller定义了具体的监听和事件处理,其他工具类便于操作k8s、便于访问redis、便于创建各类资源等
以集群控制器为例,加入了Reconciler
加入了监听逻辑对创建、更新、删除都做了监听增加了对应的处理逻辑
协调处理逻辑,获取集群定义,检查状态与预期对比,构建集群,发现异常以及自动恢复
// Reconcile reads that state of the cluster for a DistributedRedisCluster object and makes changes based on the state read // and what is in the DistributedRedisCluster.Spec // Note: // The Controller will requeue the Request to be processed again if the returned error is non-nil or // Result.Requeue is true, otherwise upon completion it will remove the work from the queue. func (r *ReconcileDistributedRedisCluster) Reconcile(request reconcile.Request) (reconcile.Result, error) { reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name) reqLogger.Info("Reconciling DistributedRedisCluster") // Fetch the DistributedRedisCluster instance instance := &redisv1alpha1.DistributedRedisCluster{} err := r.client.Get(context.TODO(), request.NamespacedName, instance) if err != nil { if errors.IsNotFound(err) { return reconcile.Result{}, nil } return reconcile.Result{}, err } ctx := &syncContext{ cluster: instance, reqLogger: reqLogger, } err = r.ensureCluster(ctx) if err != nil { switch GetType(err) { case StopRetry: reqLogger.Info("invalid", "err", err) return reconcile.Result{}, nil } reqLogger.WithValues("err", err).Info("ensureCluster") newStatus := instance.Status.DeepCopy() SetClusterScaling(newStatus, err.Error()) r.updateClusterIfNeed(instance, newStatus, reqLogger) return reconcile.Result{RequeueAfter: requeueAfter}, nil } matchLabels := getLabels(instance) redisClusterPods, err := r.statefulSetController.GetStatefulSetPodsByLabels(instance.Namespace, matchLabels) if err != nil { return reconcile.Result{}, Kubernetes.Wrap(err, "GetStatefulSetPods") } ctx.pods = clusterPods(redisClusterPods.Items) reqLogger.V(6).Info("debug cluster pods", "", ctx.pods) ctx.healer = clustermanger.NewHealer(&heal.CheckAndHeal{ Logger: reqLogger, PodControl: k8sutil.NewPodController(r.client), Pods: ctx.pods, DryRun: false, }) err = r.waitPodReady(ctx) if err != nil { switch GetType(err) { case Kubernetes: return reconcile.Result{}, err } reqLogger.WithValues("err", err).Info("waitPodReady") newStatus := instance.Status.DeepCopy() SetClusterScaling(newStatus, err.Error()) r.updateClusterIfNeed(instance, newStatus, reqLogger) return reconcile.Result{RequeueAfter: requeueAfter}, nil } password, err := statefulsets.GetClusterPassword(r.client, instance) if err != nil { return reconcile.Result{}, Kubernetes.Wrap(err, "getClusterPassword") } admin, err := newRedisAdmin(ctx.pods, password, config.RedisConf(), reqLogger) if err != nil { return reconcile.Result{}, Redis.Wrap(err, "newRedisAdmin") } defer admin.Close() clusterInfos, err := admin.GetClusterInfos() if err != nil { if clusterInfos.Status == redisutil.ClusterInfosPartial { return reconcile.Result{}, Redis.Wrap(err, "GetClusterInfos") } } requeue, err := ctx.healer.Heal(instance, clusterInfos, admin) if err != nil { return reconcile.Result{}, Redis.Wrap(err, "Heal") } if requeue { return reconcile.Result{RequeueAfter: requeueAfter}, nil } ctx.admin = admin ctx.clusterInfos = clusterInfos err = r.waitForClusterJoin(ctx) if err != nil { switch GetType(err) { case Requeue: reqLogger.WithValues("err", err).Info("requeue") return reconcile.Result{RequeueAfter: requeueAfter}, nil } newStatus := instance.Status.DeepCopy() SetClusterFailed(newStatus, err.Error()) r.updateClusterIfNeed(instance, newStatus, reqLogger) return reconcile.Result{}, err } // mark .Status.Restore.Phase = RestorePhaseRestart, will // remove init container and restore volume that referenced in stateulset for // dump RDB file from backup, then the redis master node will be restart. if instance.IsRestoreFromBackup() && instance.IsRestoreRunning() { reqLogger.Info("update restore redis cluster cr") instance.Status.Restore.Phase = redisv1alpha1.RestorePhaseRestart if err := r.crController.UpdateCRStatus(instance); err != nil { return reconcile.Result{}, err } if err := r.ensurer.UpdateRedisStatefulsets(instance, getLabels(instance)); err != nil { return reconcile.Result{}, err } waiter := &waitStatefulSetUpdating{ name: "waitMasterNodeRestarting", timeout: 60 * time.Second, tick: 5 * time.Second, statefulSetController: r.statefulSetController, cluster: instance, } if err := waiting(waiter, ctx.reqLogger); err != nil { return reconcile.Result{}, err } return reconcile.Result{Requeue: true}, nil } // restore succeeded, then update cr and wait for the next Reconcile loop if instance.IsRestoreFromBackup() && instance.IsRestoreRestarting() { reqLogger.Info("update restore redis cluster cr") instance.Status.Restore.Phase = redisv1alpha1.RestorePhaseSucceeded if err := r.crController.UpdateCRStatus(instance); err != nil { return reconcile.Result{}, err } // set ClusterReplicas = Backup.Status.ClusterReplicas, // next Reconcile loop the statefulSet's replicas will increase by ClusterReplicas, then start the slave node instance.Spec.ClusterReplicas = instance.Status.Restore.Backup.Status.ClusterReplicas if err := r.crController.UpdateCR(instance); err != nil { return reconcile.Result{}, err } return reconcile.Result{}, nil } if err := admin.SetConfigIfNeed(instance.Spec.Config); err != nil { return reconcile.Result{}, Redis.Wrap(err, "SetConfigIfNeed") } status := buildClusterStatus(clusterInfos, ctx.pods, instance, reqLogger) if is := r.isScalingDown(instance, reqLogger); is { SetClusterRebalancing(status, "scaling down") } reqLogger.V(4).Info("buildClusterStatus", "status", status) r.updateClusterIfNeed(instance, status, reqLogger) instance.Status = *status if needClusterOperation(instance, reqLogger) { reqLogger.Info(">>>>>> clustering") err = r.syncCluster(ctx) if err != nil { newStatus := instance.Status.DeepCopy() SetClusterFailed(newStatus, err.Error()) r.updateClusterIfNeed(instance, newStatus, reqLogger) return reconcile.Result{}, err } } newClusterInfos, err := admin.GetClusterInfos() if err != nil { if clusterInfos.Status == redisutil.ClusterInfosPartial { return reconcile.Result{}, Redis.Wrap(err, "GetClusterInfos") } } newStatus := buildClusterStatus(newClusterInfos, ctx.pods, instance, reqLogger) SetClusterOK(newStatus, "OK") r.updateClusterIfNeed(instance, newStatus, reqLogger) return reconcile.Result{RequeueAfter: time.Duration(reconcileTime) * time.Second}, nil }
同样,Sentinel的Operator原理也是类似,不做过多重复描述
https://github.com/ucloud/redis-operator
标签:string,err,nil,instance,开发,Result,Operator,K8S,type From: https://www.cnblogs.com/it-worker365/p/17058369.html