首页 > 其他分享 >etcd watch 实现原理

etcd watch 实现原理

时间:2024-06-10 14:21:37浏览次数:22  
标签:func key watch 观察者 ws etcd 原理 sws

介绍

在 etcd 中,watch 是一个非常重要的特性,它可以让客户端监控 etcd 中的 key 或者一组 key,当 key 发生变化时,etcd 会通知客户端。本文将介绍 etcd watch 的实现原理。

etcdctl watch /test
# 当 /test 的值发生变化时,会输出如下信息
PUT
/test
a
PUT
/test
b
DELETE
/test

watch 的 api

etcd watch api 是由 grpc stream 实现的,客户端通过 grpc stream 发送 watch 请求,etcd 会将 key 的变化通过 stream 返回给客户端。

rpc Watch(stream WatchRequest) returns (stream WatchResponse) {
      option (google.api.http) = {
        post: "/v3/watch"
        body: "*"
    };
}

api 实现

func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
	sws := serverWatchStream{
		lg: ws.lg,

		clusterID: ws.clusterID,
		memberID:  ws.memberID,

		maxRequestBytes: ws.maxRequestBytes,

		sg:        ws.sg,
		watchable: ws.watchable,
		ag:        ws.ag,

		gRPCStream:  stream,
		watchStream: ws.watchable.NewWatchStream(),
		// chan for sending control response like watcher created and canceled.
		ctrlStream: make(chan *pb.WatchResponse, ctrlStreamBufLen),

		progress: make(map[mvcc.WatchID]bool),
		prevKV:   make(map[mvcc.WatchID]bool),
		fragment: make(map[mvcc.WatchID]bool),

		closec: make(chan struct{}),
	}

	sws.wg.Add(1)
	go func() {
        // 开启一个 goroutine 处理新的 event 然后发送给客户端
		sws.sendLoop()
		sws.wg.Done()
	}()

	errc := make(chan error, 1)
	
	go func() {
        // 开启一个 goroutine 处理客户端发送的 watch 请求
		if rerr := sws.recvLoop(); rerr != nil {
			if isClientCtxErr(stream.Context().Err(), rerr) {
				sws.lg.Debug("failed to receive watch request from gRPC stream", zap.Error(rerr))
			} else {
				sws.lg.Warn("failed to receive watch request from gRPC stream", zap.Error(rerr))
				streamFailures.WithLabelValues("receive", "watch").Inc()
			}
			errc <- rerr
		}
	}()

	// 处理结束
	select {
	case err = <-errc:
		if err == context.Canceled {
			err = rpctypes.ErrGRPCWatchCanceled
		}
		close(sws.ctrlStream)
	case <-stream.Context().Done():
		err = stream.Context().Err()
		if err == context.Canceled {
			err = rpctypes.ErrGRPCWatchCanceled
		}
	}

	sws.close()
	return err
}

这里 主要的逻辑是开启两个 goroutine,一个用于处理客户端发送的 watch 请求,另一个用于处理新的 event 然后发送给客户端。

sendLoop

