首页 > 其他分享 >Kubebuilder实现一个定时扩缩容的功能

Kubebuilder实现一个定时扩缩容的功能

时间:2024-05-14 10:40:50浏览次数:12  
标签:Status return scaler err ctx 扩缩容 json Kubebuilder 定时

参考b站大佬 https://www.bilibili.com/video/BV1jJ4m1j7gK/?spm_id_from=333.788&vd_source=7e624c7a17d4407088aae9cb33e5e0aa

开始

mkdir deploy-scaler  
cd deploy-scaler  
go mod init deploy-scaler
kubebuilder init -domain scaler.com
kubebuilder create api -kind Scaler --version vl -group api

编写 api/v1/scaler_types.go

package v1

import (
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
	SCALED   = "Scaled"
	FAILED   = "Failed"
	PENDING  = "Pending"
	RESTORED = "Restored"
)

// EDIT THIS FILE!  THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required.  Any new fields you add must have json tags for the fields to be serialized.
type DeploymentInfo struct {
	Replicas  int32  `json:"replicas"`
	Namespace string `json:"namespace"`
}

type NamespacedName struct {
	Name      string `json:"name"`
	Namespace string `json:"namespace"`
}

// ScalerSpec defines the desired state of Scaler
type ScalerSpec struct {
	// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
	// Important: Run "make" to regenerate code after modifying this file

	// +kubebuilder:validation:Maximum=23
	// +kubebuilder:validation:Minimum=0
	// +kubebuilder:validation:Required
	Start int `json:"start"`
	// +kubebuilder:validation:Maximum=24
	// +kubebuilder:validation:Minimum=0
	// +kubebuilder:validation:Required
	End         int              `json:"end"`
	Replicas    int32            `json:"replicas"`
	Deployments []NamespacedName `json:"deployments"`
}

// ScalerStatus defines the observed state of Scaler
type ScalerStatus struct {
	// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
	// Important: Run "make" to regenerate code after modifying this file
	Status string `json:"status"`
}

//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
//+kubebuilder:printcolumn:name="Status",type=string,JSONPath=`.status.status`
//+kubebuilder:printcolumn:name="Age",type="date",JSONPath=`.metadata.creationTimestamp`

// Scaler is the Schema for the scalers API
type Scaler struct {
	metav1.TypeMeta   `json:",inline"`
	metav1.ObjectMeta `json:"metadata,omitempty"`

	Spec   ScalerSpec   `json:"spec,omitempty"`
	Status ScalerStatus `json:"status,omitempty"`
}

//+kubebuilder:object:root=true

// ScalerList contains a list of Scaler
type ScalerList struct {
	metav1.TypeMeta `json:",inline"`
	metav1.ListMeta `json:"metadata,omitempty"`
	Items           []Scaler `json:"items"`
}

func init() {
	SchemeBuilder.Register(&Scaler{}, &ScalerList{})
}

编写 controller

package controllers

import (
	"context"
	"fmt"
	v1 "k8s.io/api/apps/v1"
	"k8s.io/apimachinery/pkg/types"
	"k8s.io/apimachinery/pkg/util/json"
	"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
	"sigs.k8s.io/controller-runtime/pkg/log"
	"time"

	apiv1alpha1 "github.com/jack410/deploy-scaler/api/v1alpha1"
	"k8s.io/apimachinery/pkg/runtime"
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/client"
)

const finalizer = "scalers.api.scaler.com/finalizer"

var logger = log.Log.WithName("scaler_controller")

var originalDeploymentInfo = make(map[string]apiv1alpha1.DeploymentInfo)
var annotations = make(map[string]string)

// ScalerReconciler reconciles a Scaler object
type ScalerReconciler struct {
	client.Client
	Scheme *runtime.Scheme
}

