首页 > 其他分享 >k8s controller选主

k8s controller选主

时间:2023-09-16 11:13:24浏览次数:51  
标签:选主 github controller io go indirect k8s com

controller选主代码实现

controller多实例可能状态
1 抢锁成功,作为Leader跑业务
2 抢锁失败等待
3 释放锁,结束

k8s官方例子
go.mod和主流程

module controller-by-leader-election

go 1.19

require (
	github.com/google/uuid v1.3.0
	k8s.io/apimachinery v0.28.2
	k8s.io/client-go v0.28.2
	k8s.io/klog/v2 v2.100.1
)

require (
	github.com/davecgh/go-spew v1.1.1 // indirect
	github.com/emicklei/go-restful/v3 v3.9.0 // indirect
	github.com/go-logr/logr v1.2.4 // indirect
	github.com/go-openapi/jsonpointer v0.19.6 // indirect
	github.com/go-openapi/jsonreference v0.20.2 // indirect
	github.com/go-openapi/swag v0.22.3 // indirect
	github.com/gogo/protobuf v1.3.2 // indirect
	github.com/golang/protobuf v1.5.3 // indirect
	github.com/google/gnostic-models v0.6.8 // indirect
	github.com/google/go-cmp v0.5.9 // indirect
	github.com/google/gofuzz v1.2.0 // indirect
	github.com/imdario/mergo v0.3.6 // indirect
	github.com/josharian/intern v1.0.0 // indirect
	github.com/json-iterator/go v1.1.12 // indirect
	github.com/mailru/easyjson v0.7.7 // indirect
	github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
	github.com/modern-go/reflect2 v1.0.2 // indirect
	github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
	github.com/spf13/pflag v1.0.5 // indirect
	golang.org/x/net v0.13.0 // indirect
	golang.org/x/oauth2 v0.8.0 // indirect
	golang.org/x/sys v0.10.0 // indirect
	golang.org/x/term v0.10.0 // indirect
	golang.org/x/text v0.11.0 // indirect
	golang.org/x/time v0.3.0 // indirect
	google.golang.org/appengine v1.6.7 // indirect
	google.golang.org/protobuf v1.31.0 // indirect
	gopkg.in/inf.v0 v0.9.1 // indirect
	gopkg.in/yaml.v2 v2.4.0 // indirect
	gopkg.in/yaml.v3 v3.0.1 // indirect
	k8s.io/api v0.28.2 // indirect
	k8s.io/kube-openapi v0.0.0-20230905202853-d090da108d2f // indirect
	k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect
	sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
	sigs.k8s.io/structured-merge-diff/v4 v4.3.0 // indirect
	sigs.k8s.io/yaml v1.3.0 // indirect
)

// k8s.io/api => k8s.io/api v0.0.0-20230915221828-1cac0b1ef7e3
replace k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20230915221524-64708d3e9048
package main

import (
	"context"
	"flag"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/google/uuid"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	clientset "k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/tools/leaderelection"
	"k8s.io/client-go/tools/leaderelection/resourcelock"
	"k8s.io/klog/v2"
)

func buildConfig(kubeconfig string) (*rest.Config, error) {
	if kubeconfig != "" {
		cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
		if err != nil {
			return nil, err
		}
		return cfg, nil
	}

	cfg, err := rest.InClusterConfig()
	if err != nil {
		return nil, err
	}
	return cfg, nil
}

