CreateSpecChecker
SpecChecker 结构体内有 resourceStates 字段, 用于与 Spec 预设值进行比较.rc.SpecChecker = checker.CreateSpecChecker(rc.KubegresContext, rc.ResourcesStates) type SpecChecker struct { kubegresContext ctx.KubegresContext resourcesStates states.ResourcesStates }
CreateCustomConfigSpecHelper
用于记录和处理 CustomConfig 与 DefaultConfig 的异同.rc.CustomConfigSpecHelper = template.CreateCustomConfigSpecHelper(rc.KubegresContext, rc.ResourcesStates) type CustomConfigSpecHelper struct { kubegresContext ctx.KubegresContext resourcesStates states.ResourcesStates }
CreateResourcesCreatorFromTemplate
加载预定义的 Template 来作为创建 Resource 的参数, 这里着重看一下 ResourceTemplateLoader.rc.ResourcesCreatorFromTemplate = template.CreateResourcesCreatorFromTemplate(rc.KubegresContext, rc.CustomConfigSpecHelper, resourceTemplateLoader) type ResourcesCreatorFromTemplate struct { kubegresContext ctx.KubegresContext customConfigSpecHelper CustomConfigSpecHelper templateFromFiles ResourceTemplateLoader }ResourceTemplateLoader 只是一个包含了 log 的结构体, 但是 Kubegres 关于 Service, StatefulSet, CronJob, ConfigMap 的 YAML 文件都由 ResourceTemplateLoader 的方法加载为对应的 Struct 结构体, 其中用到了 scheme.Codes.UniversalDeserializer().Decode.
type ResourceTemplateLoader struct { log log2.LogWrapper } func (r *ResourceTemplateLoader) LoadBaseConfigMap() (configMap core.ConfigMap, err error) { obj, err := r.decodeYaml(yaml.BaseConfigMapTemplate) ... } func (r *ResourceTemplateLoader) LoadPrimaryService() (serviceTemplate core.Service, err error) { serviceTemplate, err = r.loadService(yaml.PrimaryServiceTemplate) return serviceTemplate, err } func (r *ResourceTemplateLoader) LoadReplicaService() (serviceTemplate core.Service, err error) { return r.loadService(yaml.ReplicaServiceTemplate) } func (r *ResourceTemplateLoader) LoadPrimaryStatefulSet() (statefulSetTemplate apps.StatefulSet, err error) { return r.loadStatefulSet(yaml.PrimaryStatefulSetTemplate) } func (r *ResourceTemplateLoader) LoadReplicaStatefulSet() (statefulSetTemplate apps.StatefulSet, err error) { return r.loadStatefulSet(yaml.ReplicaStatefulSetTemplate) } func (r *ResourceTemplateLoader) LoadBackUpCronJob() (cronJob batch.CronJob, err error) { obj, err := r.decodeYaml(yaml.BackUpCronJobTemplate) ... } func (r *ResourceTemplateLoader) decodeYaml(yamlContents string) (runtime.Object, error) { decode := scheme.Codecs.UniversalDeserializer().Decode obj, _, err := decode([]byte(yamlContents), nil, nil) ... }UniversalDeserializer 可以将它能识别到的任何存储数据转换为满足 runtime.Object 的 Go 对象。举个例子, yaml.PrimaryServiceTemplate 是一个存储 Service 类型的 yaml 内容的 const 字符串. 通过 UniversalDeserializer 将它转换成了 core.Service 对象.
PrimaryServiceTemplate = `apiVersion: v1 kind: Service metadata: name: postgres-name namespace: default labels: app: postgres-name replicationRole: primary spec: clusterIP: None ports: - protocol: TCP port: 5432 selector: app: postgres-name replicationRole: primary `
addResourcesCountSpecEnforcers
Reconcile 关注的第一个点, 是各个 Resource 的数量是否与 Spec 一致, 如果不一致, 那就要进行调谐, 调谐的逻辑就封装在与资源对应的各个 Enforcer 里. 最后, 4 个 Enforcer, BaseConfigMapCount, StatefulSetCount, ServicesCount, BackupCronJobCount, 都被集中到 ResourcesCountSpecEnforcer 中.func addResourcesCountSpecEnforcers(rc *ResourcesContext) { ... rc.StatefulSetCountSpecEnforcer = resources_count_spec.CreateStatefulSetCountSpecEnforcer(rc.PrimaryDbCountSpecEnforcer, rc.ReplicaDbCountSpecEnforcer) rc.BaseConfigMapCountSpecEnforcer = resources_count_spec.CreateBaseConfigMapCountSpecEnforcer(rc.KubegresContext, rc.ResourcesStates, rc.ResourcesCreatorFromTemplate, rc.BlockingOperation) rc.ServicesCountSpecEnforcer = resources_count_spec.CreateServicesCountSpecEnforcer(rc.KubegresContext, rc.ResourcesStates, rc.ResourcesCreatorFromTemplate) rc.BackUpCronJobCountSpecEnforcer = resources_count_spec.CreateBackUpCronJobCountSpecEnforcer(rc.KubegresContext, rc.ResourcesStates, rc.ResourcesCreatorFromTemplate) rc.ResourcesCountSpecEnforcer = resources_count_spec.ResourcesCountSpecEnforcer{} rc.ResourcesCountSpecEnforcer.AddSpecEnforcer(&rc.BaseConfigMapCountSpecEnforcer) rc.ResourcesCountSpecEnforcer.AddSpecEnforcer(&rc.StatefulSetCountSpecEnforcer) rc.ResourcesCountSpecEnforcer.AddSpecEnforcer(&rc.ServicesCountSpecEnforcer) rc.ResourcesCountSpecEnforcer.AddSpecEnforcer(&rc.BackUpCronJobCountSpecEnforcer) }
addStatefulSetSpecEnforcers
处理完 Resource 数量的问题, 接下来深入到每一个部署 Postgres 实例的 StatefulSet 的各个属性中, 逐一设置 Enforcer, 比如 image, port, Storageclass, customconfig, Affinity, ... 等.func addStatefulSetSpecEnforcers(rc *ResourcesContext) { imageSpecEnforcer := statefulset_spec.CreateImageSpecEnforcer(rc.KubegresContext) portSpecEnforcer := statefulset_spec.CreatePortSpecEnforcer(rc.KubegresContext, rc.ResourcesStates) storageClassSizeSpecEnforcer := statefulset_spec.CreateStorageClassSizeSpecEnforcer(rc.KubegresContext, rc.ResourcesStates) customConfigSpecEnforcer := statefulset_spec.CreateCustomConfigSpecEnforcer(rc.CustomConfigSpecHelper) affinitySpecEnforcer := statefulset_spec.CreateAffinitySpecEnforcer(rc.KubegresContext) tolerationsSpecEnforcer := statefulset_spec.CreateTolerationsSpecEnforcer(rc.KubegresContext) resourcesSpecEnforcer := statefulset_spec.CreateResourcesSpecEnforcer(rc.KubegresContext) volumeSpecEnforcer := statefulset_spec.CreateVolumeSpecEnforcer(rc.KubegresContext) securityContextSpecEnforcer := statefulset_spec.CreateSecurityContextSpecEnforcer(rc.KubegresContext) livenessProbeSpecEnforcer := statefulset_spec.CreateLivenessProbeSpecEnforcer(rc.KubegresContext) readinessProbeSpecEnforcer := statefulset_spec.CreateReadinessProbeSpecEnforcer(rc.KubegresContext) ... }
addBlockingOperationConfigs
在此可以看到, Kubegres 究竟把什么样的操作定义为阻塞操作. 创建 OperationConfig, PrimaryDB 的部署, 故障转移之前的等待时间, 故障转移, ReplicaDb 的部署和剔除, StatefulSetSpec 的更新, StatefulSetPod 的更新, 最后是等待操作卡住的 Pod.func addBlockingOperationConfigs(rc *ResourcesContext) { rc.BlockingOperation.AddConfig(rc.BaseConfigMapCountSpecEnforcer.CreateOperationConfig()) rc.BlockingOperation.AddConfig(rc.PrimaryDbCountSpecEnforcer.CreateOperationConfigForPrimaryDbDeploying()) rc.BlockingOperation.AddConfig(rc.PrimaryToReplicaFailOver.CreateOperationConfigWaitingBeforeForFailingOver()) rc.BlockingOperation.AddConfig(rc.PrimaryToReplicaFailOver.CreateOperationConfigForFailingOver()) rc.BlockingOperation.AddConfig(rc.ReplicaDbCountSpecEnforcer.CreateOperationConfigForReplicaDbDeploying()) rc.BlockingOperation.AddConfig(rc.ReplicaDbCountSpecEnforcer.CreateOperationConfigForReplicaDbUndeploying()) rc.BlockingOperation.AddConfig(rc.AllStatefulSetsSpecEnforcer.CreateOperationConfigForStatefulSetSpecUpdating()) rc.BlockingOperation.AddConfig(rc.AllStatefulSetsSpecEnforcer.CreateOperationConfigForStatefulSetSpecPodUpdating()) rc.BlockingOperation.AddConfig(rc.AllStatefulSetsSpecEnforcer.CreateOperationConfigForStatefulSetWaitingOnStuckPod()) }至此, CreateResourcesContext 方法解析完成, 总结一下, CreateResourceContext 中, 创建了默认的 Storageclass, 对未定义的 Spec 项设置默认值, 初始化一个空的阻塞操作, 获取当前集群中各种资源的状态, 创建 SpecChecker, 创建 ResourceCreator 以备从 template 中加载资源设置, 同时增加各种 Enforcer 用于下一步比较 State 与 Spec 的差别从而进行调谐操作.
LoadActiveOperation
LoadActiveOperation, 更准确点应该是 LoadActiveOperationStates, 主要逻辑是收集 BlockingOperation 的执行状态, 进行一些判断后更新 States. 如果阻塞操作尚未完成且还没有到超时时间, 那么就等到超时后再次发起 Reconcile, 由此也可以见 BlockingOperation, 相当于阻塞了当前这个 Reconcile 循环, 直到 BlockingOperation 结束. 因为 Operation 属于 Status, Status 发生变化需要进行 Update, returnn 就封装了判断 Status 改变并自动 Update 的逻辑.// 加载活跃的阻塞操作 nbreSecondsLeftBeforeTimeOut := resourcesContext.BlockingOperation.LoadActiveOperation() // resourceContext 阻塞操作日志 resourcesContext.BlockingOperationLogger.Log() // resourceContext 资源状态日志 resourcesContext.ResourcesStatesLogger.Log() // 这里需要 Requeue, 在 nbreSecondsLeftBeforeTimeOut 后重新调用 Reconciler if nbreSecondsLeftBeforeTimeOut > 0 { resultWithRequeue := ctrl.Result{ Requeue: true, RequeueAfter: time.Duration(nbreSecondsLeftBeforeTimeOut) * time.Second, } // 调用封装后的 return return r.returnn(resultWithRequeue, nil, resourcesContext) }
LoadActiveOperation
GetBlockingOperation 返回 Status.BlockingOperation, 而设置 BlockingOperation 发生在 Enforce 阶段.func (r *BlockingOperation) LoadActiveOperation() int64 { r.activeOperation = r.kubegresContext.Status.GetBlockingOperation() r.previouslyActiveOperation = r.kubegresContext.Status.GetPreviousBlockingOperation() r.removeOperationIfNotActive() nbreSecondsLeftBeforeTimeOut := r.GetNbreSecondsLeftBeforeTimeOut() if nbreSecondsLeftBeforeTimeOut > 20 { return 20 } return nbreSecondsLeftBeforeTimeOut }
removeOperationIfNotActive
OperationId 不为空则说明是 Active 的, 如果等于 TransitionOperationStepId 说明有操作处于 Transition 状态. 理解何为 Transition Step, 可以参看 'TransitionOperationStepId = "Transition step: waiting either for the next step to start or for the operation to be removed ..."' 如果 TimeOutEpocInSeconds - time.Now().Uinx() <= 0 则说明操作已超时. 如果超时, 并且有CompleteChecker, 则只用打印一条日志记录当前的 OperationId 和 StepId. 如果未超时, 则看是否设置了 operationConfig.AfterCompletionMoveToTransitionStep, 如果为 true, 则将 activeOperation 赋值给 previouslyActiveOperation, 并修改 activeOperation.StepId = TransitionOperationStepId. 并更新 Status 中的相关记录, 但是不进行 Update. 如果为 false, 同样会将 activeOperation 赋值给 previouslyActiveOperation, 但会将 activeOperation 设置为空. 如果操作没有超时, 并且设置了 CompletionChecker 且 Checker 判断已经完成, 也会根据是否开启 Transition 去设置 activeOperation 和 previousOperation, 并更新 Status 中的值. 简单说, 如果设置了 Transition, activeOperation.StepId 会被设置为 TransitionOperationStepId, 否则就会为空.type BlockingOperation struct { kubegresContext ctx.KubegresContext configs map[string]BlockingOperationConfig activeOperation v1.KubegresBlockingOperation previouslyActiveOperation v1.KubegresBlockingOperation } func (r *BlockingOperation) removeOperationIfNotActive() { if !r.isThereActiveOperation() || r.isOperationInTransition() { return } hasOperationTimedOut := r.hasOperationTimedOut() if hasOperationTimedOut { r.activeOperation.HasTimedOut = true if !r.hasCompletionChecker() { if r.shouldOperationBeInTransition() { r.setActiveOperationInTransition(hasOperationTimedOut) } else { r.removeActiveOperation(hasOperationTimedOut) } } else { r.kubegresContext.Log.InfoEvent("BlockingOperationTimedOut", "Blocking-Operation timed-out.", "OperationId", r.activeOperation.OperationId, "StepId", r.activeOperation.StepId) } } else if r.isOperationCompletionConditionReached() { r.kubegresContext.Log.InfoEvent("BlockingOperationCompleted", "Blocking-Operation is successfully completed.", "OperationId", r.activeOperation.OperationId, "StepId", r.activeOperation.StepId) if r.shouldOperationBeInTransition() { r.setActiveOperationInTransition(hasOperationTimedOut) } else { r.removeActiveOperation(hasOperationTimedOut) } } }
GetNbreSecondsLeftBeforeTimeOut
根据 activeOperation.TimeOutEpocInSeconds 判断是否超时.CheckSpec
主要检查 Spec 的设置中是否有致命错误导致无法继续进行调谐. 首先, Primary Postgres 一旦启动到 Ready, 那么它有关存储的一些属性就不可变了, 否则会影响数据一致性. 其次, 如果相关的 Storageclass, ConfigMap 和 ENV 等未部署或未设置, 也会触发致命错误, 无法再进行调谐, 最后自定义的 PVC, PV, 挂载点等不能与已设置相同.// 检查当前状态与 Spec 的差别 specCheckResult, err := resourcesContext.SpecChecker.CheckSpec() if err != nil { return r.returnn(ctrl.Result{}, err, resourcesContext) // 发生了致命错误, 终止当前事件触发的 Reconcile loop } else if specCheckResult.HasSpecFatalError { return r.returnn(ctrl.Result{}, nil, resourcesContext) }
CheckSpec
从函数返回值来看, SpecChecker 作用主要是检查 Spec 是否有致命错误. 首先关注的是 Primary 节点, spec 是预设值, primaryStatefulSetSpec 是当前 State.type SpecCheckResult struct { HasSpecFatalError bool FatalErrorMessage string } func (r *SpecChecker) CheckSpec() (SpecCheckResult, error) { specCheckResult := SpecCheckResult{} spec := &r.kubegresContext.Kubegres.Spec primaryStatefulSet := r.getPrimaryStatefulSet() primaryStatefulSetSpec := primaryStatefulSet.StatefulSet.Spec const emptyStr = ""Ready 和 Health 的判断是相同的, 都是使用 healthz.Ping 进行判断, IsReady 是一个 bool 值, 用于存储探测结果. 首先关注方法: createErrMsgSpecCannotBeChanged, 即在 Pods 创建后, 当前的 Spec 中的一些值无法在 Reconcile 循环中按照设定的 Spec 自动进行修改, 要么保持原状, 要么手动修改.
func (r *SpecChecker) createErrMsgSpecCannotBeChanged(specName, currentValue, newValue, reason string) string { errorMsg := "In the Resources Spec the value of '" + specName + "' cannot be changed from '" + currentValue + "' to '" + newValue + "' after Pods were created. " + reason + " " + "We roll-backed Kubegres spec to the currently working value '" + currentValue + "'. " + "If you know what you are doing, you can manually update that spec in every StatefulSet of your PostgreSql cluster and then Kubegres will automatically update itself." return r.logSpecErrMsg(errorMsg) }如果 PrimaryStatefulSet 已经处于 Ready 状态, 那么它所绑定的 PVC 的挂载点, Storageclass, 容量(当前 Kubernetes 就不支持 StatefulSet 的 PVC 扩容), 乃至 PVC 本身都不可变, 否则, 记录 specCheckResult 为致命错误, 并附带错误信息返回.
if primaryStatefulSet.Pod.IsReady { primaryVolumeMount := primaryStatefulSetSpec.Template.Spec.Containers[0].VolumeMounts[0].MountPath if spec.Database.VolumeMount != primaryVolumeMount { specCheckResult.HasSpecFatalError = true specCheckResult.FatalErrorMessage = r.createErrMsgSpecCannotBeChanged("spec.database.volumeMount", primaryVolumeMount, spec.Database.VolumeMount, "Otherwise, the cluster of PostgreSql servers risk of being inconsistent.") r.kubegresContext.Kubegres.Spec.Database.VolumeMount = primaryVolumeMount r.updateKubegresSpec("spec.database.volumeMount", primaryVolumeMount) } primaryStorageClassName := primaryStatefulSetSpec.VolumeClaimTemplates[0].Spec.StorageClassName if *spec.Database.StorageClassName != *primaryStorageClassName { ... r.kubegresContext.Kubegres.Spec.Database.StorageClassName = primaryStorageClassName r.updateKubegresSpec("spec.database.storageClassName", *primaryStorageClassName) } primaryStorageSizeQuantity := primaryStatefulSetSpec.VolumeClaimTemplates[0].Spec.Resources.Requests[v1.ResourceStorage] primaryStorageSize := primaryStorageSizeQuantity.String() if spec.Database.Size != primaryStorageSize && !r.doesStorageClassAllowVolumeExpansion() { ... spec.Database.Size = primaryStorageSize r.updateKubegresSpec("spec.database.size", primaryStorageSize) // TODO: condition to remove when Kubernetes allows updating storage size in StatefulSet (see https://github.com/kubernetes/enhancements/pull/2842) } else if spec.Database.Size != primaryStorageSize { ... spec.Database.Size = primaryStorageSize r.updateKubegresSpec("spec.database.size", primaryStorageSize) } if r.hasCustomVolumeClaimTemplatesChanged(primaryStatefulSetSpec) { specCheckResult.HasSpecFatalError = true specCheckResult.FatalErrorMessage = r.logSpecErrMsg("In the Resources Spec, the array 'spec.Volume.VolumeClaimTemplates' " + "has changed. Kubernetes does not allow to update that field in StatefulSet specification. Please rollback your changes in the YAML.") } if specCheckResult.HasSpecFatalError { return specCheckResult, nil } }
如果 Storageclass 未部署, 指定数据库存储大小为 0, Custom ConfigMap 未部署, Postgres Primary 和 Replica 密码未设置, 未设置 Postgres 从节点, 未设置 Postgres Docker 镜像, 一律记为致命错误. 如果设置了备份, 却没有准备好备份的挂载点, PVC, 或者绑定的 PVC 未部署, 也记为致命错误.
if !r.dbStorageClassDeployed() { ... if spec.Database.Size == emptyStr { ... if r.isCustomConfigNotDeployed(spec) { ... if !r.doesEnvVarExist(ctx.EnvVarNameOfPostgresSuperUserPsw) { ... if !r.doesEnvVarExist(ctx.EnvVarNameOfPostgresReplicationUserPsw) { ... if *spec.Replicas <= 0 { ... if spec.Image == "" { ... if r.isBackUpConfigured(spec) { if spec.Backup.VolumeMount == emptyStr { ... if spec.Backup.PvcName == emptyStr { ... if spec.Backup.PvcName != emptyStr && !r.isBackUpPvcDeployed() { ... }新设置的 PVC, PV, 挂载点不能使用预设置的命名. 这里面用到了 IsReversedVolumeName() 方法进行判断.
func (r *KubegresContext) IsReservedVolumeName(volumeName string) bool { // postgres-db return volumeName == DatabaseVolumeName || // base-config volumeName == BaseConfigMapVolumeName || // custom-config volumeName == CustomConfigMapVolumeName || strings.Contains(volumeName, "kube-api") } reservedVolumeName := r.doCustomVolumeClaimTemplatesHaveReservedName() if reservedVolumeName != "" { ... reservedVolumeName = r.doCustomVolumesHaveReservedName() if reservedVolumeName != "" { ... reservedVolumeName = r.doCustomVolumeMountsHaveReservedName() if reservedVolumeName != "" { ... if r.doCustomVolumeMountsHaveReservedPath() { ...至此, CheckSpec 调用结束.
enforceSpec
States 已经获取, Spec 异常值也已经排除, 正式开始执行 Reconcile. 主要做两件事情, 第一, 配置各资源数量使其与 Spec 指定的数目一致, 第二, 对实际运行 Postgres 的 StatefulSet 进行调谐.return r.returnn(ctrl.Result{}, r.enforceSpec(resourcesContext), resourcesContext) func (r *KubegresReconciler) enforceSpec(resourcesContext *resources.ResourcesContext) error { err := r.enforceResourcesCountSpec(resourcesContext) if err != nil { return err } return r.enforceAllStatefulSetsSpec(resourcesContext) }
enforceResourcesCountSpec
ResourceCountSpecEnforcer 包含 4 种资源的 Spec, 分别是 BaseConfigMap, StatefulSet, Service, BackUpCronJob.func (r *KubegresReconciler) enforceResourcesCountSpec(resourcesContext *resources.ResourcesContext) error { return resourcesContext.ResourcesCountSpecEnforcer.EnforceSpec() } rc.ResourcesCountSpecEnforcer = resources_count_spec.ResourcesCountSpecEnforcer{} rc.ResourcesCountSpecEnforcer.AddSpecEnforcer(&rc.BaseConfigMapCountSpecEnforcer) rc.ResourcesCountSpecEnforcer.AddSpecEnforcer(&rc.StatefulSetCountSpecEnforcer) rc.ResourcesCountSpecEnforcer.AddSpecEnforcer(&rc.ServicesCountSpecEnforcer) rc.ResourcesCountSpecEnforcer.AddSpecEnforcer(&rc.BackUpCronJobCountSpecEnforcer)
BaseConfigMapCount
EnforceSpec 是阻塞操作, 如果当前已经有了不同的阻塞操作在运行, 那么就停止执行. 如果上一次的 BaseConfigMapSpec 操作超时了, 那么就看 BaseConfigMap 是否已经部署成功了, 如果成功部署, 那么就在删除阻塞操作中的对应项, 然后记录日志 "Base ConfigMap is available again. We can safely re-enable all features of Kubegres.", 否则, 记录日志已超时, 在 Base ConfigMap ready 以前, Kubegres 无法提供服务. 如果上一次操作未超时, 且 Base ConfigMap 已部署, 则直接返回, 因为 Base ConfigMap 只需要部署一次.func (r *BaseConfigMapCountSpecEnforcer) EnforceSpec() error { if r.blockingOperation.IsActiveOperationIdDifferentOf(operation.OperationIdBaseConfigCountSpecEnforcement) { return nil } if r.hasLastAttemptTimedOut() { if r.isBaseConfigDeployed() { r.blockingOperation.RemoveActiveOperation() r.logKubegresFeaturesAreReEnabled() } else { r.logTimedOut() return nil } } if r.isBaseConfigDeployed() { return nil } baseConfigMap, err := r.resourcesCreator.CreateBaseConfigMap() if err != nil { r.kubegresContext.Log.ErrorEvent("BaseConfigMapTemplateErr", err, "Unable to create a Base ConfigMap object from template.", "Based ConfigMap name", ctx.BaseConfigMapName) return err } return r.deployBaseConfigMap(baseConfigMap) }否则, 加载 Base ConfigMap template, 创建一个 Base ConfigMap, 然后创建一个 BaseConfigCountSpecEnforcement 的 BlockingOperation, 调用 Create 方法进行部署, 如果调用 Create 返回错误, 则删除这次 BlockingOperation, 如果成功, 则需要在下一个 Reconcile loop 继续进行 BlockingOperation 逻辑的处理. 再次强调, BlockingOperation 并不是指阻塞当前的任何代码的执行, 而是阻塞 Reconcile loop 的逻辑, 真正的操作由 Kubernetes 执行, Reconcile 只是调用 API 接口而已, 不存在阻塞的情况.
func (r *ResourcesCreatorFromTemplate) CreateBaseConfigMap() (core.ConfigMap, error) { baseConfigMap, err := r.templateFromFiles.LoadBaseConfigMap() if err != nil { return core.ConfigMap{}, err } baseConfigMap.Namespace = r.kubegresContext.Kubegres.Namespace //baseConfigMap.OwnerReferences = r.getOwnerReference() return baseConfigMap, nil } func (r *BaseConfigMapCountSpecEnforcer) deployBaseConfigMap(configMap core.ConfigMap) error { r.kubegresContext.Log.Info("Deploying Base ConfigMap", "name", configMap.Name) if err := r.activateBlockingOperationForDeployment(); err != nil { r.kubegresContext.Log.ErrorEvent("BaseConfigMapDeploymentOperationActivationErr", err, "Error while activating blocking operation for the deployment of a Base ConfigMap.", "ConfigMap name", configMap.Name) return err } if err := r.kubegresContext.Client.Create(r.kubegresContext.Ctx, &configMap); err != nil { r.kubegresContext.Log.ErrorEvent("BaseConfigMapDeploymentErr", err, "Unable to deploy Base ConfigMap.", "ConfigMap name", configMap.Name) r.blockingOperation.RemoveActiveOperation() return err } ... }
StatefulSetCount
StatefulSetCount 分为两部分, 一是 Primary DB 的数量, 二是 Replica DB 的数量.type StatefulSetCountSpecEnforcer struct { primaryDbCountSpecEnforcer statefulset.PrimaryDbCountSpecEnforcer replicaDbCountSpecEnforcer statefulset.ReplicaDbCountSpecEnforcer } func (r *StatefulSetCountSpecEnforcer) EnforceSpec() error { if err := r.enforcePrimaryDbInstance(); err != nil { return err } return r.enforceReplicaDbInstances() } func (r *StatefulSetCountSpecEnforcer) enforcePrimaryDbInstance() error { return r.primaryDbCountSpecEnforcer.Enforce() } func (r *StatefulSetCountSpecEnforcer) enforceReplicaDbInstances() error { return r.replicaDbCountSpecEnforcer.Enforce() }
primaryDbCountSpecEnforcer
如果 Status 中 EnforcedReplicas 未设置, 且已部署的 Replica 数目与 Spec 设置的相同, 则设置 EnforcedReplicas 为已部署的 Replica 数目. 如果当前存在不同的阻塞操作, 则返回. 如果 Primary db 已经就绪, 且上一次的 PrimaryCountSpecEnforcement 操作已经超时, 那么就可以删除该 BlockingOperation 操作, 并且记录日志, Kubegres 可以提供服务. 如果当前没有 Primary 节点部署, 且 Replica 节点已部署, 此时, 如果手动操作要求进行 Failover, 或者是 Primary 不存在了需要一个 Primary, 就可以进行 Failover() 操作. 如果 Primary 和 Replicas 都尚未部署, 则直接部署一个新的 PrimaryStatefulSet.func (r *PrimaryDbCountSpecEnforcer) Enforce() error { r.initialiseStatusEnforcedReplicas() if r.blockingOperation.IsActiveOperationIdDifferentOf(operation.OperationIdPrimaryDbCountSpecEnforcement) { return nil } if r.isPrimaryDbReady() && r.hasLastPrimaryCountSpecEnforcementAttemptTimedOut() { r.blockingOperation.RemoveActiveOperation() r.logKubegresFeaturesAreReEnabled() } if r.primaryToReplicaFailOver.ShouldWeFailOver() { return r.primaryToReplicaFailOver.FailOver() } else if r.shouldWeDeployNewPrimaryDb() { return r.deployNewPrimaryStatefulSet() } return nil }
Failover
同样的, 必须当前进行的是 PrimaryDbCountSpecEnforcement 阻塞操作, Failover 尝试未超时, 选择一个准备 Promote 的 Replica 节点, 如果手动指定了就用指定的, 如果未指定就按照创建顺序选择第一个 Ready 的 Replica 节点. 如果当前的阻塞操作是在 Transition 阶段, 即等待 Kubernetes 状态发生变化, 就直接删除 PrimaryStatefulSet, 等待 Promote 完成. 否则, 执行 Promote 操作.func (r *PrimaryToReplicaFailOver) FailOver() error { if r.blockingOperation.IsActiveOperationIdDifferentOf(operation.OperationIdPrimaryDbCountSpecEnforcement) { return nil } if r.hasLastFailOverAttemptTimedOut() { r.logFailoverTimedOut() return nil } var newPrimary, err = r.selectReplicaToPromote() if err != nil { return err } if !r.isWaitingBeforeStartingFailOver() { return r.waitBeforePromotingReplicaToPrimary(newPrimary) } else { return r.promoteReplicaToPrimary(newPrimary) } }
promoteReplicaToPrimary
protote 逻辑都在这里, 修改选中的 Replica 节点的标签, "replicationRole"="primary", 将 ConfigMap 挂载到 initContainer[0] 的 /tmp 目录下. 将当前活跃的 Failover 操作设置为该 StatefulSet, 然后执行 Update. Kubelet 在最终接收到指令后会执行挂载并运行 promote_replica_to_primary.sh 完成 Promote 操作.func (r *PrimaryToReplicaFailOver) promoteReplicaToPrimary(newPrimary statefulset.StatefulSetWrapper) error { newPrimary.StatefulSet.Labels["replicationRole"] = ctx.PrimaryRoleName newPrimary.StatefulSet.Spec.Template.Labels["replicationRole"] = ctx.PrimaryRoleName volumeMount := core.VolumeMount{ Name: "base-config", MountPath: "/tmp/promote_replica_to_primary.sh", SubPath: "promote_replica_to_primary.sh", } initContainer := &newPrimary.StatefulSet.Spec.Template.Spec.InitContainers[0] initContainer.VolumeMounts = append(initContainer.VolumeMounts, volumeMount) initContainer.Command = []string{"sh", "-c", "/tmp/promote_replica_to_primary.sh"} err := r.activateOperationFailingOver(newPrimary) if err != nil { r.kubegresContext.Log.ErrorEvent("FailOverOperationActivationErr", err, "Error while activating a blocking operation for the FailOver of a Primary DB.", "InstanceIndex", newPrimary.InstanceIndex) return err } r.kubegresContext.Log.InfoEvent("FailOver", "FailOver: Promoting Replica to Primary.", "Replica to promote", newPrimary.StatefulSet.Name) err2 := r.kubegresContext.Client.Update(r.kubegresContext.Ctx, &newPrimary.StatefulSet) if err2 != nil { r.kubegresContext.Log.ErrorEvent("FailOverErr", err2, "FailOver: Unable to promote Replica to Primary.", "Replica to promote", newPrimary.StatefulSet.Name) r.blockingOperation.RemoveActiveOperation() return err } return nil }
deployNewPrimaryStatefulSet
如果上一次 PrimaryCountSpec 的操作超时未完成, 则等待上一次操作完成, 直接返回. 获取当前待部署 Primary 节点的 Index, 如果是 Failover 得到的 Primary 节点或者 Primary 节点从未部署, 则返回 0, 如果绑定的 Primary PVC 不存在, 则也返回 0, 否则返回上一次建立的 Primary 的 Index. 如果得到的 InstanceIndex 为 0, 则设置 InstanceIndex 为上一次创建的 InstanceIndex + 1. 设置当前的 BlockingOperation, 根据 template 模板加载 PrimaryStatefulSet, 并调用 Create 创建资源. 修改 Status 并打印日志.func (r *PrimaryDbCountSpecEnforcer) deployNewPrimaryStatefulSet() error { if r.hasLastPrimaryCountSpecEnforcementAttemptTimedOut() { r.logDeploymentTimedOut() return nil } instanceIndex := r.getInstanceIndexIfPrimaryNeedsToBeRecreatedAndThereIsNoReplicaSetUp() if instanceIndex == 0 { instanceIndex = r.kubegresContext.Status.GetLastCreatedInstanceIndex() + 1 } if err := r.activateBlockingOperation(instanceIndex); err != nil { r.kubegresContext.Log.ErrorEvent("PrimaryStatefulSetOperationActivationErr", err, "Error while activating blocking operation for the deployment of a Primary StatefulSet.", "InstanceIndex", instanceIndex) return err } primaryStatefulSet, err := r.resourcesCreator.CreatePrimaryStatefulSet(instanceIndex) if err != nil { r.kubegresContext.Log.ErrorEvent("PrimaryStatefulSetTemplateErr", err, "Error while creating a Primary StatefulSet object from template.", "InstanceIndex", instanceIndex) r.blockingOperation.RemoveActiveOperation() return err } if err = r.kubegresContext.Client.Create(r.kubegresContext.Ctx, &primaryStatefulSet); err != nil { r.kubegresContext.Log.ErrorEvent("PrimaryStatefulSetDeploymentErr", err, "Unable to deploy Primary StatefulSet.", "Primary name", primaryStatefulSet.Name) r.blockingOperation.RemoveActiveOperation() return err } r.kubegresContext.Status.SetEnforcedReplicas(r.kubegresContext.Kubegres.Status.EnforcedReplicas + 1) if r.kubegresContext.Status.GetLastCreatedInstanceIndex() == 0 { r.kubegresContext.Status.SetLastCreatedInstanceIndex(1) } r.kubegresContext.Log.InfoEvent("PrimaryStatefulSetDeployment", "Deployed Primary StatefulSet.", "Primary name", primaryStatefulSet.Name) return nil }
ReplicaDbCountSpecEnforcer
如果当前的阻塞操作不是 ReplicaDbCount 则终止执行. 如果上一次的 ReplicaDbCount 操作超时, 则需要判断, 上一次创建的 Replica 是创建成功或失败, 通过 activeOperation.StatefulSetOperation.InstanceIndex 获取到该 StatefulSet 的 Index, 并以此去查看该 Index 的 Replica StatefulSet 的情况, 如果该 Replica 没有部署或者已经 Ready, 那么就可以进行下一步的操作, 否则, 记录日志操作超时, 需要人工介入, 执行终止. 如果 PrimaryDb 没有 Ready, 那么停止执行. 如果上一步进行了 Failover, 那么在这里重置 spec 的 failover.promotePod. 如果当前有 ReplicaDbCount 操作正在进行, 那么也停止执行. 下面的操作就是对比已部署的 Replcas 的数量和 Spec 期待的数量, 如果多部署了就缩容, 少部署了就扩容, 如果相等就检查 Ready, 对不 Ready 的也进行删除.func (r *ReplicaDbCountSpecEnforcer) Enforce() error { if r.blockingOperation.IsActiveOperationIdDifferentOf(operation.OperationIdReplicaDbCountSpecEnforcement) { return nil } if r.hasLastAttemptTimedOut() { if r.isPreviouslyFailedAttemptOnReplicaDbFixed() { r.blockingOperation.RemoveActiveOperation() r.logKubegresFeaturesAreReEnabled() } else { r.logTimedOut() return nil } } if !r.isPrimaryDbReady() { return nil } isManualFailoverRequested := r.isManualFailoverRequested() if isManualFailoverRequested { r.resetInSpecManualFailover() } if r.isReplicaOperationInProgress() { return nil } // Check if the number of deployed replicas == spec if not then deploy one nbreNewReplicaToDeploy := r.getExpectedNbreReplicasToDeploy() - r.getNbreDeployedReplicas() if nbreNewReplicaToDeploy > 0 { if r.isAutomaticFailoverDisabled() && !isManualFailoverRequested && !r.doesSpecRequireTheDeploymentOfAdditionalReplicas() { r.logAutomaticFailoverIsDisabled() return nil } return r.deployReplicaStatefulSet() } else if nbreNewReplicaToDeploy < 0 { replicaToUndeploy := r.getReplicaToUndeploy() return r.undeployReplicaStatefulSets(replicaToUndeploy) } else if nbreNewReplicaToDeploy == 0 { for _, replicaStatefulSet := range r.getDeployedReplicas() { if !replicaStatefulSet.IsReady { return r.undeployReplicaStatefulSets(replicaStatefulSet) } } } return nil }
ServicesCount
Service 就比较简单, 因为一共就两个 Service, Primary 和 Replica.func (r *ServicesCountSpecEnforcer) EnforceSpec() error { if !r.isPrimaryServiceDeployed() && r.isPrimaryDbReady() { err := r.deployPrimaryService() if err != nil { return err } } if !r.isReplicaServiceDeployed() && r.isThereReadyReplica() { err := r.deployReplicaService() if err != nil { return err } } return nil }部署的逻辑在 deployService 中实现, 也是加载 template 中的字符串为 core.Service, 然后调用 Create 创建 Service.
func (r *ServicesCountSpecEnforcer) deployService(isPrimary bool) error { primaryOrReplicaTxt := r.createLogLabel(isPrimary) service, err := r.createServiceResource(isPrimary) ... if err := r.kubegresContext.Client.Create(r.kubegresContext.Ctx, &service); err != nil { ... }
BackUpCronJobCount
如果 CronJob 已部署, 且没有变化, 则直接返回. 否则删除旧的 CronJob. 如果 BackUp 的 Schedule ("* */1 * * * ...") 没有设置, 则也直接返回. 从 template 中加载 batch.CronJob, 默认使用 BaseConfig.func (r *BackUpCronJobCountSpecEnforcer) EnforceSpec() error { if r.isCronJobDeployed() { if !r.hasSpecChanged() { return nil } err := r.deleteCronJob() if err != nil { return err } } if !r.isBackUpConfigured() { return nil } configMapNameForBackUp := r.getConfigMapNameForBackUp(r.resourcesStates.Config) cronJob, err := r.resourcesCreator.CreateBackUpCronJob(configMapNameForBackUp) if err != nil { r.kubegresContext.Log.ErrorEvent("BackUpCronJobTemplateErr", err, "Unable to create a BackUp CronJob object from template.") return err } return r.deployCronJob(cronJob) }在创建 BackUpCronJob 中, 如果只有一个节点, 则从 Primary 备份, 否则, 从 Replica 备份
func (r *ResourcesCreatorFromTemplate) CreateBackUpCronJob(configMapNameForBackUp string) (batch.CronJob, error) { ... backSourceDbHostName := r.kubegresContext.GetServiceResourceName(false) if *postgres.Spec.Replicas == 1 { backSourceDbHostName = r.kubegresContext.GetServiceResourceName(true) } backUpCronJobContainer.Env[3].Value = backSourceDbHostName return backUpCronJob, nil }最后的 deployCronJob 就是调用 Create 方法创建 CronJob, 略过.
enforceAllStatefulSetsSpec
与 CountSpec 不同, StatefulSets 的调谐需要满足两个前提条件: PrimaryDb已经就绪, 当前的阻塞操作为 StatefulSetEnforcing. 操作的对象是按照 InstanceIndex 排列的 Replicas StatefulSet 和 Primary StatefulSet. 第一步, 检查当前 StatefulSet 的 State 与 Spec 预设值的差异. 第二步, 如果上一次 SpecUpdate 操作是否超时, 且上一次 UpdateSpec 操作和当前的相同, 则报错转人工. 否则删除当前的 BlockingOperation, Kubegres 恢复可操作. 第三步, 检查 State 与 Spec 的差异, 如果有变化, 那就进行 enforceSpec 操作, 如果没有变化, 则说明可能上一次的 SpecUpdate 没有成功或者正在执行, 这时就要进行判断, 是否 Pods 按照 enforceSpec 的进行了更新, 如果出现错误或者 Pod 更新失败, 则报错返回. 这里面有大量的逻辑判断, 先不展开, 专注于各资源 enforceSpec 的逻辑.func (r *AllStatefulSetsSpecEnforcer) EnforceSpec() error { if !r.isPrimaryDbReady() { return nil } if r.blockingOperation.IsActiveOperationIdDifferentOf(operation.OperationIdStatefulSetSpecEnforcing) { return nil } for _, statefulSetWrapper := range r.getAllReverseSortedByInstanceIndex() { statefulSet := statefulSetWrapper.StatefulSet statefulSetInstanceIndex := statefulSetWrapper.InstanceIndex specDifferences := r.specsEnforcers.CheckForSpecDifferences(&statefulSet) if r.hasLastSpecUpdateAttemptTimedOut(statefulSetInstanceIndex) { if r.areNewSpecChangesSameAsFailingSpecChanges(specDifferences) { r.logSpecEnforcementTimedOut() return nil } else { r.blockingOperation.RemoveActiveOperation() r.logKubegresFeaturesAreReEnabled() } } if specDifferences.IsThereDifference() { return r.enforceSpec(statefulSet, statefulSetInstanceIndex, specDifferences) } else if r.isStatefulSetSpecUpdating(statefulSetInstanceIndex) { isPodReadyAndSpecUpdated, err := r.verifySpecEnforcementIsAppliedToPod(statefulSetWrapper, specDifferences) if err != nil || !isPodReadyAndSpecUpdated { return err } } } return nil }调谐 StatefulSet 的属性, imag, prot, Storageclass, ConfigMap, affinity, toleration, resources, volume, security, liveness, readiness, 逐一执行 EnforceSpec.
func addStatefulSetSpecEnforcers(rc *ResourcesContext) { imageSpecEnforcer := statefulset_spec.CreateImageSpecEnforcer(rc.KubegresContext) portSpecEnforcer := statefulset_spec.CreatePortSpecEnforcer(rc.KubegresContext, rc.ResourcesStates) storageClassSizeSpecEnforcer := statefulset_spec.CreateStorageClassSizeSpecEnforcer(rc.KubegresContext, rc.ResourcesStates) customConfigSpecEnforcer := statefulset_spec.CreateCustomConfigSpecEnforcer(rc.CustomConfigSpecHelper) affinitySpecEnforcer := statefulset_spec.CreateAffinitySpecEnforcer(rc.KubegresContext) tolerationsSpecEnforcer := statefulset_spec.CreateTolerationsSpecEnforcer(rc.KubegresContext) resourcesSpecEnforcer := statefulset_spec.CreateResourcesSpecEnforcer(rc.KubegresContext) volumeSpecEnforcer := statefulset_spec.CreateVolumeSpecEnforcer(rc.KubegresContext) securityContextSpecEnforcer := statefulset_spec.CreateSecurityContextSpecEnforcer(rc.KubegresContext) livenessProbeSpecEnforcer := statefulset_spec.CreateLivenessProbeSpecEnforcer(rc.KubegresContext) readinessProbeSpecEnforcer := statefulset_spec.CreateReadinessProbeSpecEnforcer(rc.KubegresContext) rc.StatefulSetsSpecsEnforcer = statefulset_spec.CreateStatefulSetsSpecsEnforcer(rc.KubegresContext) rc.StatefulSetsSpecsEnforcer.AddSpecEnforcer(&imageSpecEnforcer) rc.StatefulSetsSpecsEnforcer.AddSpecEnforcer(&portSpecEnforcer) rc.StatefulSetsSpecsEnforcer.AddSpecEnforcer(&storageClassSizeSpecEnforcer) rc.StatefulSetsSpecsEnforcer.AddSpecEnforcer(&customConfigSpecEnforcer) rc.StatefulSetsSpecsEnforcer.AddSpecEnforcer(&affinitySpecEnforcer) rc.StatefulSetsSpecsEnforcer.AddSpecEnforcer(&tolerationsSpecEnforcer) rc.StatefulSetsSpecsEnforcer.AddSpecEnforcer(&resourcesSpecEnforcer) rc.StatefulSetsSpecsEnforcer.AddSpecEnforcer(&volumeSpecEnforcer) rc.StatefulSetsSpecsEnforcer.AddSpecEnforcer(&securityContextSpecEnforcer) rc.StatefulSetsSpecsEnforcer.AddSpecEnforcer(&livenessProbeSpecEnforcer) rc.StatefulSetsSpecsEnforcer.AddSpecEnforcer(&readinessProbeSpecEnforcer) rc.AllStatefulSetsSpecEnforcer = statefulset_spec.CreateAllStatefulSetsSpecEnforcer(rc.KubegresContext, rc.ResourcesStates, rc.BlockingOperation, rc.StatefulSetsSpecsEnforcer) }
ImageSpecEnforcer
设置 Postgres 的镜像, 因为是一个 StatefulSet 对应一个 Pod, 一个 Pod 对应一个 Container, 所以只对 Containers[0] 进行设置. InitContainers[0] 用于挂载 ConfigMap 执行 promote, 所以使用相同的镜像.func (r *ImageSpecEnforcer) EnforceSpec(statefulSet *apps.StatefulSet) (wasSpecUpdated bool, err error) { statefulSet.Spec.Template.Spec.Containers[0].Image = r.kubegresContext.Kubegres.Spec.Image if len(statefulSet.Spec.Template.Spec.InitContainers) > 0 { statefulSet.Spec.Template.Spec.InitContainers[0].Image = r.kubegresContext.Kubegres.Spec.Image } return true, nil }
portSpecEnforcer
设置 Container[0].Ports[0].ContainerPort 为 Spec 指定 Port.func (r *PortSpecEnforcer) EnforceSpec(statefulSet *apps.StatefulSet) (wasSpecUpdated bool, err error) { statefulSet.Spec.Template.Spec.Containers[0].Ports[0].ContainerPort = r.kubegresContext.Kubegres.Spec.Port return true, nil }
storageClassSizeSpecEnforcer
Kubernetes 不允许直接在 StatefulSet 中修改 StorageclassSize, 所以只能修改与 StatefulSet 绑定的 PVC, 并且, PVC 修改后, 通过设置 statefulSet.Spec.Template.ObjectMeta.Labels["sizeChangedCounter"] = "1" 来标识当前 StatefulSet 需要重启, 即重新创建 Pod.func (r *StorageClassSizeSpecEnforcer) EnforceSpec(statefulSet *apps.StatefulSet) (wasSpecUpdated bool, err error) { persistentVolumeClaimName, err := r.getPersistentVolumeClaimName(statefulSet) ... persistentVolumeClaim, err := r.getPersistentVolumeClaim(persistentVolumeClaimName) ... newSize := r.kubegresContext.Kubegres.Spec.Database.Size ... persistentVolumeClaim.Spec.Resources.Requests = core.ResourceList{core.ResourceStorage: resource.MustParse(newSize)} err = r.kubegresContext.Client.Update(r.kubegresContext.Ctx, persistentVolumeClaim) ... r.updateStatefulSetToForceToRestart(statefulSet) return true, nil }
customConfigSpecEnforcer
检查并记录用户自定义的 PostgresConf, PrimaryInitScript, PgHbaConf 与默认 "postgres.conf", "primary_init_script.sh", "pg_hba.conf" 的差异, 比较的真的是路径和文件名而不是文件的内容. 注意到 configMap 是集群当前的 ConfigMap, customConfigMapVolume 是期待的 ConfigMap. 如果当前拿到的这个 configMap 尚未部署, 就只是创建了一个 ConfigMap, 但是挂载到任何的 StatefulSet 上, 且 customConfigMapVolume 不为空, 那就删除原 configMap, 因为肯定不会使用了.func (r *CustomConfigSpecEnforcer) EnforceSpec(statefulSet *apps.StatefulSet) (wasSpecUpdated bool, err error) { wasSpecUpdated, _ = r.customConfigSpecHelper.ConfigureStatefulSet(statefulSet) return wasSpecUpdated, nil } func (r *CustomConfigSpecHelper) ConfigureStatefulSet(statefulSet *v1.StatefulSet) (hasStatefulSetChanged bool, differenceDetails string) { configMap := r.resourcesStates.Config if r.updateVolumeMountNameIfChanged(configMap.ConfigLocations.PostgreConf, states.ConfigMapDataKeyPostgresConf, statefulSet) { ... if r.updateVolumeMountNameIfChanged(configMap.ConfigLocations.PrimaryInitScript, states.ConfigMapDataKeyPrimaryInitScript, statefulSet) { ... if r.updateVolumeMountNameIfChanged(configMap.ConfigLocations.PgHbaConf, states.ConfigMapDataKeyPgHbaConf, statefulSet) { ... statefulSetTemplateSpec := &statefulSet.Spec.Template.Spec customConfigMapVolume := r.getCustomConfigMapVolume(statefulSetTemplateSpec.Volumes) if configMap.IsCustomConfigDeployed { ... } else if customConfigMapVolume != nil { r.deleteCustomConfigMapVolumeIfExist(statefulSetTemplateSpec) ... }
affinitySpecEnforcer
直接更新 Affinity.func (r *AffinitySpecEnforcer) EnforceSpec(statefulSet *apps.StatefulSet) (wasSpecUpdated bool, err error) { statefulSet.Spec.Template.Spec.Affinity = r.kubegresContext.Kubegres.Spec.Scheduler.Affinity return true, nil }