首页 > 其他分享 >Raft分布式一致性研究

Raft分布式一致性研究

时间:2023-02-18 15:35:35浏览次数:51  
标签:... log args rf reply 一致性 Raft 分布式

Raft分布式一致性研究

年前有点时间,决定把Raft分布式一致性协议实现一下,加深理解和认识,发现这件事真的是“纸上得来终觉浅,须知此事要躬行”。照着协议上的规则来写,就短短的几条规则,代码可以很快的完成,但是要想正确运行,通过所有的测试案例,那真是要磨掉一层皮,刚刚过年没回老家,在上海有时间调试了几天,终于是跑通所有的测试用例。同时也是第一次使用go语言,然后,发现经过简单的磨合之后,除了变量和函数的定义语法有点不同,其他很多和c语言差不多,很快就编写自如了,还挺喜欢这个语言的,如果说三个最先想到的特点:1. 简洁,2. 多线程的使用非常方便,3. -race竞态条件检查功能简直帅呆了,检查多线程的安全问题太方便了。最早的时候,没有启用这个选项,不知道这个用途,后面看课程说明里面说本地调试阶段要加上这个参数,就试了一下,发现一堆的竞态条件冲突,真是惊出一身的冷汗。要是没有这个检查功能,要是用的是Java,那多危险啊。自认为对于多线程的处理还是很有经验的,还是会犯很多的错误,所以,-race从语言层面进行了支持,无疑是非常优秀的,用上这个参数之后对于多线程的处理更有信心了。

6.824课程

项目是mit大学的6.824课程: http://nil.csail.mit.edu/6.824/2022/labs/lab-raft.html
把代码仓库克隆到本地,使用自己熟悉的IDE打开项目,我是直接在Windows的WSL里面安装的go,使用linux的命令行编译和执行,不太熟悉gdb,也没有使用debug,主要是通过日志来定位问题,多看看日志之后,就熟悉了协议的交互过程。

参考文档

这个是官方的Raft论文,简直是字字珠玑,Figure2一页基本上就把整个协议的精髓描述出来了,值得学习:
https://raft.github.io/raft.pdf
另外,就是官方https://raft.github.io/ 首页的这个动画可以多看看,大概就知道选举的过程了,timeout在动画中非常直观。
然后,就是http://thesecretlivesofdata.com/raft/ 这个交互式的动画展示,把Raft的选举和日志复制的过程讲得非常清晰,我是反复看了这个动画很多遍,每一个状态的变化都有深意,多看几次就能明白了。

我的实现

package raft

//
// this is an outline of the API that raft must expose to
// the service (or tester). see comments below for
// each of these functions for more details.
//
// rf = Make(...)
//   create a new Raft server.
// rf.Start(command interface{}) (index, term, isleader)
//   start agreement on a new log entry
// rf.GetState() (term, isLeader)
//   ask a Raft for its current term, and whether it thinks it is leader
// ApplyMsg
//   each time a new entry is committed to the log, each Raft peer
//   should send an ApplyMsg to the service (or tester)
//   in the same server.
//

import (
    "6.824/labgob"
    "bytes"
    "fmt"
    "math/rand"
    "sort"

    //  "bytes"
    "sync"
    "sync/atomic"
    "time"

    //  "6.824/labgob"
    "6.824/labrpc"
)

const (
    FOLLOWER  = 0
    CANDIDATE = 1
    LEADER    = 2

    ELECT_TIMEOUT_MIN  = 150 * time.Millisecond
    ELECT_TIMEOUT_MAX  = 300 * time.Millisecond
    HEARTBEAT_INTERVAL = 70 * time.Millisecond
)

//
// as each Raft peer becomes aware that successive log entries are
// committed, the peer should send an ApplyMsg to the service (or
// tester) on the same server, via the applyCh passed to Make(). set
// CommandValid to true to indicate that the ApplyMsg contains a newly
// committed log entry.
//
// in part 2D you'll want to send other kinds of messages (e.g.,
// snapshots) on the applyCh, but set CommandValid to false for these
// other uses.
//
type ApplyMsg struct {
    CommandValid bool
    Command      interface{}
    CommandIndex int

    // For 2D:
    SnapshotValid bool
    Snapshot      []byte
    SnapshotTerm  int
    SnapshotIndex int
}

type Entry struct {
    //Index int
    Term    int
    Command interface{}
}

