首页 > 编程语言 >ETCD源码阅读(三)

ETCD源码阅读(三)

时间:2023-03-31 19:14:25浏览次数:41  
标签:Raft ETCD goroutine 源码 阅读 rc raft 节点 channel

DAY2 :阅读raftexample: etcd/contrib/raftexample

serveChannels()

func (rc *raftNode) serveChannels() {
	snap, err := rc.raftStorage.Snapshot()
	if err != nil {
		panic(err)
	}
	rc.confState = snap.Metadata.ConfState
	rc.snapshotIndex = snap.Metadata.Index
	rc.appliedIndex = snap.Metadata.Index

	defer rc.wal.Close()

	ticker := time.NewTicker(100 * time.Millisecond)
	defer ticker.Stop()

	// send proposals over raft
	go func() {
		confChangeCount := uint64(0)

		for rc.proposeC != nil && rc.confChangeC != nil {
			select {
			case prop, ok := <-rc.proposeC:
				if !ok {
					rc.proposeC = nil
				} else {
					// blocks until accepted by raft state machine
					rc.node.Propose(context.TODO(), []byte(prop))
				}

			case cc, ok := <-rc.confChangeC:
				if !ok {
					rc.confChangeC = nil
				} else {
					confChangeCount++
					cc.ID = confChangeCount
					rc.node.ProposeConfChange(context.TODO(), cc)
				}
			}
		}
		// client closed channel; shutdown raft if not already
		close(rc.stopc)
	}()

	// event loop on raft state machine updates
	for {
		select {
		case <-ticker.C:
			rc.node.Tick()

		// store raft entries to wal, then publish over commit channel
		case rd := <-rc.node.Ready():
			rc.wal.Save(rd.HardState, rd.Entries)
			if !raft.IsEmptySnap(rd.Snapshot) {
				rc.saveSnap(rd.Snapshot)
				rc.raftStorage.ApplySnapshot(rd.Snapshot)
				rc.publishSnapshot(rd.Snapshot)
			}
			rc.raftStorage.Append(rd.Entries)
			rc.transport.Send(rd.Messages)
			applyDoneC, ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries))
			if !ok {
				rc.stop()
				return
			}
			rc.maybeTriggerSnapshot(applyDoneC)
			rc.node.Advance()

		case err := <-rc.transport.ErrorC:
			rc.writeError(err)
			return

		case <-rc.stopc:
			rc.stop()
			return
		}
	}
}

serveChannels函数,是 raftexample 中用于处理 Raft 消息的主要事件循环。

  1. 通过调用 rc.raftStorage.Snapshot() 方法获取 etcd 当前的快照数据,并将其中的集群配置状态(ConfState)、快照索引(snapshotIndex)和已提交的索引(appliedIndex)等元数据信息记录到当前 raftNode 实例(即 rc 变量)中。
  2. 创建一个定时器 ticker,每 100 毫秒触发一次,用于驱动 Raft 状态机的 Tick() 方法。这个Tick()方法使Raft状态机的内部逻辑时钟加一,选举超时和心跳超时都以“tick”的单位来计算。选举超时:当一个节点在经过一段时间(通常是几个 tick)没有收到心跳和选举请求时,就会从follower状态转变成candidate状态,发起一次选举。心跳超时:只有leader拥有这个计时器,超时了就给follower发送心跳包。follower收到后就会把选举超市计数器置零。
  3. 创建一个新的 goroutine,用于将处理KVStore发送的proposal,HTTP API Server对集群配置的更改请求(如添加或删除节点)。
  4. 创建一个处理Raft状态机更新的Goroutine,不断处理以下事件:
    • ticker事件:参考上一段

    • rc.node.Ready():

      case rd := <-rc.node.Ready():
          rc.wal.Save(rd.HardState, rd.Entries)
          if !raft.IsEmptySnap(rd.Snapshot) {
              rc.saveSnap(rd.Snapshot)
              rc.raftStorage.ApplySnapshot(rd.Snapshot)
              rc.publishSnapshot(rd.Snapshot)
          }
          rc.raftStorage.Append(rd.Entries)
          rc.transport.Send(rd.Messages)
          applyDoneC, ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries))
          if !ok {
              rc.stop()
              return
          }
          rc.maybeTriggerSnapshot(applyDoneC)
          // Advance notifies the Node that the application has 
          // saved progress up to the last Ready. 
          // It prepares the node to return the next available Ready.
          // The application should generally call Advance after
          // it applies the entries in last Ready.
          rc.node.Advance()
      

      获取aft节点已经准备好的一批Entry和当前的HardState(给客户端发送状态更新前的状态),并保存到 WAL 中。接着判断是否有快照数据,如果有就更新快照。

      将Entry添加到raft节点的raftStorage中。然后调用 rc.transport.Send() 方法将 Messages 发送给其他节点。

      最后调用 rc.entriesToApply() 方法获取需要apply的 committed entries,通过调用 rc.publishEntries() 方法将这些 committed entries 应用到状态机中,并返回一个 channel 用于通知应用完成。

    • rc.transport.ErrorC 和 rc.stopc:出现错误需要关闭Raft Node

raft.go

这段代码经过前面的解读,已经理解的差不多了,我们来看看对应的单元测试吧。选了一个比较有意思的。

