常量定义
根据论文中的内容,Raft中server节点共有三种状态,分别是 Follower
、Candidate
和 Leader
,因此代码中将三种状态定义如下:
const (
FOLLOWER = iota
CANDIDATE
LEADER
)
此外,根据大量测试所得经验,我分别定义以下时钟常量:选举超时间隔200-500ms、心跳间隔50ms。
const (
HeartBeatTimeout = 50
ElectionTimeoutMin = 200
ElectionTimeoutMax = 500
)
数据结构
日志信息
首先,根据论文中的要求,定义了 LogEntry
结构体表示单个日志信息:
type LogEntry struct {
Index int
Term int
Command interface{}
}
Raft结构体
然后,根据论文中Figure 2,我在代码框架的基础上,定义了如下 Raft
结构体:
// Persistent state on all servers
currentTerm int // implementing a single Raft peer.
votedFor int // candidateId that received vote in current term
log []LogEntry // first index is 1
// Volatile state on all servers
commitIndex int // index of highest log entry known to be committed
lastApplied int // index of highest log entry applied to state machine
// Volatile state on leaders:
nextIndex []int // for each server, index of the next log entry to send to that server
matchIndex []int // for each server, index of highest log entry known to be replicated on server
除了上述Figure 2中给定的内容外,我又在 Raft
结构体中定义了以下内容辅助实现实验功能:
role int
timer *time.Timer
voteCount int
successCount int
applyCh chan ApplyMsg
其中,role
表示当前该节点的状态,timer
用于选举和心跳计时,voteCount
表示候选者当前轮获得的投票数, successCount
表示领导者某次发送心跳获得的成功回应数。
RequestVote相关定义
根据Figure 2,定义如下结构体分别表示投票请求与投票回复:
type RequestVoteArgs struct {
Term int // candidate’s term
CandidateId int // candidate's id
LastLogIndex int // index of candidate’s last log entry
LastLogTerm int // term of candidate’s last log entry
}
type RequestVoteReply struct {
Term int // currentTerm, for candidate to update itself
VoteGranted bool // true means candidate received vote
}
AppendEntries相关定义
同样根据Figure 2,定义了以下两个结构体。但为了能使 leader
中日志更快地回退,在 AppendEntriesReply
中添加了 RevisedIndex
,用于告诉 leader
下次发送的日志该回退到哪个下标。
type AppendEntriesArgs struct {
Term int // leader’s term
LeaderId int // leader's id
PrevLogIndex int // index of log entry immediately preceding new ones
PrevLogTerm int // term of prevLogIndex entry
Entries []LogEntry // log entries to store (empty for heartbeat;
LeaderCommit int // leader’s commitIndex
}
type AppendEntriesReply struct {
Term int // currentTerm, for leader to update itself
Success bool // true if follower contained entry matching prevLogIndex and prevLogTerm
RevisedIndex int // nextindex to send after revising
}
Raft协议实现
Raft节点初始化
在 Make
函数中,我初始化了 Raft
中的成员变量,其中论文中要求 log
的下标是从1开始,因此我在初始化 log
时在开头添加了一个空 LogEntry
。
func Make(peers []*labrpc.ClientEnd, me int,
persister *Persister, applyCh chan ApplyMsg) *Raft {
rf := &Raft{}
rf.peers = peers
rf.persister = persister
rf.me = me
// Your initialization code here.
rf.currentTerm = 0
rf.votedFor = -1
rf.log = make([]LogEntry, 1)
var n = len(peers)
rf.nextIndex = make([]int, n)
rf.matchIndex = make([]int, n)
rf.role = FOLLOWER
rf.timer = time.NewTimer(randtime())
rf.applyCh = applyCh
// initialize from state persisted before a crash
rf.readPersist(persister.ReadRaftState())
go rf.mainLoop()
go rf.applier()
return rf
}
在 Make
中我加入了 mainLoop
和 applier
两个协程,其中 mainLoop
实现了节点的选举、发送心跳日志等一系列功能,applier
实现了定时将该节点已经 commit
的日志运用于状态机。
mainLoop实现
mainLoop
函数实现如下,外层一个大循环。每当计时器到时,若状态为 Follwer
,则进入 Candidate
状态,并发起选举;若状态为 Leader
,则发送心跳给其他节点。
func (rf *Raft) mainLoop() {
for {
<-rf.timer.C
if rf.role == FOLLOWER {
rf.mu.Lock()
rf.role = CANDIDATE
rf.timer.Reset(0)
rf.mu.Unlock()
} else if rf.role == CANDIDATE {
rf.Election()
} else { // LEADER
rf.heartBeat()
}
}
}
Candidate发起选举
候选者将当前任期加一,随机重置计时器,投票给自己,并发送投票请求给其他所有节点。经过一段时间等待,若当前节点状态仍为 Candidate
且获取投票数超过全部节点数的一半,则该节点进入 Leader
状态。代码实现如下:
func (rf *Raft) Election() {
n := len(rf.peers)
rf.mu.Lock()
rf.currentTerm++
rf.voteCount = 1
rf.votedFor = rf.me
rf.persist()
rf.timer.Reset(randtime())
m := len(rf.log)
voteFlag := make(chan bool, n-1)
args := RequestVoteArgs{rf.currentTerm, rf.me, rf.log[m-1].Index, rf.log[m-1].Term}
for i := range rf.peers {
if i == rf.me {
continue
}
go rf.voteResult(i, args, voteFlag)
}
rf.mu.Unlock()
rf.wait(voteFlag)
rf.mu.Lock()
if rf.role == CANDIDATE && rf.voteCount > n/2 {
rf.role = LEADER
for i := range rf.peers {
rf.nextIndex[i] = m
}
rf.matchIndex[rf.me] = m - 1
rf.timer.Reset(0)
rf.persist()
}
rf.mu.Unlock()
}
其中,对于其他每个节点的投票,我使用 voteResult
函数获取其结果。若获取回复的任期大于自身的任期,则转为 Follower
状态并随机重置计时器;若获取的投票结果为 true
,则将获取投票计数加一。代码实现如下:
func (rf *Raft) voteResult(id int, args RequestVoteArgs, voteFlag chan bool) {
reply := &RequestVoteReply{}
ok := rf.sendRequestVote(id, args, reply)
rf.mu.Lock()
if rf.role != CANDIDATE {
rf.mu.Unlock()
return
}
if ok {
if reply.Term > rf.currentTerm {
rf.currentTerm = reply.Term
rf.votedFor = -1
rf.role = FOLLOWER
rf.voteCount = 0
rf.persist()
rf.timer.Reset(randtime())
}
if reply.VoteGranted {
rf.voteCount++
}
voteFlag <- reply.VoteGranted
}
rf.mu.Unlock()
}
接收者回复选举
对于接收到的选举请求,若请求的任期小于自身任期,则拒绝请求;若请求任期大于自身任期,无论是否投赞成票,都更新自身任期、转为 Follower
状态并将本轮的投赞成票对象清空。若当前轮还没有投赞成票对象或本轮之前投赞成票的对象和这次相同,且请求的 LastLogTerm
与 LastLogIndex
皆不小于自身,则为其投赞成票并随机重置计时器,否则投反对票。
代码实现如下:
func (rf *Raft) RequestVote(args RequestVoteArgs, reply *RequestVoteReply) {
// Your code here.
rf.mu.Lock()
defer rf.mu.Unlock()
reply.Term = rf.currentTerm
if args.Term < rf.currentTerm {
reply.VoteGranted = false
return
}
if args.Term > rf.currentTerm {
rf.role = FOLLOWER
rf.currentTerm = args.Term
rf.votedFor = -1
}
if rf.votedFor != -1 && rf.votedFor != args.CandidateId {
reply.VoteGranted = false
} else {
if args.LastLogTerm > rf.log[len(rf.log)-1].Term ||
args.LastLogTerm == rf.log[len(rf.log)-1].Term &&
args.LastLogIndex >= rf.log[len(rf.log)-1].Index {
reply.VoteGranted = true
rf.votedFor = args.CandidateId
rf.persist()
rf.timer.Reset(randtime())
} else {
reply.VoteGranted = false
}
}
}
Leader发送心跳
每当心跳计时超时,Leader
重置心跳计时器,并将成功回应数 successCount
设为1(表示自己成功回应自己心跳),接下来将心跳发送给其他所有节点,根据 nextIndex
数组,分别确定发送给每个节点心跳中的日志内容。
经过一段时间等待,若当前节点状态仍为 Leader
且获得的成功回应数大于总节点数的一半,似乎即可更新 commitIndex
为该节点最后一个日志的下标 m-1
(因为超过一半节点的 matchIndex[i] >= m-1
)。但为了解决论文中Figure 8的情况,这里额外增加了 rf.currentTerm == rf.log[m-1].Term
的限制,即只会提交当前任期下产生的日志。
func (rf *Raft) heartBeat() {
n := len(rf.peers)
rf.mu.Lock()
rf.timer.Reset(time.Duration(HeartBeatTimeout) * time.Millisecond)
m := len(rf.log)
rf.successCount = 1
flag := make(chan bool, n-1)
for i := range rf.peers {
if i == rf.me {
continue
}
args := AppendEntriesArgs{rf.currentTerm, rf.me, rf.log[rf.nextIndex[i]-1].Index,
rf.log[rf.nextIndex[i]-1].Term, nil, rf.commitIndex}
if rf.nextIndex[i] < m {
args.Entries = append([]LogEntry{}, rf.log[rf.nextIndex[i]:]...)
}
go rf.entriesResult(i, args, flag)
}
rf.mu.Unlock()
rf.wait(flag)
rf.mu.Lock()
if rf.role == LEADER && rf.successCount > n/2 {
if m-1 > rf.commitIndex && rf.currentTerm == rf.log[m-1].Term {
rf.commitIndex = m - 1
}
}
rf.mu.Unlock()
}
其中,对于其他每个节点的心跳回复,我使用 entriesResult
函数获取其结果。若获取心跳回复的任期大于自身的任期,则转为 Follower
状态并随机重置计时器。若获取心跳回复结果为 true
,则将 successCount
加一并更新回复节点的 nextIndex[id]
与 matchIndex[id]
的值;否则则根据心跳回复的信息 reply.RevisedIndex
,回退到下次发送给该节点的日志起始下标。
func (rf *Raft) entriesResult(id int, args AppendEntriesArgs, flag chan bool) {
reply := &AppendEntriesReply{}
ok := rf.sendAppendEntries(id, args, reply)
rf.mu.Lock()
if rf.role != LEADER {
rf.mu.Unlock()
return
}
if ok {
if reply.Term > rf.currentTerm {
rf.currentTerm = reply.Term
rf.votedFor = -1
rf.role = FOLLOWER
rf.successCount = 0
rf.timer.Reset(randtime())
rf.persist()
}
if rf.role == LEADER {
if reply.Success {
rf.nextIndex[id] = args.PrevLogIndex + len(args.Entries) + 1
rf.matchIndex[id] = rf.nextIndex[id] - 1
rf.successCount++
} else {
if reply.RevisedIndex > 0 {
rf.nextIndex[id] = reply.RevisedIndex
} else {
rf.nextIndex[id] = args.PrevLogIndex
for rf.nextIndex[id] > 0 && rf.log[rf.nextIndex[id]].Term >= args.PrevLogTerm {
rf.nextIndex[id]--
}
rf.nextIndex[id]++
}
}
flag <- reply.Success
}
}
rf.mu.Unlock()
}
接收者回复心跳
对于接收到的心跳,若请求的任期小于自身任期,则回复拒绝并直接返回。若心跳任期大于自身任期,无论是否回复赞成,都更新自身任期、转为 Follower
状态。若本节点不包含下标为 prevLogIndex
的日志或下标为 prevLogIndex
的日志的任期不等于 prevLogTerm
,则回复拒绝与需要到的回退到的日志下标,并随机重置计时器;否则更新日志与 commitIndex
,并随机重置计时器。
func (rf *Raft) AppendEntries(args AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
reply.Term = rf.currentTerm
if args.Term < rf.currentTerm {
reply.Success = false
return
}
if args.Term > rf.currentTerm {
rf.currentTerm = args.Term
rf.votedFor = -1
rf.role = FOLLOWER
rf.persist()
}
if args.PrevLogIndex >= len(rf.log) || args.PrevLogTerm != rf.log[args.PrevLogIndex].Term {
reply.Success = false
if args.PrevLogIndex >= len(rf.log) {
reply.RevisedIndex = len(rf.log)
} else {
reply.RevisedIndex = -1
}
} else {
if args.Entries != nil {
rf.log = append(rf.log[:args.PrevLogIndex+1], args.Entries...)
rf.persist()
}
if args.LeaderCommit > rf.commitIndex {
rf.commitIndex = min(args.LeaderCommit, len(rf.log)-1)
}
reply.Success = true
}
rf.timer.Reset(randtime())
}
应用于状态机
每个节点每隔一段时间便检测有没有新的已经提交的日志,若有,则将这些新提交的日志应用于状态机。
func (rf *Raft) applier() {
for {
time.Sleep(10 * time.Millisecond)
rf.mu.Lock()
for rf.lastApplied < rf.commitIndex {
rf.lastApplied++
rf.applyCh <- ApplyMsg{rf.lastApplied, rf.log[rf.lastApplied].Command, false, nil}
}
rf.mu.Unlock()
}
}
持久化
根据Figure 2,每当有服务器上的持久状态(currentTerm
、voteFor
、log[]
)更新时,则调用 persist
函数进行将上述三个变量进行编码储存。
persist
与 readPersist
函数实现如下:
func (rf *Raft) persist() {
w := new(bytes.Buffer)
e := gob.NewEncoder(w)
e.Encode(rf.currentTerm)
e.Encode(rf.votedFor)
e.Encode(rf.log)
data := w.Bytes()
rf.persister.SaveRaftState(data)
}
func (rf *Raft) readPersist(data []byte) {
r := bytes.NewBuffer(data)
d := gob.NewDecoder(r)
d.Decode(&rf.currentTerm)
d.Decode(&rf.votedFor)
d.Decode(&rf.log)
}
实验演示
虽然PPT中只要求通过 "persist" 之前的测试用例,但我还是实现了所有功能,可以通过所有的测试用例。
一次通过可能会有一定的偶然性,因此我实现了一个脚本,循环测试了上百次所有的测试用例,除了 Figure8Unreliable
有一定概率无法通过外,其他测试用例皆可以每次通过。
至于为何 Figure8Unreliable
有一定概率无法通过,可能是因为在各个节点日志变化很快的情况下,因为通信不可靠,所以不同节点间日志不一致的情况很严重,无法在短时间内达成一致。或许可以增加选举超时的时间,避免各节点的 currentTerm
快速增加,如此相对来说各节点的日志可以更容易达成一致。
总结
本次的Raft实验具有一定的难度,在结合PPT、过程动画以及论文的情况下,我终于在一定程度上完成了这个实验。同时,为了完成这次实验,我简单学习了Go语言的基础语法,也是收获颇多。
标签:Term,log,报告,int,args,rf,实验,分布式系统,reply From: https://www.cnblogs.com/Mobius-strip/p/17949561