//
// A Go object implementing a single Raft peer.
//
type Raft struct {
    mu        sync.Mutex          // Lock to protect shared access to this peer's state
    peers     []*labrpc.ClientEnd // RPC end points of all peers
    persister *Persister          // Object to hold this peer's persisted state
    me        int                 // this peer's index into peers[]
    dead      int32               // set by Kill()

    // Your data here (2A, 2B, 2C).
    // Look at the paper's Figure 2 for a description of what
    // state a Raft server must maintain.
    leaderId     int
    role         int
    votedCount   int
    baseLogIndex int

    receivedHeartbeat bool
    grantedVote       bool
    chanApply         chan ApplyMsg

    currentTerm int
    votedFor    int
    log         []Entry

    commitIndex int
    lastApplied int

    nextIndex  []int
    matchIndex []int
}

// return currentTerm and whether this server
// believes it is the leader.
func (rf *Raft) GetState() (int, bool) {
    rf.mu.Lock()
    defer rf.mu.Unlock()
    return rf.currentTerm, rf.role == LEADER
}

//
// save Raft's persistent state to stable storage,
// where it can later be retrieved after a crash and restart.
// see paper's Figure 2 for a description of what should be persistent.
//
func (rf *Raft) persist() {
    // Your code here (2C).
    // Example:
    raftState := rf.encodeRaftState()
    rf.persister.SaveRaftState(raftState)
}

func (rf *Raft) encodeRaftState() []byte {
    w := new(bytes.Buffer)
    e := labgob.NewEncoder(w)
    _ = e.Encode(rf.currentTerm)
    _ = e.Encode(rf.votedFor)
    _ = e.Encode(rf.baseLogIndex)
    _ = e.Encode(rf.log)
    data := w.Bytes()
    return data
}

func (rf *Raft) persistWithSnapshot(snapshot []byte) {
    // Your code here (2C).
    // Example:
    raftState := rf.encodeRaftState()
    rf.persister.SaveStateAndSnapshot(raftState, snapshot)
}

//
// restore previously persisted state.
//
func (rf *Raft) readPersist(data []byte) {
    if data == nil || len(data) < 1 { // bootstrap without any state?
        return
    }
    // Your code here (2C).
    // Example:
    r := bytes.NewBuffer(data)
    d := labgob.NewDecoder(r)
    var currentTerm int
    var votedFor int
    var baseLogIndex int
    var log []Entry
    _ = d.Decode(&currentTerm)
    _ = d.Decode(&votedFor)
    _ = d.Decode(&baseLogIndex)
    _ = d.Decode(&log)
    rf.currentTerm = currentTerm
    rf.votedFor = votedFor
    rf.baseLogIndex = baseLogIndex
    rf.log = log
    rf.lastApplied = rf.baseLogIndex
}

//
// A service wants to switch to snapshot.  Only do so if Raft hasn't
// have more recent info since it communicate the snapshot on applyCh.
//
func (rf *Raft) CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) bool {

    // Your code here (2D).

    return true
}

// the service says it has created a snapshot that has
// all info up to and including index. this means the
// service no longer needs the log through (and including)
// that index. Raft should now trim its log as much as possible.
func (rf *Raft) Snapshot(index int, snapshot []byte) {
    // Your code here (2D).
    // need another thread to use Lock, otherwise deadlock with applyCommitted.
    go func() {
        rf.mu.Lock()
        defer rf.mu.Unlock()

        rf.debugPrint("Start snapshot index: %d, lastApplied: %d, baseLogIndex: %d",
            index, rf.lastApplied, rf.baseLogIndex)
        if index > rf.lastApplied || index <= rf.baseLogIndex {
            return
        }
        localLogIndex := index - rf.baseLogIndex
        rf.log = rf.log[localLogIndex:]
        rf.baseLogIndex = index
        rf.persistWithSnapshot(snapshot)
        rf.debugPrint("after snapshot, baseLogIndex: %d", rf.baseLogIndex)
    }()
}

//
// example RequestVote RPC arguments structure.
// field names must start with capital letters!
//
type RequestVoteArgs struct {
    // Your data here (2A, 2B).
    CandidateId  int
    Term         int
    LastLogIndex int
    LastLogTerm  int
}

