Lab2A:Raft
文章目录
- Lab2A:Raft
Lab1:MapReduce
Lab2:Raft 实验解读
Lab2B:日志复制
我的代码实现(附带详细代码注释)
直达gitee链接
https://gitee.com/zhou-xiujun/xun-go/tree/master/com/xun/Mit-6.824-xun
前言
这是根据另一个大佬的代码一步步实现的,并且根据自己的理解加入了注释解释,为用于学习记录下这个笔记,如有错误之处,大佬勿喷,感谢。
Part 2A: [leader election](中等难度)
实验指南
实现Raft领导者选举和心跳机制(仅包含没有日志条目的 AppendEntries RPCs)。第2A部分的目标是选出一个单一的领导者,如果没有故障发生,领导者应保持其领导地位;如果旧的领导者失败或 to/from 旧领导者的消息包丢失,则由新的领导者接替。运行 go test -run 2A
来测试你的第2A部分代码。
提示
- 不能直接运行Raft实现,而应通过测试程序运行,即
go test -run 2A
。运行go test
命令时,Go测试工具会查找并运行当前目录及其子目录中所有以_test.go
结尾的文件中的测试函数。 - 按照论文中的图2进行操作。此时需要关注发送和接收RequestVote RPC、与选举相关的服务器规则以及与领导者选举相关的状态。
- 将图2中领导者选举的状态添加到
raft.go
文件中的Raft结构中。同时需要定义一个结构体以保存每个日志条目的信息。 - 填写
RequestVoteArgs
和RequestVoteReply
结构体。修改Make()
函数以创建一个后台协程,在一段时间内没有从其他节点接收到消息时,通过发送RequestVote RPC来定期启动领导者选举。这样,如果已有领导者存在,节点将知道谁是领导者,或者节点本身将成为领导者。实现RequestVote()
RPC处理程序,以便服务器之间可以相互投票选出另一个领导者。 - 为了实现心跳机制,定义一个
AppendEntries
RPC结构体(尽管可能不需要所有参数),并让领导者定期发送心跳消息。编写一个AppendEntries
RPC处理方法,以重置选举超时时间,这样在已有领导者被选出时,其他服务器不会竞选领导者。 - 确保不同节点的选举超时不会总是同时触发,否则所有节点将只为自己投票,导致无人当选为领导者。(原因:所有节点会在同一时间成为候选者,并向其他节点发送RequestVote RPC。这将导致每个节点只为自己投票,最终导致选举冲突,无法选出领导者。)
- 测试程序要求领导者发送心跳RPC的频率不超过每秒10次。
- 测试程序要求在旧领导者失效后五秒内选出新领导者(如果大多数节点仍能通信)。然而,选举可能需要多轮投票以应对票数平分的情况(例如,数据包丢失或候选者选择了相同的随机退避时间)。需要选择足够短的选举超时时间(及心跳间隔),以确保选举在不到五秒内完成,即使需要多轮。
- 论文第5.2节提到的选举超时时间在150到300毫秒之间。这一范围仅在领导者发送心跳频率远高于每150毫秒一次时才合理。由于测试程序限制心跳频率为每秒10次,选举超时需要比论文的150到300毫秒更长,但不能太长,以避免在五秒内未能选出领导者。
- 可以使用Go的
rand
(链接)包。 - 需要编写代码以定期或在延迟一段时间后执行操作。最简单的方法是在一个循环中调用
time.Sleep()
的协程(参见Make()
创建的ticker()
协程)。不要使用Go的time.Timer
或time.Ticker
,因为它们难以正确地使用。 - 参考指导页面中的开发和调试代码的提示。如果代码难以通过测试,请再次阅读论文的图2;领导者选举的完整逻辑分布在图的多个部分。
- 不要忘记实现
GetState()
。 - 测试程序在永久关闭实例时会调用
rf.Kill()
。可以通过rf.killed()
检查Kill()
是否已被调用。建议在所有循环中执行此检查,以避免已关闭的Raft实例打印混乱的消息。 - Go RPC只发送字段名以大写字母开头的结构体字段。子结构体也必须有大写字母开头的字段名(例如,数组中的日志记录字段)。
labgob
包会对此发出警告,请不要忽略这些警告。
确保在提交第 2A 部分之前通过 2A 测试,这样你会看到类似这样的输出:
$ go test -run 2A
Test (2A): initial election ...
... Passed -- 3.5 3 58 16840 0
Test (2A): election after network failure ...
... Passed -- 5.4 3 118 25269 0
Test (2A): multiple elections ...
... Passed -- 7.3 7 624 138014 0
PASS
ok 6.824/raft 16.265s
$
每一行“Passed”包含五个数字;这些数字分别是测试所用的时间(以秒为单位)、Raft 节点的数量、测试期间发送的 RPC 数量、RPC 消息的总字节数以及 Raft 报告已提交的日志条目的数量。每个人的数字可能会与这里显示的不同。可以忽略这些数字,但它们可能有助于检查自己的实现发送的 RPC 数量是否合理。对于所有的第 2、3 和 4 部分,如果所有测试(使用 go test
)耗时超过 600 秒,或者任何单个测试耗时超过 120 秒,评分脚本将判定你的解决方案不合格。
在评分时,将运行没有 -race
标志的测试,但应该确保代码在带有 -race
标志的情况下也能稳定地通过测试。
错误:
- 测试时间超时
panic: test timed out after 10m0s
running tests:
TestBatch2A (9m29s)
goroutine 99721 [running]:
testing.(*M).startAlarm.func1()
/usr/local/go/src/testing/testing.go:2259 +0x3b9
created by time.goFunc
/usr/local/go/src/time/sleep.go:176 +0x2d
解决:
go test -run 2A -timeout 60m
我的实验结果:
实现细节(包含对于方法的解释 如有错误 大佬勿喷)
// ApplyMsg
// 当每个Raft节点意识到连续的日志条目被提交时,该节点应该通过传递给 Make() 的 applyCh 向同一服务器上的服务(或测试器)发送一个ApplyMsg。
// 将 CommandValid 设置为true以表明 ApplyMsg 包含一个新提交的日志条目。
type ApplyMsg struct {
CommandValid bool // 表示Command字段是否包含一个新提交的日志条目
Command interface{} // 包含新提交的日志条目
CommandIndex int // 新提交日志条目的索引
}
// Raft
// 一个实现单个 Raft 节点的 Go 对象示例
type Raft struct {
mu sync.Mutex // 保护该节点状态的共享访问的锁 Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // 所有节点的 RPC 端点 RPC end points of all peers
persister *Persister // 保存节点持久化状态的对象 Object to hold this peer's persisted state
me int // 当前节点在 peers 数组中的索引 this peer's index into peers[]
dead int32 // 通过 Kill() 设置,用于标记节点是否已停止 set by Kill()
// 2A
role RoleType // 记录节点目前状态
currentTerm int // 节点当前任期
votedFor int // follower把票投给了哪个candidate
voteCount int // 记录所获选票的个数
appendEntriesChan chan AppendEntriesReply // 心跳channel
LeaderMsgChan chan struct{} // 当选Leader时发送
VoteMsgChan chan struct{} // 收到选举信号时重置一下计时器,不然会出现覆盖term后计时器超时又突然自增。
}
// AppendEntriesArgs 是附加日志条目(包括心跳)的 RPC 请求参数结构。
type AppendEntriesArgs struct {
Term int // 领导者的当前任期
LeaderId int // 领导者的 ID
PrevLogIndex int // 新日志条目之前的日志条目的索引
PrevLogTerm int // 新日志条目之前的日志条目的任期
LeaderCommit int // 领导者的 commitIndex
}
// AppendEntriesReply 是附加日志条目(包括心跳)的 RPC 响应结构。
type AppendEntriesReply struct {
Term int // 当前任期,用于领导者更新自己的任期
Success bool // 如果跟随者包含了匹配的日志条目且日志条目已成功存储,则为 true
}
// RequestVoteArgs 是 RequestVote RPC 请求的参数结构体。
// 字段名称必须以大写字母开头,以便在序列化时正确处理。
type RequestVoteArgs struct {
Term int // 候选人的当前任期
CandidateId int // 请求投票的候选人的 ID
LastLogIndex int // 候选人最后一个日志条目的索引(5.4节)
LastLogTerm int // 候选人最后一个日志条目的任期(5.4节)
}
// RequestVoteReply 是 RequestVote RPC 响应的结构体。
// 字段名称必须以大写字母开头,以便在序列化时正确处理。
type RequestVoteReply struct {
Term int // 当前任期,用于候选人更新自身的任期
VoteGranted bool // 表示是否投票给了候选人,true 表示候选人获得了投票。
}
// GetState 返回当前的任期和该服务器是否认为自己是领导者。
// 该方法用于外部查询 Raft 服务器的当前状态。
// 返回值:
// - int:当前的任期(term)。
// - bool:该服务器是否认为自己是领导者(isleader)。
func (rf *Raft) GetState() (int, bool) {
var term int
var isleader bool
// Your code here (2A).
term = rf.currentTerm
isleader = rf.role == Leader
return term, isleader
}
// sendAllRequestVote 向所有其他 Raft 节点发送请求投票的 RPC。
// 该函数首先构建 RequestVoteArgs,然后向所有其他节点并行发送请求投票的 RPC。
func (rf *Raft) sendAllRequestVote() {
rf.mu.Lock()
defer rf.mu.Unlock()
// 构建请求投票的参数
args := &RequestVoteArgs{
Term: rf.currentTerm, // 当前任期
CandidateId: rf.me, // 候选人 ID
LastLogIndex: 0, // 候选人最后一个日志条目的索引(暂时设置为 0)
LastLogTerm: 0, // 候选人最后一个日志条目的任期(暂时设置为 0)
}
// 向所有其他节点发送请求投票的 RPC
for i := range rf.peers {
// 对于非当前节点的其他节点发送,并且本身为候选者状态
if i != rf.me && rf.role == Candidate {
// 并行发送请求投票的 RPC
go func(id int) {
// 接收返回参数
ret := &RequestVoteReply{
Term: 0,
VoteGranted: false,
}
rf.sendRequestVote(id, args, ret)
}(i)
}
}
}
// RequestVote RPC handler.
// 处理RequestVote RPC请求的处理函数
//任期比较:
// 如果请求者的任期大于当前节点的任期,说明请求者的信息更新,当前节点需要更新其任期并转换为Follower角色。
// 如果请求者的任期小于当前节点的任期,则当前节点拒绝投票,因为其任期更大,更“新”。
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
// Your code here (2A, 2B).
// 锁定当前Raft实例,以保证并发安全。
rf.mu.Lock()
defer rf.mu.Unlock()
// 设置返回的任期,投票默认拒绝 返回 false
reply.Term = rf.currentTerm
// 如果请求者的任期大于当前节点的任期,则更新当前节点的任期并转换为Follower角色。
// 这里不加return 因为一个candidate一轮只发送一次选举。Follower收到了修改自己任期即可。后面可以继续参与投票。
if args.Term > rf.currentTerm {
// 当前节点的任期比请求节点(候选者)的任期小,则转为追随者
// 将当前节点转换为Follower角色,并更新任期为请求者的任期。
rf.ConvertToFollower(args.Term)
// 向VoteMsgChan通道发送消息,通知其他部分有投票事件发生。
rf.VoteMsgChan <- struct{}{}
}
// 如果请求者的任期小于当前节点的任期,或者在同一任期内已经投给了其他候选人,则直接拒绝投票并返回当前任期与投票结果。
if args.Term < rf.currentTerm || (args.Term == rf.currentTerm && rf.votedFor != -1 && rf.votedFor != args.CandidateId) {
reply.Term, reply.VoteGranted = rf.currentTerm, false
return
}
// 如果当前节点是Follower,并且当前节点尚未投票或已投票给该候选人,
// todo: 并且候选人的日志至少和当前节点的日志一样新,则同意投票。(2B的实现中还需要加入对日志的比较(根据论文的5.4节))
if rf.role == Follower && (rf.votedFor == noVoted || rf.votedFor == args.CandidateId) {
// 更新投票给的候选人ID。
rf.votedFor = args.CandidateId
// 同意投票。
reply.VoteGranted = true
reply.Term = args.Term
// 向VoteMsgChan通道发送消息,通知其他部分有投票事件发生。
rf.VoteMsgChan <- struct{}{}
}
}
// sendRequestVote 发送 RequestVote RPC 给服务器的示例代码。
// server 是目标服务器在 rf.peers[] 中的索引。
// args 中包含 RPC 参数。
// *reply 中填充 RPC 回复,因此调用者应传递 &reply。
// 传递给 Call() 的 args 和 reply 类型必须与处理函数中声明的参数类型相同
// (包括它们是否是指针)。
//
// labrpc 包模拟了一个丢包网络,其中服务器可能无法访问,
// 请求和回复可能会丢失。Call() 发送一个请求并等待回复。
// 如果在超时间隔内收到回复,Call() 返回 true;否则返回 false。
// 因此 Call() 可能不会立即返回。
// 返回 false 可能是由服务器宕机、服务器无法访问、请求丢失或回复丢失引起的。
//
// Call() 保证会返回(可能会有延迟),*除非*服务器端的处理函数没有返回。
// 因此,不需要在 Call() 周围实现你自己的超时机制。
//
// 查看 ../labrpc/labrpc.go 中的注释以了解更多详情。
//
// 如果你在让 RPC 工作时遇到困难,请检查你是否将传递给 RPC 的结构体中的所有字段名都大写,
// 并且调用者传递的是 reply 结构体的地址(&),而不是结构体本身。
func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {
// server下标节点调用RequestVote RPC处理程序
ok := rf.peers[server].Call("Raft.RequestVote", args, reply)
// 发送失败直接返回
if !ok {
return false
}
// 如果该节点已经不是候选者或者该节点请求时的任期与当前任期不一致,直接返回 true,无须继续拉票
if rf.role != Candidate || args.Term != rf.currentTerm {
// 已经成为 Leader 或者重置为候选者或跟随者了
return true
}
// 如果收到的回复中的任期比当前节点的任期大,遇到了任期比自己大的节点,转换为跟随者 follower
if reply.Term > rf.currentTerm {
rf.ConvertToFollower(reply.Term)
rf.VoteMsgChan <- struct{}{}
return true
}
// 如果投票被授予,并且当前节点仍是候选者,增加投票计数
if reply.VoteGranted && rf.role == Candidate {
rf.voteCount++
// 如果获得超过半数的票数,并且仍是候选者,转换为领导者
if 2*rf.voteCount > len(rf.peers) && rf.role == Candidate {
rf.ConvertToLeader()
// 超半数票 直接当选,当选为领导者后,通知 LeaderMsgChan
rf.LeaderMsgChan <- struct{}{}
}
}
return true
}
// Kill 测试器在每次测试后不会停止 Raft 创建的 goroutine,
// 但它确实会调用 Kill() 方法。你的代码可以使用 killed() 来
// 检查是否已调用 Kill()。使用 atomic 避免了对锁的需求。
//
// 问题在于长时间运行的 goroutine 会使用内存并可能消耗 CPU 时间,
// 这可能导致后续测试失败并生成令人困惑的调试输出。
// 任何具有长时间运行循环的 goroutine 都应该调用 killed() 以检查它是否应该停止。
// Kill 将当前 Raft 实例标记为已终止。
func (rf *Raft) Kill() {
// 使用 atomic.StoreInt32(&rf.dead, 1) 设置 rf.dead 的值为 1。
atomic.StoreInt32(&rf.dead, 1)
// Your code here, if desired.
}
// killed 检查当前 Raft 实例是否已被终止。
// 使用 atomic.LoadInt32(&rf.dead) 来获取 rf.dead 的值。
// 如果值为 1,则表示该实例已被终止。
func (rf *Raft) killed() bool {
z := atomic.LoadInt32(&rf.dead)
return z == 1
}
通过 TestManyElections2A() 测试的关键点在于候选者收到更大任期的leader的心跳信息或者日志复制信息后,需要转为follower。
// ticker 协程在近期没有收到心跳的情况下启动新的选举。
func (rf *Raft) ticker() {
// dead置为1后为true,则退出运行
for rf.killed() == false {
// Your code here to check if a leader election should
// be started and to randomize sleeping time using
// time.Sleep().
// 在这里添加代码以检查是否应该启动领导者选举
// 并使用 time.Sleep() 随机化休眠时间。
switch rf.role {
case Candidate:
go rf.sendAllRequestVote()
select {
case <-rf.VoteMsgChan:
continue
case resp := <-rf.appendEntriesChan:
if resp.Term >= rf.currentTerm {
// 关键点:候选者收到更大任期的leader的心跳信息或者日志复制信息后,需要转为follower
rf.ConvertToFollower(resp.Term)
continue
}
case <-time.After(electionTimeout + time.Duration(rand.Int31()%300)*time.Millisecond):
// 选举超时 重置选举状态
rf.ConvertToCandidate()
continue
case <-rf.LeaderMsgChan:
}
case Leader:
// Leader 定期发送心跳和同步日志
rf.SendAllAppendEntries()
select {
case resp := <-rf.appendEntriesChan:
// 处理跟随者的响应,如发现更高的任期则转为Follower
if resp.Term > rf.currentTerm {
rf.ConvertToFollower(resp.Term)
continue
}
case <-time.After(HeartBeatInterval):
// 超时后继续发送心跳
continue
}
// 等待心跳间隔时间
//time.Sleep(HeartBeatInterval)
case Follower:
// 如果是跟随者,等待不同的事件发生
select {
case <-rf.VoteMsgChan:
// 收到投票消息,继续等待
continue
case resp := <-rf.appendEntriesChan:
// 收到附加日志条目消息,继续等待
if resp.Term > rf.currentTerm {
rf.ConvertToFollower(resp.Term)
continue
}
case <-time.After(appendEntriesTimeout + time.Duration(rand.Int31()%300)*time.Millisecond):
// 附加日志条目超时,转换为候选人,发起选举
// 增加扰动避免多个Candidate同时进入选举
rf.ConvertToCandidate()
}
}
}
}
// Make 创建服务或测试者想要创建的一个 Raft 服务器。
// 所有 Raft 服务器的端口(包括这个)都在 peers[] 数组中。
// 这个服务器的端口是 peers[me]。所有服务器的 peers[] 数组顺序相同。
// persister 是这个服务器保存其持久状态的地方,并且最初持有最近保存的状态(如果有的话)。
// applyCh 是一个通道,测试者或服务期望 Raft 在此通道上发送 ApplyMsg 消息。
// Make() 必须快速返回,因此它应该为任何长期运行的工作启动协程。
func Make(peers []*labrpc.ClientEnd, me int, persister *Persister, applyCh chan ApplyMsg) *Raft {
rf := &Raft{
mu: sync.Mutex{}, // 互斥锁,保护共享访问这个节点的状态
peers: peers, // 所有节点的 RPC 端点
persister: persister, // 持久化对象,用于保存这个节点的持久状态
me: me, // 这个节点在 peers[] 数组中的索引
dead: 0, // 用于标记节点是否已终止
}
// 2A
rf.role = Follower // 初始状态为 Follower
rf.currentTerm = 0 // 当前任期
rf.votedFor = noVoted // 当前任期内投票的候选人 ID
rf.voteCount = 0 // 当前选举中的投票计数
rf.appendEntriesChan = make(chan AppendEntriesReply) // 用于心跳信号的通道
rf.LeaderMsgChan = make(chan struct{}, chanLen) // 用于领导者选举信号的通道
rf.VoteMsgChan = make(chan struct{}, chanLen) // 用于投票信号的通道
// initialize from state persisted before a crash
// 从崩溃前保存的状态进行初始化
rf.readPersist(persister.ReadRaftState())
// start ticker goroutine to start elections
// 启动 ticker 协程开始选举
go rf.ticker()
return rf
}
// SendAllAppendEntries 由Leader向其他所有节点调用来复制日志条目;也用作heartbeat
// 用于领导者向其他所有节点发送附加日志条目(或心跳)请求。在领导者周期性地发送心跳或需要复制日志条目到所有节点时使用
func (rf *Raft) SendAllAppendEntries() {
rf.mu.Lock()
defer rf.mu.Unlock()
for server := range rf.peers {
// 对于每个不是当前节点的节点,leader 启动一个新的 goroutine 来发送 AppendEntries 请求
if server != rf.me && rf.role == Leader {
go func(id int) {
args := &AppendEntriesArgs{
Term: rf.currentTerm,
LeaderId: rf.me,
PrevLogIndex: 0,
PrevLogTerm: 0,
LeaderCommit: 0,
}
reply := &AppendEntriesReply{
Term: 0,
Success: false,
}
rf.SendAppendEntries(id, args, reply)
}(server)
}
}
}
// SendAppendEntries 向指定的节点发送 AppendEntries RPC 请求。
// 发送具体的 AppendEntries 请求,并处理响应
func (rf *Raft) SendAppendEntries(id int, args *AppendEntriesArgs, reply *AppendEntriesReply) {
// 调用指定节点的 AppendEntriesHandler 方法,并传递请求和响应结构
rf.peers[id].Call("Raft.AppendEntriesHandler", args, reply)
// 如果当前节点不再是领导者,则直接返回
if rf.role != Leader {
return
}
// 如果响应中的任期大于当前任期,当前节点会转换为跟随者
if reply.Term > rf.currentTerm {
rf.ConvertToFollower(reply.Term)
return
}
}
// AppendEntriesHandler 由Leader向每个其余节点发送
// 处理来自领导者的 AppendEntries RPC 请求
// 处理接收到的 AppendEntries 请求,包括心跳和日志条目的复制
func (rf *Raft) AppendEntriesHandler(args *AppendEntriesArgs, reply *AppendEntriesReply) {
// 传一个空结构体表示接收到了Leader的请求。
// 初始化响应的任期为当前任期
reply.Term = rf.currentTerm
// 收到Leader更高的任期时,更新自己的任期,转为 leader 的追随者
if rf.currentTerm < args.Term {
rf.ConvertToFollower(args.Term)
return
}
// 向 appendEntriesChan 发送一个空结构体,以表示接收到了领导者的请求
//rf.appendEntriesChan <- struct {}{}
// 发送心跳或日志条目后
rf.appendEntriesChan <- AppendEntriesReply{Term: rf.currentTerm, Success: true}
}
我的代码实现(附带详细代码注释)
直达gitee链接
https://gitee.com/zhou-xiujun/xun-go/tree/master/com/xun/Mit-6.824-xun
相关链接
标签:--,任期,RPC,rf,election,分布式系统,Raft,日志,节点 From: https://blog.csdn.net/weixin_44615954/article/details/140231874