lab2 说明:
https://pdos.csail.mit.edu/6.824/labs/lab-raft.html
参考博客:
https://zhuanlan.zhihu.com/p/514512060
实现内容:
实现 Raft 领导选举和心跳(AppendEntries RPCs 没有日志条目)。第 2A 部分的目标是选举单个领导者,如果没有失败,领导者仍然是领导者,如果旧领导者失败或发往/来自旧领导者的数据包被新领导者接管,则丢失的。
根据raft论文图二实现raft结构
type Raft struct { mu sync.Mutex // Lock to protect shared access to this peer's state peers []*labrpc.ClientEnd // RPC end points of all peers persister *Persister // Object to hold this peer's persisted state me int // this peer's index into peers[] dead int32 // set by Kill() // Your data here (2A, 2B, 2C). status SERVER_STATUS //当前服务器身份 electionTime time.Time //超时时间 //所有服务器持久存在 currentTerm int //任期号 votedFor int //获得本服务器选票的候选人id,没有为-1 log logEntry //日志条目集合 //所有服务器上经常改变的 commitIndex uint64 //已知的最大已经被提交的日志条目索引值 lastApplied uint64 //被应用到状态机的日志条目索引值 //leader中经常改变的 nextIndex []uint64 //对于每一个服务器需要发送给他的下一个日志索引值,这个是期望 matchIndex []uint64 //对于一个服务器,已经复制给他的日志最高索引值,这个是实际复制的 // Look at the paper's Figure 2 for a description of what // state a Raft server must maintain. }
按照mit 提供的框架,首先要完成Make函数
Make函数用于初始化服务器。
在论文中
- 刚开始所有节点都是 Follower。
2.Follwer 一段时间没接收到消息即选举超时,发起新选举。
从这里我们知道,初始化服务器时候status 要设置为Follower,需要开启一个监听函数用于超时选举,在mit框架中是tick函数
1 func Make(peers []*labrpc.ClientEnd, me int, 2 persister *Persister, applyCh chan ApplyMsg) *Raft { 3 rf := &Raft{} 4 rf.peers = peers 5 rf.persister = persister 6 rf.me = me 7 rf.status = Followers 8 rf.nextIndex = make([]uint64,len(rf.peers)) 9 rf.currentTerm = 0 10 11 // Your initialization code here (2A, 2B, 2C). 12 13 // initialize from state persisted before a crash 14 rf.readPersist(persister.ReadRaftState()) 15 16 // start ticker goroutine to start elections 17 rf.setElectionTime() 18 go rf.ticker() 19 20 21 return rf 22 }
第七行 设置每个服务器初始化为Follwer,
setElectionTime() 设置了超时时间,
go rf.ticker() 监听超时时间。
func (rf *Raft) ticker() { for rf.killed() == false { // Your code here to check if a leader election should // be started and to randomize sleeping time using // time.Sleep(). rf.tikc() ms := 50 time.Sleep(time.Duration(ms) * time.Millisecond) }
监听超时时间,每隔50毫秒就检查一遍当前时间是否超过设定好的超时时间
if time.Now().After(rf.electionTime) { rf.setElectionTime() rf.startElection() }
超时了,开始选举。
先重置自身超时时间 rf.setElectionTime() ,因为选举可能失败再来一次
开始选举。Follower 自增 term
(任期号)并转为 Candidate,并行向其他节点发送 RV RPC 等待给自己投票。
func (fr *Raft) startElection () { fr.currentTerm += 1 fr.status = Candidates DDPrintf("%v: 开启选举 term:%v\n",fr.me,fr.currentTerm) fr.requestVotes() }
请求投票requestVotes()
这个函数接受者实现有个细节,接受者是 Followers,并且投票成功要重置自身超时时间保持Followers
接受者是Leader 只要符合日志至少一样新和未投票,就会变为Followers。
不管接受者是何种身份,只要term > currentTerm,就重置自身为Followers并投票。
以下为接受者投票处理函数
1 func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) { 2 // Your code here (2A, 2B). 3 rf.mu.Lock() 4 defer rf.mu.Unlock() 5 6 if args.Term > rf.currentTerm { 7 DDPrintf("s:%v term:%v 身份:"+translate(rf.status)+" 转变为"+translate(Followers),rf.me,rf.currentTerm) 8 rf.newTerm(args.Term) 9 } 10 blimit := args.PrevLogTerm == rf.log.lastLogTerm() && args.PreLogIndex >= rf.log.lastIndex() || args.PrevLogTerm > rf.log.lastLogTerm() 11 12 if args.Term < rf.currentTerm { 13 reply.VoteGranted = false 14 } else if (rf.votedFor == -1 || rf.votedFor == args.CandidateId) && blimit { 15 reply.VoteGranted = true 16 rf.votedFor = args.CandidateId 17 //写入磁盘 18 rf.status = Followers 19 rf.setElectionTime() 20 DDPrintf("s:%v term:%v 投票给 s:%v term:%v",rf.me,rf.currentTerm,args.CandidateId,args.Term) 21 } else { 22 reply.VoteGranted = false 23 } 24 reply.Term = rf.currentTerm 25 }
blimit 是5.4选举限制。
14行只要自身未投票或者已经投票给当前请求(重发) 并且符合选举限制就投票。
请求选票函数要注意加锁的位置,在mit6.824课程中特意强调不用加锁给RPC请求,而是获得RPC返回结果后再加锁
1 func (rf *Raft) requestVoteL (peer int,args RequestVoteArgs,vote *int) { 2 DDPrintf("%v: 向 %v 请求投票",rf.me,peer) 3 var reply RequestVoteReply 4 ok := rf.peers[peer].Call("Raft.RequestVote", &args, &reply) 5 6 if ok { 7 rf.mu.Lock() 8 defer rf.mu.Unlock() 9 10 //返回term较大,则变为follower 11 if reply.Term > rf.currentTerm { 12 rf.newTerm(reply.Term) 13 } 14 15 //获得选票 16 if reply.VoteGranted == true { 17 *vote ++ 18 if rf.status == Leader { 19 return 20 } 21 if *vote > len(rf.peers) / 2 { 22 DDPrintf("s:%v term:%v : 成为leader\n",rf.me,rf.currentTerm) 23 if rf.currentTerm == args.Term { 24 rf.becomeLeader() 25 rf.requestAppends() 26 } 27 } 28 } 29 } 30 }
第18行,因为是并发发送RPC请求,所以可能已经成为leader 但是后几条RPC请求刚刚返回,所以没必要多次执行下面成为leader的代码了。
因为是2A ,所以还没完全实现心跳响应,发送的是空包,并且只是简单的重置了自身超时时间,这部分代码后面我还要进行大改动。
//心跳响应 func (rf *Raft) AppendEntries (args *RequestAppendArgs, reply *ResponseAppendArgs) { rf.mu.Lock() defer rf.mu.Unlock() if rf.currentTerm > args.Term { reply.Term = rf.currentTerm return } if rf.status != Followers && rf.currentTerm <= args.Term { rf.newTerm(args.Term) } DDPrintf("s:%v term:%v 接受心跳 来自 s:%v term:%v\n", rf.me, rf.currentTerm, args.Leaderld, args.Term) rf.setElectionTime() }
标签:Raft,6.824,args,rf,超时,currentTerm,reply,raft,lab2A From: https://www.cnblogs.com/thotf/p/16596312.html