func main() {
	klog.InitFlags(nil)

	var kubeconfig string
	var leaseLockName string
	var leaseLockNamespace string
	var id string

	// kubeconfig 指定了kubernetes集群的配置文件路径
	flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")
	// 锁的拥有者的ID,如果没有传参数进来,就随机生成一个
	flag.StringVar(&id, "id", uuid.New().String(), "the holder identity name")
	// 锁的ID,对应kubernetes中资源的name
	flag.StringVar(&leaseLockName, "lease-lock-name", "", "the lease lock resource name")
	// 锁的命名空间
	flag.StringVar(&leaseLockNamespace, "lease-lock-namespace", "", "the lease lock resource namespace")
	// 解析命令行参数
	flag.Parse()

	if leaseLockName == "" {
		klog.Fatal("unable to get lease lock resource name (missing lease-lock-name flag).")
	}
	if leaseLockNamespace == "" {
		klog.Fatal("unable to get lease lock resource namespace (missing lease-lock-namespace flag).")
	}

	// leader election uses the Kubernetes API by writing to a
	// lock object, which can be a LeaseLock object (preferred),
	// a ConfigMap, or an Endpoints (deprecated) object.
	// Conflicting writes are detected and each client handles those actions
	// independently.
	config, err := buildConfig(kubeconfig)
	if err != nil {
		klog.Fatal(err)
	}
	// 获取kubernetes集群的客户端,如果获取不到,就抛异常退出
	client := clientset.NewForConfigOrDie(config)

	// 模拟Controller的逻辑代码
	run := func(ctx context.Context) {
		// complete your controller loop here
		klog.Info("Controller loop...")

		// 不退出
		select {}
	}

	// use a Go context so we can tell the leaderelection code when we
	// want to step down
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// listen for interrupts or the Linux SIGTERM signal and cancel
	// our context, which the leader election code will observe and
	// step down
	// 处理系统的系统,收到SIGTERM信号后,会退出进程
	ch := make(chan os.Signal, 1)
	signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
	go func() {
		<-ch
		klog.Info("Received termination, signaling shutdown")
		cancel()
	}()

	// we use the Lease lock type since edits to Leases are less common
	// and fewer objects in the cluster watch "all Leases".

	// 根据参数,生成锁。这里使用的Lease这种类型资源作为锁
	lock := &resourcelock.LeaseLock{
		LeaseMeta: metav1.ObjectMeta{
			Name:      leaseLockName,
			Namespace: leaseLockNamespace,
		},
		// 跟kubernetes集群关联起来
		Client: client.CoordinationV1(),
		LockConfig: resourcelock.ResourceLockConfig{
			Identity: id,
		},
	}

	// start the leader election code loop

	// 注意,选举逻辑启动时候,会传入ctx参数,如果ctx对应的cancel函数被调用,那么选举也会结束
	leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
		// 选举使用的锁
		Lock: lock,
		// IMPORTANT: you MUST ensure that any code you have that
		// is protected by the lease must terminate **before**
		// you call cancel. Otherwise, you could have a background
		// loop still running and another process could
		// get elected before your background loop finished, violating
		// the stated goal of the lease.
		//主动放弃leader,当ctx canceled的时候
		ReleaseOnCancel: true,
		LeaseDuration:   60 * time.Second, // 选举的任期,60s一个任期,如果在60s后没有renew,那么leader就会释放锁,重新选举
		RenewDeadline:   15 * time.Second, // renew的请求的超时时间
		RetryPeriod:     5 * time.Second,  // leader获取到锁后,renew leadership的间隔。非leader,抢锁成为leader的间隔

		// 回调函数的注册
		Callbacks: leaderelection.LeaderCallbacks{
			// 成为leader的回调
			OnStartedLeading: func(ctx context.Context) {
				// we're notified when we start - this is where you would
				// usually put your code
				// 运行controller的逻辑
				run(ctx)
			},
			OnStoppedLeading: func() {
				// we can do cleanup here
				// 退出leader的
				klog.Infof("leader lost: %s", id)
				os.Exit(0)
			},
			OnNewLeader: func(identity string) {
				// 有新的leader当选
				// we're notified when new leader elected
				if identity == id {
					// I just got the lock
					return
				}
				klog.Infof("new leader elected: %s", identity)
			},
		},
	})
}

运行2个controller实例,lease-lock-name都是c,lease-lock-namespace都是default,id不同

该controller获得了锁
./controller-by-leader-election --kubeconfig=/root/.kube/config -logtostderr=true -lease-lock-name=c -lease-lock-namespace=default -id=1

该controller没有获得锁
./controller-by-leader-election --kubeconfig=/root/.kube/config -logtostderr=true -lease-lock-name=c -lease-lock-namespace=default -id=2

在default命名空间下增加了lease资源c

查看lease内容

在id=1 controller退出后id=2 controller获得了锁

锁的实现

k8s提供了Lease/Configmap/Endpoint作为锁的底层。

update时对比resourceVersion。

func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
  now := metav1.Now()
  leaderElectionRecord := rl.LeaderElectionRecord{
    HolderIdentity:       le.config.Lock.Identity(),
    LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
    RenewTime:            now,
    AcquireTime:          now,
  }

  // 检查锁有没有
  oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
  if err != nil {
    // 没有锁的资源,就创建一个
    if !errors.IsNotFound(err) {
      klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
      return false
    }
    if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil {
      klog.Errorf("error initially creating leader election record: %v", err)
      return false
    }
    // 对外宣称自己是leader
    le.setObservedRecord(&leaderElectionRecord)
​
    return true
  }