func (sws *serverWatchStream) sendLoop() {
	// watch ids that are currently active
	ids := make(map[mvcc.WatchID]struct{})
	// watch responses pending on a watch id creation message
	pending := make(map[mvcc.WatchID][]*pb.WatchResponse)

	interval := GetProgressReportInterval()
	progressTicker := time.NewTicker(interval)

	defer func() {
		progressTicker.Stop()
		// 清空chan ,清理待处理 event
		for ws := range sws.watchStream.Chan() {
			mvcc.ReportEventReceived(len(ws.Events))
		}
		for _, wrs := range pending {
			for _, ws := range wrs {
				mvcc.ReportEventReceived(len(ws.Events))
			}
		}
	}()

	for {
		select {
		case wresp, ok := <-sws.watchStream.Chan():
            // 从 watchStream.Chan() 中获取 event
            // 然后发送给客户端 
			if !ok {
				return
			}

			evs := wresp.Events
			events := make([]*mvccpb.Event, len(evs))
			sws.mu.RLock()
			needPrevKV := sws.prevKV[wresp.WatchID]
			sws.mu.RUnlock()
			for i := range evs {
				events[i] = &evs[i]
				if needPrevKV && !IsCreateEvent(evs[i]) {
					opt := mvcc.RangeOptions{Rev: evs[i].Kv.ModRevision - 1}
					r, err := sws.watchable.Range(context.TODO(), evs[i].Kv.Key, nil, opt)
					if err == nil && len(r.KVs) != 0 {
						events[i].PrevKv = &(r.KVs[0])
					}
				}
			}

			canceled := wresp.CompactRevision != 0
			wr := &pb.WatchResponse{
				Header:          sws.newResponseHeader(wresp.Revision),
				WatchId:         int64(wresp.WatchID),
				Events:          events,
				CompactRevision: wresp.CompactRevision,
				Canceled:        canceled,
			}

			// Progress notifications can have WatchID -1
			// if they announce on behalf of multiple watchers
			if wresp.WatchID != clientv3.InvalidWatchID {
				if _, okID := ids[wresp.WatchID]; !okID {
					// buffer if id not yet announced
					wrs := append(pending[wresp.WatchID], wr)
					pending[wresp.WatchID] = wrs
					continue
				}
			}

			mvcc.ReportEventReceived(len(evs))

			sws.mu.RLock()
			fragmented, ok := sws.fragment[wresp.WatchID]
			sws.mu.RUnlock()

			var serr error
			// gofail: var beforeSendWatchResponse struct{}
			if !fragmented && !ok {
				serr = sws.gRPCStream.Send(wr)
			} else {
				serr = sendFragments(wr, sws.maxRequestBytes, sws.gRPCStream.Send)
			}

			if serr != nil {
				if isClientCtxErr(sws.gRPCStream.Context().Err(), serr) {
					sws.lg.Debug("failed to send watch response to gRPC stream", zap.Error(serr))
				} else {
					sws.lg.Warn("failed to send watch response to gRPC stream", zap.Error(serr))
					streamFailures.WithLabelValues("send", "watch").Inc()
				}
				return
			}

			sws.mu.Lock()
			if len(evs) > 0 && sws.progress[wresp.WatchID] {
				// elide next progress update if sent a key update
				sws.progress[wresp.WatchID] = false
			}
			sws.mu.Unlock()

		case c, ok := <-sws.ctrlStream:
            // 处理客户端发送的 watch 请求
			if !ok {
				return
			}

			if err := sws.gRPCStream.Send(c); err != nil {
				if isClientCtxErr(sws.gRPCStream.Context().Err(), err) {
					sws.lg.Debug("failed to send watch control response to gRPC stream", zap.Error(err))
				} else {
					sws.lg.Warn("failed to send watch control response to gRPC stream", zap.Error(err))
					streamFailures.WithLabelValues("send", "watch").Inc()
				}
				return
			}

			// track id creation
			wid := mvcc.WatchID(c.WatchId)

			verify.Assert(!(c.Canceled && c.Created) || wid == clientv3.InvalidWatchID, "unexpected watchId: %d, wanted: %d, since both 'Canceled' and 'Created' are true", wid, clientv3.InvalidWatchID)

			if c.Canceled && wid != clientv3.InvalidWatchID {
				delete(ids, wid)
				continue
			}
			if c.Created {
				// flush buffered events
				ids[wid] = struct{}{}
				for _, v := range pending[wid] {
					mvcc.ReportEventReceived(len(v.Events))
					if err := sws.gRPCStream.Send(v); err != nil {
						if isClientCtxErr(sws.gRPCStream.Context().Err(), err) {
							sws.lg.Debug("failed to send pending watch response to gRPC stream", zap.Error(err))
						} else {
							sws.lg.Warn("failed to send pending watch response to gRPC stream", zap.Error(err))
							streamFailures.WithLabelValues("send", "watch").Inc()
						}
						return
					}
				}
				delete(pending, wid)
			}

		case <-progressTicker.C:
			sws.mu.Lock()
			for id, ok := range sws.progress {
				if ok {
					sws.watchStream.RequestProgress(id)
				}
				sws.progress[id] = true
			}
			sws.mu.Unlock()

		case <-sws.closec:
			return
		}
	}
}

这里使用了 for select 循环:

  1. 从 watchStream.Chan() 中获取 event 然后发送给客户端。
  2. 处理客户端发送的 watch 请求。
  3. dispatch progress 事件。
  4. 处理结束。

recvLoop