//
// example RequestVote RPC reply structure.
// field names must start with capital letters!
//
type RequestVoteReply struct {
    // Your data here (2A).
    Success bool
    Term    int
}
type AppendEntriesArgs struct {
    LeaderId     int
    Term         int
    LeaderCommit int
    PrevLogIndex int
    PrevLogTerm  int
    Entries      []Entry
}
type AppendEntriesReply struct {
    Success   bool
    Term      int
    NextIndex int
}
type InstallSnapshotArgs struct {
    LeaderId          int
    Term              int
    LastIncludedIndex int
    LastIncludedTerm  int
    Data              []byte
    // Done bool
    // offset int
}
type InstallSnapshotReply struct {
    Term int
}

//
// example RequestVote RPC handler.
//
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
    // Your code here (2A, 2B).
    rf.mu.Lock()
    defer rf.mu.Unlock()
    rf.debugPrint("Receive vote: %v", args)
    if args.Term < rf.currentTerm {
        reply.Success = false
        reply.Term = rf.currentTerm
        rf.debugPrint("Reject vote, because args term is old: %v", args)
        return
    }

    if args.Term > rf.currentTerm {
        rf.currentTerm = args.Term
        rf.votedFor = -1
        rf.votedCount = 0
        rf.leaderId = -1
        rf.becomeFollower()
        rf.persist()
    }

    reply.Term = args.Term
    if rf.votedFor != -1 && rf.votedFor != args.CandidateId {
        reply.Success = false
        rf.debugPrint("Sorry, can't vote: %d, already voted for others.", args.CandidateId)
    } else if rf.logIsNewer(args.LastLogTerm, args.LastLogIndex) {
        reply.Success = false
        rf.debugPrint("Sorry, can't vote: %d, my log is newer.", args.CandidateId)
    } else {
        rf.grantedVote = true
        rf.votedFor = args.CandidateId
        rf.becomeFollower()
        rf.persist()
        reply.Success = true
        rf.debugPrint("Granted vote to %d.", args.CandidateId)
    }
}
func (rf *Raft) handleRequestVoteReply(reply RequestVoteReply) {
    rf.mu.Lock()
    defer rf.mu.Unlock()
    if reply.Success {
        if reply.Term == rf.currentTerm {
            rf.votedCount++
            if rf.votedCount > len(rf.peers)/2 && rf.role == CANDIDATE {
                rf.becomeLeader()
            }
        }
    } else {
        if reply.Term > rf.currentTerm {
            rf.currentTerm = reply.Term
            rf.votedFor = -1
            rf.votedCount = 0
            rf.leaderId = -1
            rf.becomeFollower()
            rf.persist()
        }
    }
}
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
    // Your code here (2A, 2B).
    rf.mu.Lock()
    defer rf.mu.Unlock()

    rf.debugPrint("Receive heartbeat: %v", args)
    if args.Term < rf.currentTerm {
        reply.Success = false
        reply.Term = rf.currentTerm
        reply.NextIndex = args.PrevLogIndex + 1
        rf.debugPrint("Old leader: %d, not your term, now.", args.LeaderId)
        return
    }

    rf.receivedHeartbeat = true
    rf.leaderId = args.LeaderId
    rf.becomeFollower()
    if args.Term > rf.currentTerm {
        rf.currentTerm = args.Term
        rf.votedFor = -1
        rf.persist()
    }
    myNextLogIndex := rf.genNextLogIndex()
    myLastLogIndex := myNextLogIndex - 1
    if myLastLogIndex < args.PrevLogIndex {
        reply.Success = false
        reply.Term = args.Term
        reply.NextIndex = myNextLogIndex
        rf.debugPrint("My log is too short, need more entries.")
        return
    }

    localLogIndex := args.PrevLogIndex - rf.baseLogIndex
    sameIndexLogEntry := rf.log[localLogIndex]
    if args.PrevLogTerm == sameIndexLogEntry.Term {
        if len(args.Entries) > 0 {
            if len(rf.log) > localLogIndex+1 {
                rf.log = rf.log[0 : localLogIndex+1]
            }
            rf.log = append(rf.log, copyEntryArr(args.Entries)...)
            rf.persist()
        }
        if args.LeaderCommit > rf.commitIndex {
            rf.commitIndex = min(args.LeaderCommit, rf.genNextLogIndex()-1)
            rf.applyCommitted()
        }
        reply.Success = true
        reply.Term = args.Term
        reply.NextIndex = rf.genNextLogIndex()
        if len(args.Entries) > 0 {
            rf.debugPrint("Append log from %d success.", args.LeaderId)
        } else {
            rf.debugPrint("No append entries, reply heartbeat to %d success.", args.LeaderId)
        }
    } else {
        // improve performance, skip all items with the same conflict term
        for localLogIndex > 0 && rf.log[localLogIndex-1].Term == sameIndexLogEntry.Term {
            localLogIndex--
        }
        reply.Success = false
        reply.Term = args.Term
        // Binary search to improve performance, it depends on the network, in normal situation, we don't need
        // binary search, because partition does not occur frequently. For test, like Figure8Unreliable, it works.
        reply.NextIndex = rf.baseLogIndex + (localLogIndex+1)/2
        rf.debugPrint("Can't append the entries, the prev term conflict, next index change to: %d.",
            reply.NextIndex)
    }
}
func (rf *Raft) handleAppendEntriesReply(serverId int, reply AppendEntriesReply) {
    rf.mu.Lock()
    defer rf.mu.Unlock()

    rf.debugPrint("Handle heartbeat reply from %d: %v", serverId, reply)
    rf.nextIndex[serverId] = reply.NextIndex
    if reply.Success {
        if reply.Term == rf.currentTerm && rf.role == LEADER {
            rf.matchIndex[serverId] = reply.NextIndex - 1
            rf.commitIndex = max(rf.commitIndex, rf.calculateCommitIndex())
            rf.applyCommitted()
        }
    } else {
        if reply.Term > rf.currentTerm {
            rf.currentTerm = reply.Term
            rf.votedFor = -1
            rf.votedCount = 0
            rf.leaderId = -1
            rf.becomeFollower()
            rf.persist()
        }
    }
}
func (rf *Raft) InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {
    // Your code here (2A, 2B).
    rf.mu.Lock()
    defer rf.mu.Unlock()
    rf.debugPrint("Receive install snapshot: %v", args)
    if args.Term < rf.currentTerm {
        reply.Term = rf.currentTerm
        rf.debugPrint("Reject install snapshot, because args term is old: %v", args)
        return
    }

    if args.Term > rf.currentTerm {
        rf.currentTerm = args.Term
        rf.votedFor = -1
        rf.votedCount = 0
        rf.becomeFollower()
        rf.persist()
    }

    rf.leaderId = args.LeaderId
    reply.Term = args.Term
    if args.LastIncludedIndex <= rf.baseLogIndex {
        rf.debugPrint("Sorry, can't install snapshot from: %d, snapshot not newer, index: %d, my base index: %d",
            args.LeaderId, args.LastIncludedIndex, rf.baseLogIndex)
    } else {
        if args.LastIncludedIndex > rf.genNextLogIndex()-1 {
            rf.log = make([]Entry, 0)
            rf.log = append(rf.log, Entry{Term: args.LastIncludedTerm, Command: nil})
        } else {
            rf.log = rf.log[args.LastIncludedIndex-rf.baseLogIndex:]
        }
        rf.baseLogIndex = args.LastIncludedIndex
        rf.lastApplied = rf.baseLogIndex
        rf.persistWithSnapshot(args.Data)
        rf.debugPrint("Install snapshot from %d, last index: %d", args.LeaderId, args.LastIncludedIndex)

        applyMsg := ApplyMsg{
            CommandValid:  false,
            SnapshotValid: true,
            SnapshotIndex: args.LastIncludedIndex,
            SnapshotTerm:  args.LastIncludedTerm,
            Snapshot:      args.Data,
        }
        rf.chanApply <- applyMsg
        rf.debugPrint("Apply msg for snapshot from: %d, last index: %d", args.LeaderId, applyMsg.SnapshotIndex)
    }
}
func (rf *Raft) handleInstallSnapshotReply(reply InstallSnapshotReply) {
    rf.mu.Lock()
    defer rf.mu.Unlock()
    if reply.Term > rf.currentTerm {
        rf.currentTerm = reply.Term
        rf.votedFor = -1
        rf.votedCount = 0
        rf.leaderId = -1
        rf.becomeFollower()
        rf.persist()
    }
}
func (rf *Raft) becomeFollower() {
    if rf.role != FOLLOWER {
        rf.role = FOLLOWER
        rf.debugPrint("Become follower")
    }
}
func (rf *Raft) becomeCandidate() {
    rf.mu.Lock()
    defer rf.mu.Unlock()
    rf.role = CANDIDATE
    rf.leaderId = -1
    rf.votedFor = rf.me
    rf.votedCount = 1
    rf.currentTerm++
    rf.persist()
    rf.debugPrint("Become candidate")

    for serverId := range rf.peers {
        rf.debugPrint("Send vote to: %d", serverId)
        if serverId == rf.me {
            rf.grantedVote = true
        } else {
            args := RequestVoteArgs{
                Term:         rf.currentTerm,
                CandidateId:  rf.me,
                LastLogTerm:  rf.log[len(rf.log)-1].Term,
                LastLogIndex: rf.genNextLogIndex() - 1,
            }
            reply := RequestVoteReply{}
            go func(serverId int) {
                if rf.sendRequestVote(serverId, &args, &reply) {
                    rf.handleRequestVoteReply(reply)
                }
            }(serverId)
        }
    }
}
func (rf *Raft) becomeLeader() {
    rf.role = LEADER
    rf.leaderId = rf.me
    nextIndex := rf.genNextLogIndex()
    for serverId := range rf.peers {
        rf.nextIndex[serverId] = nextIndex
        rf.matchIndex[serverId] = 0
    }
    rf.debugPrint("Become leader")

    go rf.heartbeatLoop()
}
func (rf *Raft) heartbeatLoop() {
    for rf.killed() == false {
        rf.mu.Lock()
        if rf.role != LEADER {
            rf.mu.Unlock()
            return
        }
        for serverId := range rf.peers {
            if serverId == rf.me {
                rf.debugPrint("Send heartbeat to: %d, myself", serverId)
                rf.receivedHeartbeat = true
                nextIndex := rf.genNextLogIndex()
                rf.nextIndex[serverId] = nextIndex
                rf.matchIndex[serverId] = nextIndex - 1
            } else {
                // figure8 can't replicate log, if last log term != currentTerm
                var nextIndex int
                lastLogTerm := rf.log[len(rf.log)-1].Term
                if lastLogTerm != rf.currentTerm {
                    nextIndex = rf.genNextLogIndex()
                    rf.debugPrint("Send heartbeat to: %d, my term gets no new commands yet.", serverId)
                } else {
                    nextIndex = min(rf.genNextLogIndex(), rf.nextIndex[serverId])
                    if nextIndex > rf.baseLogIndex {
                        rf.debugPrint("Send heartbeat to: %d", serverId)
                    } else {
                        args := InstallSnapshotArgs{
                            Term:              rf.currentTerm,
                            LeaderId:          rf.me,
                            LastIncludedIndex: rf.baseLogIndex,
                            LastIncludedTerm:  rf.log[0].Term,
                            Data:              rf.persister.ReadSnapshot(),
                        }
                        reply := InstallSnapshotReply{}
                        go func(serverId int) {
                            if rf.sendInstallSnapshot(serverId, &args, &reply) {
                                rf.handleInstallSnapshotReply(reply)
                            }
                        }(serverId)
                        rf.debugPrint("Need read log back from snapshot.")
                        continue
                    }
                }
                args := AppendEntriesArgs{
                    Term:         rf.currentTerm,
                    LeaderId:     rf.me,
                    LeaderCommit: rf.commitIndex,
                    PrevLogIndex: nextIndex - 1,
                    PrevLogTerm:  rf.log[nextIndex-rf.baseLogIndex-1].Term,
                    Entries:      rf.log[nextIndex-rf.baseLogIndex:],
                }
                reply := AppendEntriesReply{}
                go func(serverId int) {
                    if rf.sendAppendEntries(serverId, &args, &reply) {
                        rf.handleAppendEntriesReply(serverId, reply)
                    }
                }(serverId)
            }
        }
        rf.mu.Unlock()
        time.Sleep(HEARTBEAT_INTERVAL)
    }
}

