Raft分布式一致性研究
年前有点时间,决定把Raft分布式一致性协议实现一下,加深理解和认识,发现这件事真的是“纸上得来终觉浅,须知此事要躬行”。照着协议上的规则来写,就短短的几条规则,代码可以很快的完成,但是要想正确运行,通过所有的测试案例,那真是要磨掉一层皮,刚刚过年没回老家,在上海有时间调试了几天,终于是跑通所有的测试用例。同时也是第一次使用go语言,然后,发现经过简单的磨合之后,除了变量和函数的定义语法有点不同,其他很多和c语言差不多,很快就编写自如了,还挺喜欢这个语言的,如果说三个最先想到的特点:1. 简洁,2. 多线程的使用非常方便,3. -race竞态条件检查功能简直帅呆了,检查多线程的安全问题太方便了。最早的时候,没有启用这个选项,不知道这个用途,后面看课程说明里面说本地调试阶段要加上这个参数,就试了一下,发现一堆的竞态条件冲突,真是惊出一身的冷汗。要是没有这个检查功能,要是用的是Java,那多危险啊。自认为对于多线程的处理还是很有经验的,还是会犯很多的错误,所以,-race从语言层面进行了支持,无疑是非常优秀的,用上这个参数之后对于多线程的处理更有信心了。
6.824课程
项目是mit大学的6.824课程: http://nil.csail.mit.edu/6.824/2022/labs/lab-raft.html
把代码仓库克隆到本地,使用自己熟悉的IDE打开项目,我是直接在Windows的WSL里面安装的go,使用linux的命令行编译和执行,不太熟悉gdb,也没有使用debug,主要是通过日志来定位问题,多看看日志之后,就熟悉了协议的交互过程。
参考文档
这个是官方的Raft论文,简直是字字珠玑,Figure2一页基本上就把整个协议的精髓描述出来了,值得学习:
https://raft.github.io/raft.pdf
另外,就是官方https://raft.github.io/ 首页的这个动画可以多看看,大概就知道选举的过程了,timeout在动画中非常直观。
然后,就是http://thesecretlivesofdata.com/raft/ 这个交互式的动画展示,把Raft的选举和日志复制的过程讲得非常清晰,我是反复看了这个动画很多遍,每一个状态的变化都有深意,多看几次就能明白了。
我的实现
package raft
//
// this is an outline of the API that raft must expose to
// the service (or tester). see comments below for
// each of these functions for more details.
//
// rf = Make(...)
// create a new Raft server.
// rf.Start(command interface{}) (index, term, isleader)
// start agreement on a new log entry
// rf.GetState() (term, isLeader)
// ask a Raft for its current term, and whether it thinks it is leader
// ApplyMsg
// each time a new entry is committed to the log, each Raft peer
// should send an ApplyMsg to the service (or tester)
// in the same server.
//
import (
"6.824/labgob"
"bytes"
"fmt"
"math/rand"
"sort"
// "bytes"
"sync"
"sync/atomic"
"time"
// "6.824/labgob"
"6.824/labrpc"
)
const (
FOLLOWER = 0
CANDIDATE = 1
LEADER = 2
ELECT_TIMEOUT_MIN = 150 * time.Millisecond
ELECT_TIMEOUT_MAX = 300 * time.Millisecond
HEARTBEAT_INTERVAL = 70 * time.Millisecond
)
//
// as each Raft peer becomes aware that successive log entries are
// committed, the peer should send an ApplyMsg to the service (or
// tester) on the same server, via the applyCh passed to Make(). set
// CommandValid to true to indicate that the ApplyMsg contains a newly
// committed log entry.
//
// in part 2D you'll want to send other kinds of messages (e.g.,
// snapshots) on the applyCh, but set CommandValid to false for these
// other uses.
//
type ApplyMsg struct {
CommandValid bool
Command interface{}
CommandIndex int
// For 2D:
SnapshotValid bool
Snapshot []byte
SnapshotTerm int
SnapshotIndex int
}
type Entry struct {
//Index int
Term int
Command interface{}
}
//
// A Go object implementing a single Raft peer.
//
type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]
dead int32 // set by Kill()
// Your data here (2A, 2B, 2C).
// Look at the paper's Figure 2 for a description of what
// state a Raft server must maintain.
leaderId int
role int
votedCount int
baseLogIndex int
receivedHeartbeat bool
grantedVote bool
chanApply chan ApplyMsg
currentTerm int
votedFor int
log []Entry
commitIndex int
lastApplied int
nextIndex []int
matchIndex []int
}
// return currentTerm and whether this server
// believes it is the leader.
func (rf *Raft) GetState() (int, bool) {
rf.mu.Lock()
defer rf.mu.Unlock()
return rf.currentTerm, rf.role == LEADER
}
//
// save Raft's persistent state to stable storage,
// where it can later be retrieved after a crash and restart.
// see paper's Figure 2 for a description of what should be persistent.
//
func (rf *Raft) persist() {
// Your code here (2C).
// Example:
raftState := rf.encodeRaftState()
rf.persister.SaveRaftState(raftState)
}
func (rf *Raft) encodeRaftState() []byte {
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
_ = e.Encode(rf.currentTerm)
_ = e.Encode(rf.votedFor)
_ = e.Encode(rf.baseLogIndex)
_ = e.Encode(rf.log)
data := w.Bytes()
return data
}
func (rf *Raft) persistWithSnapshot(snapshot []byte) {
// Your code here (2C).
// Example:
raftState := rf.encodeRaftState()
rf.persister.SaveStateAndSnapshot(raftState, snapshot)
}
//
// restore previously persisted state.
//
func (rf *Raft) readPersist(data []byte) {
if data == nil || len(data) < 1 { // bootstrap without any state?
return
}
// Your code here (2C).
// Example:
r := bytes.NewBuffer(data)
d := labgob.NewDecoder(r)
var currentTerm int
var votedFor int
var baseLogIndex int
var log []Entry
_ = d.Decode(¤tTerm)
_ = d.Decode(&votedFor)
_ = d.Decode(&baseLogIndex)
_ = d.Decode(&log)
rf.currentTerm = currentTerm
rf.votedFor = votedFor
rf.baseLogIndex = baseLogIndex
rf.log = log
rf.lastApplied = rf.baseLogIndex
}
//
// A service wants to switch to snapshot. Only do so if Raft hasn't
// have more recent info since it communicate the snapshot on applyCh.
//
func (rf *Raft) CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) bool {
// Your code here (2D).
return true
}
// the service says it has created a snapshot that has
// all info up to and including index. this means the
// service no longer needs the log through (and including)
// that index. Raft should now trim its log as much as possible.
func (rf *Raft) Snapshot(index int, snapshot []byte) {
// Your code here (2D).
// need another thread to use Lock, otherwise deadlock with applyCommitted.
go func() {
rf.mu.Lock()
defer rf.mu.Unlock()
rf.debugPrint("Start snapshot index: %d, lastApplied: %d, baseLogIndex: %d",
index, rf.lastApplied, rf.baseLogIndex)
if index > rf.lastApplied || index <= rf.baseLogIndex {
return
}
localLogIndex := index - rf.baseLogIndex
rf.log = rf.log[localLogIndex:]
rf.baseLogIndex = index
rf.persistWithSnapshot(snapshot)
rf.debugPrint("after snapshot, baseLogIndex: %d", rf.baseLogIndex)
}()
}
//
// example RequestVote RPC arguments structure.
// field names must start with capital letters!
//
type RequestVoteArgs struct {
// Your data here (2A, 2B).
CandidateId int
Term int
LastLogIndex int
LastLogTerm int
}
//
// example RequestVote RPC reply structure.
// field names must start with capital letters!
//
type RequestVoteReply struct {
// Your data here (2A).
Success bool
Term int
}
type AppendEntriesArgs struct {
LeaderId int
Term int
LeaderCommit int
PrevLogIndex int
PrevLogTerm int
Entries []Entry
}
type AppendEntriesReply struct {
Success bool
Term int
NextIndex int
}
type InstallSnapshotArgs struct {
LeaderId int
Term int
LastIncludedIndex int
LastIncludedTerm int
Data []byte
// Done bool
// offset int
}
type InstallSnapshotReply struct {
Term int
}
//
// example RequestVote RPC handler.
//
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
// Your code here (2A, 2B).
rf.mu.Lock()
defer rf.mu.Unlock()
rf.debugPrint("Receive vote: %v", args)
if args.Term < rf.currentTerm {
reply.Success = false
reply.Term = rf.currentTerm
rf.debugPrint("Reject vote, because args term is old: %v", args)
return
}
if args.Term > rf.currentTerm {
rf.currentTerm = args.Term
rf.votedFor = -1
rf.votedCount = 0
rf.leaderId = -1
rf.becomeFollower()
rf.persist()
}
reply.Term = args.Term
if rf.votedFor != -1 && rf.votedFor != args.CandidateId {
reply.Success = false
rf.debugPrint("Sorry, can't vote: %d, already voted for others.", args.CandidateId)
} else if rf.logIsNewer(args.LastLogTerm, args.LastLogIndex) {
reply.Success = false
rf.debugPrint("Sorry, can't vote: %d, my log is newer.", args.CandidateId)
} else {
rf.grantedVote = true
rf.votedFor = args.CandidateId
rf.becomeFollower()
rf.persist()
reply.Success = true
rf.debugPrint("Granted vote to %d.", args.CandidateId)
}
}
func (rf *Raft) handleRequestVoteReply(reply RequestVoteReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
if reply.Success {
if reply.Term == rf.currentTerm {
rf.votedCount++
if rf.votedCount > len(rf.peers)/2 && rf.role == CANDIDATE {
rf.becomeLeader()
}
}
} else {
if reply.Term > rf.currentTerm {
rf.currentTerm = reply.Term
rf.votedFor = -1
rf.votedCount = 0
rf.leaderId = -1
rf.becomeFollower()
rf.persist()
}
}
}
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
// Your code here (2A, 2B).
rf.mu.Lock()
defer rf.mu.Unlock()
rf.debugPrint("Receive heartbeat: %v", args)
if args.Term < rf.currentTerm {
reply.Success = false
reply.Term = rf.currentTerm
reply.NextIndex = args.PrevLogIndex + 1
rf.debugPrint("Old leader: %d, not your term, now.", args.LeaderId)
return
}
rf.receivedHeartbeat = true
rf.leaderId = args.LeaderId
rf.becomeFollower()
if args.Term > rf.currentTerm {
rf.currentTerm = args.Term
rf.votedFor = -1
rf.persist()
}
myNextLogIndex := rf.genNextLogIndex()
myLastLogIndex := myNextLogIndex - 1
if myLastLogIndex < args.PrevLogIndex {
reply.Success = false
reply.Term = args.Term
reply.NextIndex = myNextLogIndex
rf.debugPrint("My log is too short, need more entries.")
return
}
localLogIndex := args.PrevLogIndex - rf.baseLogIndex
sameIndexLogEntry := rf.log[localLogIndex]
if args.PrevLogTerm == sameIndexLogEntry.Term {
if len(args.Entries) > 0 {
if len(rf.log) > localLogIndex+1 {
rf.log = rf.log[0 : localLogIndex+1]
}
rf.log = append(rf.log, copyEntryArr(args.Entries)...)
rf.persist()
}
if args.LeaderCommit > rf.commitIndex {
rf.commitIndex = min(args.LeaderCommit, rf.genNextLogIndex()-1)
rf.applyCommitted()
}
reply.Success = true
reply.Term = args.Term
reply.NextIndex = rf.genNextLogIndex()
if len(args.Entries) > 0 {
rf.debugPrint("Append log from %d success.", args.LeaderId)
} else {
rf.debugPrint("No append entries, reply heartbeat to %d success.", args.LeaderId)
}
} else {
// improve performance, skip all items with the same conflict term
for localLogIndex > 0 && rf.log[localLogIndex-1].Term == sameIndexLogEntry.Term {
localLogIndex--
}
reply.Success = false
reply.Term = args.Term
// Binary search to improve performance, it depends on the network, in normal situation, we don't need
// binary search, because partition does not occur frequently. For test, like Figure8Unreliable, it works.
reply.NextIndex = rf.baseLogIndex + (localLogIndex+1)/2
rf.debugPrint("Can't append the entries, the prev term conflict, next index change to: %d.",
reply.NextIndex)
}
}
func (rf *Raft) handleAppendEntriesReply(serverId int, reply AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
rf.debugPrint("Handle heartbeat reply from %d: %v", serverId, reply)
rf.nextIndex[serverId] = reply.NextIndex
if reply.Success {
if reply.Term == rf.currentTerm && rf.role == LEADER {
rf.matchIndex[serverId] = reply.NextIndex - 1
rf.commitIndex = max(rf.commitIndex, rf.calculateCommitIndex())
rf.applyCommitted()
}
} else {
if reply.Term > rf.currentTerm {
rf.currentTerm = reply.Term
rf.votedFor = -1
rf.votedCount = 0
rf.leaderId = -1
rf.becomeFollower()
rf.persist()
}
}
}
func (rf *Raft) InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {
// Your code here (2A, 2B).
rf.mu.Lock()
defer rf.mu.Unlock()
rf.debugPrint("Receive install snapshot: %v", args)
if args.Term < rf.currentTerm {
reply.Term = rf.currentTerm
rf.debugPrint("Reject install snapshot, because args term is old: %v", args)
return
}
if args.Term > rf.currentTerm {
rf.currentTerm = args.Term
rf.votedFor = -1
rf.votedCount = 0
rf.becomeFollower()
rf.persist()
}
rf.leaderId = args.LeaderId
reply.Term = args.Term
if args.LastIncludedIndex <= rf.baseLogIndex {
rf.debugPrint("Sorry, can't install snapshot from: %d, snapshot not newer, index: %d, my base index: %d",
args.LeaderId, args.LastIncludedIndex, rf.baseLogIndex)
} else {
if args.LastIncludedIndex > rf.genNextLogIndex()-1 {
rf.log = make([]Entry, 0)
rf.log = append(rf.log, Entry{Term: args.LastIncludedTerm, Command: nil})
} else {
rf.log = rf.log[args.LastIncludedIndex-rf.baseLogIndex:]
}
rf.baseLogIndex = args.LastIncludedIndex
rf.lastApplied = rf.baseLogIndex
rf.persistWithSnapshot(args.Data)
rf.debugPrint("Install snapshot from %d, last index: %d", args.LeaderId, args.LastIncludedIndex)
applyMsg := ApplyMsg{
CommandValid: false,
SnapshotValid: true,
SnapshotIndex: args.LastIncludedIndex,
SnapshotTerm: args.LastIncludedTerm,
Snapshot: args.Data,
}
rf.chanApply <- applyMsg
rf.debugPrint("Apply msg for snapshot from: %d, last index: %d", args.LeaderId, applyMsg.SnapshotIndex)
}
}
func (rf *Raft) handleInstallSnapshotReply(reply InstallSnapshotReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
if reply.Term > rf.currentTerm {
rf.currentTerm = reply.Term
rf.votedFor = -1
rf.votedCount = 0
rf.leaderId = -1
rf.becomeFollower()
rf.persist()
}
}
func (rf *Raft) becomeFollower() {
if rf.role != FOLLOWER {
rf.role = FOLLOWER
rf.debugPrint("Become follower")
}
}
func (rf *Raft) becomeCandidate() {
rf.mu.Lock()
defer rf.mu.Unlock()
rf.role = CANDIDATE
rf.leaderId = -1
rf.votedFor = rf.me
rf.votedCount = 1
rf.currentTerm++
rf.persist()
rf.debugPrint("Become candidate")
for serverId := range rf.peers {
rf.debugPrint("Send vote to: %d", serverId)
if serverId == rf.me {
rf.grantedVote = true
} else {
args := RequestVoteArgs{
Term: rf.currentTerm,
CandidateId: rf.me,
LastLogTerm: rf.log[len(rf.log)-1].Term,
LastLogIndex: rf.genNextLogIndex() - 1,
}
reply := RequestVoteReply{}
go func(serverId int) {
if rf.sendRequestVote(serverId, &args, &reply) {
rf.handleRequestVoteReply(reply)
}
}(serverId)
}
}
}
func (rf *Raft) becomeLeader() {
rf.role = LEADER
rf.leaderId = rf.me
nextIndex := rf.genNextLogIndex()
for serverId := range rf.peers {
rf.nextIndex[serverId] = nextIndex
rf.matchIndex[serverId] = 0
}
rf.debugPrint("Become leader")
go rf.heartbeatLoop()
}
func (rf *Raft) heartbeatLoop() {
for rf.killed() == false {
rf.mu.Lock()
if rf.role != LEADER {
rf.mu.Unlock()
return
}
for serverId := range rf.peers {
if serverId == rf.me {
rf.debugPrint("Send heartbeat to: %d, myself", serverId)
rf.receivedHeartbeat = true
nextIndex := rf.genNextLogIndex()
rf.nextIndex[serverId] = nextIndex
rf.matchIndex[serverId] = nextIndex - 1
} else {
// figure8 can't replicate log, if last log term != currentTerm
var nextIndex int
lastLogTerm := rf.log[len(rf.log)-1].Term
if lastLogTerm != rf.currentTerm {
nextIndex = rf.genNextLogIndex()
rf.debugPrint("Send heartbeat to: %d, my term gets no new commands yet.", serverId)
} else {
nextIndex = min(rf.genNextLogIndex(), rf.nextIndex[serverId])
if nextIndex > rf.baseLogIndex {
rf.debugPrint("Send heartbeat to: %d", serverId)
} else {
args := InstallSnapshotArgs{
Term: rf.currentTerm,
LeaderId: rf.me,
LastIncludedIndex: rf.baseLogIndex,
LastIncludedTerm: rf.log[0].Term,
Data: rf.persister.ReadSnapshot(),
}
reply := InstallSnapshotReply{}
go func(serverId int) {
if rf.sendInstallSnapshot(serverId, &args, &reply) {
rf.handleInstallSnapshotReply(reply)
}
}(serverId)
rf.debugPrint("Need read log back from snapshot.")
continue
}
}
args := AppendEntriesArgs{
Term: rf.currentTerm,
LeaderId: rf.me,
LeaderCommit: rf.commitIndex,
PrevLogIndex: nextIndex - 1,
PrevLogTerm: rf.log[nextIndex-rf.baseLogIndex-1].Term,
Entries: rf.log[nextIndex-rf.baseLogIndex:],
}
reply := AppendEntriesReply{}
go func(serverId int) {
if rf.sendAppendEntries(serverId, &args, &reply) {
rf.handleAppendEntriesReply(serverId, reply)
}
}(serverId)
}
}
rf.mu.Unlock()
time.Sleep(HEARTBEAT_INTERVAL)
}
}
//
// example code to send a RequestVote RPC to a server.
// server is the index of the target server in rf.peers[].
// expects RPC arguments in args.
// fills in *reply with RPC reply, so caller should
// pass &reply.
// the types of the args and reply passed to Call() must be
// the same as the types of the arguments declared in the
// handler function (including whether they are pointers).
//
// The labrpc package simulates a lossy network, in which servers
// may be unreachable, and in which requests and replies may be lost.
// Call() sends a request and waits for a reply. If a reply arrives
// within a timeout interval, Call() returns true; otherwise
// Call() returns false. Thus Call() may not return for a while.
// A false return can be caused by a dead server, a live server that
// can't be reached, a lost request, or a lost reply.
//
// Call() is guaranteed to return (perhaps after a delay) *except* if the
// handler function on the server side does not return. Thus there
// is no need to implement your own timeouts around Call().
//
// look at the comments in ../labrpc/labrpc.go for more details.
//
// if you're having trouble getting RPC to work, check that you've
// capitalized all field names in structs passed over RPC, and
// that the caller passes the address of the reply struct with &, not
// the struct itself.
//
func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {
ok := rf.peers[server].Call("Raft.RequestVote", args, reply)
return ok
}
func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool {
ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
return ok
}
func (rf *Raft) sendInstallSnapshot(server int, args *InstallSnapshotArgs, reply *InstallSnapshotReply) bool {
ok := rf.peers[server].Call("Raft.InstallSnapshot", args, reply)
return ok
}
//
// the service using Raft (e.g. a k/v server) wants to start
// agreement on the next command to be appended to Raft's log. if this
// server isn't the leader, returns false. otherwise start the
// agreement and return immediately. there is no guarantee that this
// command will ever be committed to the Raft log, since the leader
// may fail or lose an election. even if the Raft instance has been killed,
// this function should return gracefully.
//
// the first return value is the index that the command will appear at
// if it's ever committed. the second return value is the current
// term. the third return value is true if this server believes it is
// the leader.
//
func (rf *Raft) Start(command interface{}) (int, int, bool) {
index := -1
term := -1
isLeader := false
// Your code here (2B).
rf.mu.Lock()
defer rf.mu.Unlock()
isLeader = rf.role == LEADER
if isLeader {
index = rf.genNextLogIndex()
term = rf.currentTerm
entry := Entry{Term: rf.currentTerm, Command: command}
rf.log = append(rf.log, entry)
rf.persist()
rf.debugPrint("Start new command: %v", command)
}
return index, term, isLeader
}
//
// the tester doesn't halt goroutines created by Raft after each test,
// but it does call the Kill() method. your code can use killed() to
// check whether Kill() has been called. the use of atomic avoids the
// need for a lock.
//
// the issue is that long-running goroutines use memory and may chew
// up CPU time, perhaps causing later tests to fail and generating
// confusing debug output. any goroutine with a long-running loop
// should call killed() to check whether it should stop.
//
func (rf *Raft) Kill() {
atomic.StoreInt32(&rf.dead, 1)
// Your code here, if desired.
}
func (rf *Raft) killed() bool {
z := atomic.LoadInt32(&rf.dead)
return z == 1
}
// The ticker go routine starts a new election if this peer hasn't received
// heartsbeats recently.
func (rf *Raft) ticker() {
for rf.killed() == false {
// Your code here to check if a leader election should
// be started and to randomize sleeping time using
// time.Sleep().
time.Sleep(rf.getRandElectTimeout())
rf.mu.Lock()
if rf.receivedHeartbeat {
rf.receivedHeartbeat = false
//rf.debugPrint("received heartbeat, leader is alive, don't elect.")
} else if rf.grantedVote {
rf.grantedVote = false
//rf.debugPrint("granted vote, don't elect.")
} else {
go rf.becomeCandidate()
}
rf.mu.Unlock()
}
}
func (rf *Raft) debugPrint(format string, args ...interface{}) {
DPrintf(fmt.Sprintf("id:%d,term:%d,role:%d,l:%d,v:%d,commit:%d,apply:%d,base:%d,match:%v,next:%v,log:%v msg: %s",
rf.me, rf.currentTerm, rf.role, rf.leaderId, rf.votedFor, rf.commitIndex, rf.lastApplied, rf.baseLogIndex,
rf.matchIndex, rf.nextIndex, rf.log, format), args...)
}
func (rf *Raft) genNextLogIndex() int {
return rf.baseLogIndex + len(rf.log)
}
func (rf *Raft) getRandElectTimeout() time.Duration {
return ELECT_TIMEOUT_MIN +
time.Duration(rand.Intn(int((ELECT_TIMEOUT_MAX-ELECT_TIMEOUT_MIN)/time.Millisecond)))*time.Millisecond
}
func (rf *Raft) logIsNewer(candidateLogTerm int, candidateLogIndex int) bool {
myLogTerm := rf.log[len(rf.log)-1].Term
if myLogTerm != candidateLogTerm {
return myLogTerm > candidateLogTerm
} else {
myLogIndex := rf.genNextLogIndex() - 1
return myLogIndex > candidateLogIndex
}
}
func (rf *Raft) applyCommitted() {
for rf.commitIndex > rf.lastApplied {
localLogIndex := rf.lastApplied + 1 - rf.baseLogIndex
applyMsg := ApplyMsg{
Command: rf.log[localLogIndex].Command,
CommandIndex: rf.lastApplied + 1,
CommandValid: true,
}
rf.chanApply <- applyMsg
rf.lastApplied++
rf.debugPrint("Apply msg for command: %v", applyMsg)
}
}
func (rf *Raft) calculateCommitIndex() int {
beforeMid := (len(rf.peers) - 1) / 2
matchIndexCopy := copyIntArr(rf.matchIndex)
sort.Ints(matchIndexCopy)
return matchIndexCopy[beforeMid]
}
func copyIntArr(arr []int) []int {
arrCopy := make([]int, len(arr))
copy(arrCopy, arr)
return arrCopy
}
func copyEntryArr(arr []Entry) []Entry {
arrCopy := make([]Entry, len(arr))
copy(arrCopy, arr)
return arrCopy
}
func max(a int, b int) int {
if a > b {
return a
} else {
return b
}
}
func min(a int, b int) int {
if a < b {
return a
} else {
return b
}
}
//
// the service or tester wants to create a Raft server. the ports
// of all the Raft servers (including this one) are in peers[]. this
// server's port is peers[me]. all the servers' peers[] arrays
// have the same order. persister is a place for this server to
// save its persistent state, and also initially holds the most
// recent saved state, if any. applyCh is a channel on which the
// tester or service expects Raft to send ApplyMsg messages.
// Make() must return quickly, so it should start goroutines
// for any long-running work.
//
func Make(peers []*labrpc.ClientEnd, me int,
persister *Persister, applyCh chan ApplyMsg) *Raft {
rf := &Raft{}
rf.peers = peers
rf.persister = persister
rf.me = me
// Your initialization code here (2A, 2B, 2C).
rand.Seed(time.Now().UnixNano())
rf.chanApply = applyCh
rf.dead = 0
rf.currentTerm = 0
rf.votedFor = -1
rf.log = make([]Entry, 0)
rf.log = append(rf.log, Entry{Term: 0, Command: nil})
rf.role = FOLLOWER
rf.leaderId = -1
rf.votedCount = 0
rf.receivedHeartbeat = false
rf.grantedVote = false
rf.baseLogIndex = 0
rf.nextIndex = make([]int, len(peers))
rf.matchIndex = make([]int, len(peers))
rf.commitIndex = 0
rf.lastApplied = 0
// initialize from state persisted before a crash
rf.readPersist(persister.ReadRaftState())
// start ticker goroutine to start elections
go rf.ticker()
return rf
}
我的执行结果
标签:...,log,args,rf,reply,一致性,Raft,分布式 From: https://www.cnblogs.com/yangwen0228/p/17132712.htmlTest (2A): initial election ...
... Passed -- 3.0 3 84 21300 0
Test (2A): election after network failure ...
... Passed -- 4.6 3 156 30024 0
Test (2A): multiple elections ...
... Passed -- 6.0 7 816 156260 0
Test (2B): basic agreement ...
... Passed -- 0.6 3 16 4078 3
Test (2B): RPC byte count ...
... Passed -- 1.7 3 48 112826 11
Test (2B): agreement after follower reconnects ...
... Passed -- 5.7 3 158 39113 7
Test (2B): no agreement if too many followers disconnect ...
... Passed -- 3.4 5 232 47462 3
Test (2B): concurrent Start()s ...
... Passed -- 0.6 3 16 4094 6
Test (2B): rejoin of partitioned leader ...
... Passed -- 6.4 3 254 57971 4
Test (2B): leader backs up quickly over incorrect follower logs ...
... Passed -- 21.2 5 2264 1722320 102
Test (2B): RPC counts aren't too high ...
... Passed -- 2.2 3 58 15322 12
Test (2C): basic persistence ...
... Passed -- 3.6 3 104 24614 6
Test (2C): more persistence ...
... Passed -- 15.6 5 1152 238836 16
Test (2C): partitioned leader and one follower crash, leader restarts ...
... Passed -- 1.5 3 38 9224 4
Test (2C): Figure 8 ...
... Passed -- 27.7 5 844 159399 21
Test (2C): unreliable agreement ...
... Passed -- 4.2 5 224 72989 246
Test (2C): Figure 8 (unreliable) ...
... Passed -- 43.9 5 5120 9238145 424
Test (2C): churn ...
... Passed -- 16.5 5 904 617927 497
Test (2C): unreliable churn ...
... Passed -- 16.3 5 708 167976 157
Test (2D): snapshots basic ...
... Passed -- 5.2 3 142 46258 211
Test (2D): install snapshots (disconnect) ...
... Passed -- 85.4 3 2374 1014944 351
Test (2D): install snapshots (disconnect+unreliable) ...
... Passed -- 107.8 3 2978 1237609 359
Test (2D): install snapshots (crash) ...
... Passed -- 72.5 3 1800 689554 324
Test (2D): install snapshots (unreliable+crash) ...
... Passed -- 88.4 3 2216 938538 363
Test (2D): crash and restart all servers ...
... Passed -- 9.3 3 250 66712 56
PASS
ok 6.824/raft 553.424s