首页 > 其他分享 >mit 6.824 lab2A ,raft 领导人选举实现

mit 6.824 lab2A ,raft 领导人选举实现

时间:2022-08-18 11:26:15浏览次数:54  
标签:Raft 6.824 args rf 超时 currentTerm reply raft lab2A

lab2 说明:

https://pdos.csail.mit.edu/6.824/labs/lab-raft.html

参考博客:

https://zhuanlan.zhihu.com/p/514512060

实现内容:

实现 Raft 领导选举和心跳(AppendEntries RPCs 没有日志条目)。第 2A 部分的目标是选举单个领导者,如果没有失败,领导者仍然是领导者,如果旧领导者失败或发往/来自旧领导者的数据包被新领导者接管,则丢失的。

 

 

根据raft论文图二实现raft结构

  

type Raft struct {
	mu        sync.Mutex          // Lock to protect shared access to this peer's state
	peers     []*labrpc.ClientEnd // RPC end points of all peers
	persister *Persister          // Object to hold this peer's persisted state
	me        int                 // this peer's index into peers[]
	dead      int32               // set by Kill()

	// Your data here (2A, 2B, 2C).
	status	  SERVER_STATUS        //当前服务器身份
	electionTime time.Time         //超时时间
	//所有服务器持久存在
	currentTerm int      			//任期号
	votedFor	int					//获得本服务器选票的候选人id,没有为-1
	log			logEntry				//日志条目集合

	//所有服务器上经常改变的
	commitIndex	uint64				//已知的最大已经被提交的日志条目索引值
	lastApplied	uint64				//被应用到状态机的日志条目索引值

	//leader中经常改变的
	nextIndex	[]uint64			//对于每一个服务器需要发送给他的下一个日志索引值,这个是期望
	matchIndex	[]uint64			//对于一个服务器,已经复制给他的日志最高索引值,这个是实际复制的


	// Look at the paper's Figure 2 for a description of what
	// state a Raft server must maintain.

}

  

按照mit 提供的框架,首先要完成Make函数

Make函数用于初始化服务器。

在论文中

  1. 刚开始所有节点都是 Follower。

    2.Follwer 一段时间没接收到消息即选举超时,发起新选举。

从这里我们知道,初始化服务器时候status 要设置为Follower,需要开启一个监听函数用于超时选举,在mit框架中是tick函数

 1 func Make(peers []*labrpc.ClientEnd, me int,
 2     persister *Persister, applyCh chan ApplyMsg) *Raft {
 3     rf := &Raft{}
 4     rf.peers = peers
 5     rf.persister = persister
 6     rf.me = me
 7     rf.status = Followers
 8     rf.nextIndex = make([]uint64,len(rf.peers))
 9     rf.currentTerm = 0
10 
11     // Your initialization code here (2A, 2B, 2C).
12 
13     // initialize from state persisted before a crash
14     rf.readPersist(persister.ReadRaftState())
15 
16     // start ticker goroutine to start elections
17     rf.setElectionTime()
18     go rf.ticker()
19 
20 
21     return rf
22 }

 

  第七行 设置每个服务器初始化为Follwer,

setElectionTime() 设置了超时时间,

go rf.ticker() 监听超时时间。