//
// example code to send a RequestVote RPC to a server.
// server is the index of the target server in rf.peers[].
// expects RPC arguments in args.
// fills in *reply with RPC reply, so caller should
// pass &reply.
// the types of the args and reply passed to Call() must be
// the same as the types of the arguments declared in the
// handler function (including whether they are pointers).
//
// The labrpc package simulates a lossy network, in which servers
// may be unreachable, and in which requests and replies may be lost.
// Call() sends a request and waits for a reply. If a reply arrives
// within a timeout interval, Call() returns true; otherwise
// Call() returns false. Thus Call() may not return for a while.
// A false return can be caused by a dead server, a live server that
// can't be reached, a lost request, or a lost reply.
//
// Call() is guaranteed to return (perhaps after a delay) *except* if the
// handler function on the server side does not return.  Thus there
// is no need to implement your own timeouts around Call().
//
// look at the comments in ../labrpc/labrpc.go for more details.
//
// if you're having trouble getting RPC to work, check that you've
// capitalized all field names in structs passed over RPC, and
// that the caller passes the address of the reply struct with &, not
// the struct itself.
//
func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {
    ok := rf.peers[server].Call("Raft.RequestVote", args, reply)
    return ok
}

func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool {
    ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
    return ok
}
func (rf *Raft) sendInstallSnapshot(server int, args *InstallSnapshotArgs, reply *InstallSnapshotReply) bool {
    ok := rf.peers[server].Call("Raft.InstallSnapshot", args, reply)
    return ok
}

