首页 > 其他分享 >6.824 lab2

6.824 lab2

时间:2022-11-07 21:55:43浏览次数:69  
标签:Term 6.824 log args len lab2 rf nextIndex

任务

完成log replication,论文5.3
基于lab2A

逻辑

这个网站有动画演示
大致过程

  1. 命令发送给leader,leader复制到本地
  2. leader将日志发给followers
  3. follower复制到本地,返回成功消息
  4. leader收到大多数的回复消息,提交日志,再向follower发消息
  5. follower提交日志
    上面过程的是有非常多细节的,之后展开

实现

首先是发送命令给leader,这个是通过start实现的,具体看注释

func (rf *Raft) Start(command interface{}) (int, int, bool) {
	// Your code here (2B).
	index := -1
	term := -1
	if rf.state != Leader {
		return index, term, false
	}
	rf.mu.Lock()
	defer rf.mu.Unlock()

	index = len(rf.log)
	term = rf.currentTerm
       //首先将日志复制到本地
	rf.log = append(rf.log, Entry{Term: term, Command: command, Index: index})

	return index, term, true
}

在论文中说接到命令之后立刻发送AppendEntries,但是此处实现需要新建一个函数,功能和2A中的boardcastheartbeat是相似的,代码会有冗余。当然也可以通过传参来决定是运行一次还是循环运行。最后选择了让boardcastheartbeat循环执行的时候带上发送,这样会牺牲一些性能

下一步是将消息发送folloers,这个发送的时候我门需要带上log信息,此时引出了新的问题,我门要将哪部分log传过去,现在就需要nextIndex matchIndex登场了
前者记录每个foller下一条日志的位置,但是当一个节点刚成为leader时候,并不知道其他节点的日志情况,所以nextIndex在成为leader时初始化为自己日志的长度

func (rf *Raft) convertLeader() {
	rf.state = Leader
	for i := range rf.peers {
		//initialized to leader last log index+1
		rf.nextIndex[i] = len(rf.log)
	}
}

matchIndex是在我们实际匹配过程中探测到匹配的日志,Make初始化为0
我们需要传送的日志就是nextIndex[i]之后的内容

args := AppendEntriesArgs{
  Term:         rf.currentTerm,
  LeaderId:     rf.me,
  PrevLogIndex: rf.nextIndex[i] - 1,
  LedaerCommit: rf.commitIndex,
  Entries:      make([]Entry, 0),
}

if args.PrevLogIndex >= 0 {
  args.PrevLogTerm = rf.log[args.PrevLogIndex].Term
}
if rf.nextIndex[i] >= 0 {
  args.Entries = append(args.Entries, rf.log[rf.nextIndex[i]:]...)
}

接下来考虑怎么处理返回结果

  1. 如果Success=false,代表着我们log的因为某种原因匹配失败了,我们需要减少next[i]的值,在论文中有表述
  2. 如果Success=true,代表复制成功,修改nextIndex matchIndex,按照论文的意思我们需要收集到多数的确认之后,提交日志。但是日志的数量可能很多,节点也多,维护计数器会变的复杂。参考大佬的思想,我们将所有节点的matchIndex排序,在中间位置上的index就是“多数”
// sync success
if reply.Success {
  rf.matchIndex[i] = args.PrevLogIndex + len(args.Entries)
  rf.nextIndex[i] = rf.matchIndex[i] + 1
  //更新commiindex
  arr := make([]int, len(rf.matchIndex))
  copy(arr, rf.matchIndex)
  //添加leader节点的
  arr[rf.me] = len(rf.log) - 1
  sort.Ints(arr)
  newCIndex := arr[len(rf.peers)/2]
  if rf.state == Leader && rf.log[newCIndex].Term == rf.currentTerm && newCIndex > rf.commitIndex {
    rf.commitIndex = newCIndex
    go rf.applyLog()
  }
} else { //sync fail
  rf.nextIndex[I]--
}

