Kubernetes的Controller-manager集成了Kubernetes内置所有资源对象的控制器,而创建的CRD编写一个控制器的过程就是实现一个Operator。
Operator是CoreOS开发的、特定的应用程序控制器,用来扩展Kubernetes API。它可以创建、配置和管理复杂的有状态应用,如数据库、缓存和监控系统。Operator基于Kubernetes的资源和控制器之上构建,同时又包含了应用程序特定的领域知识。创建Operator的关键是CRD的设计和控制器的编写。
Operator的概念可以这样描述:设计一个CRD,并且为这个CRD编写控制器的过程。
虽然Kubernetes的控制器并不是万能的,但Kubernetes设计的高扩展特性Operator让自己去解决问题。
1.准备工作
在集群中创建CRD之后,开始编写控制器,Clientset只支持Kubernetes内部的资源对象操作,Informer也是如此,这时需要自己编写的CRD生成client-go客户端代码,金丝雀控制器项目的目录结构如下所示。
·manifests目录存放CRD的定义YAML文件。
·pkg有4个目录
■apis子目录下是我们需要准备的代码,待使用代码生成工具生成客户端代码。
■controllers子目录存放控制器的核心代码。
■generated和signals子目录下是使用https://github.com/kubernetes/code-generator代码生成工具生成的客户端代码。
·main.go是控制器入口。
·go.mod是依赖包。
先来看apis目录下的代码,代码路径为/pkg/apis/canarycontroller/register.go,示例如下。在GroupName字段填写CRD的分组名,代码路径为/pkg/apis/canarycontroller/v1alpha1/doc.go。
package canarycontroller const ( GroupName = "canarycontroller.tech.com" )
填写CRD分组名时重点注意types文件,定义的CRD具体字段都在types文件中,代码路径为/pkg/apis/canarycontroller/types.go,示例如下。
//+k8s:deepcopy-gen=package //+groupName=canarycontroller.tech.com package v1alpha1 //import "canary-controller/pkg/apis/canarycontroller/v1alpha1"
types文件代码如下。
package v1alpha1 import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) //+genclient //+k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object type Canary struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` Spec CanarySpec `json:"spec"` Status CanaryStatus `json:"status"` } type CanarySpec struct { Info map[string]string `json:"info"` } type CanaryStatus struct { Info map[string]string `json:"info"` } //+k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object type CanaryList struct { metav1.TypeMeta `json:",inline"` metav1.ListMeta `json:"metadata"` Items []Canary `json:"items"` }
·在types中,定义了Canary这个资源对象的实体结构,和Kubernetes内置资源一样,拥有metadata、spec、status字段。
·在Spec下定义了Info字段,用来记录金丝雀发布策略,这个字段是一个map[string]string类型,可以通过添加和减少key的方式,在开发过程中添加和减少字段,无须重新用代码生成器生成客户端代码。
·在Status下定义了Info字段,用来记录金丝雀发布过程中的发布状态,这个字段同样是一个map[string]string类型。
·Items是Canary这个资源对象的列表结构体。
register.go注册代码的路径为/pkg/apis/canarycontroller/register.go,示例如下。
package v1alpha1 import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" canarycontroller "canary-controller/pkg/apis/canarycontroller" ) var SchemeGroupVersion = schema.GroupVersion{Group: canarycontroller.GroupName, Version: "v1alpha1"} func Kind(kind string) schema.GroupKind { return SchemeGroupVersion.WithKind(kind).GroupKind() } func Resource(resource string) schema.GroupResource { return SchemeGroupVersion.WithResource(resource).GroupResource() } var ( SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes) AddToScheme = SchemeBuilder.AddToScheme ) func addKnownTypes(scheme *runtime.Scheme) error { scheme.AddKnownTypes(SchemeGroupVersion, &Canary{}, &CanaryList{}, ) metav1.AddToGroupVersion(scheme, SchemeGroupVersion) return nil }
这里在addKnowTypes()方法中添加在types.go中定义类型的指针即可。
3.生成CRD的client-go客户端代码
在Linux系统下下载代码生成工具code-generator,示例如下。
git clone https://github.com/kubernetes/code-generator
将项目canary-controller的目录以及目录apis上传到Linux服务器。上传后,Linux下目录结构示例如下。
[root@golang src]# tree
.
└── canary-controller
└── pkg
└── apis
└── canarycontroller
├── register.go
└── v1alpha1
├── doc.go
├── register.go
└── types.go
下载依赖,进入code-generator目录,运行generate-groups.sh脚本生成客户端代码如下。
cd $GOPATH/src \
&& go get -u k8s.io/apimachinery/pkg/apis/meta/v1 \
&& cd $GOPATH/src/k8s.io/code-generator \
&& ./generate-groups.sh all \
canary-controller/pkg/client \
canary-controller/pkg/apis \
canarycontroller:v1alpha1
执行成功后,新生成的代码如下。
文件:/pkg/apis/canarycontroller/v1alpha1/doc.go/zz_generated.deepcopy.go。
目录:/pkg/client。将目录/pkg/client改名为/pkg/generated即可。
4.编写金丝雀控制器的主逻辑
控制器的主要流程和逻辑比较复杂,我们先把每个方法的作用介绍一遍,再梳理整体的函数和方法执行流程。
首先是引入pkg,示例如下。
package controllers import ( "context" "encoding/json" "fmt" "math" "strconv" "strings" "time" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" appsinformers "k8s.io/client-go/informers/apps/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" appslisters "k8s.io/client-go/listers/apps/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/klog" canaryv1alpha1 "canary-controller/pkg/apis/canarycontroller/v1alpha1" clientset "canary-controller/pkg/generated/clientset/versioned" canaryscheme "canary-controller/pkg/generated/clientset/versioned/scheme" informers "canary-controller/pkg/generated/informers/externalversions/ canarycontroller/v1alpha1" listers "canary-controller/pkg/generated/listers/canarycontroller/v1alpha1" )
定义eventBroadcaster常量和controller结构体,其中eventBroadcaster就是我们通过kubectl describe资源对象获取的事件,示例如下。
const controllerAgentName = "canary-controller" const ( //注册SuccessSynced为reason事件之一,当canary同步成功时,回调显示 SuccessSynced = "Synced" //注册ErrResourceExists为reason事件之一,由于 Deployment已经存在,导致 //Canary执行sync失败,回调显示 ErrResourceExists = "ErrResourceExists"资源不属于canary-controller管理的消息 //MessageResourceExists = "Resource %q already exists and is not //managed by Canary" //注册MessageResourceSynced为事件消息,当Canary执行sync成功时,回调显示 MessageResourceSynced = "Canary synced successfully" ) //Canary控制器结构 type Controller struct { //常规Kubernetes的Clientset kubeclientset kubernetes.Interface //Canary的Clientset canaryclientset clientset.Interface deploymentsLister appslisters.DeploymentLister deploymentsSynced cache.InformerSynced canariesLister listers.CanaryLister canariesSynced cache.InformerSynced //workqueue是一个限速工作队列,确保并发下同一时间一个项只被一个工作协程处理,有重新 //进入队列并降速的功能 workqueue workqueue.RateLimitingInterface //事件记录者,记录资源对象的事件 recorder record.EventRecorder }
NewController函数代码示例如下。
//生成 canary-controller对象 func NewController( kubeclientset kubernetes.Interface, canaryclientset clientset.Interface, deploymentInformer appsinformers.DeploymentInformer, canaryInformer informers.CanaryInformer) *Controller { //创建事件广播,添加 canary-controller的类型到默认的Kubernetes scheme //这样事件日志可以打印 canary-controller的类型 utilruntime.Must(canaryscheme.AddToScheme(scheme.Scheme)) klog.V(4).Info("Creating event broadcaster") eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(klog.Infof) eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl {Interface: kubeclientset.CoreV1().Events("")}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1. EventSource{Component: controllerAgentName}) controller := &Controller{ kubeclientset: kubeclientset, canaryclientset: canaryclientset, deploymentsLister: deploymentInformer.Lister(), deploymentsSynced: deploymentInformer.Informer().HasSynced, canariesLister: canaryInformer.Lister(), canariesSynced: canaryInformer.Informer().HasSynced, workqueue: workqueue.NewNamedRateLimitingQueue(workque ue.DefaultControllerRateLimiter(), "Canarys"), recorder: recorder, } klog.Info("Setting up event handlers") //设置Canary变更的事件回调方法 canaryInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.enqueueCanary, UpdateFunc: func(old, new interface{}) { controller.enqueueCanary(new) }, }) //设置Deployment变更的事件处理方法,可以设置owner,确保只关心由Canary所管理的Deployment //送入Canary的工作队列,我们这里不做过滤,详情查看: //https://github.com/kubernetes/community/blob/8cafef897a22026d42f5e5 //bb3f104febe7e29830/contributors/devel/controllers.md deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.handleObject, UpdateFunc: func(old, new interface{}) { newDepl := new.(*appsv1.Deployment) oldDepl := old.(*appsv1.Deployment) //事件会按resync设置的时间周期发送最新的Deployment状态 //如果resourceVersion相同,说明没有变更 if newDepl.ResourceVersion == oldDepl.ResourceVersion { return } controller.handleObject(new) }, DeleteFunc: controller.handleObject, }) return controller }
NewController主要定义如下。
·生成Canary和Deployment的Clientset。
·订阅Canary的创建和更新事件,如果事件发生,将发生事件的Canary对象使用controller.enqueueCanary方法存入workqueue中。
·订阅Deployment的增删改事件,如果事件发生,将发生事件的Deployment对象交给handleObject方法处理。
Run()方法启动Controller的代码如下。
//可以设置关心的类型事件, informer caches会同步,并且启动 runWorker方法处理,stopCh关
//闭才会退出
//关闭时会停止 workerqueue,并且等待worker将任务处理完成
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { defer utilruntime.HandleCrash() defer c.workqueue.ShutDown() klog.Info("Starting Canary controller") klog.Info("Waiting for informer caches to sync") //注意Run()方法传入的 deploymentSynced和canariesSynced均为 //SharedInformerFactory //这样引入其他同一Clientset下的Informer时可以共享一份缓存,提高效率 if ok := cache.WaitForCacheSync(stopCh, c.deploymentsSynced, c.canariesSynced); !ok { return fmt.Errorf("failed to wait for caches to sync") } klog.Info("Starting workers") //启动threadiness个协程并发处理Canary for i := 0; i < threadiness; i++ { go wait.Until(c.runWorker, time.Second, stopCh) } klog.Info("Started workers") <-stopCh klog.Info("Shutting down workers") return nil } //循环调用 processNextWorkItem,持续在workqueue中读取消息并处理 func (c *Controller) runWorker() { for c.processNextWorkItem() { } }
·Run()方法会启动threadiness个协程并发的执行runWorker()方法。
·runWorker()方法实际就是循环调用c.processNextWorkItem持续在workqueue中读取消息。
下面来看processNextWorkItem的具体逻辑,示例如下。
//从workqueue中读取单个项,并且通过调用syncHandler尝试处理 func (c *Controller) processNextWorkItem() bool { startTimeNano := time.Now().UnixNano() obj, shutdown := c.workqueue.Get() if shutdown { return false } //这里为了调用defer c.workqueue.Done封装一个函数 err := func(obj interface{}) error { //workqueue.Done通知这个项已经处理完成,如果不想让这个项重新进入队列,一 //定要记得调用 workqueue.Forget,否则这个项会重新进入队列并且在下一个 //back-off的周期继续进入队列 defer c.workqueue.Done(obj) var key string var ok bool //这里取出obj对象的string,string是namespace/name,这样可以通过 //informer的cache取出更多的信息 if key, ok = obj.(string); !ok { //丢弃无效的项,否则会循环尝试处理无效的项 c.workqueue.Forget(obj) utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) return nil } //调用syncHandler,传递Canary的namespace/name if err := c.syncHandler(key); err != nil { //如果处理错误,重新放回队列并限速 c.workqueue.AddRateLimited(key) return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) } //处理成功,则调用Forget方法从队列中移除项 c.workqueue.Forget(obj) endTimeNano := time.Now().UnixNano() costTimeNano := (endTimeNano - startTimeNano) / 1e6 klog.Infof("Successfully synced '%s'\ncostTime:%v\nthe workQueue len:%d\n", key, costTimeNano, c.workqueue.Len()) return nil }(obj) if err != nil { utilruntime.HandleError(err) return true } return true }
processNextWorkItem主要做了如下事情。
·从workqueue中取出obj对象。
·从取出的obj对象中取出资源名,调用syncHandler进行状态收敛逻辑。
·处理成功,调用Forget()方法将obj对象从workqueue中移除。
·处理失败,将obj对象放回workqueue并限速。
下面我们看syncHandler的代码定义,示例如下。
//对比实际状态和期望状态,尝试收敛到期望状态并同步更新Canary的实际状态 func (c *Controller) syncHandler(key string) error { namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key)) return nil } //通过namespace/name获取Canary canary, err := c.canariesLister.Canaries(namespace).Get(name) if err != nil { //如果Canary不存在,停止处理并返回 if errors.IsNotFound(err) { utilruntime.HandleError(fmt.Errorf("canary '%s' in work queue no longer exists", key)) return nil } return err } newDeployment, err := c.unmarshalDeployment(canary, "newDeploymentYaml") if err != nil { return err } klog.Infof("Synchandle deployment is %s/%s\n", newDeployment.Namespace, newDeployment.Name) if newDeployment.Name == "" { //消化本次错误,不再进入队列 utilruntime.HandleError(fmt.Errorf("%s: deployment name must be specified", key)) return nil } var deployment *appsv1.Deployment canaryType := canary.Spec.Info["type"] switch canaryType { case "NormalDeploy": klog.Info("NormalDeploy") deployment, err = c.normalDeploy(canary, newDeployment) if err != nil { return err } case "CanaryDeploy": if canary.Spec.Info["currentBatch"] == "1" { klog.Info("canaryDeploy") deployment, err = c.firstCanaryDeploy(canary, newDeployment) if err != nil { return err } } else { deployment, err = c.notFirstCanaryDeploy(canary, newDeployment) if err != nil { return err } } case "CanaryRollback": klog.Info("RollBack") deployment, err = c.canaryRollback(canary, newDeployment) if err != nil { return err } default: klog.Info("canaryType not match!") return nil } err = c.updateCanaryStatus(canary, deployment) if err != nil { return err } c.recorder.Event(canary, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced) return nil }
这是金丝雀的核心控制逻辑,跟业务息息相关。Operator基于Kubernetes的资源和控制器概念之上进行构建,同时又包含了应用程序特定的领域知识。
我们开发的金丝雀控制器包含两种发布方式。
·NormalDeploy:原生的Deployment滚动升级。
·CanaryDeployment:金丝雀发布,将应用的Pod实例分成特定的批次,批次不大于总实例数,第一批次一定会暂停,剩下批次的用户可以决定是手动点击发布,还是控制器代替用户自动发布。
enqueueCanary()是Canary对象发生创建和更新事件后调用的方法,将对象通过Meta-NamespaceKeyFunc取出资源名并放入workqueue,示例如下。
func (c *Controller) enqueueCanary(obj interface{}) { var key string var err error if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { utilruntime.HandleError(err) return } c.workqueue.Add(key) }
handlerObject()是Deployment事件触发后的处理方法,示例如下。
func (c *Controller) handleObject(obj interface{}) { var object metav1.Object var ok bool if object, ok = obj.(metav1.Object); !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { utilruntime.HandleError(fmt.Errorf("error decoding object, invalid type")) return } object, ok = tombstone.Obj.(metav1.Object) if !ok { utilruntime.HandleError(fmt.Errorf("error decoding object tombstone, invalid type")) return } klog.Infof("Recovered deleted object '%s' from tombstone", object.GetName()) } klog.Infof("Processing object: %s\n", object.GetName()) if strings.HasPrefix(object.GetName(), "canary-") { lastIndex := strings.LastIndex(object.GetName(), "-deployment-") canaryName := object.GetName()[7:lastIndex] + "-canary" canary, err := c.canariesLister.Canaries(object.GetNamespace()). Get(canaryName) if err != nil { klog.Infof("Canary get error Ignoreing the object: %s", object.GetName()) klog.Infof("canaryName is %s, error:%s", canaryName, err) return } c.enqueueCanary(canary) } else { klog.Infof("Not prefix canary-, ignore %s/%s\n", object. GetNamespace(), object.GetName()) } //if ownerRef := metav1.GetControllerOf(object); ownerRef != nil { // if ownerRef.Kind != "Canary" { // return // } // // canary, err := c.canariesLister.Canaries(object.GetNamespace()). Get(ownerRef.Name) // if err != nil { // klog.Infof("ignoring orphaned object '%s' of foo '%s'", object.GetSelfLink(), ownerRef.Name) // return // } // // c.enqueueCanary(canary) // return //} }
handlerObject()方法的主要流程如下。
1)获取资源对象的名称,验证是否是canary开头,这是我们通过Canary生成的Deployment的名字前缀。其实这不是一个很好的鉴别方式,正确的方法是使用注释中的ownerRef来判断这个Deployment是否由Canary管控。这是一个历史原因,在上线金丝雀资源之前,很多Deployment并不是通过Canary创建的。当然,所有Deployment都发布过一次后,就改为使用ownerRef来鉴别资源是否由Canary控制器管控。
2)验证通过后,调用enqueueCanary()方法存入workqueue。
下面3个方法是金丝雀YAML文件中type字段的处理逻辑,首先是原生的滚动升级,示例如下。
func (c *Controller) normalDeploy(canary *canaryv1alpha1.Canary, newDeployment *appsv1.Deployment) (*appsv1.Deployment, error) { deployment, err := c.deploymentsLister.Deployments(canary.Namespace). Get(newDeployment.Name) //如果不存在,则创建 if errors.IsNotFound(err) { deployment, err = c.kubeclientset.AppsV1().Deployments(canary. Namespace).Create(newDeploymentWithOwner(canary, newDeployment)) klog.Infof("initDeploy return deployment: %s\n", deployment) return deployment, err } //如果Get/Create()方法出错,比如发生网络错误或其他错误,那么返回错误,将obj对象 //重新放入队列,稍后再处理 if err != nil { return deployment, err } if deployment.Spec.Template.Spec.Containers[0].Image != newDeployment. Spec.Template.Spec.Containers[0].Image || *deployment.Spec.Replicas != *newDeployment.Spec.Replicas || deployment.Spec.Template.Labels["random"] != newDeployment. Spec.Template.Labels["random"] || deployment.Spec.Template.Spec.Containers[0].Resources.Limits. Cpu().String() != newDeployment.Spec.Template.Spec.Containers[0]. Resources.Limits.Cpu().String() || deployment.Spec.Template.Spec.Containers[0].Resources.Limits. Memory().String() != newDeployment.Spec.Template.Spec. Containers[0].Resources.Limits.Memory().String() { klog.Infof("image:%s, %s, replicas:%d, %d\n", newDeployment. Spec.Template.Spec.Containers[0].Image, newDeployment. Spec.Template.Spec.Containers[0].Image, deployment.Spec.Replicas, newDeployment.Spec.Replicas) deployment, err = c.kubeclientset.AppsV1().Deployments(canary. Namespace).Update(newDeploymentWithOwner(canary, newDeployment)) klog.Info("image or replicas not equal update!") } //如果 Get/Create方法出错,比如网络错误或其他错误,那么返回错误,将obj对象重新放 //入队列,稍后再处理 if err != nil { return deployment, err } return deployment, nil }
normalDeploy()方法的主要流程如下。
1)判断集群中的Deployment是否存在,如果不存在则创建。
2)如果存在,查看当前Deployment的实例、镜像、CPU、内存配置是否和Canary中定义的相符,如果不符合,则按照Canary中定义的状态收敛并更新。
3)中间如果处理失败则返回错误,由上一层方法限速并放回workqueue。
firstCanaryDeploy()方法的定义如下。
func (c *Controller) firstCanaryDeploy(canary *canaryv1alpha1.Canary, newDeployment *appsv1.Deployment) (*appsv1.Deployment, error) { oldDeployment, err := c.unmarshalDeployment(canary, "oldDeploymentYaml") if err != nil { return oldDeployment, err } newDeploymentDesiredReplicas := *newDeployment.Spec.Replicas *newDeployment.Spec.Replicas = 1 deployment, err := c.deploymentsLister.Deployments(canary.Namespace). Get(newDeployment.Name) //如果不存在,则创建 if errors.IsNotFound(err) { deployment, err = c.kubeclientset.AppsV1().Deployments(canary. Namespace).Create(newDeploymentWithOwner(canary, newDeployment)) klog.Infof("canaryDeploy return deployment: %s\n", deployment) return deployment, err } if err != nil { return deployment, err } if *deployment.Spec.Replicas != 1 || deployment.Spec.Template.Spec.Containers[0]. Image != newDeployment.Spec.Template.Spec.Containers[0].Image { deployment, err = c.kubeclientset.AppsV1().Deployments(canary. Namespace).Update(newDeploymentWithOwner(canary, newDeployment)) klog.Info("firstCanaryDeploy: image or replicas not equal so update!") if err != nil { return deployment, err } } if deployment.Status.AvailableReplicas == 1 { *oldDeployment.Spec.Replicas = newDeploymentDesiredReplicas - 1 oldDeploymentJson, _ := json.Marshal(oldDeployment) oldDeployment, err = c.kubeclientset.AppsV1().Deployments (oldDeployment.Namespace).Patch(oldDeployment.Name, types. MergePatchType, oldDeploymentJson) if err != nil { return oldDeployment, err } } return deployment, err }
firstCanaryDeploy()是金丝雀第一次批次发版的执行方法,主要流程如下。
1)判断集群中的Deployment是否存在,如果不存在则创建。
2)如果存在,则判断当前运行中的新版本Deployment实例是否为1,如果不为1,则进行状态收敛。
3)查看旧版本Deployment的实例数,并收敛新旧版本实例数,旧版本可用实例数+老版本可用实例数=应用正常定义的总实例数。
4)实时更新status字段,上报第一次发布的状态。
notFirstCanaryDeploy()是金丝雀非首次发布批次的处理方法,示例如下。
//金丝雀非首次发布 func (c *Controller) notFirstCanaryDeploy(canary *canaryv1alpha1.Canary, newDeployment *appsv1.Deployment) (*appsv1.Deployment, error) { oldDeployment, err := c.unmarshalDeployment(canary, "oldDeploymentYaml") if err != nil { return oldDeployment, err } newDeploymentDesiredReplicas := *newDeployment.Spec.Replicas intCurrentBatch, err := strconv.Atoi(canary.Spec.Info["currentBatch"]) if err != nil { return oldDeployment, err } intTotalBatches, err := strconv.Atoi(canary.Spec.Info["totalBatches"]) if err != nil { return oldDeployment, err } currentBatch := int32(intCurrentBatch) totalBatches := int32(intTotalBatches) everyAddReplicas := int32(math.Floor(float64(newDeploymentDesiredRepli cas)/float64(totalBatches) + 0.5)) //如果只有第一批暂停,那么增加currentCount后退出,等待下次处理 if canary.Spec.Info["pauseType"] == "First" && canary.Status.Info ["batch"+canary.Spec.Info["currentBatch"]+"Status"] == "Finished" && canary.Spec.Info["currentBatch"] != canary.Spec. Info["totalBatches"] { klog.Info("pauseType First add currentBatch") canaryCopy := canary.DeepCopy() canaryCopy.Spec.Info["currentBatch"] = strconv.Itoa (int(currentBatch + 1)) _, err = c.canaryclientset.CanarycontrollerV1alpha1().Canaries (canaryCopy.Namespace).Update(context.TODO(), canaryCopy, metav1.UpdateOptions{}) if err != nil { klog.Infof("Update canary failed: %s", err) return oldDeployment, err } deployment, err := c.deploymentsLister.Deployments(canary. Namespace).Get(newDeployment.Name) return deployment, err } if totalBatches == currentBatch { *newDeployment.Spec.Replicas = newDeploymentDesiredReplicas } else { *newDeployment.Spec.Replicas = 1 + (currentBatch-1)*everyAddReplicas } deployment, err := c.deploymentsLister.Deployments(canary.Namespace). Get(newDeployment.Name) if err != nil { return deployment, err } if *deployment.Spec.Replicas != *newDeployment.Spec.Replicas || deployment.Spec.Template.Spec.Containers[0].Image != newDeployment.Spec.Template.Spec.Containers[0].Image || deployment.Spec.Template.Labels["random"] != newDeployment. Spec.Template.Labels["random"] || deployment.Spec.Template.Spec.Containers[0].Resources.Limits. Cpu().String() != newDeployment.Spec.Template.Spec. Containers[0].Resources.Limits.Cpu().String() || deployment.Spec.Template.Spec.Containers[0].Resources.Limits. Memory().String() != newDeployment.Spec.Template.Spec. Containers[0].Resources.Limits.Memory().String() { deployment, err = c.kubeclientset.AppsV1().Deployments(canary. Namespace).Update(newDeploymentWithOwner(canary, newDeployment)) klog.Info("notFirstCanaryDeployFirstPause: image or replicas not equal so update!") if err != nil { return deployment, err } } *oldDeployment.Spec.Replicas = newDeploymentDesiredReplicas - deployment. Status.AvailableReplicas if *oldDeployment.Spec.Replicas == 0 { _, err := c.deploymentsLister.Deployments(oldDeployment. Namespace).Get(oldDeployment.Name) //如果已经删除,则不处理 if errors.IsNotFound(err) { return deployment, nil } if err != nil { return deployment, err } if err := c.kubeclientset.AppsV1().Deployments(oldDeployment. Namespace).Delete(oldDeployment.Name, &metav1.DeleteOptions{}); err != nil { return deployment, err } } else { oldDeploymentJson, _ := json.Marshal(oldDeployment) oldDeployment, err = c.kubeclientset.AppsV1().Deployments (oldDeployment.Namespace).Patch(oldDeployment.Name, types. MergePatchType, oldDeploymentJson) if err != nil { return deployment, err } } return deployment, err }
notFirstCanaryDeploy()方法的逻辑是整个金丝雀控制器中最复杂的。
1)获取当前集群中新版本和老版本的实例数。
2)获取当前集群中Canary定义的发布批次和暂停策略。
3)动态、实时计算集群中新旧版本Deployment可用的实例数,按照Canary定义的发布批次和策略,动态增加新版本的实例数,减少老版本的实例数。
4)实时更新status字段,上报发布的状态。
canaryRollback()是金丝雀发布的回滚方法,示例如下。
func (c *Controller) canaryRollback(canary *canaryv1alpha1.Canary, newDeployment *appsv1.Deployment) (*appsv1.Deployment, error) { oldDeployment, err := c.unmarshalDeployment(canary, "oldDeploymentYaml") if err != nil { return oldDeployment, err } newDeploymentDesiredReplicas := *newDeployment.Spec.Replicas deployment, err := c.deploymentsLister.Deployments(canary.Namespace). Get(newDeployment.Name) //如果不存在,则创建 if errors.IsNotFound(err) { deployment, err = c.kubeclientset.AppsV1().Deployments(canary. Namespace).Create(newDeploymentWithOwner(canary, newDeployment)) klog.Infof("canaryDeploy return deployment: %s\n", deployment) return deployment, err } if err != nil { return deployment, err } if *deployment.Spec.Replicas != *newDeployment.Spec.Replicas || deployment.Spec.Template.Spec.Containers[0].Image != newDeployment.Spec.Template.Spec.Containers[0].Image || deployment.Spec.Template.Labels["random"] != newDeployment. Spec.Template.Labels["random"] || deployment.Spec.Template.Spec.Containers[0].Resources.Limits. Cpu().String() != newDeployment.Spec.Template.Spec. Containers[0].Resources.Limits.Cpu().String() || deployment.Spec.Template.Spec.Containers[0].Resources.Limits. Memory().String() != newDeployment.Spec.Template.Spec. Containers[0].Resources.Limits.Memory().String() { deployment, err = c.kubeclientset.AppsV1().Deployments(canary. Namespace).Update(newDeploymentWithOwner(canary, newDeployment)) klog.Info("canaryRollback: image or replicas not equal so update!") if err != nil { return deployment, err } } //如果是分批发布的回滚,则不处理 if oldDeployment.Name == newDeployment.Name { klog.Infof("oldDeployment.Name %s == newDeployment.Name %s, not delete oldDeployment", oldDeployment.Name, newDeployment.Name) return deployment, nil } *oldDeployment.Spec.Replicas = newDeploymentDesiredReplicas - deployment.Status.AvailableReplicas if *oldDeployment.Spec.Replicas == 0 { _, err := c.deploymentsLister.Deployments(oldDeployment. Namespace).Get(oldDeployment.Name) //如果已经删除,则不处理 if errors.IsNotFound(err) { return deployment, nil } if err != nil { return deployment, err } if err := c.kubeclientset.AppsV1().Deployments(oldDeployment. Namespace).Delete(oldDeployment.Name, &metav1. DeleteOptions{}); err != nil { return deployment, err } } else { oldDeploymentJson, _ := json.Marshal(oldDeployment) oldDeployment, err = c.kubeclientset.AppsV1().Deployments (oldDeployment.Namespace).Patch(oldDeployment.Name, types. MergePatchType, oldDeploymentJson) if err != nil { return deployment, err } } return deployment, err }
canaryRollback()方法需要考虑在各种情况下应该如何回滚,主要逻辑如下。
如果已经发布完成,回滚时需要创建老版本的Deployment,并设置replicas字段为真实实例数,这时会发生如下2种情况。
·如果是正常的滚动升级,则不做处理。因为滚动升级并不需要生成2个Deployment,只需要将newDeploymentYaml字段的Deployment回滚为老版本即可。
·如果是金丝雀发布,那么需要全程计算老版本(要回滚的版本)可用实例数,动态减少新版本(不再需要的版本)的实例数。
实时更新status字段,报告回滚状态。
syncHandler()方法在结束之前,会执行updateCanaryStatus()方法,上报当前发布的状态,示例如下。
func (c *Controller) updateCanaryStatus(canary *canaryv1alpha1.Canary, deployment *appsv1.Deployment) error { //不要修改informer获取的资源,它是只读的本地缓存 //使用深拷贝 canaryCopy := canary.DeepCopy() canaryCopy.Status.Info = make(map[string]string) canaryType := canary.Spec.Info["type"] switch canaryType { case "CanaryDeploy": if *deployment.Spec.Replicas == deployment.Status.AvailableReplicas { if canary.Spec.Info["totalBatches"] != canary.Spec. Info["currentBatch"] { canaryCopy.Status.Info["batch"+canary.Spec.Info ["currentBatch"]+"Status"] = "Finished" } else { oldDeployment, err := c.unmarshalDeployment (canary, "oldDeploymentYaml") if err != nil { return err } _, err = c.deploymentsLister.Deployments (oldDeployment.Namespace).Get (oldDeployment.Name) //如果旧的Deployment已经删除,则更新状态为完成 if err != nil { if errors.IsNotFound(err) { canaryCopy.Status.Info["batch"+ canary.Spec.Info ["currentBatch"]+"Status"] = "Finished" } else { return err } } else { canaryCopy.Status.Info["batch"+canary. Spec.Info["currentBatch"]+"Status"] = "Ing" } } } else { canaryCopy.Status.Info["batch"+canary.Spec.Info ["currentBatch"]+"Status"] = "Ing" } case "CanaryRollback": oldDeployment, err := c.unmarshalDeployment(canary, "oldDeploymentYaml") if err != nil { return err } newDeployment, err := c.unmarshalDeployment(canary, "newDeploymentYaml") if err != nil { return err } _, err = c.deploymentsLister.Deployments(oldDeployment. Namespace).Get(oldDeployment.Name) //如果旧的Deployment已经删除,则更新状态为完成 if err != nil { if errors.IsNotFound(err) { canaryCopy.Status.Info["rollbackStatus"] = "Finished" } else { return err } } else { if oldDeployment.Name == newDeployment.Name { canaryCopy.Status.Info["rollbackStatus"] = "Finished" } else { canaryCopy.Status.Info["rollbackStatus"] = "Ing" } } case "NormalDeploy": return nil default: klog.Info("canary.Spec.Info type not match!") return nil } canaryCopy.Status.Info["availableReplicas"] = strconv.Itoa(int (deployment.Status.AvailableReplicas)) return err }
updateCanaryStatus()方法针对以下3种情况进行状态上报。
·如果是金丝雀发布,实时获取当前新老版本的实例数与可用实例数,并通过发布批次和策略判断计算是否发布完成,如果发布完成,将状态更新为完成。字段的key携带批次。
·如果是回滚发布,那么判断不需要的版本是否已经删除,需要回滚的版本是否已经恢复到应用定义的实例数。
·如果是滚动发布,则不进行任何处理。
unmarshalDeployment()和newDeploymentWithOwner()方法的定义如下。
//获取canary中的Deployment func (c *Controller) unmarshalDeployment(canary *canaryv1alpha1.Canary, newOrOld string) (*appsv1.Deployment, error) { deployment := &appsv1.Deployment{} deploymentInfo := []byte(canary.Spec.Info[newOrOld]) if err := json.Unmarshal(deploymentInfo, deployment); err != nil { klog.Infof("%s/%s unmarshal failed: ", canary.Namespace, canary.Name) klog.Info(err) return deployment, err } return deployment, nil } func newDeploymentWithOwner(canary *canaryv1alpha1.Canary, deployment *appsv1. Deployment) *appsv1.Deployment { deployment.ObjectMeta.OwnerReferences = []metav1.OwnerReference{ *metav1.NewControllerRef(canary, canaryv1alpha1.SchemeGroupVersion. WithKind("Canary")), } return deployment
unmarshalDeployment()方法负责将从Canary取出来的string转换为Deployment的结构体。
newDeploymentWithOwner()方法是给创建的Deployment对象添加Ow
nerReferences属性,代表这个Deployment归金丝雀控制器管控。
整个金丝雀控制器的代码较为复杂,需要我们熟悉使用场景,对发布过程中的实例数、可用实例数、发布的类型和策略进行计算后精准控制,这种高度自定义的需求也是Kubernetes中没有内置金丝雀发布的功能而是留给用户自己去研发的原因。
标签:return,nil,err,canary,deployment,Operator,Spec From: https://www.cnblogs.com/muzinan110/p/17156619.html