//
// the service using Raft (e.g. a k/v server) wants to start
// agreement on the next command to be appended to Raft's log. if this
// server isn't the leader, returns false. otherwise start the
// agreement and return immediately. there is no guarantee that this
// command will ever be committed to the Raft log, since the leader
// may fail or lose an election. even if the Raft instance has been killed,
// this function should return gracefully.
//
// the first return value is the index that the command will appear at
// if it's ever committed. the second return value is the current
// term. the third return value is true if this server believes it is
// the leader.
//
func (rf *Raft) Start(command interface{}) (int, int, bool) {
    index := -1
    term := -1
    isLeader := false

    // Your code here (2B).
    rf.mu.Lock()
    defer rf.mu.Unlock()
    isLeader = rf.role == LEADER
    if isLeader {
        index = rf.genNextLogIndex()
        term = rf.currentTerm
        entry := Entry{Term: rf.currentTerm, Command: command}
        rf.log = append(rf.log, entry)
        rf.persist()
        rf.debugPrint("Start new command: %v", command)
    }
    return index, term, isLeader
}

//
// the tester doesn't halt goroutines created by Raft after each test,
// but it does call the Kill() method. your code can use killed() to
// check whether Kill() has been called. the use of atomic avoids the
// need for a lock.
//
// the issue is that long-running goroutines use memory and may chew
// up CPU time, perhaps causing later tests to fail and generating
// confusing debug output. any goroutine with a long-running loop
// should call killed() to check whether it should stop.
//
func (rf *Raft) Kill() {
    atomic.StoreInt32(&rf.dead, 1)
    // Your code here, if desired.
}