func (sws *serverWatchStream) recvLoop() error {
	for {
		req, err := sws.gRPCStream.Recv()
		if err == io.EOF {
			return nil
		}
		if err != nil {
			return err
		}

		switch uv := req.RequestUnion.(type) {
		case *pb.WatchRequest_CreateRequest:
			if uv.CreateRequest == nil {
				break
			}

			creq := uv.CreateRequest
			if len(creq.Key) == 0 {
				// \x00 is the smallest key
				creq.Key = []byte{0}
			}
			if len(creq.RangeEnd) == 0 {
				// force nil since watchstream.Watch distinguishes
				// between nil and []byte{} for single key / >=
				creq.RangeEnd = nil
			}
			if len(creq.RangeEnd) == 1 && creq.RangeEnd[0] == 0 {
				// support  >= key queries
				creq.RangeEnd = []byte{}
			}

			err := sws.isWatchPermitted(creq)
			if err != nil {
				var cancelReason string
				switch err {
				case auth.ErrInvalidAuthToken:
					cancelReason = rpctypes.ErrGRPCInvalidAuthToken.Error()
				case auth.ErrAuthOldRevision:
					cancelReason = rpctypes.ErrGRPCAuthOldRevision.Error()
				case auth.ErrUserEmpty:
					cancelReason = rpctypes.ErrGRPCUserEmpty.Error()
				default:
					if err != auth.ErrPermissionDenied {
						sws.lg.Error("unexpected error code", zap.Error(err))
					}
					cancelReason = rpctypes.ErrGRPCPermissionDenied.Error()
				}

				wr := &pb.WatchResponse{
					Header:       sws.newResponseHeader(sws.watchStream.Rev()),
					WatchId:      clientv3.InvalidWatchID,
					Canceled:     true,
					Created:      true,
					CancelReason: cancelReason,
				}

				select {
				case sws.ctrlStream <- wr:
					continue
				case <-sws.closec:
					return nil
				}
			}

			filters := FiltersFromRequest(creq)

			wsrev := sws.watchStream.Rev()
			rev := creq.StartRevision
			if rev == 0 {
				rev = wsrev + 1
			}
			id, err := sws.watchStream.Watch(mvcc.WatchID(creq.WatchId), creq.Key, creq.RangeEnd, rev, filters...)
			if err == nil {
				sws.mu.Lock()
				if creq.ProgressNotify {
					sws.progress[id] = true
				}
				if creq.PrevKv {
					sws.prevKV[id] = true
				}
				if creq.Fragment {
					sws.fragment[id] = true
				}
				sws.mu.Unlock()
			} else {
				id = clientv3.InvalidWatchID
			}

			wr := &pb.WatchResponse{
				Header:   sws.newResponseHeader(wsrev),
				WatchId:  int64(id),
				Created:  true,
				Canceled: err != nil,
			}
			if err != nil {
				wr.CancelReason = err.Error()
			}
			select {
			case sws.ctrlStream <- wr:
			case <-sws.closec:
				return nil
			}

		case *pb.WatchRequest_CancelRequest:
			if uv.CancelRequest != nil {
				id := uv.CancelRequest.WatchId
				err := sws.watchStream.Cancel(mvcc.WatchID(id))
				if err == nil {
					sws.ctrlStream <- &pb.WatchResponse{
						Header:   sws.newResponseHeader(sws.watchStream.Rev()),
						WatchId:  id,
						Canceled: true,
					}
					sws.mu.Lock()
					delete(sws.progress, mvcc.WatchID(id))
					delete(sws.prevKV, mvcc.WatchID(id))
					delete(sws.fragment, mvcc.WatchID(id))
					sws.mu.Unlock()
				}
			}
		case *pb.WatchRequest_ProgressRequest:
			if uv.ProgressRequest != nil {
				sws.mu.Lock()
				sws.watchStream.RequestProgressAll()
				sws.mu.Unlock()
			}
		default:
			// we probably should not shutdown the entire stream when
			// receive an invalid command.
			// so just do nothing instead.
			sws.lg.Sugar().Infof("invalid watch request type %T received in gRPC stream", uv)
			continue
		}
	}
}

这里主要处理客户端发送的 watch 请求,然后发送给 ctrlStream。sendLoop 会从 ctrlStream 中获取 event 然后发送给客户端。

WatchStream

这个 inferface 才是处理 watch 的主要逻辑

