lab1是实现MapReduce
老师完成了框架的大部分
我我们只需要做的是填充哪重要的几个部分
2022的官方连接
https://pdos.csail.mit.edu/6.824/labs/lab-mr.html
准备工作
下载
git clone git://g.csail.mit.edu/6.824-golabs-2022 6.824
运行示例
编译库
首先进入main目录中
在最新的2022版本中已经没有这个问题了
go build -race -buildmode=plugin ../mrapps/wc.go
但是会出错,因为这个代码比较早,1.16
我们需要重新指定一下go mod
同时将项目中的相对路径改了
修改src/main/mrsequential.go,修改为import "modename/src/mr"
修改src/mrapps/wc.go修改为import "modename/src/mr"
执行
go build -race -buildmode=plugin ../mrapps/wc.go
mac下去掉-race
win中无法进行此操作,我们要生成so文件,这是linux中的文件,只能在linux,macos,freebsd,
编译完成之后会在main下生成wc.so
so,share object,是linux中的动态库,相较于静态连接。动态库系统只会加载一次,供所有人用,节省内存。如果是静态连接,每链接一次就多一个份,不够灵活
go run -race mrsequential.go wc.so pg*.txt
项目中给我们实现了一个简单的mapreduce的实现,在main/mrsequential.go
其中用到了wc中几个函数,所以需要加上wc.go
功能也是论文经典操作,统计单词频次,做倒排索引
统计的文件是 pg*.txt,结果会被输出到mr-out-0
倒排索引
这是一种信息检索的方式
平时我么能见到的多是正排索引,由文件找到文件内容
倒排索引是反过来,通过文件内容关键词找到文件
举个例子搜索引擎可以通过关键词定位网页
我们的任务
实现一个分布式的mapreduce,实例是一个单体的应用
我们需要实现coordinator
worker
任务介绍中,和论文中已经告诉我们需要怎么做了
我想提一点是关于中间结果的保存问题,这个比较特殊,刚开始也应该是我的理解难点因为论文中并没这个规定,6.824网站提示如下
- A reasonable naming convention for intermediate files is mr-X-Y, where X is the Map task number, and Y is the reduce task number.
我们map生成的结果要保存为mr-x-y的式样,x是任务的编号,y是reduce的编号,要求中间有10个reduce(在mrcoordinator.go中定义的
那么0号任务产生mr-0-0
到mr-0-9
一共十个文件
1号任务也产生mr-1-0
到mr-1-9
共十个文件
当所有的map运算完成之后,我们要把所有y相同的文件组合到一起,这样我们就得到了10个文件,对应10个reduce任务
reduce读取这10个文件进行统计即可
实现
基于要求,我不放太多代码,写一下解决的思路,建议自己尝试去写,真要最后写不出来,去参考一下别人的实现
coordinator需要实现任务的管理,上述中间结果的合并,还有任务的控制
即然需要管理任务我们就需要对应的数据结构来表示和管理这些任务
我们的任务分为了两个部分Map,和Reduce
在Map阶段,我们需要
- 文件的编号
- 文件位置,因为我们直接在本地存取,直接用文件名就可以
- 任务阶段,该做map,reduce还是任务完成exit
- reduce数量,实验中是10,也可以定死不要这个量,也能通过测试,显然这样处理不太好
- 输出的mr-x-y中间文件
在Reduce阶段
- 任务编号,其实可以不用
- 输入文件
- 任务阶段
- reduce数量
- 输出最终结果
这两个阶段有不少相同的字段,我们可以将其合并为一个,不合并也没问题
type Task struct {
TaskId int
File string //文件位置,mapper阶段用
TaskState Status //需要做哪一步处理
NReduce int
Temp []string //用于保存每个work产生的n个文件、进行reduce时处理的文件
Output string //输出文件名,reduce阶段用
}
之后就是关于coordinator,这个类是需要管理任务
我们需要的数据
- 任务队列
- 任务的控制信息。任务是否在运行状态,启动时间,任务指针
- reduce数量
- 输入文件
- map的中间结果
- 状态,是否退出
go中没有提供队列,可以自己实现,或者利用有缓冲的chan
type Coordinator struct {
TaskQueue chan *Task
TaskInfo map[int]*TaskInfo
NReduce int
Files []string
Temp [][]string
Exit bool
}
之后是coordinator的初始化构建
//简化
func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{ //... } //初始化Coordinator
//初始化任务,每一个文件就是一个任务
for id, file := range files {
task := Task{ //..,}
tashInfo的添加
任务队列的添加
}
c.server()
go c.heartBeat()
return &c
}
看到这离引入一个heratBeat()
这个是用来做超时检测的
其逻辑也很简单
//简化版本,提示一下什么时候停止这个协程
func (c Coordinator) heartBeat() {
for {
for _, taks := range c.TaskInfo {
if taks 超时
重制一下任务
}
time.Sleep(time.Second * 5)
}
}
接下来实现RPC函数,Assign任务分配,worker通过这个RPC得到任务
//提示处理安全访问问题
func (c *Coordinator) Assign(args *ExampleArgs, reply *Task) error {
if 还有任务 {
队列取一个任务
修改这个任务的状态
} else coordinator退出 {
返回 coordinator退出
} else 没有任务 {
返回 等待
}
}
接下来要coordinator要实现什么函数,估计没有什么概念,因为我们还不知道worker的其他需求
所以我们可以转头去实现work一部分
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
for {
请求分配任务
switch reply.TaskState {
case Map:
mapper(&reply, mapf) //执行map
case Reduce:
reduce(&reply, reducef) //执行reduce
case Wait:
time.Sleep(5 * time.Second)
case Exit:
return
}
}
}
我们首先实现这个mapper
func mapper(task *Task, mapf func(string, string) []KeyValue) {
读取文件内容
keyvalues := mapf(reply.File, 文件内容) //mapf是实验框架已经实现好的函数,通过参数传递
keyvalues分成n组,n为指定reduce数量,提示使用ihash函数,在worker.go内已经提供
将这n分组写到本地n个文件中 ,格式mr-x-y
将文件位置保存在Task.Temp中
c通知coordinator任务完成
}
上面函数实现过程我们发现,当完成任务之后需要通知coordinator,所以coordinator需要一个方法来处理
//coordinator.go内
func (c *Coordinator) Completed(task *Task, reply *ExampleReply) error {
if 任务已经完成 { //这个文件已经被别的worker处理完了
return nil
}
设置完成状态
/*
每个worker都会产生n个中间文件,我们需要将这些信息汇总给coordinater,方便之后的任务分配
每个任务完成时机,也是我们检查是不是所有任务都完成的时机,如果都完成了,就退出
这个过程要完成任务很多,代码还要上锁,为了提高效率,我们开新的协程
*/
go c.GenerateResult(task)
return nil
}
//合并中间结果,检查所有任务是否完成
func (c *Coordinator) GenerateResult(task *Task) {
switch task.TaskState {
case Map: //map结束,进入reduce阶段
for id, file := range task.Temp {
//将task 产生了temp给coordinator进行统一处理
c.Temp[id] = append(c.Temp[id], file)
}
if c.IsAllDown() { //所有任务进入reduce
创建reduce的任务队列
for id, file := range c.Temp {
task := Task{/...} //创建任务
加入队列
设置任务开始信息
}
}
case Reduce:
if c.IsAllDown() {
c.Exit = true //退出coordinator
}
}
}
然后我们完成worker部分
//此部分有较多文件读写,直接贴代码
func reduce(task *Task, reducef func(string, []string) string) {
//将文件内的数据,重新序列化为keyvalue
intermediate := []KeyValue{} //reduce处理的时keyvalue,我们需要将文件序列化为keyvalue
for _, file := range task.Temp {
f, err := os.Open(file)
if err != nil {
log.Fatal("Failed to open file", err)
}
con := json.NewDecoder(f)
for {
var t KeyValue
if err := con.Decode(&t); err != nil {
break
}
intermediate = append(intermediate, t)
}
f.Close()
}
//下面可以参考mrsequential.go
sort.Sort(ByKey(intermediate))
src, _ := os.Getwd()
tempFile, err := ioutil.TempFile(src, "mr-tmp-*")
if err != nil {
log.Fatal("Failed to create temp file", err)
}
i := 0
for i < len(intermediate) {
j := i + 1
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
output := reducef(intermediate[i].Key, values)
// this is the correct format for each line of Reduce output.
fmt.Fprintf(tempFile, "%v %v\n", intermediate[i].Key, output)
i = j
}
tempFile.Close()
newname := fmt.Sprintf("mr-out-%d", task.TaskId)
os.Rename(tempFile.Name(), newname)
task.Output = newname
r := ExampleReply{}
call("Coordinator.Completed", task, &r) //rpc通知coordinator任务完成
}
上面所有代码都有删减,在实现基本逻辑上还都需要的补充一部分内容,需要注意(比如我删了所有的锁)
标签:文件,6.824,string,reduce,MapReduce,lab1,task,任务,go From: https://www.cnblogs.com/beifangcc/p/16736339.html