func (rf *Raft) killed() bool {
    z := atomic.LoadInt32(&rf.dead)
    return z == 1
}

// The ticker go routine starts a new election if this peer hasn't received
// heartsbeats recently.
func (rf *Raft) ticker() {
    for rf.killed() == false {

        // Your code here to check if a leader election should
        // be started and to randomize sleeping time using
        // time.Sleep().
        time.Sleep(rf.getRandElectTimeout())
        rf.mu.Lock()
        if rf.receivedHeartbeat {
            rf.receivedHeartbeat = false
            //rf.debugPrint("received heartbeat, leader is alive, don't elect.")
        } else if rf.grantedVote {
            rf.grantedVote = false
            //rf.debugPrint("granted vote, don't elect.")
        } else {
            go rf.becomeCandidate()
        }
        rf.mu.Unlock()
    }
}

func (rf *Raft) debugPrint(format string, args ...interface{}) {
    DPrintf(fmt.Sprintf("id:%d,term:%d,role:%d,l:%d,v:%d,commit:%d,apply:%d,base:%d,match:%v,next:%v,log:%v  msg: %s",
        rf.me, rf.currentTerm, rf.role, rf.leaderId, rf.votedFor, rf.commitIndex, rf.lastApplied, rf.baseLogIndex,
        rf.matchIndex, rf.nextIndex, rf.log, format), args...)
}

func (rf *Raft) genNextLogIndex() int {
    return rf.baseLogIndex + len(rf.log)
}

func (rf *Raft) getRandElectTimeout() time.Duration {
    return ELECT_TIMEOUT_MIN +
        time.Duration(rand.Intn(int((ELECT_TIMEOUT_MAX-ELECT_TIMEOUT_MIN)/time.Millisecond)))*time.Millisecond
}

