参考资料
- MapReduce 论文
- ((20221029103443-4pqvf1n "Lecture - MapReduce"))
- Lab1 实验文档
- Lab1 实验文档译文
任务需求
在一个分布式存储系统上(实验是单机),实现 coordinator 和 worker,同一时刻有 1 个 coordinator process、多个 worker process 运行,彼此间使用 RPC 进行通信
Coordinator
- 负责为每个 worker 分配 Map 或 Reduce 任务。
- 确保每个文件只被处理一次,且最终输出的结果是正确的。
- 处理 worker 超时或崩溃,将任务重新分配给其他的 worker。
Worker
- 向 coordinator 请求任务询问 coordinator 以从文件中读取任务输入,并将任务输出写入到文件中。
- 对于 map 任务,将输入为 pg-xxx.txt 的文档处理为一组键值对,并按 reduce 任务的数量分桶,得到一组中间文件 mr-X-Y。
- 对于 reduce 任务,将属于当前任务的中间文件全部读入,排序后输出到结果文件 mr-out-n 中。
设计选择
Channel-based or Mutex-based?
一些实现中希望做到 lock-free,但他们仅仅是不显式使用互斥锁,而用 channel 来替代 [1-2]。lock-free 本质上意味着阻止线程休眠,以减少“锁”带来的系统开销,但并不代表着不使用互斥锁 [3-4]。一种实现lock-free的方式是CAS,在 Go 中, channel 的底层结构包含 Sync.Mutex 互斥锁 [5],阻塞时也会使用到互斥锁,而 Sync.Mutex 使用了 CAS 以减少调度开销 [6],因此 channel 还是 mutex 从 lock-free 的角度是相差无几的。
那么如何决定使用 channel 还是 mutex 呢?参考 [7],当需要关注数据流动时使用channel,在本次实验中流动的数据是“任务”,因此可以考虑组织一个任务队列,以生产者消费者模式分发任务。当希望实现对某个共享资源的并发访问控制,只需要锁定少量共享资源时,可以使用 mutex,更简单直观。二者的使用并非是冲突的,需要结合场景选择,但需要注意的是,channel 中数据流动的本质在于值的复制,会带来额外的开销。
在本文的实现中仅采用 mutex,因为可以发现,对于一个分配给 worker 的特定的任务而言,状态可能会从“运行”转变为“失败”,这种转变既可能来自于 worker 执行失败后的响应,也可能来自于 coordinator 发现任务超时。一个特定的任务队列具有不同的输入角色可能会增加复杂性。
[1] MIT6.824-2021/lab1.md at master · OneSizeFitsQuorum/MIT6.824-2021 (github.com)
[2] 6.824-2022-public/lab1.md at main · endless-hu/6.824-2022-public (github.com)
[3] 并发与并行编程(三):无锁并发(Lock-Free Concurrency) - 知乎 (zhihu.com)
[4] 简化概念下的 lock-free 编程 - 知乎 (zhihu.com)
[5] ((20220910133356-gofuyu1 '通道底层原理'))
[6] ((20220821221809-znrk31q 'Sync.Mutex 互斥锁'))
[7] Golang并发:再也不愁选channel还是选锁 - Go语言实战 - SegmentFault 思否
Coordinator 需要保存 Worker 的信息吗?
考虑一个分布式存储系统,在其上的 worker 应该都是平等的,coordinator 也无需关注是否新增或崩溃了任一 worker,仅需要考虑每个任务都被正确执行且被执行一次,因此本文实现的 coordinator 不保存 worker 信息。需要注意的时,这也意味着 coordinator 无法主动询问 worker,RPC 请求是单向的。
Coordinator 如何发现任务超时并重新分配?
一种做法是启动一个 goroutine 定时检查任务的状态,但在本文的实现中是不必要的,因为 coordinator 由于没有保存 worker 的信息,即使发现了超时也无法主动重新分配任务。在本文的实现中,coordinator 仅在收到了任务分配请求的情况下贪婪地去查询可分配的任务,其中包括超时的任务。需要注意的是,为简单起见,本文没有将不同状态的任务分离,但这会导致额外的遍历开销。
Coordinator 如何识别过期的响应
一种做法是在请求中携带 Term,当超时并重新分配后 Term+1。本文考虑到 Task 本身要保存分配的时间,为了简单起见,传递时间戳而非 Term。但需要注意的是,由于数据类型不同,这种方式相比于传递 Term 需要更多的开销。
Coordinator 和 worker正确退出
正常退出是指 Woker 从 Coordinator 处了解到所有任务完成后退出,Coordinator 通知完所有Worker后退出。不能简单地根据 RPC 失败就认为 Coordinator 已完成任务退出,因为 RPC 失败也可能是网络丢包导致的。
由于本文中设计的 Coordinator 无法主动通知 worker,仅能通过 Worker 请求任务时才能同步任务已完成的消息。这样可以解决 Worker 的退出问题,但 Coordinator 无法知道是否已通知所有的 worker。本文的解决方案是设置一个定时器,这个定时器的时间间隔应该远大于任务从分配到响应的耗时,并在每次响应 worker 后充值。这样可以保证未崩溃的的 worker 都能在超时前通过请求任务来同步任务完成情况,超时则表明所有正常工作(除崩溃和网络故障外)的 worker 都已通知完毕。
如何处理异常?
在本次实验中,有可能出现的异常包括:任务失败但正常返回、任务成功但超时返回、worker崩溃任务超时且无返回:
-
任务失败但正常响应:coordinator 应检查任务的完成状态。
-
任务成功但超时响应:
- coordinator 应检查返回对应的任务是否已超时,超时则丢弃响应;
- 如果任务已成功,重新分配到任务的worker应能感知这种成功并避免重复处理。
-
worker崩溃任务超时且无响应:
- coordinator 在分配任务时检查超时任务并重新分配;
- worker 如果是程序性崩溃,应该在退出前执行清理工作;
- worker 如果是机器导致的崩溃,无法执行清理工作,则其他重新分配到任务的worker应该感知到这种崩溃以避免被未完成的清理工作影响
实现细节
Hints: One way to get started is to modify
mr/worker.go
'sWorker()
to send an RPC to the coordinator asking for a task. Then modify the coordinator to respond with the file name of an as-yet-unstarted map task. Then modify the worker to read that file and call the application Map function, as in mrsequential.go.
Worker 实现
Worker 工作流程表现为三段式:1. 通过RPC调用获取任务;2. 根据任务类型执行任务;3.将任务执行情况返回给Coordinator。
在设计时,需要避免worker提前退出导致Coordinator无法分配未完成的任务,以及保证worker的正常退出。本文将任务完成报告这一职责交由Map和Reduce各自的执行函数,以分离不同的响应和不同的异常收尾流程。
// main/mrworker.go calls this function.
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
for {
// 1. 通过 RPC 调用获取任务
task := callGetTask()
if task == nil {
// 无可用任务,但仍有任务未完成,不可提前退出
fmt.Println("task is nil, continue")
time.Sleep(2 * time.Second)
continue
}
fmt.Printf("worker: receive coordinators get task: %v\n", task)
// 2. 执行特定的任务
switch task.TaskType {
case MapTask:
doMap(mapf, task)
case ReduceTask:
doReduce(reducef, task)
case AllTasksDone:
// 当收到 Coordinator 的退出信号时,正常退出
return
default:
// 非期望的任务类型,重试
fmt.Printf("unexpected TaskType %v", task.TaskType)
time.Sleep(time.Second)
continue
}
// 3. 通过 RPC 调用,报告 Map/Reduce 任务完成状况
}
}
Uber go style guide 中提到应在生产代码中避免使用panic。但可以注意到以下代码中,错误被当做异常通过 panic-recover 的形式捕获,这种实现方式是否合理呢?
笔者个人认为是合理的。首先是错误和可能的 IO panic 是否有区分的必要?在本次实验中错误没有被本层或上层恢复的可能,均体现为本次任务的执行失败,因此无需向上层传递错误,自然也可以将错误或panic在本层截获以生成任务失败的响应。其次是 panic 完全在函数内截获,仅能表示当前任务的失败,而不会影响到 worker 的正常执行,不会导致 worker 的崩溃。
要小心的是,为了保证任务仅被正确执行一次,Map 和 Reduce 都需要以重命名的方式检查任务是否冲突。但二者略有差别,Map 任务由于需要重命名一系列中间文件,也存在中途崩溃的可能。在这种情况下,后续执行该任务的 worker 在前几次重命名时却会检查到冲突,导致该任务始终无法被完成。因此 Map 任务需要在重命名时感知曾经的worker是否在重命名时崩溃,并处理失效的输出文件。
另外需要明确,删除崩溃导致的输出文件应交由下一个worker进行,而中间文件应交由当前任务来删除。中间文件在正常情况下能被覆盖重命名,但有可能出现一种情况:任务被正确处理,但响应超时。任务会被重新分配,而下一个worker仅会在重命名时感知到任务被正确处理,直接退出可能会导致中间文件残留,占用大量存储空间。
func doMap(mapf func(string, string) []KeyValue, task *Task) {
// 异常处理
defer func() {
task.Status = Done
if err := recover(); err != nil {
// panic 前执行收尾工作
fmt.Printf("worker: doMap panic, %s", err)
task.Status = Waiting // 通知 Coordinator 本次任务执行失败
}
callTaskDoneOr(task)
// 删除可能存在的中间文件
for i := 0; i < task.NReduce; i++ {
intermediateFile := fmt.Sprintf("./intermediates/mr-%s-%d.tmp%v", strings.TrimSuffix(filepath.Base(task.FileNames[0]), ".txt"), i, task.StartTime)
os.Remove(intermediateFile)
}
}()
// 1. 读取输入
content, err := ioutil.ReadFile(task.FileNames[0])
if err != nil {
panic(fmt.Sprintf("worker crash on map task, can not read %s", task.FileNames[0]))
}
// 2. 覅用 plugin map function
kvs := mapf(task.FileNames[0], string(content))
// 3. 将键值对写入到 nReduce 个中间文件中
bucketfiles := make([]*os.File, task.NReduce)
os.MkdirAll("./intermediates/", os.ModeDir) // create dir and parents if not exists
for i := 0; i < task.NReduce; i++ {
// 创建中间文件
intermediateFile := fmt.Sprintf("./intermediates/mr-%s-%d.tmp%v", strings.TrimSuffix(filepath.Base(task.FileNames[0]), ".txt"), i, task.StartTime)
f, err := os.OpenFile(intermediateFile, os.O_CREATE|os.O_RDWR|os.O_APPEND, os.ModePerm) // ModePerm=0777
if err != nil {
panic(err)
}
defer f.Close()
os.Chmod(intermediateFile, os.ModePerm) // Go 创建文件设置权限的坑,重新设置保证权限的正确
bucketfiles[i] = f
}
// 将键值对写入到对应中间文件中
for _, kv := range kvs {
idx := ihash(kv.Key) % task.NReduce
_, err := bucketfiles[idx].WriteString(fmt.Sprintf("%s %s\n", kv.Key, kv.Value))
if err != nil {
// fail to wrtite
panic(fmt.Sprintf("worker crash on map task, fail append kv (%s, %s) to intermediate file %s", kv.Key, kv.Value, bucketfiles[idx].Name()))
}
}
// 5. 保证输入仅被正确处理一次
for _, file := range bucketfiles {
oname := strings.TrimSuffix(file.Name(), fmt.Sprintf(".tmp%v", task.StartTime))
// 如果重命名失败,说明已有其他 worker 执行了该任务,但由于需要重命名多个文件,仍可能在过程中崩溃,导致只重命名了部分文件
if err := os.Rename(file.Name(), oname); err != nil {
lastfile := strings.TrimSuffix(bucketfiles[len(bucketfiles)-1].Name(), fmt.Sprintf(".tmp%v", task.StartTime))
// 如果最后一个中间文件未被重命名,说明其他worker在重命名时崩溃
fmt.Println(err)
if _, err := os.Lstat(lastfile); err != nil {
// 尝试删除已被命名的文件并重试重命名
os.Remove(strings.TrimSuffix(file.Name(), fmt.Sprintf(".tmp%v", task.StartTime)))
os.Rename(file.Name(), strings.TrimSuffix(file.Name(), fmt.Sprintf(".tmp%v", task.StartTime)))
}
}
}
}
func doReduce(reducef func(string, []string) string, task *Task) {
defer func() {
task.Status = Done
if err := recover(); err != nil {
// panic 前的收尾工作
fmt.Printf("worker: doReduce panic, %s\n", err)
task.Status = Waiting // 通知 coordinator 任务失败
}
callTaskDoneOr(task)
// 删除可能的中间文件
os.Remove(fmt.Sprintf("mr-out-%d.tmp%v", task.Id, task.StartTime))
}()
var kvs ByKey // kvs sorted by key
// 1. 读取中间文件
for _, fileName := range task.FileNames {
f, err := os.Open(fileName)
if err != nil {
panic(fmt.Sprintf("worker crash on reduce task, fail to open %s", fileName))
}
defer f.Close()
// 逐行读取
sc := bufio.NewScanner(f)
for sc.Scan() {
kvStrs := strings.Fields(sc.Text())
if len(kvStrs) != 2 {
panic("worker crash on reduce task, kv fomat wrong")
}
kvs = append(kvs, KeyValue{
Key: kvStrs[0],
Value: kvStrs[1],
})
}
}
sort.Sort(kvs) // 根据 key 排序
// 2. 写入到一个临时文件中
oname := fmt.Sprintf("mr-out-%d.tmp%v", task.Id, task.StartTime)
ofile, err := os.Create(oname)
if err != nil {
panic(err)
}
defer ofile.Close()
i := 0 // start idx for same keys
for i < len(kvs) {
j := i + 1 // end idx for same keys
for j < len(kvs) && kvs[j].Key == kvs[i].Key {
j++
}
// collect values with the same key
values := []string{}
for k := i; k < j; k++ {
values = append(values, kvs[k].Value)
}
output := reducef(kvs[i].Key, values)
// this is the correct format for each line of Reduce output.
fmt.Fprintf(ofile, "%v %v\n", kvs[i].Key, output)
i = j
}
// 3. 重命名以保证任务仅正确执行一次
os.Rename(ofile.Name(), strings.TrimSuffix(ofile.Name(), fmt.Sprintf(".tmp%v", task.StartTime)))
}
Task 及 RPC 消息定义
Task 中 Map/Reduce 的输入文件均使用同一个变量保存,但对于 Map 任务而言,输入文件列表的长度始终为 1。在定义任务类型时,考虑到 0 通常表示一种异常值,为了避免歧义从 1 开始设置。
为简单起见,RPC 消息传递整个任务的信息,但要意识到这种做法存在冗余的开销。
// Add your RPC definitions here.
type TaskType int
const (
_ TaskType = iota // 0 is unknown
MapTask
ReduceTask
AllTasksDone
)
type TaskStatus int
const (
Waiting TaskStatus = iota // wait for getting
Runing
Done
)
type Task struct {
TaskType TaskType
FileNames []string // len slice for map task is 1
Id int
StartTime int64 // time.Time().UnixNano()
Status TaskStatus
NReduce int
}
type GetTaskRequest struct {
// all workers are equal
}
type GetTaskReply struct {
Task *Task
}
type TaskDoneOrRequest struct {
Task *Task
}
type TaskDoneOrReply struct {
// coordinator does not reply
}
Coordinator 实现
const (
TaskTimeout = 10 * time.Second // 任务超时
)
type phase int
const (
duringMap phase = iota
duringReduce
finish
)
var clock time.Timer // 可重用的计时器
type Coordinator struct {
// Your definitions here.
tasks map[TaskType][]*Task
nMap int
nReduce int
phase phase
finishedMap int
finishedReduce int
mu sync.Mutex
}
// Your code here -- RPC handlers for the worker to call.
func (c *Coordinator) GetTask(req *GetTaskRequest, reply *GetTaskReply) error {
c.mu.Lock()
defer c.mu.Unlock()
// 分配任务或nil,nil 表示暂无可分配的任务
switch c.phase {
case duringMap:
reply.Task = findAvaliableTask(c.tasks[MapTask])
case duringReduce:
reply.Task = findAvaliableTask(c.tasks[ReduceTask])
case finish:
// 重置定时器
clock.Reset(10 * time.Second)
reply.Task = &Task{
TaskType: AllTasksDone,
}
}
return nil
}
func findAvaliableTask(tasks []*Task) *Task {
for _, task := range tasks {
// 从未分配的任务或超时的任务中分配
if task.Status == Waiting || (task.Status == Runing && time.Now().UnixNano()-task.StartTime > TaskTimeout.Nanoseconds()) {
task.StartTime = time.Now().UnixNano()
task.Status = Runing
return task
}
}
return nil
}
func (c *Coordinator) TaskDoneOr(req *TaskDoneOrRequest, reply *TaskDoneOrReply) error {
c.mu.Lock()
defer c.mu.Unlock()
taskType, id := req.Task.TaskType, req.Task.Id
// 报告 Map 任务的完成情况
if taskType == MapTask && c.tasks[MapTask][id].Status != Done {
c.tasks[MapTask][id] = req.Task // 更新任务状态 Done or Waiting
c.finishedMap++
if c.finishedMap == c.nMap {
c.phase = duringReduce
fmt.Println("coodinator: all map tasks finished")
}
}
// 报告 Reduce 任务的完成情况
if taskType == ReduceTask && c.tasks[ReduceTask][id].Status != Done {
c.tasks[ReduceTask][id] = req.Task // Update Task.Status = Done
c.finishedReduce++
if c.finishedReduce == c.nReduce {
c.phase = finish
fmt.Println("coodinator: all reduce tasks finished")
// 当所有任务已完成,初始化定时器,并启动定时退出程序
clock = *time.NewTimer(10 * time.Second)
go func() {
for {
select {
case <-clock.C:
os.Exit(0) // 超时后正常退出
}
}
}()
}
}
return nil
}
调试及测试
单 worker 调试
-
先运行 coordinator:
go run -race mrcoordinator.go pg-*.txt
-
再载入 plugin 以运行 worker:
go build -race -buildmode=plugin ../mrapps/wc.go
go run -race mrworker.go wc.so
实验脚本测试
- 单次测试:
bash test-mr.sh
- 多次测试:
bash test-mr-many.sh 5