// TestProposeOnCommit starts three nodes and feeds commits back into the proposal
// channel. The intent is to ensure blocking on a proposal won't block raft progress.
func TestProposeOnCommit(t *testing.T) {
	clus := newCluster(3)
	defer clus.closeNoErrors(t)

	donec := make(chan struct{})
	for i := range clus.peers {
		// feedback for "n" committed entries, then update donec
		go func(pC chan<- string, cC <-chan *commit, eC <-chan error) {
			for n := 0; n < 100; n++ {
				c, ok := <-cC
				if !ok {
					pC = nil
				}
				select {
				case pC <- c.data[0]:
					continue
				case err := <-eC:
					t.Errorf("eC message (%v)", err)
				}
			}
			donec <- struct{}{}
			for range cC {
				// acknowledge the commits from other nodes so
				// raft continues to make progress
			}
		}(clus.proposeC[i], clus.commitC[i], clus.errorC[i])

		// one message feedback per node
		go func(i int) { clus.proposeC[i] <- "foo" }(i)
	}

	for range clus.peers {
		<-donec
	}
}

先mock了一个三节点的集群(其实就是将对应的channel连接上),然后在每个节点上启动了两个 goroutine。其中,一个 goroutine 持续地从该节点的 commit channel 中读取消息,然后将第一条消息反馈给该节点的 propose channel。另一个 goroutine 仅向该节点的 propose channel 中发送一条消息。

这样就会发生不断地从 propose channel 中读取并将消息发送到 commit channel 中的过程,然后在一段时间后结束这个循环并向 donec channel 发送一个空结构体来指示完成。最后测试会阻塞直到从所有的节点都接收到了 donec channel 中的空结构体,以确保测试的所有 goroutine 都已结束。

这个测试是为了验证:即使在等待反馈消息的时候,也不会阻塞 raft 的进展。

标签:Raft,ETCD,goroutine,源码,阅读,rc,raft,节点,channel
From: https://www.cnblogs.com/chnjm/p/17277227.html

相关文章

  • ETCD源码阅读(二)
    DAY1:阅读raftexample:etcd/contrib/raftexampleraftexample包括三个组件:一个基于raft的kvstore、一个RESTAPIServer、一个基于etcdraft实现的RaftNode。其中RaftNode也拥有一个Httpserver,用于与peer节点进行通信。RESTAPIServer则是这个RaftNode的client。kvs......
  • ETCD源码阅读(一)
    DAY0:ETCD架构下图中展示了etcd如何处理一个客户端请求涉及到的模块和流程。图中淡紫色的矩阵表示etcd,它包括如下几个模块:etcdserver:对外接受客户端的请求,请求etcd代码中的etcdserver目录,其中还有一个raft.go的模块与etcdraft库进行通信。etcdserver中与......
  • vue+leaflet示例:克里金插值渲染显示(附源码下载)
    demo源码运行环境以及配置运行环境:依赖Node安装环境,demo本地Node版本:14.19.1。运行工具:vscode或者其他工具。配置方式:下载demo源码,vscode打开,然后顺序执行以下命令:(1)下载demo环境依赖包命令:npmi(2)启动demo命令:npmrundev(3)打包demo命令:npmrunbuild:release示例效果......
  • 直播网站源码,Android中点击图片放大的简单方法
    直播网站源码,Android中点击图片放大的简单方法简单的思路就是把要放大的图片显示在一个对话框中显示出来 Java代码: publicvoidonThumbnailClick(Viewv){//finalAlertDialogdialog=newAlertDialog.Builder(this).create();//ImageViewimgView=getView();//di......
  • ChatGPT 微信接入 C#完整源码
    1.无需搭建服务器,操作极其简单。  2.winform运行程序扫码进行微信登录,勾上自动回复,就可以充当机器人调用chatGPT可实现自动回复,可以申请小号操作。  3.可以识别会话消息和群聊消息,拉入群聊@机器人可以进行群聊的消息回复,可以得到@自己的回复消息。4.代码是完整的也......
  • C#上位机开发源码 上位机项目源代码 采用基于RS485通讯总线的ModbusRtu协议
    C#上位机开发源码上位机项目源代码采用基于RS485通讯总线的ModbusRtu协议,支持用户权限管理、sqlite数据库、实时曲线、历史曲线、历史报表、导出Excel、主界面布局可调带记忆等功能YID:81150611746679046......
  • 构建之法阅读笔记2
    首先,软件会产生的原因是什么?没错,就是人们为了解决生活中遇到的问题。那么我们作为软件的开发者,就要最大程度上去把握用户们的需求,进而制作出用户们满意的产品。那么我们如何去把握用户们的需求呢,接下来我们一一道来。1、获取与引导需求,就是我们要找到软件产品的相关者,获取他......
  • 构建之法阅读笔记3
    创新是新时代所提倡的,但是有一些观点也随之而来(迷思):1、顿悟的传说,比如:牛顿被苹果砸中,发明了万有引力。我们都在想着什么时候灵光一闪,就能够改变世界,那是不切实际的,只有持续创新才能有成果。2、大家都喜欢创新。3、好的想法会赢,但是在现实中却是,好的想法不一定会赢......
  • 《程序员修炼之道:从小工到专家》阅读笔记七
    二十三、断言式编程在自责中有一种满足感,当我们责备自己时,会觉得再没人有权责备我们。--王尔德不要有“这绝不会发生...”的自我欺骗。如果它不可能发生,用断言确保它不会发生。对于算法操作,有时断言也是有用的检查。二十四、何时使用异常将异常用于异常问题异常表示即时的、......
  • 构建之法阅读笔记2
    第四部分是陈述,它是程序主体的基本组成单元,它高于变量。这部分主要描述语句的组织结构,如线性类型、循环控制、条件控制表驱动和其他常见方法,如条件循环,在大多数情况下并不常见。我应该对程序逻辑有高度的概括和灵活性。这仅在编写编译器课程实践代码时使用第五部分是代码改进。......