其中的applyLog很重要,他的作用是通知客户端提交成功了,也是我们测试要用的

func (rf *Raft) applyLog() {
	if len(rf.log) == 0 {
		return
	}
	for rf.commitIndex > rf.lastApplied {
		rf.lastApplied++
		entry := rf.log[rf.lastApplied]
		msg := ApplyMsg{
			CommandValid: true,
			Command:      entry.Command,
			CommandIndex: entry.Index,
		}
		rf.applyCh <- msg
	}
}

applyCh是raft结构体中需要心添加的字段,记得Make进行初始化

func Make(peers []*labrpc.ClientEnd, me int,
	persister *Persister, applyCh chan ApplyMsg) *Raft {
	rf := &Raft{}
	rf.peers = peers
	rf.persister = persister
	rf.me = me
  //......
  rf.applyCh = applyCh //这个chan从参数传过来的
  //......
}

broadcastHeartBeat代码

//发起心跳广播
func (rf *Raft) broadcastHeartBeat() {
	for !rf.killed() && rf.state == Leader {
		for i := range rf.peers {
			//跳过自己
			if i == rf.me {
				continue
			}
			go func(i int) {
				reply := AppendEntriesReply{}
				args := AppendEntriesArgs{
					Term:         rf.currentTerm,
					LeaderId:     rf.me,
					PrevLogIndex: rf.nextIndex[i] - 1,
					LedaerCommit: rf.commitIndex,
					Entries:      make([]Entry, 0),
				}
				if args.PrevLogIndex >= 0 {
					args.PrevLogTerm = rf.log[args.PrevLogIndex].Term
				}
				if rf.nextIndex[i] >= 0 {
					args.Entries = append(args.Entries, rf.log[rf.nextIndex[i]:]...)
				}

				//send RPC
				f := rf.sendAppendEntries(i, &args, &reply)
				//链接失败
				if !f {
					return
				}
				if reply.Term > rf.currentTerm {
					rf.currentTerm = reply.Term
					rf.convertFollower()
					return
				}
				//发送的是心跳,接下来的返回消息可以不用处理
				if len(args.Entries) == 0 {
					return
				}
				// sync success
				if reply.Success {
					rf.matchIndex[i] = args.PrevLogIndex + len(args.Entries)
					rf.nextIndex[i] = rf.matchIndex[i] + 1
					//更新commiindex
					arr := make([]int, len(rf.matchIndex))
					copy(arr, rf.matchIndex)
					//添加leader节点的
					arr[rf.me] = len(rf.log) - 1
					sort.Ints(arr)
					newCIndex := arr[len(rf.peers)/2]
					if rf.state == Leader && rf.log[newCIndex].Term == rf.currentTerm && newCIndex > rf.commitIndex {
						rf.commitIndex = newCIndex
						go rf.applyLog()
					}
				} else { //sync fail
					rf.nextIndex[i]--
				}
			}(i)
		}
		time.Sleep(150 * time.Millisecond)
	}
}

接下来是AppendEntries
需要将失配的日志删除,然后复制新的日志
还需要处理日志的提交

//AppendEntries RPC handler.
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
	// Your code here (2A, 2B).

	reply.Term = rf.currentTerm
	reply.Success = false
	//拒绝响应
	if rf.currentTerm > args.Term {
		return
	}
	rf.resetTimeout()

	//if recetive term >currentTerm
	if rf.currentTerm < args.Term {
		rf.currentTerm = args.Term
		rf.convertFollower()
	}

	//本地日志缺少
	if len(rf.log)-1 < args.PrevLogIndex {
		return
	}
	//日志term不匹配
	if args.PrevLogIndex > 0 && rf.log[args.PrevLogIndex].Term != args.PrevLogTerm {
		return
	}

	for i, entry := range args.Entries {
		logIndex := args.PrevLogIndex + i + 1
		if logIndex >= len(rf.log)-1 {
			rf.log = append(rf.log, entry)
		} else {
			if rf.log[logIndex].Term != entry.Term {
				rf.log = rf.log[:logIndex] //删除失配内容
				rf.log = append(rf.log, entry)
			}
		}
	}

	//更新commitdex
	if args.LedaerCommit > rf.commitIndex {
		rf.commitIndex = args.LedaerCommit
		if rf.commitIndex > len(rf.log)-1 {
			rf.commitIndex = len(rf.log) - 1
		}
	}
	go rf.applyLog()
	reply.Success = true
}