// WatchStream 是一个接口,定义了一个流式处理watch请求的机制
type WatchStream interface {
	// Watch 创建一个观察者。观察者会监听在给定的键或范围 [key, end) 上发生的事件或已发生的事件。
	//
	// 整个事件历史都可以被观察到,除非被压缩。
	// 如果 "startRev" <= 0,watch 将观察在当前修订版本之后的事件。
	//
	// 返回的 "id" 是这个观察者的ID。它作为 WatchID 出现在通过 stream 通道发送到创建的观察者的事件中。
	// 当 WatchID 不等于 AutoWatchID 时,使用指定的 WatchID,否则返回自动生成的 WatchID。
	Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error)

	// Chan 返回一个通道。所有的watch响应将被发送到这个返回的通道。
	Chan() <-chan WatchResponse

	// RequestProgress 请求给定ID的观察者的进度。响应只有在观察者当前同步时才会被发送。
	// 响应将通过与此流关联的 WatchResponse 通道发送,以确保正确的顺序。
	// 响应不包含事件。响应中的修订版本是观察者自同步以来的进度。
	RequestProgress(id WatchID)

	// RequestProgressAll 请求所有共享此流的观察者的进度通知。
	// 如果所有观察者都已同步,将向此流的任意观察者发送带有watch ID -1的进度通知,并返回 true。
	RequestProgressAll() bool

	// Cancel 通过给定ID取消观察者。如果观察者不存在,将返回错误。
	Cancel(id WatchID) error

	// Close 关闭通道并释放所有相关资源。
	Close()

	// Rev 返回流上观察到的KV的当前修订版本。
	Rev() int64
}

// WatchResponse 表示一个watch操作的响应。
type WatchResponse struct {
	// WatchID 是发送此响应的观察者的ID。
	WatchID WatchID

	// Events 包含所有需要发送的事件。
	Events []mvccpb.Event

	// Revision 是创建watch响应时KV的修订版本。
	// 对于正常响应,修订版本应该与Events中最后一个修改的修订版本相同。
	// 对于延迟响应的未同步观察者,修订版本大于Events中最后一个修改的修订版本。
	Revision int64

	// CompactRevision 在观察者由于压缩而被取消时设置。
	CompactRevision int64
}

// 实现了 WatchStream
// watchStream 包含共享一个流通道发送被观察事件和其他控制事件的观察者集合。
type watchStream struct {
	// 可观察对象(例如KV存储)
	watchable watchable
	// 用于发送watch响应的通道
	ch        chan WatchResponse

	// 互斥锁,保护以下字段
	mu sync.Mutex 
	// nextID 是为此流中下一个新观察者预分配的ID
	nextID   WatchID
	// 标志流是否已关闭
	closed   bool
	// 取消函数的映射,用于取消特定的观察者
	cancels  map[WatchID]cancelFunc
	// 观察者的映射,根据观察者ID索引
	watchers map[WatchID]*watcher
}

// Watch 在流中创建一个新的观察者并返回其 WatchID。
func (ws *watchStream) Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error) {
	// 防止键 >= 结束键(按字典顺序)的错误范围
	// 带有 'WithFromKey' 的watch请求具有空字节范围结束
	if len(end) != 0 && bytes.Compare(key, end) != -1 {
		return -1, ErrEmptyWatcherRange
	}

	// 获取互斥锁
	ws.mu.Lock()
	defer ws.mu.Unlock()
	// 如果流已关闭,返回错误
	if ws.closed {
		return -1, ErrEmptyWatcherRange
	}

	// 自动生成 WatchID
	if id == clientv3.AutoWatchID {
		for ws.watchers[ws.nextID] != nil {
			ws.nextID++
		}
		id = ws.nextID
		ws.nextID++
	} else if _, ok := ws.watchers[id]; ok {
		return -1, ErrWatcherDuplicateID
	}

	// 创建新的观察者
	w, c := ws.watchable.watch(key, end, startRev, id, ws.ch, fcs...)

	// 保存取消函数和观察者
	ws.cancels[id] = c
	ws.watchers[id] = w
	return id, nil
}
// Chan 返回用于接收watch响应的通道。
func (ws *watchStream) Chan() <-chan WatchResponse {
	return ws.ch
}
// Cancel 取消具有给定ID的观察者。
func (ws *watchStream) Cancel(id WatchID) error {
	// 获取互斥锁
	ws.mu.Lock()
	cancel, ok := ws.cancels[id]
	w := ws.watchers[id]
	ok = ok && !ws.closed
	ws.mu.Unlock()

	// 如果观察者不存在或流已关闭,返回错误
	if !ok {
		return ErrWatcherNotExist
	}
	cancel()

	// 获取互斥锁
	ws.mu.Lock()
	// 在取消之前不删除观察者,以确保 Close() 调用时等待取消
	if ww := ws.watchers[id]; ww == w {
		delete(ws.cancels, id)
		delete(ws.watchers, id)
	}
	ws.mu.Unlock()

	return nil
}
// Close 关闭通道并释放所有相关资源。
func (ws *watchStream) Close() {
	// 获取互斥锁
	ws.mu.Lock()
	defer ws.mu.Unlock()

	// 取消所有观察者
	for _, cancel := range ws.cancels {
		cancel()
	}
	// 标记流已关闭并关闭通道
	ws.closed = true
	close(ws.ch)
	watchStreamGauge.Dec()
}
// Rev 返回流上观察到的KV的当前修订版本。
func (ws *watchStream) Rev() int64 {
	// 获取互斥锁
	ws.mu.Lock()
	defer ws.mu.Unlock()
	return ws.watchable.rev()
}
// RequestProgress 请求给定ID的观察者的进度。
func (ws *watchStream) RequestProgress(id WatchID) {
	// 获取互斥锁
	ws.mu.Lock()
	w, ok := ws.watchers[id]
	ws.mu.Unlock()
	// 如果观察者不存在,直接返回
	if !ok {
		return
	}
	// 请求进度
	ws.watchable.progress(w)
}
// RequestProgressAll 请求所有观察者的进度通知。
func (ws *watchStream) RequestProgressAll() bool {
	// 获取互斥锁
	ws.mu.Lock()
	defer ws.mu.Unlock()
	return ws.watchable.progressAll(ws.watchers)
}
  1. Watch 方法:创建一个新的观察者,如果指定的范围不正确或观察者ID重复,则返回错误。否则,创建观察者并保存取消函数和观察者实例。
  2. Chan 方法:返回用于接收watch响应的通道。
  3. Cancel 方法:取消给定ID的观察者,删除相关的取消函数和观察者实例。
  4. Close 方法:关闭所有观察者并释放资源。
  5. Rev 方法:返回当前观察到的KV修订版本。
  6. RequestProgress 方法:请求特定观察者的进度。
  7. RequestProgressAll 方法:请求所有观察者的进度通知。

