首页 > 其他分享 >分布式系统实验二报告

分布式系统实验二报告

时间:2024-01-14 10:56:57浏览次数:31  
标签:Term log 报告 int args rf 实验 分布式系统 reply

常量定义

根据论文中的内容,Raft中server节点共有三种状态,分别是 FollowerCandidateLeader,因此代码中将三种状态定义如下:

const (
	FOLLOWER = iota
	CANDIDATE
	LEADER
)

此外,根据大量测试所得经验,我分别定义以下时钟常量:选举超时间隔200-500ms、心跳间隔50ms。

const (
	HeartBeatTimeout   = 50
	ElectionTimeoutMin = 200
	ElectionTimeoutMax = 500
)

数据结构

日志信息

首先,根据论文中的要求,定义了 LogEntry 结构体表示单个日志信息:

type LogEntry struct {
	Index   int
	Term    int
	Command interface{}
}

Raft结构体

然后,根据论文中Figure 2,我在代码框架的基础上,定义了如下 Raft 结构体:

// Persistent state on all servers
currentTerm int        // implementing a single Raft peer.
votedFor    int        // candidateId that received vote in current term
log         []LogEntry // first index is 1

// Volatile state on all servers
commitIndex int // index of highest log entry known to be committed
lastApplied int // index of highest log entry applied to state machine

// Volatile state on leaders:
nextIndex  []int // for each server, index of the next log entry to send to that server
matchIndex []int // for each server, index of highest log entry known to be replicated on server

除了上述Figure 2中给定的内容外,我又在 Raft 结构体中定义了以下内容辅助实现实验功能:

role         int
timer        *time.Timer
voteCount    int
successCount int
applyCh      chan ApplyMsg

其中,role 表示当前该节点的状态,timer 用于选举和心跳计时,voteCount 表示候选者当前轮获得的投票数, successCount 表示领导者某次发送心跳获得的成功回应数。

RequestVote相关定义

根据Figure 2,定义如下结构体分别表示投票请求与投票回复:

type RequestVoteArgs struct {
	Term         int // candidate’s term
	CandidateId  int // candidate's id
	LastLogIndex int // index of candidate’s last log entry
	LastLogTerm  int // term of candidate’s last log entry
}

type RequestVoteReply struct {
	Term        int  // currentTerm, for candidate to update itself
	VoteGranted bool // true means candidate received vote
}

AppendEntries相关定义

同样根据Figure 2,定义了以下两个结构体。但为了能使 leader 中日志更快地回退,在 AppendEntriesReply 中添加了 RevisedIndex,用于告诉 leader 下次发送的日志该回退到哪个下标。

type AppendEntriesArgs struct {
	Term         int        // leader’s term
	LeaderId     int        // leader's id
	PrevLogIndex int        // index of log entry immediately preceding new ones
	PrevLogTerm  int        // term of prevLogIndex entry
	Entries      []LogEntry // log entries to store (empty for heartbeat;
	LeaderCommit int        // leader’s commitIndex
}

type AppendEntriesReply struct {
	Term         int  // currentTerm, for leader to update itself
	Success      bool // true if follower contained entry matching prevLogIndex and prevLogTerm
	RevisedIndex int  // nextindex to send after revising
}

Raft协议实现

Raft节点初始化

Make 函数中,我初始化了 Raft 中的成员变量,其中论文中要求 log 的下标是从1开始,因此我在初始化 log 时在开头添加了一个空 LogEntry

func Make(peers []*labrpc.ClientEnd, me int,
	persister *Persister, applyCh chan ApplyMsg) *Raft {
	rf := &Raft{}
	rf.peers = peers
	rf.persister = persister
	rf.me = me

	// Your initialization code here.
	rf.currentTerm = 0
	rf.votedFor = -1
	rf.log = make([]LogEntry, 1)

	var n = len(peers)
	rf.nextIndex = make([]int, n)
	rf.matchIndex = make([]int, n)

	rf.role = FOLLOWER
	rf.timer = time.NewTimer(randtime())
	rf.applyCh = applyCh

	// initialize from state persisted before a crash
	rf.readPersist(persister.ReadRaftState())

	go rf.mainLoop()
	go rf.applier()

	return rf
}

Make 中我加入了 mainLoopapplier 两个协程,其中 mainLoop 实现了节点的选举、发送心跳日志等一系列功能,applier 实现了定时将该节点已经 commit 的日志运用于状态机。

mainLoop实现

mainLoop 函数实现如下,外层一个大循环。每当计时器到时,若状态为 Follwer,则进入 Candidate 状态,并发起选举;若状态为 Leader,则发送心跳给其他节点。

