首页 > 编程语言 >ETCD源码阅读(六)

ETCD源码阅读(六)

时间:2023-04-05 21:35:23浏览次数:46  
标签:阅读 lessor 租约 源码 LeaseID error ETCD Lease lease

DAY5 :ETCD的Lease机制

Lease 机制简介

除了前面文章中提到的分布式锁,lease机制还广泛应用于服务注册与发现场景,需要与watch机制相配合。本文主要做对Lease部分的源码分析。

一个租约可以关联ETCD集群中的一个或多个key。当租约过期或者被撤销时,关联的key会被自动删除。租约部分代码在client与sever端都有,涉及到:client获得租约,client主动续约,server撤回租约,server发放租约等操作。

需要注意的是:lease与leasing并不是一个东西:leasing是指在ETCD中使用lease进行资源分配和管理的过程。当一个client希望使用某个资源时,它可以请求ETCD分配一个lease,并在这个lease的有效期内保持对该资源的控制。在这个过程中,client需要不断发送心跳以保持lease的有效性,并且没有别的client申请个关于这个资源的lease;否则需要重新申请lease。

本文主要讲解lease

1. Lease client端源码简介

etcd/client/v3/

# 客户端
lease.go
example_lease_test.go

Lease RPC客户端

etcd/api/etcdserverpb/rpc.proto.go

  // LeaseClient is the client API for Lease service.
  type LeaseClient interface {
	// LeaseGrant creates a lease which expires if the server does not receive a keepAlive
	// within a given time to live period. All keys attached to the lease will be expired and
	// deleted if the lease expires. Each expired key generates a delete event in the event history.
	LeaseGrant(ctx context.Context, in *LeaseGrantRequest, opts ...grpc.CallOption) (*LeaseGrantResponse, error)
	// LeaseRevoke revokes a lease. All keys attached to the lease will expire and be deleted.
	LeaseRevoke(ctx context.Context, in *LeaseRevokeRequest, opts ...grpc.CallOption) (*LeaseRevokeResponse, error)
	// LeaseKeepAlive keeps the lease alive by streaming keep alive requests from the client
	// to the server and streaming keep alive responses from the server to the client.
	LeaseKeepAlive(ctx context.Context, opts ...grpc.CallOption) (Lease_LeaseKeepAliveClient, error)
	// LeaseTimeToLive retrieves lease information.
	LeaseTimeToLive(ctx context.Context, in *LeaseTimeToLiveRequest, opts ...grpc.CallOption) (*LeaseTimeToLiveResponse, error)
	// LeaseLeases lists all existing leases.
	LeaseLeases(ctx context.Context, in *LeaseLeasesRequest, opts ...grpc.CallOption) (*LeaseLeasesResponse, error)
}

关于Lease机制提供的RPC接口,注释已经已经非常清楚了。

lease.go

lease接口定义:

type Lease interface {
	// Grant creates a new lease.
	Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)

	// Revoke revokes the given lease.
	Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)

	// TimeToLive retrieves the lease information of the given lease ID.
	TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)

	// Leases retrieves all leases.
	Leases(ctx context.Context) (*LeaseLeasesResponse, error)

	KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
	KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)

	// Close releases all resources Lease keeps for efficient communication
	// with the etcd server.
	Close() error
}

这里最重要的方法便是KeepAlive,默认情况下会永远自动续约。接受到keepalive响应后,会将内容发送到一个channel,如果没有其他消费者快速从channel中读,channel会很快装满。这时,客户端会继续向服务端发送keepalive请求,但会丢掉所有的响应内容,直到channel中有足够的buffer。

当客户端keep alive循环进程出错时,会返回一个error;当客户端发生异常错误,或者传入的context被cancel或者超时,*LeaseKeepAliveResponse channel会关闭

lessor结构体则实现了lease接口,这里只是部分关键成员:

type lessor struct {
	remote pb.LeaseClient

	keepAlives map[LeaseID]*keepAlive
	// firstKeepAliveOnce ensures stream starts after first KeepAlive call.
	firstKeepAliveOnce sync.Once
}

