任务
完成log replication,论文5.3
基于lab2A
逻辑
这个网站有动画演示
大致过程
- 命令发送给leader,leader复制到本地
- leader将日志发给followers
- follower复制到本地,返回成功消息
- leader收到大多数的回复消息,提交日志,再向follower发消息
- follower提交日志
上面过程的是有非常多细节的,之后展开
实现
首先是发送命令给leader,这个是通过start
实现的,具体看注释
func (rf *Raft) Start(command interface{}) (int, int, bool) {
// Your code here (2B).
index := -1
term := -1
if rf.state != Leader {
return index, term, false
}
rf.mu.Lock()
defer rf.mu.Unlock()
index = len(rf.log)
term = rf.currentTerm
//首先将日志复制到本地
rf.log = append(rf.log, Entry{Term: term, Command: command, Index: index})
return index, term, true
}
在论文中说接到命令之后立刻发送AppendEntries
,但是此处实现需要新建一个函数,功能和2A中的boardcastheartbeat
是相似的,代码会有冗余。当然也可以通过传参来决定是运行一次还是循环运行。最后选择了让boardcastheartbeat
循环执行的时候带上发送,这样会牺牲一些性能
下一步是将消息发送folloers,这个发送的时候我门需要带上log信息,此时引出了新的问题,我门要将哪部分log传过去,现在就需要nextIndex
matchIndex
登场了
前者记录每个foller下一条日志的位置,但是当一个节点刚成为leader时候,并不知道其他节点的日志情况,所以nextIndex在成为leader时初始化为自己日志的长度
func (rf *Raft) convertLeader() {
rf.state = Leader
for i := range rf.peers {
//initialized to leader last log index+1
rf.nextIndex[i] = len(rf.log)
}
}
matchIndex是在我们实际匹配过程中探测到匹配的日志,Make初始化为0
我们需要传送的日志就是nextIndex[i]之后的内容
args := AppendEntriesArgs{
Term: rf.currentTerm,
LeaderId: rf.me,
PrevLogIndex: rf.nextIndex[i] - 1,
LedaerCommit: rf.commitIndex,
Entries: make([]Entry, 0),
}
if args.PrevLogIndex >= 0 {
args.PrevLogTerm = rf.log[args.PrevLogIndex].Term
}
if rf.nextIndex[i] >= 0 {
args.Entries = append(args.Entries, rf.log[rf.nextIndex[i]:]...)
}
接下来考虑怎么处理返回结果
- 如果Success=false,代表着我们log的因为某种原因匹配失败了,我们需要减少next[i]的值,在论文中有表述
- 如果Success=true,代表复制成功,修改
nextIndex
matchIndex
,按照论文的意思我们需要收集到多数的确认之后,提交日志。但是日志的数量可能很多,节点也多,维护计数器会变的复杂。参考大佬的思想,我们将所有节点的matchIndex排序,在中间位置上的index就是“多数”
// sync success
if reply.Success {
rf.matchIndex[i] = args.PrevLogIndex + len(args.Entries)
rf.nextIndex[i] = rf.matchIndex[i] + 1
//更新commiindex
arr := make([]int, len(rf.matchIndex))
copy(arr, rf.matchIndex)
//添加leader节点的
arr[rf.me] = len(rf.log) - 1
sort.Ints(arr)
newCIndex := arr[len(rf.peers)/2]
if rf.state == Leader && rf.log[newCIndex].Term == rf.currentTerm && newCIndex > rf.commitIndex {
rf.commitIndex = newCIndex
go rf.applyLog()
}
} else { //sync fail
rf.nextIndex[I]--
}
其中的applyLog很重要,他的作用是通知客户端提交成功了,也是我们测试要用的
func (rf *Raft) applyLog() {
if len(rf.log) == 0 {
return
}
for rf.commitIndex > rf.lastApplied {
rf.lastApplied++
entry := rf.log[rf.lastApplied]
msg := ApplyMsg{
CommandValid: true,
Command: entry.Command,
CommandIndex: entry.Index,
}
rf.applyCh <- msg
}
}
applyCh是raft结构体中需要心添加的字段,记得Make进行初始化
func Make(peers []*labrpc.ClientEnd, me int,
persister *Persister, applyCh chan ApplyMsg) *Raft {
rf := &Raft{}
rf.peers = peers
rf.persister = persister
rf.me = me
//......
rf.applyCh = applyCh //这个chan从参数传过来的
//......
}
broadcastHeartBeat代码
//发起心跳广播
func (rf *Raft) broadcastHeartBeat() {
for !rf.killed() && rf.state == Leader {
for i := range rf.peers {
//跳过自己
if i == rf.me {
continue
}
go func(i int) {
reply := AppendEntriesReply{}
args := AppendEntriesArgs{
Term: rf.currentTerm,
LeaderId: rf.me,
PrevLogIndex: rf.nextIndex[i] - 1,
LedaerCommit: rf.commitIndex,
Entries: make([]Entry, 0),
}
if args.PrevLogIndex >= 0 {
args.PrevLogTerm = rf.log[args.PrevLogIndex].Term
}
if rf.nextIndex[i] >= 0 {
args.Entries = append(args.Entries, rf.log[rf.nextIndex[i]:]...)
}
//send RPC
f := rf.sendAppendEntries(i, &args, &reply)
//链接失败
if !f {
return
}
if reply.Term > rf.currentTerm {
rf.currentTerm = reply.Term
rf.convertFollower()
return
}
//发送的是心跳,接下来的返回消息可以不用处理
if len(args.Entries) == 0 {
return
}
// sync success
if reply.Success {
rf.matchIndex[i] = args.PrevLogIndex + len(args.Entries)
rf.nextIndex[i] = rf.matchIndex[i] + 1
//更新commiindex
arr := make([]int, len(rf.matchIndex))
copy(arr, rf.matchIndex)
//添加leader节点的
arr[rf.me] = len(rf.log) - 1
sort.Ints(arr)
newCIndex := arr[len(rf.peers)/2]
if rf.state == Leader && rf.log[newCIndex].Term == rf.currentTerm && newCIndex > rf.commitIndex {
rf.commitIndex = newCIndex
go rf.applyLog()
}
} else { //sync fail
rf.nextIndex[i]--
}
}(i)
}
time.Sleep(150 * time.Millisecond)
}
}
接下来是AppendEntries
需要将失配的日志删除,然后复制新的日志
还需要处理日志的提交
//AppendEntries RPC handler.
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
// Your code here (2A, 2B).
reply.Term = rf.currentTerm
reply.Success = false
//拒绝响应
if rf.currentTerm > args.Term {
return
}
rf.resetTimeout()
//if recetive term >currentTerm
if rf.currentTerm < args.Term {
rf.currentTerm = args.Term
rf.convertFollower()
}
//本地日志缺少
if len(rf.log)-1 < args.PrevLogIndex {
return
}
//日志term不匹配
if args.PrevLogIndex > 0 && rf.log[args.PrevLogIndex].Term != args.PrevLogTerm {
return
}
for i, entry := range args.Entries {
logIndex := args.PrevLogIndex + i + 1
if logIndex >= len(rf.log)-1 {
rf.log = append(rf.log, entry)
} else {
if rf.log[logIndex].Term != entry.Term {
rf.log = rf.log[:logIndex] //删除失配内容
rf.log = append(rf.log, entry)
}
}
}
//更新commitdex
if args.LedaerCommit > rf.commitIndex {
rf.commitIndex = args.LedaerCommit
if rf.commitIndex > len(rf.log)-1 {
rf.commitIndex = len(rf.log) - 1
}
}
go rf.applyLog()
reply.Success = true
}
注意
以上代码在测试的时候会出现新的问题,我们代码log是从下标0开始的,而论文中是从1,我们的测试函数也是从1开始,所以直接测试有几个一定无法通过
需要修改测试函数,或者修改代码实现,可以在Make
的时候添加一条日志用来占位
休要修改的测试函数
TestBasicAgree2B
TestFailNoAgree2B
无法保证100%通过测试,有时会无法通过
标签:Term,6.824,log,args,len,lab2,rf,nextIndex From: https://www.cnblogs.com/beifangcc/p/16846300.html