func (rf *Raft) mainLoop() {
	for {
		<-rf.timer.C

		if rf.role == FOLLOWER {
			rf.mu.Lock()
			rf.role = CANDIDATE
			rf.timer.Reset(0)
			rf.mu.Unlock()

		} else if rf.role == CANDIDATE {
			rf.Election()

		} else { // LEADER
			rf.heartBeat()
		}
	}
}

Candidate发起选举

候选者将当前任期加一,随机重置计时器,投票给自己,并发送投票请求给其他所有节点。经过一段时间等待,若当前节点状态仍为 Candidate且获取投票数超过全部节点数的一半,则该节点进入 Leader 状态。代码实现如下:

func (rf *Raft) Election() {
	n := len(rf.peers)
	rf.mu.Lock()

	rf.currentTerm++
	rf.voteCount = 1
	rf.votedFor = rf.me
	rf.persist()
	rf.timer.Reset(randtime())
	m := len(rf.log)

	voteFlag := make(chan bool, n-1)

	args := RequestVoteArgs{rf.currentTerm, rf.me, rf.log[m-1].Index, rf.log[m-1].Term}

	for i := range rf.peers {
		if i == rf.me {
			continue
		}

		go rf.voteResult(i, args, voteFlag)
	}
	rf.mu.Unlock()

	rf.wait(voteFlag)

	rf.mu.Lock()
	if rf.role == CANDIDATE && rf.voteCount > n/2 {
		rf.role = LEADER
		for i := range rf.peers {
			rf.nextIndex[i] = m
		}
		rf.matchIndex[rf.me] = m - 1
		rf.timer.Reset(0)
		rf.persist()
	}
	rf.mu.Unlock()
}

其中,对于其他每个节点的投票,我使用 voteResult 函数获取其结果。若获取回复的任期大于自身的任期,则转为 Follower 状态并随机重置计时器;若获取的投票结果为 true,则将获取投票计数加一。代码实现如下:

func (rf *Raft) voteResult(id int, args RequestVoteArgs, voteFlag chan bool) {
	reply := &RequestVoteReply{}
	ok := rf.sendRequestVote(id, args, reply)
	rf.mu.Lock()
	if rf.role != CANDIDATE {
		rf.mu.Unlock()
		return
	}

	if ok {
		if reply.Term > rf.currentTerm {
			rf.currentTerm = reply.Term
			rf.votedFor = -1
			rf.role = FOLLOWER
			rf.voteCount = 0
			rf.persist()
			rf.timer.Reset(randtime())
		}
		if reply.VoteGranted {
			rf.voteCount++
		}
		voteFlag <- reply.VoteGranted
	}
	rf.mu.Unlock()
}

接收者回复选举

对于接收到的选举请求,若请求的任期小于自身任期,则拒绝请求;若请求任期大于自身任期,无论是否投赞成票,都更新自身任期、转为 Follower 状态并将本轮的投赞成票对象清空。若当前轮还没有投赞成票对象或本轮之前投赞成票的对象和这次相同,且请求的 LastLogTermLastLogIndex 皆不小于自身,则为其投赞成票并随机重置计时器,否则投反对票。

代码实现如下:

func (rf *Raft) RequestVote(args RequestVoteArgs, reply *RequestVoteReply) {
	// Your code here.
	rf.mu.Lock()
	defer rf.mu.Unlock()

	reply.Term = rf.currentTerm

	if args.Term < rf.currentTerm {
		reply.VoteGranted = false
		return
	}
	if args.Term > rf.currentTerm {
		rf.role = FOLLOWER
		rf.currentTerm = args.Term
		rf.votedFor = -1
	}

	if rf.votedFor != -1 && rf.votedFor != args.CandidateId {
		reply.VoteGranted = false
	} else {
		if args.LastLogTerm > rf.log[len(rf.log)-1].Term ||
			args.LastLogTerm == rf.log[len(rf.log)-1].Term &&
				args.LastLogIndex >= rf.log[len(rf.log)-1].Index {
			reply.VoteGranted = true
			rf.votedFor = args.CandidateId
			rf.persist()
			rf.timer.Reset(randtime())
		} else {
			reply.VoteGranted = false
		}
	}
}

Leader发送心跳

每当心跳计时超时,Leader 重置心跳计时器,并将成功回应数 successCount 设为1(表示自己成功回应自己心跳),接下来将心跳发送给其他所有节点,根据 nextIndex 数组,分别确定发送给每个节点心跳中的日志内容。

经过一段时间等待,若当前节点状态仍为 Leader 且获得的成功回应数大于总节点数的一半,似乎即可更新 commitIndex 为该节点最后一个日志的下标 m-1(因为超过一半节点的 matchIndex[i] >= m-1)。但为了解决论文中Figure 8的情况,这里额外增加了 rf.currentTerm == rf.log[m-1].Term 的限制,即只会提交当前任期下产生的日志。