remote是生成的LeaseClient,提供rpc接口 keepAlive则非常有意思,是一个leaseID*keepAlive的映射。

// keepAlive multiplexes a keepalive for a lease over multiple channels
type keepAlive struct {
	chs  []chan<- *LeaseKeepAliveResponse
	ctxs []context.Context
	// deadline is the time the keep alive channels close if no response
	deadline time.Time
	// nextKeepAlive is when to send the next keep alive message
	nextKeepAlive time.Time
	// donec is closed on lease revoke, expiration, or cancel.
	donec chan struct{}
}

keepAlive是一个多路复用(multiplexes)的租约保活机制,可以同时将LeaseKeepAliveResponse发送到多个channel中。这个channel在创建租约时被返回,读取行为由客户端自己定义,比如直接Print出来

	ch, kaerr := cli.KeepAlive(context.TODO(), resp.ID)
	ka := <-ch
	if ka != nil {
		fmt.Println("ttl:", ka.TTL)
	} else {
		fmt.Println("Unexpected NULL")
	}

再来看看KeepAlive方法:

func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
	ch := make(chan *LeaseKeepAliveResponse, LeaseResponseChSize)

	l.mu.Lock()
	// ensure that recvKeepAliveLoop is still running
	select {
	case <-l.donec:
		err := l.loopErr
		l.mu.Unlock()
		close(ch)
		return ch, ErrKeepAliveHalted{Reason: err}
	default:
	}
	ka, ok := l.keepAlives[id]
	if !ok {
		// create fresh keep alive
		ka = &keepAlive{
			chs:           []chan<- *LeaseKeepAliveResponse{ch},
			ctxs:          []context.Context{ctx},
			deadline:      time.Now().Add(l.firstKeepAliveTimeout),
			nextKeepAlive: time.Now(),
			donec:         make(chan struct{}),
		}
		l.keepAlives[id] = ka
	} else {
		// add channel and context to existing keep alive
		ka.ctxs = append(ka.ctxs, ctx)
		ka.chs = append(ka.chs, ch)
	}
	l.mu.Unlock()

	go l.keepAliveCtxCloser(ctx, id, ka.donec)

	l.firstKeepAliveOnce.Do(func() {
		go l.recvKeepAliveLoop()
		go l.deadlineLoop()
	})

	return ch, nil
}
  1. 创建一个带缓存的*LeaseKeepAliveResponse channel,用于接收服务端的response。
  2. 获取 lessor 的锁,通过select监听donec,如果这个channel被关闭,说明keepALive任务已停止,返回一个 ErrKeepAliveHalted
  3. 判断当前的keepAlives map中是否有 ID的映射,如果没有,就创建一个新的keepAlive 对象,将ch和ctx传进去,并创建映射;如果有,就将 ch 和 ctx 直接加入到该 keepAlive 对象的 chs 和 ctxs slice中
  4. 释放 lessor 的锁,并启动一个 keepAliveCtxCloser 协程,用于在 ctx 取消或租约过期、撤销时,关闭 ch 和清除 keepAlives map 中对应的 keepAlive 对象。
  5. 如果这个方法是首次调用,会启动recvKeepAliveLoopdeadlineLoop协程,分别用于 处理KeepALive响应;监测租约是否过期并执行清理操作。

recvKeepAliveLoop则用于处理服务端的KeepAliveResponse

func (l *lessor) recvKeepAliveLoop() (gerr error) {
	// ...
	for {
		stream, err := l.resetRecv()
		if err != nil {
			// ...
		} else {
			for {
				resp, err := stream.Recv()
				if err != nil {
					// ...
				}
				l.recvKeepAlive(resp)
			}
		}
		select {
		case <-time.After(retryConnWait):
		case <-l.stopCtx.Done():
			return l.stopCtx.Err()
		}
	}
}