//+kubebuilder:rbac:groups=api.scaler.com,resources=scalers,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=api.scaler.com,resources=scalers/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=api.scaler.com,resources=scalers/finalizers,verbs=update

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
// the Scaler object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (r *ScalerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	log := logger.WithValues("Request.Namespace", req.Namespace, "Request.Name", req.Name)
	log.Info("Reconcile called")

	//创建一个scaler实例
	scaler := &apiv1alpha1.Scaler{}
	err := r.Get(ctx, req.NamespacedName, scaler)
	if err != nil {
		//如果没有发现这个scaler实例,我们就用 client.IgnoreNotFound(err)来忽略错误,让进程不中断
		return ctrl.Result{}, client.IgnoreNotFound(err)
	}

	if scaler.ObjectMeta.DeletionTimestamp.IsZero() {
		if !controllerutil.ContainsFinalizer(scaler, finalizer) {
			controllerutil.AddFinalizer(scaler, finalizer)
			log.Info("add finalizer.")
			err := r.Update(ctx, scaler)
			if err != nil {
				return ctrl.Result{}, client.IgnoreNotFound(err)
			}
		}
		if scaler.Status.Status == "" {
			scaler.Status.Status = apiv1alpha1.PENDING
			err := r.Status().Update(ctx, scaler)
			if err != nil {
				return ctrl.Result{}, client.IgnoreNotFound(err)
			}

			//将scaler中管理的deployments的副本数和namespace名称加到annotations里
			if err := addAnnotations(scaler, r, ctx); err != nil {
				return ctrl.Result{}, client.IgnoreNotFound(err)
			}
		}

		//开始执行scaler的逻辑
		startTime := scaler.Spec.Start
		endTime := scaler.Spec.End
		replicas := scaler.Spec.Replicas

		currenHour := time.Now().Local().Hour()
		log.Info(fmt.Sprintf("currentTIme: %d", currenHour))

		//从startime开始endtime为止
		if currenHour >= startTime && currenHour < endTime {
			if scaler.Status.Status != apiv1alpha1.SCALED {
				log.Info("starting to call scaleDeployment func.")
				err := scaleDeployment(scaler, r, ctx, replicas)
				if err != nil {
					return ctrl.Result{}, err
				}
			}
		} else {
			if scaler.Status.Status == apiv1alpha1.SCALED {
				restoreDeployment(scaler, r, ctx)
			}
		}
	} else {
		log.Info("starting deletion flow.")
		if scaler.Status.Status == apiv1alpha1.SCALED {
			log.Info("restore Deployment.")
			err := restoreDeployment(scaler, r, ctx)
			if err != nil {
				return ctrl.Result{}, err
			}
			log.Info("remove finalizer.")
			controllerutil.RemoveFinalizer(scaler, finalizer)
			err = r.Update(ctx, scaler)
			if err != nil {
				return ctrl.Result{}, err
			}
			log.Info("finalizer removed.")
		} else {
			controllerutil.RemoveFinalizer(scaler, finalizer)
			err = r.Update(ctx, scaler)
			if err != nil {
				return ctrl.Result{}, err
			}
			log.Info("finalizer removed.")
		}
		log.Info("scaler removed.")
	}

	return ctrl.Result{RequeueAfter: time.Duration(10 * time.Second)}, nil
}

func restoreDeployment(scaler *apiv1alpha1.Scaler, r *ScalerReconciler, ctx context.Context) error {
	logger.Info("starting to return to the original state")
	for name, originalDeployInfo := range originalDeploymentInfo {
		deployment := &v1.Deployment{}
		if err := r.Get(ctx, types.NamespacedName{
			Name:      name,
			Namespace: originalDeployInfo.Namespace,
		}, deployment); err != nil {
			return err
		}

		if deployment.Spec.Replicas != &originalDeployInfo.Replicas {
			deployment.Spec.Replicas = &originalDeployInfo.Replicas
			if err := r.Update(ctx, deployment); err != nil {
				return err
			}
		}
	}

	scaler.Status.Status = apiv1alpha1.RESTORED
	err := r.Status().Update(ctx, scaler)
	if err != nil {
		return err
	}

	return nil
}

func scaleDeployment(scaler *apiv1alpha1.Scaler, r *ScalerReconciler, ctx context.Context, replicas int32) error {
	//从scaler实例中遍历deployments
	for _, deploy := range scaler.Spec.Deployments {
		//创建一个新的deployment实例
		deployment := &v1.Deployment{}
		err := r.Get(ctx, types.NamespacedName{
			Name:      deploy.Name,
			Namespace: deploy.Namespace,
		}, deployment)
		if err != nil {
			return err
		}

		//判断当前k8s集群中的deployment副本数是否等于scaler中指定的副本数
		if deployment.Spec.Replicas != &replicas {
			deployment.Spec.Replicas = &replicas
			err := r.Update(ctx, deployment)
			if err != nil {
				scaler.Status.Status = apiv1alpha1.FAILED
				r.Status().Update(ctx, scaler)
				return err
			}

			scaler.Status.Status = apiv1alpha1.SCALED
			r.Status().Update(ctx, scaler)
		}
	}
	return nil
}

func addAnnotations(scaler *apiv1alpha1.Scaler, r *ScalerReconciler, ctx context.Context) error {
	//记录deployments的原始副本数和namespace名称
	for _, deploy := range scaler.Spec.Deployments {
		deployment := &v1.Deployment{}
		if err := r.Get(ctx, types.NamespacedName{
			Name:      deploy.Name,
			Namespace: deploy.Namespace,
		}, deployment); err != nil {
			return err
		}

		//开始记录
		if *deployment.Spec.Replicas != scaler.Spec.Replicas {
			logger.Info("add original state to originalDeploymentInfo map.")
			originalDeploymentInfo[deployment.Name] = apiv1alpha1.DeploymentInfo{
				Namespace: deployment.Namespace,
				Replicas:  *deployment.Spec.Replicas,
			}
		}
	}

	//将原始记录加到annotations里
	for deploymentName, info := range originalDeploymentInfo {
		//将info转换为json
		infoJson, err := json.Marshal(info)
		if err != nil {
			return err
		}

		//将infoJson存到annotations map里
		annotations[deploymentName] = string(infoJson)
	}

	//更新scaler的annotations
	scaler.ObjectMeta.Annotations = annotations
	err := r.Update(ctx, scaler)
	if err != nil {
		return err
	}

	return nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *ScalerReconciler) SetupWithManager(mgr ctrl.Manager) error {
	return ctrl.NewControllerManagedBy(mgr).
		For(&apiv1alpha1.Scaler{}).
		Complete(r)
}