func (rf *Raft) heartBeat() {
	n := len(rf.peers)

	rf.mu.Lock()
	rf.timer.Reset(time.Duration(HeartBeatTimeout) * time.Millisecond)
	m := len(rf.log)
	rf.successCount = 1

	flag := make(chan bool, n-1)

	for i := range rf.peers {
		if i == rf.me {
			continue
		}
		args := AppendEntriesArgs{rf.currentTerm, rf.me, rf.log[rf.nextIndex[i]-1].Index,
			rf.log[rf.nextIndex[i]-1].Term, nil, rf.commitIndex}
		if rf.nextIndex[i] < m {
			args.Entries = append([]LogEntry{}, rf.log[rf.nextIndex[i]:]...)
		}

		go rf.entriesResult(i, args, flag)
	}
	rf.mu.Unlock()

	rf.wait(flag)

	rf.mu.Lock()
	if rf.role == LEADER && rf.successCount > n/2 {
		if m-1 > rf.commitIndex && rf.currentTerm == rf.log[m-1].Term {
			rf.commitIndex = m - 1
		}
	}
	rf.mu.Unlock()
}

其中,对于其他每个节点的心跳回复,我使用 entriesResult 函数获取其结果。若获取心跳回复的任期大于自身的任期,则转为 Follower 状态并随机重置计时器。若获取心跳回复结果为 true,则将 successCount 加一并更新回复节点的 nextIndex[id]matchIndex[id] 的值;否则则根据心跳回复的信息 reply.RevisedIndex,回退到下次发送给该节点的日志起始下标。

func (rf *Raft) entriesResult(id int, args AppendEntriesArgs, flag chan bool) {
	reply := &AppendEntriesReply{}
	ok := rf.sendAppendEntries(id, args, reply)
	rf.mu.Lock()

	if rf.role != LEADER {
		rf.mu.Unlock()
		return
	}

	if ok {
		if reply.Term > rf.currentTerm {
			rf.currentTerm = reply.Term
			rf.votedFor = -1
			rf.role = FOLLOWER
			rf.successCount = 0
			rf.timer.Reset(randtime())
			rf.persist()
		}

		if rf.role == LEADER {
			if reply.Success {
				rf.nextIndex[id] = args.PrevLogIndex + len(args.Entries) + 1
				rf.matchIndex[id] = rf.nextIndex[id] - 1
				rf.successCount++
			} else {
				if reply.RevisedIndex > 0 {
					rf.nextIndex[id] = reply.RevisedIndex
				} else {
					rf.nextIndex[id] = args.PrevLogIndex
					for rf.nextIndex[id] > 0 && rf.log[rf.nextIndex[id]].Term >= args.PrevLogTerm {
						rf.nextIndex[id]--
					}
					rf.nextIndex[id]++
				}
			}
			flag <- reply.Success
		}
	}
	rf.mu.Unlock()
}

接收者回复心跳

对于接收到的心跳,若请求的任期小于自身任期,则回复拒绝并直接返回。若心跳任期大于自身任期,无论是否回复赞成,都更新自身任期、转为 Follower 状态。若本节点不包含下标为 prevLogIndex 的日志或下标为 prevLogIndex 的日志的任期不等于 prevLogTerm,则回复拒绝与需要到的回退到的日志下标,并随机重置计时器;否则更新日志与 commitIndex,并随机重置计时器。

func (rf *Raft) AppendEntries(args AppendEntriesArgs, reply *AppendEntriesReply) {
	rf.mu.Lock()
	defer rf.mu.Unlock()
	reply.Term = rf.currentTerm

	if args.Term < rf.currentTerm {
		reply.Success = false
		return
	}

	if args.Term > rf.currentTerm {
		rf.currentTerm = args.Term
		rf.votedFor = -1
		rf.role = FOLLOWER
		rf.persist()
	}

	if args.PrevLogIndex >= len(rf.log) || args.PrevLogTerm != rf.log[args.PrevLogIndex].Term {
		reply.Success = false
		if args.PrevLogIndex >= len(rf.log) {
			reply.RevisedIndex = len(rf.log)
		} else {
			reply.RevisedIndex = -1
		}
	} else {
		if args.Entries != nil {
			rf.log = append(rf.log[:args.PrevLogIndex+1], args.Entries...)
			rf.persist()
		}

		if args.LeaderCommit > rf.commitIndex {
			rf.commitIndex = min(args.LeaderCommit, len(rf.log)-1)
		}
		reply.Success = true
	}

	rf.timer.Reset(randtime())
}

应用于状态机

每个节点每隔一段时间便检测有没有新的已经提交的日志,若有,则将这些新提交的日志应用于状态机。