func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
	karesp := &LeaseKeepAliveResponse{
		ResponseHeader: resp.GetHeader(),
		ID:             LeaseID(resp.ID),
		TTL:            resp.TTL,
	}
	// ...
	if karesp.TTL <= 0 {
		// lease expired; close all keep alive channels
		delete(l.keepAlives, karesp.ID)
		ka.close()
		return
	}

	// send update to all channels
	nextKeepAlive := time.Now().Add((time.Duration(karesp.TTL) * time.Second) / 3.0)
	ka.deadline = time.Now().Add(time.Duration(karesp.TTL) * time.Second)
	for _, ch := range ka.chs {
		select {
		case ch <- karesp:
		default:
			if l.lg != nil {
				l.lg.Warn("lease keepalive response queue is full; dropping response send",
					zap.Int("queue-size", len(ch)),
					zap.Int("queue-capacity", cap(ch)),
				)
			}
		}
		// still advance in order to rate-limit keep-alive sends
		ka.nextKeepAlive = nextKeepAlive
	}
}

recvKeepAliveLoop:

  1. 调用 l.resetRecv() 方法重新建立租约续约的客户端连接,获取到一个 lease Stream
  2. 将读取到的响应信息交给 recvKeepAlive 方法处理

recvKeepAlive:

  1. 将resp中的信息解析为一个LeaseKeepAliveResponse类型的结构体 karesp,然后根据karesp中的信息更新租约信息。
  2. 如果karesp的 TTL <= 0,则表示该租约已过期,根据LeaseID,删除对应的keepAlive,并关闭该keepAlive拥有的所有channel。如果 TTL >= 0,则表示租约仍然有效,更新keepAlive的deadline、nextKeepAlive和chs等参数
  3. 最后,尝试将karesp发送到keepAlive拥有的通道,如果写不进去,就会丢弃response。仍然会更新nextKeepAlive,以限制keepAlive请求发送速率。

2. lease server端源码

# 服务端
lease/
├── doc.go
├── lease_queue.go
├── lease_queue_test.go
├── leasehttp
│   ├── doc.go
│   ├── http.go
│   └── http_test.go
├── leasepb
│   ├── lease.pb.go
│   └── lease.proto
├── lessor.go
├── lessor_bench_test.go
├── lessor_test.go
└── metrics.go

lease的定义如下

type Lease struct {
	ID           LeaseID
	ttl          int64 // time to live of the lease in seconds
	remainingTTL int64 // remaining time to live in seconds, if zero valued it is considered unset and the full ttl should be used
	// expiryMu protects concurrent accesses to expiry
	expiryMu sync.RWMutex
	// expiry is time when lease should expire. no expiration when expiry.IsZero() is true
	expiry time.Time

	// mu protects concurrent accesses to itemSet
	mu      sync.RWMutex
	itemSet map[LeaseItem]struct{}
	revokec chan struct{}
}
// 具有以下主要方法

// 返回当前lease绑定的所有key
func (*Lease).Keys() []string

// 当前租约剩下的时间
func (*Lease).Remaining() time.Duration

// 租约的TTL.
func (*Lease).TTL() int64

具体的,ETCD通过lessor来管理多个lease,我只复制了主要部分代码

type Lessor interface {
    // ...
	// Grant grants a lease that expires at least after TTL seconds.
	Grant(id LeaseID, ttl int64) (*Lease, error)
	// Revoke revokes a lease with given ID. The item attached to the
	// given lease will be removed. If the ID does not exist, an error
	// will be returned.
	Revoke(id LeaseID) error

	// Attach attaches given leaseItem to the lease with given LeaseID.
	// If the lease does not exist, an error will be returned.
	Attach(id LeaseID, items []LeaseItem) error

	// GetLease returns LeaseID for given item.
	// If no lease found, NoLease value will be returned.
	GetLease(item LeaseItem) LeaseID

	// Detach detaches given leaseItem from the lease with given LeaseID.
	// If the lease does not exist, an error will be returned.
	Detach(id LeaseID, items []LeaseItem) error

	// Renew renews a lease with given ID. It returns the renewed TTL. If the ID does not exist,
	// an error will be returned.
	Renew(id LeaseID) (int64, error)
}