执行 make manifests

修改 config/samples/下的系统默认的yaml,增加spec的修改

name: deploy1 就是要扩缩容的deployment,根据自己实际情况

apiVersion: api.scaler.com/v1
kind: Scaler
metadata:
  name: scaler-sample
spec:
  start: 20
  end: 21
  replicas: 2
  deployments:
    - name: deploy1 
      namespace: default
    - name: deploy2
      namespace: default

安装crd

make install

可以用 kubectl get api-resources 查询

运行

make run

另一个窗口运行

kubectl apply -f config/samples/

标签:Status,return,scaler,err,ctx,扩缩容,json,Kubebuilder,定时
From: https://www.cnblogs.com/qcy-blog/p/18190823

相关文章

  • requestAnimationFrame模拟定时器
    const{ myInterval,clearMyInterval}=(function(){ //存放系统中的定时器id lettimerIdMap={num:0} functionmyInterval(callback,interval){ //每设置一次定时器,num++代表系统中有num个自定义的定时器 ......
  • kubebuilder的简单入门
    确保kubeconfig文件~/.kube/config存在,并且内容正确如果是k3s参考我之前的文章https://www.cnblogs.com/qcy-blog/p/181888651.Operator是什么?Operator是使用自定义资源(CR,CustomResource)管理应用及其组件的自定义控制器(Controller)ControlPlane的控制器实施控制循环......
  • 在PLC的扫描特性下,FOR循环里面套IF或者定时器会发生什么?
    1.为什么会提出这个题目在PLC指令的执行过程中,FOR循环会在每个扫描周期执行完一个循环,然后程序才会接着往下扫描在PLC指令的执行过程中,IF判断会在每次扫描中判断一次条件,满足则进入执行体,不满足则跳出IF把IF放在FOR循环里面,会导致循环体执行时间过长,或者其他问题吗?(答案是不会......
  • 如何定时打开网站
    首先,需要用到的这个工具:度娘网盘提取码:qwu2蓝奏云提取码:2r1z1、打开工具按下Ctrl+3,切换到定时器模块,左侧右键,选择新建 2、标题叫百度,等下就让它打开百度,间隔1分钟,每次重复执行1个 3、在建好的右侧,右键,添加  4、选择链接,输入百度网址,点击确定  5、这样做好......
  • LwRB - 一款适用嵌入式系统的轻量级 RingBuffer+MultiTimer - 超精简的纯软件定时器驱
    1、MicroMagic发布世界上最快的64-bitRISC-V核近日,一家位于美国加州森尼维尔的小型电子设计公司MicroMagic宣称设计、生产出了全世界最快的64位RISC-V内核,比苹果的M1芯片和ArmCortex-A9表现还要出色。消息源: http://www.micromagic.com/news/RISCv-Fastest_PR.pdf这......
  • Docker容器定时备份MySQL数据库
    1.系统环境mysql8、centos7.92.创建mysql_backup.sh文件#!/bin/bash#获取容器idcontainer_id=`/usr/bin/dockerps-aqf"name=mysql-8.0"`echo"mysql的镜像IDis$container_id"#登录用户名mysql_user="xxx"#登录密码(注意如果密码包含特殊符号前面要用'......
  • .NET有哪些好用的定时任务调度框架
    .NET有哪些好用的定时任务调度框架前言定时任务调度的相关业务在日常工作开发中是一个十分常见的需求,经常有小伙伴们在技术群提问:有什么好用的定时任务调度框架推荐的?今天大姚给大家分享5个.NET开源、简单、易用、免费的任务调度框架,帮助大家在做定时任务调度框架技术选型的时候......
  • 为什么要用setTimout来做定时器?
    Q:再js中定时任务我们为什么要用setTimeout模拟,而不直接用setIntervalA:以下为详细答案精确控制时间间隔:使用setTimeout可以更精确地控制每次任务执行的时间间隔。因为在任务执行完成之后,我们可以根据需要再次设置下一个任务执行的时间,从而避免了可能因任务执行时间过长而......
  • Oracle数据库 定时备份
    说明学习了如何Oracle如何备份数据库,实际开发过程中数据库应该每隔一段时间就要备份一次,所以我们就需要一个定时执行这个代码的功能,同时备份的文件可能进行一些处理,比如压缩。步骤建一个文本文件,添加以下内容,后缀名修改为.bat::代码页更改为Unicode(UTF-8)chcp65001@......
  • .NET有哪些好用的定时任务调度框架
    前言定时任务调度的相关业务在日常工作开发中是一个十分常见的需求,经常有小伙伴们在技术群提问:有什么好用的定时任务调度框架推荐的?今天大姚给大家分享5个.NET开源、简单、易用、免费的任务调度框架,帮助大家在做定时任务调度框架技术选型的时候有一个参考。以下开源任务调度收......