首页 > 其他分享 >Operator

Operator

时间:2023-02-26 14:35:25浏览次数:49  
标签:return nil err canary deployment Operator Spec

Kubernetes的Controller-manager集成了Kubernetes内置所有资源对象的控制器,而创建的CRD编写一个控制器的过程就是实现一个Operator。

Operator是CoreOS开发的、特定的应用程序控制器,用来扩展Kubernetes API。它可以创建、配置和管理复杂的有状态应用,如数据库、缓存和监控系统。Operator基于Kubernetes的资源和控制器之上构建,同时又包含了应用程序特定的领域知识。创建Operator的关键是CRD的设计和控制器的编写。

Operator的概念可以这样描述:设计一个CRD,并且为这个CRD编写控制器的过程。

虽然Kubernetes的控制器并不是万能的,但Kubernetes设计的高扩展特性Operator让自己去解决问题。

1.准备工作

在集群中创建CRD之后,开始编写控制器,Clientset只支持Kubernetes内部的资源对象操作,Informer也是如此,这时需要自己编写的CRD生成client-go客户端代码,金丝雀控制器项目的目录结构如下所示。

·manifests目录存放CRD的定义YAML文件。

·pkg有4个目录

■apis子目录下是我们需要准备的代码,待使用代码生成工具生成客户端代码。

■controllers子目录存放控制器的核心代码。

■generated和signals子目录下是使用https://github.com/kubernetes/code-generator代码生成工具生成的客户端代码。

·main.go是控制器入口。

·go.mod是依赖包。

先来看apis目录下的代码,代码路径为/pkg/apis/canarycontroller/register.go,示例如下。在GroupName字段填写CRD的分组名,代码路径为/pkg/apis/canarycontroller/v1alpha1/doc.go。

package canarycontroller
const (
    GroupName = "canarycontroller.tech.com"
)

填写CRD分组名时重点注意types文件,定义的CRD具体字段都在types文件中,代码路径为/pkg/apis/canarycontroller/types.go,示例如下。

//+k8s:deepcopy-gen=package
//+groupName=canarycontroller.tech.com
package v1alpha1 //import "canary-controller/pkg/apis/canarycontroller/v1alpha1"

types文件代码如下。

package v1alpha1
import (
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
//+genclient
//+k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type Canary struct {
    metav1.TypeMeta    `json:",inline"`
    metav1.ObjectMeta `json:"metadata,omitempty"`
    Spec    CanarySpec    `json:"spec"`
    Status CanaryStatus `json:"status"`
}
type CanarySpec struct {
    Info map[string]string `json:"info"`
}
type CanaryStatus struct {
    Info map[string]string `json:"info"`
}
//+k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type CanaryList struct {
    metav1.TypeMeta `json:",inline"`
    metav1.ListMeta `json:"metadata"`
    Items []Canary `json:"items"`
}

·在types中,定义了Canary这个资源对象的实体结构,和Kubernetes内置资源一样,拥有metadata、spec、status字段。

·在Spec下定义了Info字段,用来记录金丝雀发布策略,这个字段是一个map[string]string类型,可以通过添加和减少key的方式,在开发过程中添加和减少字段,无须重新用代码生成器生成客户端代码。

·在Status下定义了Info字段,用来记录金丝雀发布过程中的发布状态,这个字段同样是一个map[string]string类型。

·Items是Canary这个资源对象的列表结构体。

register.go注册代码的路径为/pkg/apis/canarycontroller/register.go,示例如下。

package v1alpha1
import (
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/apimachinery/pkg/runtime/schema"
    canarycontroller "canary-controller/pkg/apis/canarycontroller"
)
var SchemeGroupVersion = schema.GroupVersion{Group: canarycontroller.GroupName, 
    Version: "v1alpha1"}
func Kind(kind string) schema.GroupKind {
    return SchemeGroupVersion.WithKind(kind).GroupKind()
}
func Resource(resource string) schema.GroupResource {
    return SchemeGroupVersion.WithResource(resource).GroupResource()
}
var (
    SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
    AddToScheme = SchemeBuilder.AddToScheme
)
func addKnownTypes(scheme *runtime.Scheme) error {
    scheme.AddKnownTypes(SchemeGroupVersion,
        &Canary{},
        &CanaryList{},
    )
    metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
    return nil
}

这里在addKnowTypes()方法中添加在types.go中定义类型的指针即可。

3.生成CRD的client-go客户端代码

在Linux系统下下载代码生成工具code-generator,示例如下。

git clone https://github.com/kubernetes/code-generator

将项目canary-controller的目录以及目录apis上传到Linux服务器。上传后,Linux下目录结构示例如下。

[root@golang src]# tree
.
└── canary-controller
    └── pkg
        └── apis
            └── canarycontroller
                ├── register.go
                └── v1alpha1
                    ├── doc.go
                    ├── register.go
                    └── types.go

下载依赖,进入code-generator目录,运行generate-groups.sh脚本生成客户端代码如下。

