参考链接
课程地址
如何Debug:没有它可怎么活,几万行的日志怎么看
Students' Guide to Raft
raft算法可视化:直观展示
raft可视化简单入门
raft讲解视频:强烈推荐
感想
- 感觉理论+实践来学一个东西才学的深刻,特别是对于我这样对抽象理解不太行的,每次见识了一个算法或系统真正如何运行或者代码如何实现才能理解,否则总一头雾水或者忘得快。但是又导致学的越来越多,越来越底层,越来越难,把握不好这个度,浪费了时间精力,消磨了信心。所以感谢有这样好的实验课。
- 实验建议先过一遍论文,看不懂没关系,然后边看边做,后来做完试验后,我再看论文,发现论文都比较清晰了。
- 代码实现我觉得要始终记住一点:在任何执行RPC通信的函数中,要时刻想着这个通信可能有延时,可能网络导致接收不到消息,可能节点发送或接收前后它的状态(follower leader candidate)都变了。我遇到的很多问题都是这些情况处理不好
下面以raft原论文:In Search of an Understandable Consensus Algorithm为基础,以自顶向下的方式记录总结lab2和lab3的内容(下面的代码都是伪代码,仅为了说明思路)
基于Raft的KV存储系统
课程提供的系统结构图
模仿它做的一个示意图,它的大致功能是:提供一个基于Raft的容错的KV存储的服务
多个client会不断发出三种请求:put添加一个键值对,get得到一个键的值,append更新一个键的值。每个请求会通过clerk发送到leader的service。clerk如何找到leader?随机访问一个节点,如果不是leader,该节点会告诉clerk真正的leader是谁(通过心跳会知道),如果该节点故障没有回复,则询问下一个。
service通过start将请求封装成log项,通过Raft协议在大多数节点达成一致后,通过applyCh告诉service。然后service才执行命令,修改自己的状态(即自己存储的键值对数据)。最后通知client执行的结果
读操作(get)处理
get操作并不会修改状态机,那么是不是get操作可以不用经过raft一致性协议,直接读取当前键值对数据返回即可?lab这里要保证强一致性,所以不可以。
要保证线性一致性读:读到的结果要是读请求发起时已经完成提交的结果。但是如果发生分区,client访问的是老leader,可能读不到最新的结果。
解决:将读操作也作为log来达成共识后,再返回结果
实现
client端只需要实现get put append三个接口功能。实现找leader时偷懒,采用循环遍历每个service,直到找到leader。
service端get和putappend都采用如下流程实现
func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
// 调用start添加log
index, _, is_leader := kv.rf.Start(*args)
// 不是leader,添加失败,返回错误
if !is_leader {
reply.Err = ErrWrongLeader
return
}
// 添加成功,这里使用通道(也可以采用其它方式判断结果是否返回)等待raft对该log达成一致
ch := kv.getCh(index)
select {
case result := <-ch:
reply.Err = result.Err
reply.Value = result.Value
return
// 超时了还没有得到raft的结果返回错误
case <-time.After(time.Millisecond * 500):
reply.Err = ErrTimeout
return
}
}
然后service主要实现两个功能:
一是等待raft的结果(即从applyCh中得到raft传来的结果),并传给index对应的ch通道。传来的结果有两种,一种是达成一致的命令,service得到后,通过apply执行命令就好,一种是leader节点发来的快照(可能由于该follower落后的日志在leader中已经删去,生成了快照,所以通过sendsnapshot发送,在日志复制和压缩部分解释),收到后,直接解析快照(主要是KV数据),替换现在的KV键值对。
执行了命令(如执行get,就查询get的key对应的value作为结果),修改了状态机状态后,通过notify通知客户端执行的结果即可(即传给index对应的ch通道)。
特殊情况讨论:
-
notify时
- follower的服务器只用执行命令,不需要通知客户端
- 不是自己term的命令,不需要通知客户端
-
apply时:可能一个客户端请求一个命令,实际已经执行,但是由于网络等原因,没有收到,它可能重新请求命令,会导致命令执行两次。
对于get没有影响,但是对于putappend产生影响(比如put(a,1)put(a,2)本来两个命令依次正确执行完a对应2。但如果第一次put后a对应1,但是客户端没收到以为执行失败,然后客户端执行第二个命令a变成了2,客户端又重试第1个命令,a对应为1,出错)
所以在客户端请求时要加一个递增的序号,并且保存每个客户端执行的最后一个命令的序号,在apply时只有这个命令的序号大于已执行的最后一个命令的序号,才被执行,否则忽略
二是监视log大小(在日志压缩部分解释,不断积累的log会占满状态机的内存,在满足一定条件后,在lab中的条件就是log的大小超过了设置的大小,就把当前log都写入快照snapshot),然后调用snapshot让raft生成快照,删除多余log
快照除了包含当前的kv键值对数据,还要加一个每个客户端执行的最后一个命令的序号,便于服务崩溃恢复后命令的执行
raft总览
复制状态机:多个节点从相同的初始状态开始,执行相同的命令,最后会产生相同的最终状态,共识算法就是为了实现这样的复制状态机
raft中,leader将client的请求(put get append)等封装为一个个log项,然后将log复制给所有的follower,所有的节点按相同顺序执行(应用)这些请求(log),会得到相同的状态(如果是一个kvserver,状态就是保存的键值对,即最后每个节点保存的键值对都是一样的)
每个raft节点只能处在三种状态之一:leader,follower,candidate。他们之间的状态转化
raft将时间分为连续任期term,用连续整数表示
raft节点间使用rpc进行通信
实现
所以我设计每个节点实际跑这样一个循环,根据角色进入不同逻辑。每次变更为follower,都重置以下超时时间,在asFollower中就是不断判断是否超时时间内没收到心跳,要开始选举进入candidate状态。在asCandidate中主要就是发起投票,统计票数,看是否能进入leader状态。在asLeader中主要是不断发送心跳和日志(我统一写成一个函数sendTicker)和判断是否可以提交执行命令commitlog函数
func (rf *Raft) run() {
for !rf.killed() {
switch {
case rf.phase == follower:
rf.deadline = time.Now().Add(time.Duration(rand.Int63()%300+150) * time.Millisecond)
rf.asFollower()
case rf.phase == candidate:
rf.asCandidate()
case rf.phase == leader:
rf.asLeader()
default:
Debug(dError, "S%d execept phase Term:%d", rf.me, rf.currentTerm)
}
}
}
领导者选举
leader会周期向所有follower发送心跳,如果follower在超时时间内没有收到心跳,就要开始选举:
- 首先增加自己的term号,然后变为candidate状态,然后投票给自己,然后向其它所有节点发送投票请求
- 如果自己赢得超过半数的投票,那就转为leader状态,发送心跳
- 如果因为其它节点成为了leader而收到了它的心跳,且它的term大于等于自己的term,就转为follower状态
- 如果一段超时时间过去了还没有任何leader,就增加term号,重新发送投票请求
是否会给对方投票需要依次判断:
- 对方的term是否比自己大
- 自己是否已经投票给别人了(没投或投给自己可以继续判断)
- 对方的log是否比自己的“新”(保证选出的leader一定包含了所有被提交的日志,在日志提交部分解释)
如何判断“新”:
- 如果两个节点最后日志的term不同,那么term大的更新
- 如果两个节点最后日志的term相同,那么index大的更新
可以通过下面两个图(有颜色的是已经执行的log,方格中的数字是添加这个log的term)得知,只有更新的当选leader才能阻止错误
特殊情况讨论:
- 一个处于candidate(已经给自己投了票)的节点收到了别的candidate请求,如果对方的term比自己大,自己会转为follower状态,所以之前给自己投的票作废,是可以给别人投票的
- 一个处于candidate(已经给自己投了票)的节点收到了别的candidate请求,如果对方的term和自己一样大,但是log比自己的多或新,而且之前票是投给自己的,是可以投给对方的,投给自己的作废
实现
请求投票的判断逻辑
func (rf *Raft) asCandidate() {
for !rf.killed() && rf.phase == candidate {
loopagain:
box := make(chan bool)
request := RequestVoteArgument{
Term: rf.currentTerm,
Candidate: rf.me,
LastLogIndex //这两个字段主要为了判断log是否“新”
LastLogTerm
}
// 对其它节点发起请求
for i := 0; i < len(rf.peers); i++ {
if i == rf.me {
continue
}
response := RequestVoteResult{
Term: -1,
VoteGranted: false,
}
go func(i int) {
result := rf.RequestVote(i, &request, &response)
box <- result
}(i)
}
// 统计票数
tickets := 0
for {
select {
case result := <-box:
if rf.phase != candidate {
return
}
if result {
tickets++
}
if tickets+1 > len(rf.peers)/2 {
rf.phase = leader // 得票过半,当选
return
}
case <-time.After(time.Millisecond * time.Duration(rand.Int63()%300+200)):
if rf.phase != candidate {
return
}
rf.currentTerm++
goto loopagain // 超时,重新开始选举
}
}
}
}
投票的判断逻辑
func (rf *Raft) Vote(args *RequestVoteArgument, reply *RequestVoteResult) {
// 判断条件1,对方的term是否比自己大
if args.Term < rf.currentTerm {
reply.VoteGranted = false
reply.Term = rf.currentTerm
return
} else if args.Term > rf.currentTerm {
// 如果收到term比自己大,自己的状态就要变为follower
rf.currentTerm = args.Term
rf.phase = follower
}
// 判断条件2,自己是否已经投票给别人了(没投或投给自己可以继续判断)
if rf.voteFor == -1 || rf.voteFor == rf.me {
// 判断条件3,对方的log是否比自己的“新”(保证选出的leader一定包含了所有被提交的日志)
if haveLastLogTerm > args.LastLogTerm {
// term大的更新
reply.VoteGranted = false
reply.Term = rf.currentTerm
return
} else if haveLastLogTerm == args.LastLogTerm {
// term相同,那么index大的更新
if haveLastLogIndex > args.LastLogIndex {
reply.VoteGranted = false
reply.Term = rf.currentTerm
return
}
}
// 过关,投票给它
rf.voteFor = args.Candidate
reply.VoteGranted = true
} else {
reply.VoteGranted = false
return
}
}
日志复制
log项是什么?主要是service传来需要达成一致的执行的命令。
leader收到客户端的指令,会将其包装为log项,追加到自己的log中。然后leader将log发给其它所有节点,follower同样追加到自己的log中。如果有超过半数的节点复制后,leader就在本地执行(apply)命令,最后把结果返回给客户端
特殊情况讨论:
-
leader没有收到follower的回复:
- 原因:follower故障,网络故障follower已经复制但回复发送不成功或收不到
- 对应机制:leader会重复发送这个log
-
日志不一致:
- 原因1:follower崩溃后恢复,leader发送的log项的前面的(可能没有持久化)log项缺失。如下图所示,2会认为3下一个要复制的log是index4
- 原因2:leader复制的一个log项,还没来得及发送(或者发送了,还没有在本地执行)就崩溃了,一个没有复制这个log项的节点A成为新leader(比如5个节点,1个E崩溃,只有1个D复制了这个log,B和C节点都是可能投票给A的,A可以成为新leader)。如果老leader又活了,成为了follower。新leader复制新log项,老leader收到这个log项后就会有冲突
- 对应机制:leader会发送给follower,它认为follower的前一个log项的term和index,如果冲突follower会拒绝,被拒绝后leader会发送前一个log项,直到被接受(follower中原有冲突的log项会被覆盖,这并不违反一致性,因为被覆盖的log项一定没有被执行,一定没有被复制到大多数节点,因为如果被复制到大多数节点后,在投票阶段,通过对方的log是否比自己的“新”这一判断条件,大多数节点都不投票给它,已经阻止了没有复制到大多数节点这个log项的节点当选为leader)
- 优化:follower会可以直接告诉leader自己冲突的log项的index
实现
log项
type LogEntry struct {
LogTerm int
LogIndex int
Command interface{}
}
我把发送日志和发送心跳杂糅在一起来,感觉不太好。发送日志时不要直接发送原来日志的切片,否则有问题
func (rf *Raft) SendTicker() {
for i := range rf.peers {
if i == rf.me {
continue
}
// 日志压缩部分解释。如果follower落后的log被清理,leader直接发送快照
if rf.nextIndex[i] <= rf.lastSnapshotIndex {
go rf.SendSnapshot(i)
continue
}
args := AppendEntriesRequest{
Term int
LeaderID int
PreLogIndex int // 我leader认为你follower应该有的前一个log的index和term
PrevLogTerm int
Entries []LogEntry
LeaderCommit int // 我leader已经commit的日志的index,commit是指leader知道这个log已经被复制到大多数
}
// nextIndex[i]保存我leader认为你i号follower复制的下一个log
// 发送是发送之后的所有日志
// log的开头第一个永远是index和term都为0命令为空的项
args.Entries = make([]LogEntry, 0)
args.Entries = append(args.Entries, rf.log[rf.nextIndex[i]-rf.lastSnapshotIndex:]...)
go func(i int) {
// 发送
result := make(chan bool)
go func() {
ok := rf.peers[i].Call("Raft.ReceiveTicker", &args, &reply)
result <- ok
}()
for {
select {
case ok := <-result:
if rf.phase != leader {
return
}
if reply.Success {
// 复制日志成功
if args.PreLogIndex+len(args.Entries) < rf.matchIndex[i] {
// 过滤掉过时的回复,可能由于网络等原因,先前的回复才回来,就忽略
return
}
// 更新这个follower下一个要复试的log的index(nextIndex)和已经确认该follower复制了的log的index(matchIndex)
rf.matchIndex[i] = args.PreLogIndex + len(args.Entries)
rf.nextIndex[i] = rf.matchIndex[i] + 1
} else {
// 复制日志失败
if reply.Term > rf.currentTerm {
// follower 的term比自己大,自己要转为follower
rf.currentTerm = reply.Term
rf.phase = follower
} else {
// 日志不一致,这里我采用优化的方法,follower会直接通过ConflictIndex告知不一致的日志序号,下次leader按照这个发就可
rf.nextIndex[i] = reply.ConflictIndex
}
}
return
case <-time.After(50 * time.Millisecond):
// 超时
return
}
}
}(i)
}
}
follower收到log的操作
func (rf *Raft) ReceiveTicker(args *AppendEntriesRequest, reply *AppendEntriesReply) {
// leader的term比我小,拒绝
if args.Term < rf.currentTerm {
reply.Success = false
reply.Term = rf.currentTerm
return
}
// 因为我将日志和心跳杂糅在一起,收到后也要将自己的超时时间等更新
rf.currentTerm = args.Term
rf.voteFor = -1
if rf.phase == follower {
rf.deadline = time.Now().Add(time.Duration(rand.Int63()%300+150) * time.Millisecond)
}
rf.phase = follower
// 日志不一致,log缺失了
if rf.lastSnapshotIndex+len(rf.log)-1 < args.PreLogIndex {
reply.ConflictIndex = rf.lastSnapshotIndex + len(rf.log) // 直接给最后一个log的index
reply.Success = false
return
}
// follower落后的log被清理,leader应该直接发送快照
if args.PreLogIndex < rf.lastSnapshotIndex {
reply.Success = false
reply.ConflictIndex = rf.lastSnapshotIndex + 1
return
}
// 日志不一致,term不对
if myprevLogTerm != args.PrevLogTerm {
reply.Success = false
reply.Term = rf.currentTer reply.ConflictIndex = args.PreLogIndex
// 直接给该term的第一个log的index
//...
} else {
reply.Success = true
// 不是心跳
if len(args.Entries) != 0 {
// 有过时的复制日志的请求,可能之前的日志比如复制日志index 1, 2已经复制了,但leader没收到回复,又发送复制日志index 1, 2,但这个命令在leader发送复制日志index3后才来,就忽略
//...
// 复制日志
rf.log = rf.log[:args.PreLogIndex-rf.lastSnapshotIndex+1]
rf.log = append(rf.log, args.Entries...)
}
// leader通过LeaderCommit通知,这个index之前的log已经被复制到大多数节点了,可以通知你的service执行(提交)了,这部分将在日志提交解释
if args.LeaderCommit > rf.commitIndex {
var lastLogIndex int
if len(rf.log) == 1 {
lastLogIndex = rf.lastSnapshotIndex
} else {
lastLogIndex = rf.log[len(rf.log)-1].LogIndex
}
if rf.commitIndex > lastLogIndex {
rf.commitIndex = lastLogIndex
} else {
rf.commitIndex = args.LeaderCommit
}
// 通知follower的service执行(提交)
go rf.ApplyLog()
}
}
}
日志提交(执行命令)
如果有超过半数的节点复制后,leader就在本地执行(apply)命令,最后把结果返回给客户端
follower如何执行命令?leader会告诉follower已经超过半数的节点复制的log项的index,follower就会根据这个也在本地执行命令
特殊情况讨论:
有颜色的是已经执行的log,方格中的数字是添加这个log的term
解决机制:leader还没有提交某个log项就崩溃了,新leader不会直接提交不是自己term的log,只会完成该log的复制(可能还有少于半数的节点没有复制)。那么这个log如何提交?当新的log项来了,完成复制后,新leader会提交新log项,这样会提交老log
优化:新leader当选后,立即追加一个空的log项,以达成快速提交的目地
实现
在日志复制中的follower代码中提到follower如何提交,并使用ApplyLog函数提交(apply)
follower能不能提交(apply)是leader通过LeaderCommit告诉的
leader通过如下逻辑判断
func (rf *Raft) CommitLog() {
for index := len(rf.log) - 1; rf.log[index].LogIndex > rf.commitIndex; index-- {
// 从后往前遍历log,只要这个index的log能提交,前面的也能提交
if rf.phase != leader {
return
}
// 只提交自己当前term的log
if rf.log[index].LogTerm != rf.currentTerm {
return
}
// 通过matchIndex判断各个节点复制了这个index的log的数量
count := 0
for i := range rf.peers {
if i != rf.me && rf.matchIndex[i] >= rf.log[index].LogIndex {
count++
}
}
// 如果超过半数,就要apply(通知service去执行)
if count+1 > len(rf.peers)/2 {
Debug(dInfo, "S%d commit log rf.commitindex %d %s", rf.me, rf.commitIndex, rf.log[index])
rf.commitIndex = rf.log[index].LogIndex
go rf.ApplyLog()
return
}
}
}
就是将lastApplied最后一次通知的index到commitIndex现在确认能通知的index之间所有log发送到applyCh
func (rf *Raft) ApplyLog() {
for i := rf.lastApplied + 1; i <= rf.commitIndex; i++ {
rf.applyCh <- ApplyMsg{
CommandValid: true,
CommandIndex: rf.log[i-rf.lastSnapshotIndex].LogIndex,
Command: rf.log[i-rf.lastSnapshotIndex].Command,
CommandTerm: rf.log[i-rf.lastSnapshotIndex].LogTerm,
}
}
rf.lastApplied = rf.commitIndex
rf.applyCh <- notify
}
日志压缩
不断积累的log会占满状态机的内存。在满足一定条件后,就把当前log都写入快照snapshot,然后删除已经写入快照snapshot的log。如果follower落后的log被清理,leader直接发送快照
实现
每个节点的service都是独立的调用snapshot函数进行日志压缩的,leader只在follower缺失的日志被删除时,发送快照
当service发现log过大,通过此函数进行压缩
// 丢弃index及之前的所有log,snapshot为service传来的状态机的快照
func (rf *Raft) Snapshot(index int, snapshot []byte) {
// 删除的log不能没有执行
if index > rf.lastApplied {
return
}
// 删除的log不能已经删除
if index <= rf.lastSnapshotIndex {
return
}
// 删除log
//...
// 保存快照,这部分在持久化解释
rf.persist(rf.EncodeState(), snapshot)
}
在日志复制时已经提到如果follower落后的log被清理,leader直接发送快照(通过SendSnapshot调用rpc接口InstallSnapshot,让follower保存快照)
func (rf *Raft) InstallSnapshot(arg *InstallSnapshotArgument, reply *InstallSnapshotResult) {
// 相当于一次心跳
// ...
// 传来的快照已经存在
if rf.lastSnapshotIndex >= arg.LastIncludedIndex || rf.lastSnapshotTerm > arg.LastIncludedTerm {
reply.Term = -1
return
}
// 快照传来,该删除的log都要删除
//...
//...
// 快照也要发送给service让他更新状态
go func() {
rf.applyCh <- ApplyMsg{
CommandValid: false,
SnapshotValid: true,
Snapshot: rf.persister.ReadSnapshot(),
SnapshotIndex: rf.lastSnapshotIndex,
SnapshotTerm: rf.lastSnapshotTerm,
}
}()
}
持久化
当崩溃重启时不会丢失原有状态,需要及时持久化,比如说log,votefor,快照等这些在修改后要及时持久化保存,在重启后可以直接读取恢复状态机状态。像commitIndex,matchIndex,lastApplied这些都不需要持久化,都能根据日志交互逐步算出来
集群节点变更
采用两阶段的方法:
- 一阶段:leader追加\(C_{old, new}\)日志,整个集群进入联合一致状态。在这个状态中,日志复制和选举的操作不仅要满足所有节点的大多数,
还要在原来的旧节点中满足大多数
若新增节点,新增的节点先设置为只读,只有等日志追加完成后,leader才开始集群变更 - 二阶段:leader追加\(C_{new}\)日志,整个集群进入新配置阶段。和原来一样,在这个状态中,日志复制和选举的操作要满足所有节点的大多数
只要节点复制了上面阶段中任何一个日志,就进入这个阶段,无论这个日志是否被提交
特殊情况讨论:
- leader在\(C_{old, new}\)未提交时宕机:只可能旧节点中的当选,如果这个旧节点没有复制\(C_{old, new}\),那么集群成员变更失败,如果这个
- 节点复制了,虽然无法提交这个log,但可以继续之后的阶段,完成集群变更
- leader追加\(C_{old, new}\)已提交但\(C_{new}\)未发起时宕机:选举限制决定了新的leader一定具有\(C_{old, new}\)
- leader在\(C_{new}\)已发起时宕机:只可能复制了\(C_{new}\)的节点当选
优化:单节点集群变更:每次只增减一个节点
标签:MIT6.5840,log,记录,follower,节点,lab2,rf,日志,leader From: https://www.cnblogs.com/Qi-Lin/p/17518003.html