前言
为遵守 mit 的约定,这个帖子不贴太多具体的代码,主要聊聊自己在码代码时的一些想法和遇到的问题。
这个实验需要我们去实现一个 map-reduce 的功能。实质上,这个实验分为两个大的板块,map 和 reduce 两个阶段,也就是这个实验的核心部分,两个阶段都包含若干小的子任务,然后用户通过编写 map 和 reduce 函数。这个实验里,我们的任务是,读取 main 文件夹下的八个 txt 文档,扫描其中的单词,并计数,将结果输出到若干个子文件中,最后的话,测试脚本会读取这八个文件,把里面的结果输出到另一个 txt 中并进行排序,比对给出的标准答案,来评判该实验是否通过。
做这个实验的前提是,已经读过这个实验配套的论文:mapreduce-osdi04.pdf (googleusercontent.com) 知道这个实验以及想要做这个实验的人多少都会有点手段上谷歌(当然,也可以去找国内转载的,看不懂的话就看中文的吧,实验文档也是。
Getting Started
当我们打开这个项目工程,我们阅读这个项目的所有文件,以及 lab1 中给出的提示,我们可以先试着运行以下部分代码,来看看我们最重要得到什么结果:
# 当前目录为 src/main
go build -race -buildmode=plugin ../mrapps/wc.go
rm -rf mr-out*
go run -race mrsequential.go wc.so pg*.txt
more mr-out-0
这个是一个单线程的 map-reduce,我们可以查看 mrsequential.go
的内容,大概了解下整个 map-reduce 的过程是怎样的。
然后,我们把目光聚集到以下文件中:
---main
|---mrworker.go
|---mrcoordinator.go
|---mrsequential.go
---mr
|---worker.go
|---coordinator.go
|---rpc.go
---mrapp
|---wc.go
后面我们在这个实验中很多内容都要参考这些文件的内容,其中包含一些函数的来源,其中,尤其 main/mrsequential.go
尤其重要。
实现 rpc 通信
如果说想要实现 map-reduce,那么第一步就是实现 worker 和 coordinator 的 rpc 通信,观察 mr 目录下的文件后,我们需要在 rpc.go
和 coordinator.go
中定义以下结构体 (目前仅实现 rpc 通信):
// coordinator.go
// 专门定义一个Task,用于coordinator向worker分发任务
type Task struct {
FileName string
}
// 这里声明coordinator相关的结构体
type Coordinator struct {
task Task
}
// rpc.go
// 这里参考了上面的两个Example
type TaskRequest struct {
X int
}
type TaskReply struct {
XTask Task
}
下一步要做的是,需要让 coordinator 和 worker 之间能够进行 rpc 通信。
实现两者之间的通信是完成这个实验的基础。
worker接收消息
worker 调用 coordinator 的获取任务函数,获取要处理的文件名,然后执行打开操作。
在构建中间体 intermediate
时,可以留意到 mrsequential.go
有提示:
// a big difference from real MapReduce is that all the
// intermediate data is in one place, intermediate[],
// rather than being partitioned into NxM buckets.
根据这个思路,相当于提示我们,在构建桶存放中间体时,可能会用到二维 NxM 的数组。
然后经过 map 处理后的键值对切片,需要进一步经过 json 处理,并且将这个结果分成 nReduce 份,存放的文件命名规则是 mr-X-Y
,其中 x 是 map 任务的序号,y 是 reduce 任务的序号。
在进行 reduce 任务时,读取结果也需要经过 json 处理,这里很多步骤都可以借鉴 mrsequential.go
,包括读取文件等。
在创建目标文件时,可以使用 ioutil.TempFile
来创建临时文件,最后再重新命名。
此阶段的结构体声明如下:
// coordinator.go
type Coordinator struct {
State int // 0 map 1 reduce 2 none
MapTask Task
ReduceTask Task
NumMapTask int
NumReduceTask int
MapTaskFinish chan bool
ReduceTaskFinish chan bool
}
type Task struct {
FileName string
IDMap int
IDReduce string
}
// rpc.go
type TaskRequest struct {
X int
}
type TaskReply struct {
XTask Task
NumMapTask int
NumReduceTask int
CurNumMapTask int
CurNumReduceTask int
}
实现 rpc 通信
根据文档的指引,我们首先要实现 coordinator 和 worker 之间的通信,我们看到 worker.go
中有 call 和 CallExample 两个函数,那也照葫芦画瓢,自己搞一个 CallGetTask,实现 rpc 通信。
Worker 申领 task
看着 Worker()
里的注释,有一行 CallExample()
,是需要我们在这个函数里调用自定义的 CallGetTask 函数来获取 coordinator 分发的 task,在 call 之前,我们先要给 coordinator 的成员 MapTask 初始化,在 MakeCoordinator
中,我们可以看到 files 和 nReduce 这两个参数,那就从这两个入手,进行简单的初始化后,我们尝试在 Worker 中输出,能够输出文件名就是阶段性胜利。
照抄 mrsequential.go
文档中有提到,可以随意借鉴 mrsequential 中的函数,那么,走起。不过也要看看注释和文档,可以创建一个 NxM 的桶,和利用 encoder 和 decoder 来处理中间产物。
向 coordinator 的报告
每执行完一个任务,就向 coordinator 报告,方便 coordinator 记录,当所有任务都执行完时,修改 Done
中的条件,解除阻塞。
其实,走到这一步,可以说这个 lab 完成一半了,剩下就是各种断点打印 debug。
如果在执行过程提示无法打开文件,那说明,map 或 reduce 任务完成个数的条件没有控制好,mrsequential.go
中规定了一共会生成 3 个 workers,无法打开文件,只可能是,并发申请 task 时,已经快要到 task 的容量数,没分配到的 worker 自然也就没有分配到 FileName 和 MapID,所以需要设置好这些控制条件
解决 crash
做完上面,7 个 test 就可以 pass 6 个了,剩下一个 crash 的,需要用到锁或原子变量方面的知识。在进行 GetTask 时,我们传递的参数,需要确保其原子性,不然会出现 data race 现象;同时,也要对超过 10s 的任务进行舍弃处理,这里我们加一个时间戳,来记录任务的完成情况和开始时间。
又考虑到在记录任务完成情况时,是一个并发状态,这里考虑使用 sync.Map
。在进行最后的 Done
之前,我们还要再定义一个检查函数,来遍历检查是否还有 crash 的任务。
参考链接
6.5840 Lab 1: MapReduce (mit.edu)
mit6.824分布式lab1-MapReduce(1)_哔哩哔哩_bilibili
标签:map,6.824,int,reduce,rpc,coordinator,lab1,go,mit From: https://www.cnblogs.com/jaydenchang/p/17096604.html