cd $GOPATH/src \
&& go get -u k8s.io/apimachinery/pkg/apis/meta/v1 \
&& cd $GOPATH/src/k8s.io/code-generator \
&& ./generate-groups.sh all \
canary-controller/pkg/client \
canary-controller/pkg/apis \
canarycontroller:v1alpha1

执行成功后,新生成的代码如下。

文件:/pkg/apis/canarycontroller/v1alpha1/doc.go/zz_generated.deepcopy.go。

目录:/pkg/client。将目录/pkg/client改名为/pkg/generated即可。

4.编写金丝雀控制器的主逻辑

控制器的主要流程和逻辑比较复杂,我们先把每个方法的作用介绍一遍,再梳理整体的函数和方法执行流程。

首先是引入pkg,示例如下。

package controllers
import (
        "context"
        "encoding/json"
        "fmt"
        "math"
        "strconv"
        "strings"
        "time"
        appsv1 "k8s.io/api/apps/v1"
        corev1 "k8s.io/api/core/v1"
        "k8s.io/apimachinery/pkg/api/errors"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
        "k8s.io/apimachinery/pkg/types"
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
        "k8s.io/apimachinery/pkg/util/wait"
        appsinformers "k8s.io/client-go/informers/apps/v1"
        "k8s.io/client-go/kubernetes"
        "k8s.io/client-go/kubernetes/scheme"
        typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
        appslisters "k8s.io/client-go/listers/apps/v1"
        "k8s.io/client-go/tools/cache"
        "k8s.io/client-go/tools/record"
        "k8s.io/client-go/util/workqueue"
        "k8s.io/klog"
        canaryv1alpha1 "canary-controller/pkg/apis/canarycontroller/v1alpha1"
        clientset "canary-controller/pkg/generated/clientset/versioned"
        canaryscheme "canary-controller/pkg/generated/clientset/versioned/scheme"
        informers "canary-controller/pkg/generated/informers/externalversions/
            canarycontroller/v1alpha1"
        listers "canary-controller/pkg/generated/listers/canarycontroller/v1alpha1"
)

定义eventBroadcaster常量和controller结构体,其中eventBroadcaster就是我们通过kubectl describe资源对象获取的事件,示例如下。

const controllerAgentName = "canary-controller"
const (
        //注册SuccessSynced为reason事件之一,当canary同步成功时,回调显示
        SuccessSynced = "Synced"
        //注册ErrResourceExists为reason事件之一,由于 Deployment已经存在,导致
        //Canary执行sync失败,回调显示
        ErrResourceExists = "ErrResourceExists"资源不属于canary-controller管理的消息
        //MessageResourceExists = "Resource %q already exists and is not 
        //managed by Canary"
        //注册MessageResourceSynced为事件消息,当Canary执行sync成功时,回调显示
        MessageResourceSynced = "Canary synced successfully"
)
//Canary控制器结构
type Controller struct {
        //常规Kubernetes的Clientset
        kubeclientset kubernetes.Interface
        //Canary的Clientset
        canaryclientset clientset.Interface
        deploymentsLister appslisters.DeploymentLister
        deploymentsSynced cache.InformerSynced
        canariesLister    listers.CanaryLister
        canariesSynced    cache.InformerSynced
        //workqueue是一个限速工作队列,确保并发下同一时间一个项只被一个工作协程处理,有重新
        //进入队列并降速的功能
        workqueue workqueue.RateLimitingInterface
        //事件记录者,记录资源对象的事件
        recorder record.EventRecorder
}

 

NewController函数代码示例如下。

//生成 canary-controller对象
func NewController(
        kubeclientset kubernetes.Interface,
        canaryclientset clientset.Interface,
        deploymentInformer appsinformers.DeploymentInformer,
        canaryInformer informers.CanaryInformer) *Controller {
        //创建事件广播,添加 canary-controller的类型到默认的Kubernetes scheme
        //这样事件日志可以打印 canary-controller的类型
        utilruntime.Must(canaryscheme.AddToScheme(scheme.Scheme))
        klog.V(4).Info("Creating event broadcaster")
        eventBroadcaster := record.NewBroadcaster()
        eventBroadcaster.StartLogging(klog.Infof)
        eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl
            {Interface: kubeclientset.CoreV1().Events("")})
        recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.
            EventSource{Component: controllerAgentName})
        controller := &Controller{
                kubeclientset:     kubeclientset,
                canaryclientset:   canaryclientset,
                deploymentsLister: deploymentInformer.Lister(),
                deploymentsSynced: deploymentInformer.Informer().HasSynced,
                canariesLister:    canaryInformer.Lister(),
                canariesSynced:    canaryInformer.Informer().HasSynced,
                workqueue:         workqueue.NewNamedRateLimitingQueue(workque
                    ue.DefaultControllerRateLimiter(), "Canarys"),
                recorder:          recorder,
        }
        klog.Info("Setting up event handlers")
        //设置Canary变更的事件回调方法
        canaryInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
                AddFunc: controller.enqueueCanary,
                UpdateFunc: func(old, new interface{}) {
                        controller.enqueueCanary(new)
                },
        })
        //设置Deployment变更的事件处理方法,可以设置owner,确保只关心由Canary所管理的Deployment
        //送入Canary的工作队列,我们这里不做过滤,详情查看:
        //https://github.com/kubernetes/community/blob/8cafef897a22026d42f5e5
        //bb3f104febe7e29830/contributors/devel/controllers.md
        deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
                AddFunc: controller.handleObject,
                UpdateFunc: func(old, new interface{}) {
                        newDepl := new.(*appsv1.Deployment)
                        oldDepl := old.(*appsv1.Deployment)
                        //事件会按resync设置的时间周期发送最新的Deployment状态
                        //如果resourceVersion相同,说明没有变更
                        if newDepl.ResourceVersion == oldDepl.ResourceVersion {
                                return
                        }
                        controller.handleObject(new)
                },
                DeleteFunc: controller.handleObject,
        })
        return controller
}

 