可以可到 当调用 Watch 的时候 每个 watchId 都会调用 watchable.watch 并把自己 ch 放入进去

watchable

// watchable 接口定义了可观察对象的行为
type watchable interface {
    // watch 创建一个新的观察者,用于监听指定键或范围[startRev, end)上的事件。
    // 返回观察者指针和取消函数。
    watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc)
    
    // progress 通知特定观察者当前的进度。
    progress(w *watcher)
    
    // progressAll 通知所有观察者当前的进度。
    // 如果所有观察者都已同步,则返回 true。
    progressAll(watchers map[WatchID]*watcher) bool
    
    // rev 返回当前观察到的修订版本。
    rev() int64
}


// watchableStore 是一个实现了 watchable 接口的结构体,代表一个可观察的存储
type watchableStore struct {
    // store 是一个指向基础存储的指针
    *store

    // mu 保护观察者组和批次。为了避免死锁,在锁定 store.mu 之前不应锁定 mu。
    mu sync.RWMutex

    // victims 是在 watch 通道上被阻塞的观察者批次
    victims []watcherBatch
    victimc chan struct{}

    // unsynced 包含所有需要同步已经发生的事件的未同步观察者
    unsynced watcherGroup

    // synced 包含所有与存储进度同步的观察者
    // 映射的键是观察者监听的键
    synced watcherGroup

    // stopc 是一个用于停止操作的通道
    stopc chan struct{}

    // wg 用于等待所有 goroutine 完成
    wg sync.WaitGroup
}

func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) {
	// 创建一个新的观察者
	wa := &watcher{
		key:    key,
		end:    end,
		minRev: startRev,
		id:     id,
		ch:     ch,
		fcs:    fcs,
	}

	// 锁定 watchableStore 的互斥锁
	s.mu.Lock()
	// 锁定 store 的读写锁用于获取当前修订版本
	s.revMu.RLock()
	// 判断观察者是否与当前存储修订版本同步
	synced := startRev > s.store.currentRev || startRev == 0
	if synced {
		// 如果同步,设置最小修订版本为当前修订版本的下一个版本
		wa.minRev = s.store.currentRev + 1
		if startRev > wa.minRev {
			wa.minRev = startRev
		}
		// 将观察者添加到同步观察者组中
		s.synced.add(wa)
	} else {
		// 如果未同步,增加慢速观察者计数器
		slowWatcherGauge.Inc()
		// 将观察者添加到未同步观察者组中
		s.unsynced.add(wa)
	}
	// 解锁 store 的读写锁
	s.revMu.RUnlock()
	// 解锁 watchableStore 的互斥锁
	s.mu.Unlock()

	// 增加观察者计数器
	watcherGauge.Inc()

	// 返回观察者和取消函数
	return wa, func() { s.cancelWatcher(wa) }
}