lessor 实现了 Lessor 接口,我只复制了主要部分代码

type lessor struct {
	// ...
	leaseMap             map[LeaseID]*Lease
	leaseExpiredNotifier *LeaseExpiredNotifier
	leaseCheckpointHeap  LeaseQueue
	itemMap              map[LeaseItem]LeaseID
}

关于服务端lease的逻辑,其实与客户端是对应的,我就只对Renew进行介绍,其他的不再赘述。

func (le *lessor) Renew(id LeaseID) (int64, error) {
	// ...
    if !le.isPrimary() {
		// forward renew request to primary instead of returning error.
		le.mu.RUnlock()
		return -1, ErrNotPrimary
	}

	l := le.leaseMap[id]
	if l == nil {
		le.mu.RUnlock()
		return -1, ErrLeaseNotFound
	}
	// Clear remaining TTL when we renew if it is set
	clearRemainingTTL := le.cp != nil && l.remainingTTL > 0

	if l.expired() {
		select {
		case <-l.revokec:
			return -1, ErrLeaseNotFound
		case <-demotec:
			return -1, ErrNotPrimary
		case <-le.stopC:
			return -1, ErrNotPrimary
		}
	}
	if clearRemainingTTL {
		le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: []*pb.LeaseCheckpoint{{ID: int64(l.ID), Remaining_TTL: 0}}})
	}

	le.mu.Lock()
	l.refresh(0)
	item := &LeaseWithTime{id: l.ID, time: l.expiry}
	le.leaseExpiredNotifier.RegisterOrUpdate(item)
	le.mu.Unlock()

	leaseRenewed.Inc()
	return l.ttl, nil
}
  1. 如果ETCD节点不是主节点,则该方法返回错误,将Renew请求转发到主节点
  2. 获取指定ID的租约(l := le.leaseMap[id])。如果该租约不存在,则返回错误
  3. 如果租约存在:
    • 等待 lease 对应的 revoke channel 被关闭。如果 revoke channel 已经被关闭,则说明 lease 已经被删除。如果 revoke channel 未关闭,则说明 lease 正在被删除,需要等待删除完成。
    • 如果当前节点不是 primary,或者stopc被关闭,则返回 ErrNotPrimary。
  4. 清除当前租约的剩余TTL,以便续约时设置正确的TTL。刷新 lease 的过期时间,并将该 lease 注册到 leaseExpiredNotifier 中。
  5. 由于Raft算法会存在leader变更问题,因此需要给lease创建checkponit,便于leader变更后恢复

LeaseQueue

// LeaseWithTime contains lease object with a time.
// For the lessor's lease heap, time identifies the lease expiration time.
// For the lessor's lease checkpoint heap, the time identifies the next lease checkpoint time.
type LeaseWithTime struct {
	id    LeaseID
	time  time.Time
	index int
}

type LeaseQueue []*LeaseWithTime
// LeaseQueue实现了golang标准库的container/heap接口

leaseHeap 实现是一个小根堆,比较的关键是Lease失效的时间。每次从小堆里判断堆顶元素是否失效,失效就 Pop

LeaseExpiredNotifier:通知lessor撤销lease的队列

// LeaseExpiredNotifier is a queue used to notify lessor to revoke expired lease.
// Only save one item for a lease, `Register` will update time of the corresponding lease.
type LeaseExpiredNotifier struct {
	m     map[LeaseID]*LeaseWithTime
	queue LeaseQueue
}

lessor中的expireExists方法,会peek堆顶部,查看是否存在过期的lease,然后revokeExpiredLeases批量删除过期的lease。

item := le.leaseExpiredNotifier.Peek()

标签:阅读,lessor,租约,源码,LeaseID,error,ETCD,Lease,lease
From: https://www.cnblogs.com/chnjm/p/17290968.html