NewController主要定义如下。

·生成Canary和Deployment的Clientset。

·订阅Canary的创建和更新事件,如果事件发生,将发生事件的Canary对象使用controller.enqueueCanary方法存入workqueue中。

·订阅Deployment的增删改事件,如果事件发生,将发生事件的Deployment对象交给handleObject方法处理。

Run()方法启动Controller的代码如下。

//可以设置关心的类型事件, informer caches会同步,并且启动 runWorker方法处理,stopCh关
//闭才会退出
//关闭时会停止 workerqueue,并且等待worker将任务处理完成

func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
        defer utilruntime.HandleCrash()
        defer c.workqueue.ShutDown()
        klog.Info("Starting Canary controller")
        klog.Info("Waiting for informer caches to sync")
        //注意Run()方法传入的 deploymentSynced和canariesSynced均为 
        //SharedInformerFactory
        //这样引入其他同一Clientset下的Informer时可以共享一份缓存,提高效率
        if ok := cache.WaitForCacheSync(stopCh, c.deploymentsSynced, 
            c.canariesSynced); !ok {
                return fmt.Errorf("failed to wait for caches to sync")
        }
        klog.Info("Starting workers")
        //启动threadiness个协程并发处理Canary
        for i := 0; i < threadiness; i++ {
                go wait.Until(c.runWorker, time.Second, stopCh)
        }
        klog.Info("Started workers")
        <-stopCh
        klog.Info("Shutting down workers")
        return nil
}
//循环调用 processNextWorkItem,持续在workqueue中读取消息并处理
func (c *Controller) runWorker() {
        for c.processNextWorkItem() {
        }
}

 

·Run()方法会启动threadiness个协程并发的执行runWorker()方法。

·runWorker()方法实际就是循环调用c.processNextWorkItem持续在workqueue中读取消息。

下面来看processNextWorkItem的具体逻辑,示例如下。