func (rf *Raft) logIsNewer(candidateLogTerm int, candidateLogIndex int) bool {
    myLogTerm := rf.log[len(rf.log)-1].Term
    if myLogTerm != candidateLogTerm {
        return myLogTerm > candidateLogTerm
    } else {
        myLogIndex := rf.genNextLogIndex() - 1
        return myLogIndex > candidateLogIndex
    }
}

func (rf *Raft) applyCommitted() {
    for rf.commitIndex > rf.lastApplied {
        localLogIndex := rf.lastApplied + 1 - rf.baseLogIndex
        applyMsg := ApplyMsg{
            Command:      rf.log[localLogIndex].Command,
            CommandIndex: rf.lastApplied + 1,
            CommandValid: true,
        }
        rf.chanApply <- applyMsg
        rf.lastApplied++
        rf.debugPrint("Apply msg for command: %v", applyMsg)
    }
}

func (rf *Raft) calculateCommitIndex() int {
    beforeMid := (len(rf.peers) - 1) / 2
    matchIndexCopy := copyIntArr(rf.matchIndex)
    sort.Ints(matchIndexCopy)
    return matchIndexCopy[beforeMid]
}

func copyIntArr(arr []int) []int {
    arrCopy := make([]int, len(arr))
    copy(arrCopy, arr)
    return arrCopy
}

func copyEntryArr(arr []Entry) []Entry {
    arrCopy := make([]Entry, len(arr))
    copy(arrCopy, arr)
    return arrCopy
}

func max(a int, b int) int {
    if a > b {
        return a
    } else {
        return b
    }
}
func min(a int, b int) int {
    if a < b {
        return a
    } else {
        return b
    }
}

//
// the service or tester wants to create a Raft server. the ports
// of all the Raft servers (including this one) are in peers[]. this
// server's port is peers[me]. all the servers' peers[] arrays
// have the same order. persister is a place for this server to
// save its persistent state, and also initially holds the most
// recent saved state, if any. applyCh is a channel on which the
// tester or service expects Raft to send ApplyMsg messages.
// Make() must return quickly, so it should start goroutines
// for any long-running work.
//
func Make(peers []*labrpc.ClientEnd, me int,
    persister *Persister, applyCh chan ApplyMsg) *Raft {
    rf := &Raft{}
    rf.peers = peers
    rf.persister = persister
    rf.me = me

    // Your initialization code here (2A, 2B, 2C).
    rand.Seed(time.Now().UnixNano())
    rf.chanApply = applyCh
    rf.dead = 0

    rf.currentTerm = 0
    rf.votedFor = -1
    rf.log = make([]Entry, 0)
    rf.log = append(rf.log, Entry{Term: 0, Command: nil})

    rf.role = FOLLOWER
    rf.leaderId = -1
    rf.votedCount = 0
    rf.receivedHeartbeat = false
    rf.grantedVote = false
    rf.baseLogIndex = 0
    rf.nextIndex = make([]int, len(peers))
    rf.matchIndex = make([]int, len(peers))
    rf.commitIndex = 0
    rf.lastApplied = 0

    // initialize from state persisted before a crash
    rf.readPersist(persister.ReadRaftState())

    // start ticker goroutine to start elections
    go rf.ticker()

    return rf
}

我的执行结果