注意

以上代码在测试的时候会出现新的问题,我们代码log是从下标0开始的,而论文中是从1,我们的测试函数也是从1开始,所以直接测试有几个一定无法通过
需要修改测试函数,或者修改代码实现,可以在Make的时候添加一条日志用来占位
休要修改的测试函数
TestBasicAgree2B TestFailNoAgree2B

无法保证100%通过测试,有时会无法通过

标签:Term,6.824,log,args,len,lab2,rf,nextIndex
From: https://www.cnblogs.com/beifangcc/p/16846300.html

相关文章

  • Lab2_syscall
     操作系统实验报告 lab2【Systemcalls】       学生姓名:jeekzhang 学 号:20307130XXX 专 业:计算机科学与技术一、实验内容:PartA......
  • 6.824 lab1-MapReduce
    lab1是实现MapReduce老师完成了框架的大部分我我们只需要做的是填充哪重要的几个部分2022的官方连接https://pdos.csail.mit.edu/6.824/labs/lab-mr.html准备工作下......
  • 6.824 Frangipani
    本文重点 缓存一致性、分布式事务、分布式故障恢复设计和功能之间的关联。缓存一致性是指,如果我缓存了一些数据,之后你修改了实际数据但是并没有考虑我缓存中的数据,必须......
  • MIT6.824-Distributed System
    Goversion:1.13.6wgethttps://dl.google.com/go/go1.13.6.linux-amd64.tar.gzsudotar-C/usr/local-xvfgo1.13-.6.linux-amd64.tar.gzsudonano~/.profile在......
  • 6.824笔记3
    大规模存储分布式的底层运行着一个大型分布式存储系统,并有一套接口,评估指标包括并行性能,容错,复制,一致性数据分割并放到多个服务器上,并且需要一个自动化的容错系统,一种容......
  • 6.824笔记2
    线程为每一个prc请求使用一个线程,当请求回收的时候,线程继续运作,多线程能能够开启多个网络请求,形成io并发并行化,线程用来实现并行化异步编程,事件驱动编程,又一个线程,一个循......
  • 解决matlab2022a启动提示
    linux下安装完matlab2022a后,之后,每次启动都会有:MESA-LOADER:failedtoopeniris:/usr/local/MATLAB/R2022a/bin/glnxa64/../../sys/os/glnxa64/libstdc++.so.6:version......
  • MIT6.824_LEC3_GFS_Outline
    为什么我们要阅读GFS论文?分布式存储是关键的抽象概念接口和语法应该是怎样的?内部是怎么运行的?GFS论文对6.824这门课的很多主题有指导意义并行性能容错副本......
  • MIT_6.824_LEC3_GFS_FAQ翻译
    GFSFAQQ:Whyisatomicrecordappendat-least-once,ratherthanexactlyonce?为什么记录的追加是至少一次,而不是仅仅只追加一次?Section3.1,Step7,saystha......
  • 《安富莱嵌入式周报》第284期:Matlab2022b发布,支持从 .NET 调用,耳机放大器,牛屎芯片替换
    ​视频教程更新:GUI综合实战视频教程第1期:综合UI项目规划以及AppWizard和ThreadXGUIX初识(2022-09-24)​​https://www.armbbs.cn/forum.php?mod=viewthread&tid=115615​​视......