func (rf *Raft) applier() {
	for {
		time.Sleep(10 * time.Millisecond)
		rf.mu.Lock()
		for rf.lastApplied < rf.commitIndex {
			rf.lastApplied++
			rf.applyCh <- ApplyMsg{rf.lastApplied, rf.log[rf.lastApplied].Command, false, nil}
		}
		rf.mu.Unlock()
	}
}

持久化

根据Figure 2,每当有服务器上的持久状态(currentTermvoteForlog[])更新时,则调用 persist 函数进行将上述三个变量进行编码储存。

persistreadPersist 函数实现如下:

func (rf *Raft) persist() {
	w := new(bytes.Buffer)
	e := gob.NewEncoder(w)
	e.Encode(rf.currentTerm)
	e.Encode(rf.votedFor)
	e.Encode(rf.log)
	data := w.Bytes()
	rf.persister.SaveRaftState(data)
}

func (rf *Raft) readPersist(data []byte) {
	r := bytes.NewBuffer(data)
	d := gob.NewDecoder(r)
	d.Decode(&rf.currentTerm)
	d.Decode(&rf.votedFor)
	d.Decode(&rf.log)
}

实验演示

虽然PPT中只要求通过 "persist" 之前的测试用例,但我还是实现了所有功能,可以通过所有的测试用例。

image

一次通过可能会有一定的偶然性,因此我实现了一个脚本,循环测试了上百次所有的测试用例,除了 Figure8Unreliable 有一定概率无法通过外,其他测试用例皆可以每次通过。

至于为何 Figure8Unreliable 有一定概率无法通过,可能是因为在各个节点日志变化很快的情况下,因为通信不可靠,所以不同节点间日志不一致的情况很严重,无法在短时间内达成一致。或许可以增加选举超时的时间,避免各节点的 currentTerm 快速增加,如此相对来说各节点的日志可以更容易达成一致。

总结

本次的Raft实验具有一定的难度,在结合PPT、过程动画以及论文的情况下,我终于在一定程度上完成了这个实验。同时,为了完成这次实验,我简单学习了Go语言的基础语法,也是收获颇多。

标签:Term,log,报告,int,args,rf,实验,分布式系统,reply
From: https://www.cnblogs.com/Mobius-strip/p/17949561

相关文章

  • 实验七:Spark机器学习库Mtlib编程实践
    1、数据导入导入相关的jar包:importorg.apache.spark.ml.feature.PCAimportorg.apache.spark.sql.Rowimportorg.apache.spark.ml.linalg.{Vector,Vectors}importorg.apache.spark.ml.evaluation.MulticlassClassificationEvaluatorimportorg.apache.spark.ml.{Pipeline,......
  • 解题报告P2501 [HAOI2006] 数字序列
    P2501[HAOI2006]数字序列题目描述现在我们有一个长度为\(n\)的整数序列\(a\)。但是它太不好看了,于是我们希望把它变成一个单调严格上升的序列。但是不希望改变过多的数,也不希望改变的幅度太大。输入格式第一行是一个整数,表示序列长度\(n\)。第二行有\(n\)个整数,第\(......
  • [JMeter] JMeter的测试报告格式转换(.jtl => html)
    0序言近期在jmeter测试服务器上跑压测脚本,跑完后,生成.jtl的测试报告文件。但这份文件不便于直接阅读(尤其是统计分析能力欠缺),我需要转为html。1使用方式CASE1:基于JTL测试报告文件,转为HTML测试报告set"BASE_DIR=E:\work_data\xxxx\"jmeter-g"%BASE_DIR%\alarm_report-......
  • 软件测试报告模板
    ......
  • 系统竣工报告模板
    ......
  • 软件验收报告模板
    ......
  • VRRP配置实验
    1、实验拓扑图2、实验目的确定配置PC1、PC2访问路由器R1的路径3、实验配置3.1SW1配置<sw1>displaycurrent-configuration #sysnamesw1#vlanbatch102030405060100#clusterenablentdpenablendpenable#dropillegal-macalarm#diffservdomaindefault#d......
  • 软件测试报告模板
    ......
  • OSPF单区域实验
    1.实验背景你是公司的网络管理员。现在公司的网络中有三台AR路由器,分别作为销售部、技术部、财务部的接入路由器,通过交换机进行互联,需要实现各部门的PC之间的相互通信;所以你选择使用OSPF动态路由协议实现PC之间的相互通信。2.实验需求每个AR路由器都应学习到各个部门所有网段的路由......
  • 五十八、配置VXLAN不同子网互访(集中式网关)实验组网
    1、网络拓扑图2、实验目的通过OSPFv2的配置令设备leaf1、spine与leaf2相互联通,构建基于IPv4的underlay网络,PC1与PC2属于不同网段且不同VLAN的用户;通过在三台设备上配置VXLAN隧道,实现两台客户端主机跨网段之间的互访,VXLAN网关集中在spine上3、实验配置leaf1:<leaf1>displaycurrent-c......