newWatchableStore

func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *watchableStore {
	if lg == nil {
		lg = zap.NewNop()
	}
	s := &watchableStore{
		store:    NewStore(lg, b, le, cfg),
		victimc:  make(chan struct{}, 1),
		unsynced: newWatcherGroup(),
		synced:   newWatcherGroup(),
		stopc:    make(chan struct{}),
	}
	s.store.ReadView = &readView{s}
	s.store.WriteView = &writeView{s}
	if s.le != nil {
		// use this store as the deleter so revokes trigger watch events
		s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write(traceutil.TODO()) })
	}
	s.wg.Add(2)
	go s.syncWatchersLoop()
	go s.syncVictimsLoop()
	return s
}

syncWatchersLoop

// syncWatchersLoop 每100毫秒同步一次unsynced集合中的观察者。
func (s *watchableStore) syncWatchersLoop() {
	defer s.wg.Done()

	// 设置等待时间为100毫秒
	waitDuration := 100 * time.Millisecond
	delayTicker := time.NewTicker(waitDuration)
	defer delayTicker.Stop()

	for {
		// 锁定以获取未同步观察者的数量
		s.mu.RLock()
		st := time.Now()
		lastUnsyncedWatchers := s.unsynced.size()
		s.mu.RUnlock()

		unsyncedWatchers := 0
		// 如果有未同步观察者,同步这些观察者
		if lastUnsyncedWatchers > 0 {
			unsyncedWatchers = s.syncWatchers()
		}
		syncDuration := time.Since(st)

		// 重置定时器
		delayTicker.Reset(waitDuration)
		// 检查是否有更多待处理的工作
		if unsyncedWatchers != 0 && lastUnsyncedWatchers > unsyncedWatchers {
			// 公平对待其他存储操作,通过延长时间来避免占用太多资源
			delayTicker.Reset(syncDuration)
		}

		// 等待定时器或停止信号
		select {
		case <-delayTicker.C:
		case <-s.stopc:
			return
		}
	}
}

// syncWatchers 通过以下步骤同步未同步的观察者:
//  1. 从未同步观察者组中选择一组观察者
//  2. 迭代该组以获取最小修订版本并移除压缩的观察者
//  3. 使用最小修订版本获取所有键值对,并将这些事件发送给观察者
//  4. 从未同步组中移除已同步的观察者,并移动到同步组中
func (s *watchableStore) syncWatchers() int {
	// 锁定
	s.mu.Lock()
	defer s.mu.Unlock()

	// 如果没有未同步观察者,返回0
	if s.unsynced.size() == 0 {
		return 0
	}

	// 锁定存储的读写锁
	s.store.revMu.RLock()
	defer s.store.revMu.RUnlock()

	// 为了从未同步观察者中找到键值对,我们需要找到最小修订版本
	curRev := s.store.currentRev
	compactionRev := s.store.compactMainRev

	// 选择一组观察者
	wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)
	minBytes, maxBytes := NewRevBytes(), NewRevBytes()
	minBytes = RevToBytes(Revision{Main: minRev}, minBytes)
	maxBytes = RevToBytes(Revision{Main: curRev + 1}, maxBytes)

	// UnsafeRange 返回键和值。在boltdb中,键是修订版本,值是实际的键值对。
	tx := s.store.b.ReadTx()
	tx.RLock()
	revs, vs := tx.UnsafeRange(schema.Key, minBytes, maxBytes, 0)
	evs := kvsToEvents(s.store.lg, wg, revs, vs)
	// 必须在kvsToEvents之后解锁,因为vs(来自boltdb内存)不是深拷贝。
	// 我们只能在Unmarshal之后解锁,这将进行深拷贝。
	// 否则我们将在boltdb重新mmap期间触发SIGSEGV。
	tx.RUnlock()

	// 创建一个新的观察者批次
	victims := make(watcherBatch)
	wb := newWatcherBatch(wg, evs)
	for w := range wg.watchers {
		if w.minRev < compactionRev {
			// 跳过因压缩而无法发送响应的观察者
			continue
		}
		w.minRev = curRev + 1

		eb, ok := wb[w]
		if !ok {
			// 将未通知的观察者移至同步
			s.synced.add(w)
			s.unsynced.delete(w)
			continue
		}

		if eb.moreRev != 0 {
			w.minRev = eb.moreRev
		}

		// 发送响应
		if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev}) {
			pendingEventsGauge.Add(float64(len(eb.evs)))
		} else {
			w.victim = true
		}

		// 处理受害者观察者
		if w.victim {
			victims[w] = eb
		} else {
			if eb.moreRev != 0 {
				// 保持未同步状态;还有更多要读取
				continue
			}
			s.synced.add(w)
		}
		s.unsynced.delete(w)
	}
	s.addVictim(victims)

	// 更新慢速观察者计数器
	vsz := 0
	for _, v := range s.victims {
		vsz += len(v)
	}
	slowWatcherGauge.Set(float64(s.unsynced.size() + vsz))

	return s.unsynced.size()
}