func (rf *Raft) ticker() {
    for rf.killed() == false {
        // Your code here to check if a leader election should
        // be started and to randomize sleeping time using
        // time.Sleep().
        rf.tikc()
        ms := 50
        time.Sleep(time.Duration(ms) * time.Millisecond)
    }

监听超时时间,每隔50毫秒就检查一遍当前时间是否超过设定好的超时时间

if time.Now().After(rf.electionTime) {
        rf.setElectionTime()
        rf.startElection()
    }

 

 

超时了,开始选举。

先重置自身超时时间 rf.setElectionTime() ,因为选举可能失败再来一次

开始选举。Follower 自增 term(任期号)并转为 Candidate,并行向其他节点发送 RV RPC 等待给自己投票。

func (fr *Raft) startElection () {
    fr.currentTerm += 1
    fr.status = Candidates
    DDPrintf("%v: 开启选举 term:%v\n",fr.me,fr.currentTerm)
    fr.requestVotes()
}


请求投票requestVotes()

 


 

这个函数接受者实现有个细节,接受者是 Followers,并且投票成功要重置自身超时时间保持Followers
                接受者是Leader 只要符合日志至少一样新和未投票,就会变为Followers。 
              不管接受者是何种身份,只要term > currentTerm,就重置自身为Followers并投票。
以下为接受者投票处理函数

 1 func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
 2     // Your code here (2A, 2B).
 3     rf.mu.Lock()
 4     defer rf.mu.Unlock()
 5 
 6     if args.Term > rf.currentTerm {
 7         DDPrintf("s:%v term:%v 身份:"+translate(rf.status)+" 转变为"+translate(Followers),rf.me,rf.currentTerm)
 8         rf.newTerm(args.Term)
 9     }
10     blimit := args.PrevLogTerm == rf.log.lastLogTerm() && args.PreLogIndex >= rf.log.lastIndex() || args.PrevLogTerm > rf.log.lastLogTerm()
11 
12     if args.Term < rf.currentTerm {
13         reply.VoteGranted = false
14     } else if (rf.votedFor == -1 || rf.votedFor == args.CandidateId) && blimit {
15         reply.VoteGranted = true
16         rf.votedFor = args.CandidateId
17         //写入磁盘
18         rf.status = Followers
19         rf.setElectionTime()
20         DDPrintf("s:%v term:%v 投票给 s:%v term:%v",rf.me,rf.currentTerm,args.CandidateId,args.Term)
21     } else {
22         reply.VoteGranted = false
23     }
24     reply.Term = rf.currentTerm
25 }

blimit 是5.4选举限制。
14行只要自身未投票或者已经投票给当前请求(重发) 并且符合选举限制就投票。

请求选票函数要注意加锁的位置,在mit6.824课程中特意强调不用加锁给RPC请求,而是获得RPC返回结果后再加锁
 1 func (rf *Raft) requestVoteL (peer int,args RequestVoteArgs,vote *int) {
 2     DDPrintf("%v: 向 %v 请求投票",rf.me,peer)
 3     var reply RequestVoteReply
 4     ok := rf.peers[peer].Call("Raft.RequestVote", &args, &reply)
 5 
 6     if ok {
 7         rf.mu.Lock()
 8         defer rf.mu.Unlock()
 9 
10         //返回term较大,则变为follower
11         if reply.Term > rf.currentTerm {
12             rf.newTerm(reply.Term)
13         }
14 
15         //获得选票
16         if reply.VoteGranted == true {
17             *vote ++
18             if rf.status == Leader {
19                 return
20             }
21             if *vote > len(rf.peers) / 2 {
22                 DDPrintf("s:%v term:%v : 成为leader\n",rf.me,rf.currentTerm)
23                 if rf.currentTerm == args.Term {
24                     rf.becomeLeader()
25                     rf.requestAppends()
26                 }
27             }
28         }
29     }
30 }

 

 第18行,因为是并发发送RPC请求,所以可能已经成为leader 但是后几条RPC请求刚刚返回,所以没必要多次执行下面成为leader的代码了。

 

因为是2A ,所以还没完全实现心跳响应,发送的是空包,并且只是简单的重置了自身超时时间,这部分代码后面我还要进行大改动。

//心跳响应
func (rf *Raft) AppendEntries (args *RequestAppendArgs, reply *ResponseAppendArgs) {
	rf.mu.Lock()
	defer rf.mu.Unlock()
	if rf.currentTerm > args.Term {
		reply.Term = rf.currentTerm
		return
	}
	if rf.status != Followers && rf.currentTerm <= args.Term {
		rf.newTerm(args.Term)
	}

	DDPrintf("s:%v term:%v 接受心跳  来自 s:%v  term:%v\n", rf.me, rf.currentTerm, args.Leaderld, args.Term)
	rf.setElectionTime()

}

  





  

 

 

 

 

 

 

 

  

  

 

标签:Raft,6.824,args,rf,超时,currentTerm,reply,raft,lab2A
From: https://www.cnblogs.com/thotf/p/16596312.html

相关文章