​
  // 2. Record obtained, check the Identity & Time
  if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
    // leader不断renew,oldLeaderElectionRawRecord一直变化,更新le.observedTime
    le.setObservedRecord(oldLeaderElectionRecord)
    le.observedRawRecord = oldLeaderElectionRawRecord
  }
  
  // 未超时且不是leader,return
  if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
    le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
    !le.IsLeader() {
    klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
    return false
  }

  // leader续约锁
  if le.IsLeader() {
    leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
    leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
  } else {
    // 不是leader,尝试成为leader
    leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
  }
  
  // 更新锁
  if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
    klog.Errorf("Failed to update lock: %v", err)
    return false
  }
​
  le.setObservedRecord(&leaderElectionRecord)
  return true
}

参考资料

https://juejin.cn/post/7157648925078323207

标签:选主,github,controller,io,go,indirect,k8s,com
From: https://www.cnblogs.com/WJQ2017/p/17706453.html

相关文章

  • 【Kubernets】K8s群集安装部署详细教程-(3)安装过程中的错误解决
    k8s常用命令集合k8s常用命令集合:#查看当前集群的所有的节点kubectlgetnode#显示Node的详细信息(一般用不着)kubectldescribenodenode1#查看所有的podkubectlgetpod--all-namespaces#查看pod的详细信息kubectlgetpods-owide--all-namespaces#查看所有创......
  • 基于k8s的statefulset+pv安装mysql5.7主从集群
    前提假设:1.已安装k8s环境;2.因为我使用nfs作为pv存储介质,所以需要预先配置好nfs服务nfs安装可参考:https://blog.csdn.net/wudinaniya/article/details/81068518 步骤:1.规划mysql持久化文件在nfs中的存储路径;2.创建mysqlpv;3.创建mysql configmap;4.创建mysql service;5.......
  • K8S服务发布
    1.nodePort对外发布服务[root@master~]#vimmysvc1.yaml[root@master~]#vimmysvc1.yaml---kind:ServiceapiVersion:v1metadata:name:mysvc1spec:type:NodePort#服务类型selector:app:webports:-protocol:TCPport:80n......
  • iOS开发实战-仿小红书App开发-2-项目总体设计,TabBarController,启动页,深色模式
    1.新建一个LittlePink项目 完成一些配置. 2.在Main中新增一个TabBarController.把箭头给TbaBarController. 除了原有的一个ViewController外,再拉两个ViewController,使Main中共有5个ViewController. 将主界面Ctrl加拖拽到其他三个ViewController中,选择ViewCon......
  • Kubernetes初探[1]:部署您的第一个ASP.NET Core应用到k8s集群
    原文:https://www.cnblogs.com/wl-blog/p/16936019.htmlKubernetes简介Kubernetes是Google基于Borg开源的容器编排调度引擎,作为CNCF(CloudNativeComputingFoundation)最重要的组件之一,它的目标不仅仅是一个编排系统,而是提供一个规范,可以让你来描述集群的架构,定义服务的最终状态,K......
  • k8s 基础理论汇总
    1.k8s有哪些常用组件,他们功能是什么 etcd保存了整个集群的状态;apiserver提供了资源操作的唯一入口,并提供认证、授权、访问控制、API注册和发现等机制;controllermanager负责维护集群的状态,比如故障检测、自动扩展、滚动更新等;scheduler负责资源的调度,按照预定的调度......
  • rancher 导入k8s集群
     cat  rancher.shdockerrun-d  --privileged  --restart=unless-stopped-p81:80-p1443:443rancher/rancher:stableRancher登录地址:https://172.22.0.11:1443/ 然后输入集群名字点创建: 在安装k8s集群机器上执行如下命令进行导入如果有报错,按照提示执......
  • k8s之affinity and anti-affinity
    背景介绍:在k8s环境中,通常情况下,Pod分配到哪些Node是不需要管理员操心的,这个过程会由scheduler调度实现,合理的分配到最优的节点上。但在实际项目中,我们可能需要指定一些调度的限制,例如某些应用需要跑在具有SSD存储或带gpu的节点上,或某些需进行大量计算解析且耗费很多cpu等资源的应......
  • controller方法入参出参加日志打印
    importcom.alibaba.fastjson.JSON;importcom.alibaba.fastjson.serializer.SerializerFeature;importlombok.extern.slf4j.Slf4j;importorg.aspectj.lang.ProceedingJoinPoint;importorg.aspectj.lang.annotation.Around;importorg.aspectj.lang.annotation.Aspect......
  • k8s安装kube-promethues(0.7版本)
    k8s安装kube-promethues(0.7版本)一.检查本地k8s版本,下载对应安装包kubectlversion如图可见是1.19版本进入kube-promethus下载地址,查找自己的k8s版本适合哪一个kube-promethues版本。然后下载自己合适的版本#还可以通过如下地址,在服务器上直接下已经打包好的包。或者复......