//从workqueue中读取单个项,并且通过调用syncHandler尝试处理
func (c *Controller) processNextWorkItem() bool {
        startTimeNano := time.Now().UnixNano()
        obj, shutdown := c.workqueue.Get()
        if shutdown {
                return false
        }
        //这里为了调用defer c.workqueue.Done封装一个函数
        err := func(obj interface{}) error {
                //workqueue.Done通知这个项已经处理完成,如果不想让这个项重新进入队列,一
                //定要记得调用 workqueue.Forget,否则这个项会重新进入队列并且在下一个
                //back-off的周期继续进入队列
                defer c.workqueue.Done(obj)
                var key string
                var ok bool
                //这里取出obj对象的string,string是namespace/name,这样可以通过 
                //informer的cache取出更多的信息
                if key, ok = obj.(string); !ok {
                        //丢弃无效的项,否则会循环尝试处理无效的项
                        c.workqueue.Forget(obj)
                        utilruntime.HandleError(fmt.Errorf("expected string in 
                            workqueue but got %#v", obj))
                        return nil
                }
                //调用syncHandler,传递Canary的namespace/name
                if err := c.syncHandler(key); err != nil {
                        //如果处理错误,重新放回队列并限速
                        c.workqueue.AddRateLimited(key)
                        return fmt.Errorf("error syncing '%s': %s, requeuing", 
                            key, err.Error())
                }
                //处理成功,则调用Forget方法从队列中移除项
                c.workqueue.Forget(obj)
                endTimeNano := time.Now().UnixNano()
                costTimeNano := (endTimeNano - startTimeNano) / 1e6
                klog.Infof("Successfully synced '%s'\ncostTime:%v\nthe workQueue 
                    len:%d\n", key, costTimeNano, c.workqueue.Len())
                return nil
        }(obj)
        if err != nil {
                utilruntime.HandleError(err)
                return true
        }
        return true
}

 

processNextWorkItem主要做了如下事情。

·从workqueue中取出obj对象。

·从取出的obj对象中取出资源名,调用syncHandler进行状态收敛逻辑。

·处理成功,调用Forget()方法将obj对象从workqueue中移除。

·处理失败,将obj对象放回workqueue并限速。

下面我们看syncHandler的代码定义,示例如下。

//对比实际状态和期望状态,尝试收敛到期望状态并同步更新Canary的实际状态
func (c *Controller) syncHandler(key string) error {
        namespace, name, err := cache.SplitMetaNamespaceKey(key)
        if err != nil {
                utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
                return nil
        }
        //通过namespace/name获取Canary
        canary, err := c.canariesLister.Canaries(namespace).Get(name)
        if err != nil {
                //如果Canary不存在,停止处理并返回
                if errors.IsNotFound(err) {
                        utilruntime.HandleError(fmt.Errorf("canary '%s' in 
                            work queue no longer exists", key))
                        return nil
                }
                return err
        }
        newDeployment, err := c.unmarshalDeployment(canary, "newDeploymentYaml")
        if err != nil {
                return err
        }
        klog.Infof("Synchandle deployment is %s/%s\n", newDeployment.Namespace, 
            newDeployment.Name)
        if newDeployment.Name == "" {
                //消化本次错误,不再进入队列
                utilruntime.HandleError(fmt.Errorf("%s: deployment name must 
                    be specified", key))
                return nil
        }
        var deployment *appsv1.Deployment
        canaryType := canary.Spec.Info["type"]
        switch canaryType {
        case "NormalDeploy":
                klog.Info("NormalDeploy")
                deployment, err = c.normalDeploy(canary, newDeployment)
                if err != nil {
                        return err
                }
        case "CanaryDeploy":
                if canary.Spec.Info["currentBatch"] == "1" {
                        klog.Info("canaryDeploy")
                        deployment, err = c.firstCanaryDeploy(canary, newDeployment)
                        if err != nil {
                                return err
                        }
                } else {
                        deployment, err = c.notFirstCanaryDeploy(canary, 
                            newDeployment)
                        if err != nil {
                                return err
                        }

                }
        case "CanaryRollback":
                klog.Info("RollBack")
                deployment, err = c.canaryRollback(canary, newDeployment)
                if err != nil {
                        return err
                }
        default:
                klog.Info("canaryType not match!")
                return nil
        }
        err = c.updateCanaryStatus(canary, deployment)
        if err != nil {
                return err
        }
        c.recorder.Event(canary, corev1.EventTypeNormal, SuccessSynced, 
            MessageResourceSynced)
        return nil
}

这是金丝雀的核心控制逻辑,跟业务息息相关。Operator基于Kubernetes的资源和控制器概念之上进行构建,同时又包含了应用程序特定的领域知识。

我们开发的金丝雀控制器包含两种发布方式。

·NormalDeploy:原生的Deployment滚动升级。

·CanaryDeployment:金丝雀发布,将应用的Pod实例分成特定的批次,批次不大于总实例数,第一批次一定会暂停,剩下批次的用户可以决定是手动点击发布,还是控制器代替用户自动发布。

enqueueCanary()是Canary对象发生创建和更新事件后调用的方法,将对象通过Meta-NamespaceKeyFunc取出资源名并放入workqueue,示例如下。

func (c *Controller) enqueueCanary(obj interface{}) {
        var key string
        var err error
        if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
                utilruntime.HandleError(err)
                return
        }
        c.workqueue.Add(key)
}

handlerObject()是Deployment事件触发后的处理方法,示例如下。

func (c *Controller) handleObject(obj interface{}) {
        var object metav1.Object
        var ok bool
        if object, ok = obj.(metav1.Object); !ok {
                tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
                if !ok {
                        utilruntime.HandleError(fmt.Errorf("error decoding 
                            object, invalid type"))
                        return
                }
                object, ok = tombstone.Obj.(metav1.Object)
                if !ok {
                        utilruntime.HandleError(fmt.Errorf("error decoding 
                            object tombstone, invalid type"))
                        return
                }
                klog.Infof("Recovered deleted object '%s' from tombstone", 
                    object.GetName())
        }
        klog.Infof("Processing object: %s\n", object.GetName())
        if strings.HasPrefix(object.GetName(), "canary-") {
                lastIndex := strings.LastIndex(object.GetName(), "-deployment-")
                canaryName := object.GetName()[7:lastIndex] + "-canary"
                canary, err := c.canariesLister.Canaries(object.GetNamespace()).
                    Get(canaryName)
                if err != nil {
                        klog.Infof("Canary get error Ignoreing the object: %s", 
                            object.GetName())
                        klog.Infof("canaryName is %s, error:%s", canaryName, err)
                        return
                }
                c.enqueueCanary(canary)
        } else {
                klog.Infof("Not prefix canary-, ignore %s/%s\n", object.
                    GetNamespace(), object.GetName())
        }
        //if ownerRef := metav1.GetControllerOf(object); ownerRef != nil {
        //       if ownerRef.Kind != "Canary" {
        //               return
        //       }
        //
        //       canary, err := c.canariesLister.Canaries(object.GetNamespace()).
           Get(ownerRef.Name)
        //       if err != nil {
        //               klog.Infof("ignoring orphaned object '%s' of foo 
                   '%s'", object.GetSelfLink(), ownerRef.Name)
        //               return
        //       }
        //
        //       c.enqueueCanary(canary)
        //       return
        //}
}

handlerObject()方法的主要流程如下。

1)获取资源对象的名称,验证是否是canary开头,这是我们通过Canary生成的Deployment的名字前缀。其实这不是一个很好的鉴别方式,正确的方法是使用注释中的ownerRef来判断这个Deployment是否由Canary管控。这是一个历史原因,在上线金丝雀资源之前,很多Deployment并不是通过Canary创建的。当然,所有Deployment都发布过一次后,就改为使用ownerRef来鉴别资源是否由Canary控制器管控。

2)验证通过后,调用enqueueCanary()方法存入workqueue。

下面3个方法是金丝雀YAML文件中type字段的处理逻辑,首先是原生的滚动升级,示例如下。

func (c *Controller) normalDeploy(canary *canaryv1alpha1.Canary, newDeployment 
    *appsv1.Deployment) (*appsv1.Deployment, error) {
        deployment, err := c.deploymentsLister.Deployments(canary.Namespace).
            Get(newDeployment.Name)
        //如果不存在,则创建
        if errors.IsNotFound(err) {
                deployment, err = c.kubeclientset.AppsV1().Deployments(canary.
                    Namespace).Create(newDeploymentWithOwner(canary, newDeployment))
                klog.Infof("initDeploy return deployment: %s\n", deployment)
                return deployment, err
        }
        //如果Get/Create()方法出错,比如发生网络错误或其他错误,那么返回错误,将obj对象
        //重新放入队列,稍后再处理
        if err != nil {
                return deployment, err
        }
        if deployment.Spec.Template.Spec.Containers[0].Image != newDeployment.
            Spec.Template.Spec.Containers[0].Image ||
                *deployment.Spec.Replicas != *newDeployment.Spec.Replicas || 
                    deployment.Spec.Template.Labels["random"] != newDeployment.
                    Spec.Template.Labels["random"] ||
                deployment.Spec.Template.Spec.Containers[0].Resources.Limits.
                    Cpu().String() != newDeployment.Spec.Template.Spec.Containers[0].
                    Resources.Limits.Cpu().String() ||
                deployment.Spec.Template.Spec.Containers[0].Resources.Limits.
                    Memory().String() != newDeployment.Spec.Template.Spec.
                    Containers[0].Resources.Limits.Memory().String() {
                klog.Infof("image:%s, %s, replicas:%d, %d\n", newDeployment.
                    Spec.Template.Spec.Containers[0].Image, newDeployment.
                    Spec.Template.Spec.Containers[0].Image,
                        deployment.Spec.Replicas, newDeployment.Spec.Replicas)
                deployment, err = c.kubeclientset.AppsV1().Deployments(canary.
                    Namespace).Update(newDeploymentWithOwner(canary, newDeployment))
                klog.Info("image or replicas not equal update!")
        }
        //如果 Get/Create方法出错,比如网络错误或其他错误,那么返回错误,将obj对象重新放
        //入队列,稍后再处理
        if err != nil {
                return deployment, err
        }
        return deployment, nil
}

normalDeploy()方法的主要流程如下。

1)判断集群中的Deployment是否存在,如果不存在则创建。

2)如果存在,查看当前Deployment的实例、镜像、CPU、内存配置是否和Canary中定义的相符,如果不符合,则按照Canary中定义的状态收敛并更新。

3)中间如果处理失败则返回错误,由上一层方法限速并放回workqueue。

firstCanaryDeploy()方法的定义如下。

func (c *Controller) firstCanaryDeploy(canary *canaryv1alpha1.Canary, newDeployment 
    *appsv1.Deployment) (*appsv1.Deployment, error) {
        oldDeployment, err := c.unmarshalDeployment(canary, "oldDeploymentYaml")
        if err != nil {
                return oldDeployment, err
        }
        newDeploymentDesiredReplicas := *newDeployment.Spec.Replicas
        *newDeployment.Spec.Replicas = 1
        deployment, err := c.deploymentsLister.Deployments(canary.Namespace).
            Get(newDeployment.Name)
        //如果不存在,则创建
        if errors.IsNotFound(err) {
                deployment, err = c.kubeclientset.AppsV1().Deployments(canary.
                    Namespace).Create(newDeploymentWithOwner(canary, newDeployment))
                klog.Infof("canaryDeploy return deployment: %s\n", deployment)

                return deployment, err
        }
        if err != nil {
                return deployment, err
        }
        if *deployment.Spec.Replicas != 1 || deployment.Spec.Template.Spec.Containers[0].
            Image != newDeployment.Spec.Template.Spec.Containers[0].Image {
                deployment, err = c.kubeclientset.AppsV1().Deployments(canary.
                    Namespace).Update(newDeploymentWithOwner(canary, newDeployment))
                klog.Info("firstCanaryDeploy: image or replicas not equal so update!")
                if err != nil {
                        return deployment, err
                }
        }
        if deployment.Status.AvailableReplicas == 1 {
                *oldDeployment.Spec.Replicas = newDeploymentDesiredReplicas - 1
                oldDeploymentJson, _ := json.Marshal(oldDeployment)
                oldDeployment, err = c.kubeclientset.AppsV1().Deployments
                    (oldDeployment.Namespace).Patch(oldDeployment.Name, types.
                    MergePatchType, oldDeploymentJson)
                if err != nil {
                        return oldDeployment, err
                }
        }
        return deployment, err
}

firstCanaryDeploy()是金丝雀第一次批次发版的执行方法,主要流程如下。

1)判断集群中的Deployment是否存在,如果不存在则创建。

2)如果存在,则判断当前运行中的新版本Deployment实例是否为1,如果不为1,则进行状态收敛。

3)查看旧版本Deployment的实例数,并收敛新旧版本实例数,旧版本可用实例数+老版本可用实例数=应用正常定义的总实例数。

4)实时更新status字段,上报第一次发布的状态。

notFirstCanaryDeploy()是金丝雀非首次发布批次的处理方法,示例如下。

//金丝雀非首次发布
func (c *Controller) notFirstCanaryDeploy(canary *canaryv1alpha1.Canary, 
    newDeployment *appsv1.Deployment) (*appsv1.Deployment, error) {
        oldDeployment, err := c.unmarshalDeployment(canary, "oldDeploymentYaml")
        if err != nil {
                return oldDeployment, err
        }
        newDeploymentDesiredReplicas := *newDeployment.Spec.Replicas
        intCurrentBatch, err := strconv.Atoi(canary.Spec.Info["currentBatch"])
        if err != nil {
                return oldDeployment, err
        }
        intTotalBatches, err := strconv.Atoi(canary.Spec.Info["totalBatches"])
        if err != nil {
                return oldDeployment, err
        }
        currentBatch := int32(intCurrentBatch)
        totalBatches := int32(intTotalBatches)
        everyAddReplicas := int32(math.Floor(float64(newDeploymentDesiredRepli
            cas)/float64(totalBatches) + 0.5))
         //如果只有第一批暂停,那么增加currentCount后退出,等待下次处理
        if canary.Spec.Info["pauseType"] == "First" && canary.Status.Info
            ["batch"+canary.Spec.Info["currentBatch"]+"Status"] == "Finished" &&
                canary.Spec.Info["currentBatch"] != canary.Spec.
                    Info["totalBatches"] {
                klog.Info("pauseType First add currentBatch")
                canaryCopy := canary.DeepCopy()
                canaryCopy.Spec.Info["currentBatch"] = strconv.Itoa
                    (int(currentBatch + 1))
                _, err = c.canaryclientset.CanarycontrollerV1alpha1().Canaries
                    (canaryCopy.Namespace).Update(context.TODO(), canaryCopy, 
                    metav1.UpdateOptions{})
                if err != nil {
                        klog.Infof("Update canary failed: %s", err)
                        return oldDeployment, err
                }
                deployment, err := c.deploymentsLister.Deployments(canary.
                    Namespace).Get(newDeployment.Name)
                return deployment, err
        }
        if totalBatches == currentBatch {
                *newDeployment.Spec.Replicas = newDeploymentDesiredReplicas
        } else {
                *newDeployment.Spec.Replicas = 1 + (currentBatch-1)*everyAddReplicas
        }
        deployment, err := c.deploymentsLister.Deployments(canary.Namespace).
            Get(newDeployment.Name)
        if err != nil {
                return deployment, err
        }
        if *deployment.Spec.Replicas != *newDeployment.Spec.Replicas ||
                deployment.Spec.Template.Spec.Containers[0].Image != 
                    newDeployment.Spec.Template.Spec.Containers[0].Image || 
                    deployment.Spec.Template.Labels["random"] != newDeployment.
                    Spec.Template.Labels["random"] ||
                deployment.Spec.Template.Spec.Containers[0].Resources.Limits.
                    Cpu().String() != newDeployment.Spec.Template.Spec.
                    Containers[0].Resources.Limits.Cpu().String() ||
                deployment.Spec.Template.Spec.Containers[0].Resources.Limits.
                    Memory().String() != newDeployment.Spec.Template.Spec.
                    Containers[0].Resources.Limits.Memory().String() {
                deployment, err = c.kubeclientset.AppsV1().Deployments(canary.
                    Namespace).Update(newDeploymentWithOwner(canary, newDeployment))
                klog.Info("notFirstCanaryDeployFirstPause: image or replicas 
                    not equal so update!")
                if err != nil {
                        return deployment, err
                }
        }
        *oldDeployment.Spec.Replicas = newDeploymentDesiredReplicas - deployment.
            Status.AvailableReplicas
        if *oldDeployment.Spec.Replicas == 0 {
                _, err := c.deploymentsLister.Deployments(oldDeployment.
                    Namespace).Get(oldDeployment.Name)
                //如果已经删除,则不处理
                if errors.IsNotFound(err) {
                        return deployment, nil
                }
                if err != nil {
                        return deployment, err
                }
                if err := c.kubeclientset.AppsV1().Deployments(oldDeployment.
                    Namespace).Delete(oldDeployment.Name, &metav1.DeleteOptions{}); 
                    err != nil {
                        return deployment, err
                }
        } else {
                oldDeploymentJson, _ := json.Marshal(oldDeployment)
                oldDeployment, err = c.kubeclientset.AppsV1().Deployments
                    (oldDeployment.Namespace).Patch(oldDeployment.Name, types.
                    MergePatchType, oldDeploymentJson)
                if err != nil {
                        return deployment, err
                }
        }
        return deployment, err
}

notFirstCanaryDeploy()方法的逻辑是整个金丝雀控制器中最复杂的。

1)获取当前集群中新版本和老版本的实例数。

2)获取当前集群中Canary定义的发布批次和暂停策略。