相关文章

  • python---飞机大战小游戏(提供源码)
    项目准备:本项目在pycharm平台实现,需要安装pygame等模块游戏功能:敌机会从不同位置出现且具有不同的速度,飞机可以发射子弹击毁敌机,飞机触碰到敌机会被击落,游戏结束效果演示飞机大战视频演示完整代码项目主要有两个文件构成,分别是plane_main.py文件和plane_sprites.py文件。plane_mai......
  • Springboot+Mysql 图书管理系统【源码+sql】
    java项目学生图书管理系统(源码+数据库文件)技术框架:java+springboot+mysql后端框架:SpringBoot、SpringMVC、MyBatisPlus前端界面:Thymeleaf、BootStrap、jQuery系统共分为三种用户系统主要功能:系统设计三个角色,学生端,管理员端,系统管理员端1.普通用户书籍查询、书籍借阅......
  • 我的阅读经验
    阅读文本大概需要2.8分钟。我之前跟大家说过,我现在特别喜欢阅读,我每天平均的阅读量大概在5w字左右,差不多每天阅读的总时间在1小时左右,有人说,听起来不过每天就一小时的阅读时间而已,然而常年累月坚持下去,这个习惯会极大的提升自己。其实,要是以前,我从来不敢想象我竟然这么喜欢阅......
  • 在Linux部署Etcd集群
    前言目前解决分布式系统下数据强一致性的主要算法理论是Paxos和Raft,偏向CAP定理一致性(Consistency)、可用性(Availability)、分区容错性(Partitiontolerance)中的CP。Raft在容错和性能方面和Paxos相当,不同之处在于它将问题分解成相对独立的子问题,逻辑较为清晰,更易于理解。关于Raft......
  • Spring 源码解析 - xml解析封装BeanDefinition(1)
    -  XML解析封装BeanDefinition  断点在 DefaultListableBeanFacy,registerBeanDefinition()二 如果给属性赋值 三 各种postprocessor       ##2、Spring套路点-1、AbstractBeanDefinition看看何时给容器中注入了什么组件-2、BeanFactory让......
  • OpenJDK源码研究笔记(十):枚举的高级用法,枚举实现接口,竟是别有洞天
    在研究OpenJDK,Java编译器javac源码的过程中,发现以下代码。顿时发现枚举类竟然也有如此“高端大气上档次”的用法。沙场点兵(用法源码)com.sun.tools.javac.file.JavacFileManager.SortFilesprotectedenumSortFilesimplementsComparator<File>{FORWARD{......
  • 直播网站程序源码,element el-menu,前端做菜单搜索
    直播网站程序源码,elementel-menu,前端做菜单搜索方案一:递归+indexof实现步骤:JS实现树形结构数据的模糊搜索查询, 即使父节点没有,但子节点含有,父节点仍要返回。 /** *递归tree关键词搜索 * *@param{key}需要递归的key名 *@param{value}需要搜索查询的关键字 *......
  • 直播网站源码,修改el-input边框颜色
    直播网站源码,修改el-input边框颜色.el-input{ --el-input-hover-border-color:#f56c6c; --el-input-focus-border-color:#f56c6c;}​以上就是直播网站源码,修改el-input边框颜色,更多内容欢迎关注之后的文章 ......
  • kubegres 源码解析(二) kubebuilder简介
    摘要Kubegres完全使用KubebuilderV3版本开发,在对Kubegres进行代码解析前,首先了解一下Kubebuilder,本文尝试理清几个问题:如何使用Kubebuilder生成Controller/Operator项目?项目结构是什么,每个文件的作用是什么?具体到最重要的几个文件,代码如何组织,功......
  • Zookeeper Session源码
    我们说客户端与服务端建立连接交互的时候会创建一个Session与之对应,那假设客户端请求来了,服务端是如何处理的?Session又是如何创建出来的?我们先来看第一个问题:服务端如何处理客户端发来的请求?一、如何处理请求所谓的请求全称是网络请求,涉及到网络就少不了Socket通信,ZooKeep......