watcher & send

type watcher struct {
	// the watcher key
	key []byte
	// end indicates the end of the range to watch.
	// If end is set, the watcher is on a range.
	end []byte

	// victim is set when ch is blocked and undergoing victim processing
	victim bool

	// compacted is set when the watcher is removed because of compaction
	compacted bool

	// restore is true when the watcher is being restored from leader snapshot
	// which means that this watcher has just been moved from "synced" to "unsynced"
	// watcher group, possibly with a future revision when it was first added
	// to the synced watcher
	// "unsynced" watcher revision must always be <= current revision,
	// except when the watcher were to be moved from "synced" watcher group
	restore bool

	// minRev is the minimum revision update the watcher will accept
	minRev int64
	id     WatchID

	fcs []FilterFunc
	// a chan to send out the watch response.
	// The chan might be shared with other watchers.
	ch chan<- WatchResponse
}

func (w *watcher) send(wr WatchResponse) bool {
	progressEvent := len(wr.Events) == 0

	if len(w.fcs) != 0 {
		ne := make([]mvccpb.Event, 0, len(wr.Events))
		for i := range wr.Events {
			filtered := false
			for _, filter := range w.fcs {
				if filter(wr.Events[i]) {
					filtered = true
					break
				}
			}
			if !filtered {
				ne = append(ne, wr.Events[i])
			}
		}
		wr.Events = ne
	}

	// if all events are filtered out, we should send nothing.
	if !progressEvent && len(wr.Events) == 0 {
		return true
	}
	select {
	case w.ch <- wr:
		return true
	default:
		return false
	}
}

syncVictimsLoop

// syncVictimsLoop 尝试将预先计算的观察者响应写入被阻塞的观察者通道
func (s *watchableStore) syncVictimsLoop() {
	defer s.wg.Done()

	for {
		// 尝试更新所有受害者观察者
		for s.moveVictims() != 0 {
			// 持续更新,直到所有受害者观察者都处理完毕
		}

		// 检查是否有受害者观察者
		s.mu.RLock()
		isEmpty := len(s.victims) == 0
		s.mu.RUnlock()

		var tickc <-chan time.Time
		if !isEmpty {
			tickc = time.After(10 * time.Millisecond)
		}

		// 等待10毫秒或收到新的受害者通知或停止信号
		select {
		case <-tickc:
		case <-s.victimc:
		case <-s.stopc:
			return
		}
	}
}

// moveVictims 尝试使用已存在的事件数据更新观察者
func (s *watchableStore) moveVictims() (moved int) {
	s.mu.Lock()
	victims := s.victims
	s.victims = nil
	s.mu.Unlock()

	var newVictim watcherBatch
	for _, wb := range victims {
		// 再次尝试发送响应
		for w, eb := range wb {
			// 观察者已观察到存储,直到但不包括 w.minRev
			rev := w.minRev - 1
			if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
				pendingEventsGauge.Add(float64(len(eb.evs)))
			} else {
				if newVictim == nil {
					newVictim = make(watcherBatch)
				}
				newVictim[w] = eb
				continue
			}
			moved++
		}

		// 将完成的受害者观察者分配到未同步/同步组
		s.mu.Lock()
		s.store.revMu.RLock()
		curRev := s.store.currentRev
		for w, eb := range wb {
			if newVictim != nil && newVictim[w] != nil {
				// 无法发送watch响应,仍然是受害者
				continue
			}
			w.victim = false
			if eb.moreRev != 0 {
				w.minRev = eb.moreRev
			}
			if w.minRev <= curRev {
				s.unsynced.add(w)
			} else {
				slowWatcherGauge.Dec()
				s.synced.add(w)
			}
		}
		s.store.revMu.RUnlock()
		s.mu.Unlock()
	}

	// 如果仍然有未处理的受害者,重新添加到受害者列表中
	if len(newVictim) > 0 {
		s.mu.Lock()
		s.victims = append(s.victims, newVictim)
		s.mu.Unlock()
	}

	return moved
}