3)动态、实时计算集群中新旧版本Deployment可用的实例数,按照Canary定义的发布批次和策略,动态增加新版本的实例数,减少老版本的实例数。

4)实时更新status字段,上报发布的状态。

canaryRollback()是金丝雀发布的回滚方法,示例如下。

func (c *Controller) canaryRollback(canary *canaryv1alpha1.Canary, 
    newDeployment *appsv1.Deployment) (*appsv1.Deployment, error) {
        oldDeployment, err := c.unmarshalDeployment(canary, "oldDeploymentYaml")
        if err != nil {
                return oldDeployment, err
        }
        newDeploymentDesiredReplicas := *newDeployment.Spec.Replicas
        deployment, err := c.deploymentsLister.Deployments(canary.Namespace).
            Get(newDeployment.Name)
        //如果不存在,则创建
        if errors.IsNotFound(err) {
                deployment, err = c.kubeclientset.AppsV1().Deployments(canary.
                    Namespace).Create(newDeploymentWithOwner(canary, newDeployment))
                klog.Infof("canaryDeploy return deployment: %s\n", deployment)

                return deployment, err
        }
        if err != nil {
                return deployment, err
        }
        if *deployment.Spec.Replicas != *newDeployment.Spec.Replicas ||
                deployment.Spec.Template.Spec.Containers[0].Image != 
                    newDeployment.Spec.Template.Spec.Containers[0].Image || 
                    deployment.Spec.Template.Labels["random"] != newDeployment.
                    Spec.Template.Labels["random"] ||
                deployment.Spec.Template.Spec.Containers[0].Resources.Limits.
                    Cpu().String() != newDeployment.Spec.Template.Spec.
                    Containers[0].Resources.Limits.Cpu().String() ||
                deployment.Spec.Template.Spec.Containers[0].Resources.Limits.
                    Memory().String() != newDeployment.Spec.Template.Spec.
                    Containers[0].Resources.Limits.Memory().String() {
                deployment, err = c.kubeclientset.AppsV1().Deployments(canary.
                    Namespace).Update(newDeploymentWithOwner(canary, newDeployment))
                klog.Info("canaryRollback: image or replicas not equal so update!")
                if err != nil {
                        return deployment, err
                }
        }
        //如果是分批发布的回滚,则不处理
        if oldDeployment.Name == newDeployment.Name {
                klog.Infof("oldDeployment.Name %s == newDeployment.Name %s, 
                    not delete oldDeployment", oldDeployment.Name, newDeployment.Name)
                return deployment, nil
        }
        *oldDeployment.Spec.Replicas = newDeploymentDesiredReplicas - 
            deployment.Status.AvailableReplicas
        if *oldDeployment.Spec.Replicas == 0 {
                _, err := c.deploymentsLister.Deployments(oldDeployment.
                    Namespace).Get(oldDeployment.Name)
                //如果已经删除,则不处理
                if errors.IsNotFound(err) {
                        return deployment, nil
                }
                if err != nil {
                        return deployment, err
                }
                if err := c.kubeclientset.AppsV1().Deployments(oldDeployment.
                    Namespace).Delete(oldDeployment.Name, &metav1.
                    DeleteOptions{}); err != nil {
                        return deployment, err
                }
        } else {
                oldDeploymentJson, _ := json.Marshal(oldDeployment)
                oldDeployment, err = c.kubeclientset.AppsV1().Deployments
                    (oldDeployment.Namespace).Patch(oldDeployment.Name, types.
                    MergePatchType, oldDeploymentJson)
                if err != nil {
                        return deployment, err
                }
        }
        return deployment, err
}

canaryRollback()方法需要考虑在各种情况下应该如何回滚,主要逻辑如下。

如果已经发布完成,回滚时需要创建老版本的Deployment,并设置replicas字段为真实实例数,这时会发生如下2种情况。

·如果是正常的滚动升级,则不做处理。因为滚动升级并不需要生成2个Deployment,只需要将newDeploymentYaml字段的Deployment回滚为老版本即可。

·如果是金丝雀发布,那么需要全程计算老版本(要回滚的版本)可用实例数,动态减少新版本(不再需要的版本)的实例数。

实时更新status字段,报告回滚状态。

syncHandler()方法在结束之前,会执行updateCanaryStatus()方法,上报当前发布的状态,示例如下。

func (c *Controller) updateCanaryStatus(canary *canaryv1alpha1.Canary, 
    deployment *appsv1.Deployment) error {
        //不要修改informer获取的资源,它是只读的本地缓存
        //使用深拷贝
        canaryCopy := canary.DeepCopy()
        canaryCopy.Status.Info = make(map[string]string)
        canaryType := canary.Spec.Info["type"]
        switch canaryType {
        case "CanaryDeploy":
                if *deployment.Spec.Replicas == deployment.Status.AvailableReplicas {
                        if canary.Spec.Info["totalBatches"] != canary.Spec.
                            Info["currentBatch"] {

                                canaryCopy.Status.Info["batch"+canary.Spec.Info
                                    ["currentBatch"]+"Status"] = "Finished"
                        } else {
                                oldDeployment, err := c.unmarshalDeployment
                                    (canary, "oldDeploymentYaml")
                                if err != nil {
                                        return err
                                }
                                _, err = c.deploymentsLister.Deployments
                                    (oldDeployment.Namespace).Get
                                    (oldDeployment.Name)
                                //如果旧的Deployment已经删除,则更新状态为完成
                                if err != nil {
                                        if errors.IsNotFound(err) {

                                                canaryCopy.Status.Info["batch"+
                                                    canary.Spec.Info
                                                    ["currentBatch"]+"Status"] = 
                                                    "Finished"
                                        } else {
                                                return err
                                        }
                                } else {
                                        canaryCopy.Status.Info["batch"+canary.
                                            Spec.Info["currentBatch"]+"Status"] 
                                            = "Ing"
                                }
                        }
                } else {
                        canaryCopy.Status.Info["batch"+canary.Spec.Info
                            ["currentBatch"]+"Status"] = "Ing"
                }
        case "CanaryRollback":
                oldDeployment, err := c.unmarshalDeployment(canary, 
                    "oldDeploymentYaml")
                if err != nil {
                        return err
                }
                newDeployment, err := c.unmarshalDeployment(canary, 
                    "newDeploymentYaml")
                if err != nil {
                        return err
                }
                _, err = c.deploymentsLister.Deployments(oldDeployment.
                    Namespace).Get(oldDeployment.Name)
                //如果旧的Deployment已经删除,则更新状态为完成
                if err != nil {
                        if errors.IsNotFound(err) {
                                canaryCopy.Status.Info["rollbackStatus"] = "Finished"
                        } else {
                                return err
                        }
                } else {
                        if oldDeployment.Name == newDeployment.Name {
                                canaryCopy.Status.Info["rollbackStatus"] = "Finished"
                        } else {
                                canaryCopy.Status.Info["rollbackStatus"] = "Ing"
                        }
                }
        case "NormalDeploy":
                return nil
        default:
                klog.Info("canary.Spec.Info type not match!")
                return nil
        }
        canaryCopy.Status.Info["availableReplicas"] = strconv.Itoa(int
            (deployment.Status.AvailableReplicas))
        return err
}

updateCanaryStatus()方法针对以下3种情况进行状态上报。

·如果是金丝雀发布,实时获取当前新老版本的实例数与可用实例数,并通过发布批次和策略判断计算是否发布完成,如果发布完成,将状态更新为完成。字段的key携带批次。

·如果是回滚发布,那么判断不需要的版本是否已经删除,需要回滚的版本是否已经恢复到应用定义的实例数。

·如果是滚动发布,则不进行任何处理。

unmarshalDeployment()和newDeploymentWithOwner()方法的定义如下。

//获取canary中的Deployment
func (c *Controller) unmarshalDeployment(canary *canaryv1alpha1.Canary, 
    newOrOld string) (*appsv1.Deployment, error) {
        deployment := &appsv1.Deployment{}
        deploymentInfo := []byte(canary.Spec.Info[newOrOld])
        if err := json.Unmarshal(deploymentInfo, deployment); err != nil {
                klog.Infof("%s/%s unmarshal failed: ", canary.Namespace, 
                    canary.Name)
                klog.Info(err)
                return deployment, err
        }
        return deployment, nil
}
func newDeploymentWithOwner(canary *canaryv1alpha1.Canary, deployment *appsv1.
    Deployment) *appsv1.Deployment {
        deployment.ObjectMeta.OwnerReferences = []metav1.OwnerReference{
                *metav1.NewControllerRef(canary, canaryv1alpha1.SchemeGroupVersion.
                    WithKind("Canary")),
        }
        return deployment

unmarshalDeployment()方法负责将从Canary取出来的string转换为Deployment的结构体。

newDeploymentWithOwner()方法是给创建的Deployment对象添加Ow

nerReferences属性,代表这个Deployment归金丝雀控制器管控。

整个金丝雀控制器的代码较为复杂,需要我们熟悉使用场景,对发布过程中的实例数、可用实例数、发布的类型和策略进行计算后精准控制,这种高度自定义的需求也是Kubernetes中没有内置金丝雀发布的功能而是留给用户自己去研发的原因。

标签:return,nil,err,canary,deployment,Operator,Spec
From: https://www.cnblogs.com/muzinan110/p/17156619.html

相关文章