Test (2A): initial election ...
... Passed -- 3.0 3 84 21300 0
Test (2A): election after network failure ...
... Passed -- 4.6 3 156 30024 0
Test (2A): multiple elections ...
... Passed -- 6.0 7 816 156260 0
Test (2B): basic agreement ...
... Passed -- 0.6 3 16 4078 3
Test (2B): RPC byte count ...
... Passed -- 1.7 3 48 112826 11
Test (2B): agreement after follower reconnects ...
... Passed -- 5.7 3 158 39113 7
Test (2B): no agreement if too many followers disconnect ...
... Passed -- 3.4 5 232 47462 3
Test (2B): concurrent Start()s ...
... Passed -- 0.6 3 16 4094 6
Test (2B): rejoin of partitioned leader ...
... Passed -- 6.4 3 254 57971 4
Test (2B): leader backs up quickly over incorrect follower logs ...
... Passed -- 21.2 5 2264 1722320 102
Test (2B): RPC counts aren't too high ...
... Passed -- 2.2 3 58 15322 12
Test (2C): basic persistence ...
... Passed -- 3.6 3 104 24614 6
Test (2C): more persistence ...
... Passed -- 15.6 5 1152 238836 16
Test (2C): partitioned leader and one follower crash, leader restarts ...
... Passed -- 1.5 3 38 9224 4
Test (2C): Figure 8 ...
... Passed -- 27.7 5 844 159399 21
Test (2C): unreliable agreement ...
... Passed -- 4.2 5 224 72989 246
Test (2C): Figure 8 (unreliable) ...
... Passed -- 43.9 5 5120 9238145 424
Test (2C): churn ...
... Passed -- 16.5 5 904 617927 497
Test (2C): unreliable churn ...
... Passed -- 16.3 5 708 167976 157
Test (2D): snapshots basic ...
... Passed -- 5.2 3 142 46258 211
Test (2D): install snapshots (disconnect) ...
... Passed -- 85.4 3 2374 1014944 351
Test (2D): install snapshots (disconnect+unreliable) ...
... Passed -- 107.8 3 2978 1237609 359
Test (2D): install snapshots (crash) ...
... Passed -- 72.5 3 1800 689554 324
Test (2D): install snapshots (unreliable+crash) ...
... Passed -- 88.4 3 2216 938538 363
Test (2D): crash and restart all servers ...
... Passed -- 9.3 3 250 66712 56
PASS
ok 6.824/raft 553.424s

标签:...,log,args,rf,reply,一致性,Raft,分布式
From: https://www.cnblogs.com/yangwen0228/p/17132712.html

相关文章

  • 拜占庭将军问题和 Raft 共识算法讲解
    作者:京东物流郭益如导读在分布式系统中,什么是拜占庭将军问题?产生的场景和解决方案是什么?什么是Raft共识算法?Raft算法是如何解决拜占庭将军问题的?其核心原理和算法......
  • 《分布式技术原理与算法解析》学习笔记Day14
    分布式计算模式:Stream什么是流数据?实时性任务主要是针对流数据处理,对处理时延要求很高,通常需要常驻服务进程,等待数据的随时到来随时处理,以保证低时延。流数据有4个特征:......
  • 从 PyTorch DDP 到 Accelerate 到 Trainer,轻松掌握分布式训练
    概述本教程假定你已经对于PyToch训练一个简单模型有一定的基础理解。本教程将展示使用3种封装层级不同的方法调用DDP(DistributedDataParallel)进程,在多个GPU上......
  • 分布式云原生平台Kurator v0.2.0正式发布!一键构建分布式云原生平台
    摘要:北京时间2023年2月9日,Kurator正式发布v0.2.0版本。本文分享自华为云社区《分布式云原生平台Kuratorv0.2.0正式发布!一键构建分布式云原生平台》,作者:Kurator团队。......
  • hadoop+hive+mysql+sqoop+spark完全分布式集群搭建
    hadoop+hive+mysql+sqoop+spark完全分布式集群搭建零、配置网络(固定ip)(可以不做,但是后面关闭后ip会重复变动,后面步骤中有再次提到,后面操作在做)1.固定ip因centos 7 ip......
  • 《分布式技术原理与算法解析》学习笔记Day13
    分布式计算模式:MapReduce什么是分治法?分治法是将一个复杂、难以直接解决的大问题,分割成一些规模小、可以比较简单或者直接求解的子问题,这些子问题之间相互独立且与原问题......
  • 《分布式技术原理与算法解析》学习笔记Day12
    调度框架:共享状态调度什么是共享状态调度?共享状态调度是为了解决单体调度和两层调度遇到的问题而创建出来的新的调度框架。它通过将单体调度器分解为多个调度器,每个调度......
  • 分布式AI集群服务器架构
    ChatGPT的推出,人工智能正式进入大模型时代。要训练一个chatgpt这样的大模型,需要分布式AI集群的支持。深度学习迎来大模型OpenAI的GPT-3模型,使用512张V100,需要训练7个......
  • 华为云发布分布式编译构建系统CodeArts Build
    摘要:2月14日,华为云发布分布式编译构建系统CodeArtsBuild,旨在支撑企业实现高效的软件开发,缩短产品上市周期,帮助企业的软件产品快速形成关键竞争力。本文分享自华为云社区......
  • 架构 分布式和微服务区别
    目录架构微服务架构和分布式架构的区别含义不同概念层面不同解决问题不同部署方式不同耦合度不同拓展阅读架构微服务架构和分布式架构的区别含义不同微服务架构:微服务......