Reference

标签:func,key,watch,观察者,ws,etcd,原理,sws
From: https://www.cnblogs.com/daemon365/p/18240641

相关文章

  • 纯理论容器实现的原理
    近期在复习容器的原理,希望这篇文章可以帮助到大家。一、什么是容器?    容器本质上就是主机上的一个进程。这个进程拥有自己的用户空间并且和主机共享内核空间。        容器内的进程可以通过系统调用与内核进行交互,使用内核提供的各种功能和资源。   ......
  • Socket编程权威指南(四)彻底解密 Epoll 原理
    在上一篇文章中,我们优化了基于Socket的网络服务器,从最初的select/poll模型进化到了高效的epoll。很多读者对epoll的惊人性能表示极大的兴趣,对它的工作原理也充满了好奇。今天,就让我们一起揭开epoll神秘的面纱,深入剖析其内部运作机制,进一步提升你的Linux网络编程......
  • 计算机组成原理 第六章 计算机的运算方法 Part4 浮点数的四则运算、IEEE754标准与ALU
    1.浮点数介绍基本格式首先需要明确的是浮点数的组成,浮点数由阶码和尾数两部分组成其中阶码又分为阶符和数值部分,阶码J和阶码的位数m共同反应浮点数的表示范围以及小数点的实际位置;尾数由数符和数值部分组成,数符代表浮点数的符号,尾数的数值部分反映浮点数的精度例:阶码E反......
  • 深入剖析C++多态的实现与原理-详解
    目录多态基础虚函数虚函数的继承虚类/虚基类重写/覆盖条件:概念:多态的条件其他的多态行为多态中子类可以不写virtual协变代码举例继承遗留问题解决析构函数具体解决方式:题目1答案:解析:题目2答案:C++11override和finalfinal功能1:禁用继承使用场景:功能2:禁用重写使用场景overr......
  • 计算机组成原理-cache详解
    一、Cache的概念和原理1、cache原理2、cache性能分析一道例题3、cache和主存数据交换的单位每次访问到的主存块会立即放入cache中小结二、cache和主存之间的映射关系全相联映射全相联访存过程直接映射组相联映射小结三、cache替换算法在直接映射中,每......
  • 计算机组成原理之指令寻址
    一、顺序寻址1、定长指令字结构2、变长指令字结构二、跳跃寻址三、数据寻址1、直接寻址2、间接寻址3、寄存器寻址寄存器间接寻址4、隐含寻址5、立即寻址6、偏移寻址1、基址寻址2、变址寻址3、相对寻址......
  • 【三级指针、二级指针、一级指针、指针值】原理+超直白说明/总结(当然,还是会有点涩~)
    看不懂的原理及区别在计算机编程中,指针是一个变量,其值为另一个变量的地址。指针的级别是根据它们所指向的对象类型来定义的。当我们谈论“一级指针”、“二级指针”和“三级指针”时,我们实际上是在讨论指针所指向的对象的类型。1.一级指针一级指针(或称为普通指针)是最常......
  • 【很全】IPv6/ICMPv6/DHCPv6/SLACC基本原理(2024最新)
    目录1IPv6报文1.1IPv6基本包头1.2IPv6扩展包头 2IPv6地址2.1IPv6地址组成2.2IPv6地址分类2.2.1单播地址(GUA/ULA/LLA)2.2.2组播地址2.2.3任播地址2.3IPv6地址配置 2.3.1无状态自动配置(SLAAC)2.3.2有状态自动分配(StatefulDHCPv6)2.3.3无状态自动分配(St......
  • 2024计算机组成原理复习——第一章
    计算机组成原理复习——第一章一、计算机系统概括本笔记不用于商业用途,内容参考《2025年计算机组成原理——考研复习指导》以及其对应的b站免费视频课(图文信息主要来自于此)(一)计算机系统结构层次1.计算机系统的基本组成硬件:有形的物理设备,计算机系统中实际物理装置的总称......
  • Android 系统架构 详解(原理和四个层次以及启动流程)
    Android系统架构详解(原理和四个层次以及启动流程)Android系统架构是指Android操作系统的整体结构和组织方式,包括不同层次的软件组件和其相互之间的关系,Android系统架构是一个分层的体系结构,它包括多个层次,每个层次都有特定的功能和责任。一、背景Android系统架构......