首页 > 其他分享 >MIT 6.824-Lab1. MapReduce 实现思路

MIT 6.824-Lab1. MapReduce 实现思路

时间:2023-03-14 17:34:12浏览次数:50  
标签:6.824 err fmt worker MapReduce Lab1 task 任务 Task

参考资料

  1. MapReduce 论文
  2. ((20221029103443-4pqvf1n "Lecture - MapReduce"))
  3. Lab1 实验文档
  4. Lab1 实验文档译文

任务需求

在一个分布式存储系统上(实验是单机),实现 coordinator 和 worker,同一时刻有 1 个 coordinator process、多个 worker process 运行,彼此间使用 RPC 进行通信

Coordinator

  1. 负责为每个 worker 分配 Map 或 Reduce 任务。
  2. 确保每个文件只被处理一次,且最终输出的结果是正确的。
  3. 处理 worker 超时或崩溃,将任务重新分配给其他的 worker。

Worker

  1. 向 coordinator 请求任务询问 coordinator 以从文件中读取任务输入,并将任务输出写入到文件中。
  2. 对于 map 任务,将输入为 pg-xxx.txt 的文档处理为一组键值对,并按 reduce 任务的数量分桶,得到一组中间文件 mr-X-Y。
  3. 对于 reduce 任务,将属于当前任务的中间文件全部读入,排序后输出到结果文件 mr-out-n 中。

设计选择

Channel-based or Mutex-based?

一些实现中希望做到 lock-free,但他们仅仅是不显式使用互斥锁,而用 channel 来替代 [1-2]。lock-free 本质上意味着阻止线程休眠,以减少“锁”带来的系统开销,但并不代表着不使用互斥锁 [3-4]。一种实现lock-free的方式是CAS,在 Go 中, channel 的底层结构包含 Sync.Mutex 互斥锁 [5],阻塞时也会使用到互斥锁,而 Sync.Mutex 使用了 CAS 以减少调度开销 [6],因此 channel 还是 mutex 从 lock-free 的角度是相差无几的。
那么如何决定使用 channel 还是 mutex 呢?参考 [7],当需要关注数据流动时使用channel,在本次实验中流动的数据是“任务”,因此可以考虑组织一个任务队列,以生产者消费者模式分发任务。当希望实现对某个共享资源的并发访问控制,只需要锁定少量共享资源时,可以使用 mutex,更简单直观。二者的使用并非是冲突的,需要结合场景选择,但需要注意的是,channel 中数据流动的本质在于值的复制,会带来额外的开销。
在本文的实现中仅采用 mutex,因为可以发现,对于一个分配给 worker 的特定的任务而言,状态可能会从“运行”转变为“失败”,这种转变既可能来自于 worker 执行失败后的响应,也可能来自于 coordinator 发现任务超时。一个特定的任务队列具有不同的输入角色可能会增加复杂性。

[1] MIT6.824-2021/lab1.md at master · OneSizeFitsQuorum/MIT6.824-2021 (github.com)

[2] 6.824-2022-public/lab1.md at main · endless-hu/6.824-2022-public (github.com)

[3] 并发与并行编程(三):无锁并发(Lock-Free Concurrency) - 知乎 (zhihu.com)

[4] 简化概念下的 lock-free 编程 - 知乎 (zhihu.com)

[5] ((20220910133356-gofuyu1 '通道底层原理'))

[6] ((20220821221809-znrk31q 'Sync.Mutex 互斥锁'))

[7] Golang并发:再也不愁选channel还是选锁 - Go语言实战 - SegmentFault 思否

Coordinator 需要保存 Worker 的信息吗?

考虑一个分布式存储系统,在其上的 worker 应该都是平等的,coordinator 也无需关注是否新增或崩溃了任一 worker,仅需要考虑每个任务都被正确执行且被执行一次,因此本文实现的 coordinator 不保存 worker 信息。需要注意的时,这也意味着 coordinator 无法主动询问 worker,RPC 请求是单向的。

Coordinator 如何发现任务超时并重新分配?

一种做法是启动一个 goroutine 定时检查任务的状态,但在本文的实现中是不必要的,因为 coordinator 由于没有保存 worker 的信息,即使发现了超时也无法主动重新分配任务。在本文的实现中,coordinator 仅在收到了任务分配请求的情况下贪婪地去查询可分配的任务,其中包括超时的任务。需要注意的是,为简单起见,本文没有将不同状态的任务分离,但这会导致额外的遍历开销。

Coordinator 如何识别过期的响应

一种做法是在请求中携带 Term,当超时并重新分配后 Term+1。本文考虑到 Task 本身要保存分配的时间,为了简单起见,传递时间戳而非 Term。但需要注意的是,由于数据类型不同,这种方式相比于传递 Term 需要更多的开销。

Coordinator 和 worker正确退出

正常退出是指 Woker 从 Coordinator 处了解到所有任务完成后退出,Coordinator 通知完所有Worker后退出。不能简单地根据 RPC 失败就认为 Coordinator 已完成任务退出,因为 RPC 失败也可能是网络丢包导致的。
由于本文中设计的 Coordinator 无法主动通知 worker,仅能通过 Worker 请求任务时才能同步任务已完成的消息。这样可以解决 Worker 的退出问题,但 Coordinator 无法知道是否已通知所有的 worker。本文的解决方案是设置一个定时器,这个定时器的时间间隔应该远大于任务从分配到响应的耗时,并在每次响应 worker 后充值。这样可以保证未崩溃的的 worker 都能在超时前通过请求任务来同步任务完成情况,超时则表明所有正常工作(除崩溃和网络故障外)的 worker 都已通知完毕。

如何处理异常?

在本次实验中,有可能出现的异常包括:任务失败但正常返回、任务成功但超时返回、worker崩溃任务超时且无返回:

  • 任务失败但正常响应:coordinator 应检查任务的完成状态。

  • 任务成功但超时响应:

    • coordinator 应检查返回对应的任务是否已超时,超时则丢弃响应;
    • 如果任务已成功,重新分配到任务的worker应能感知这种成功并避免重复处理。
  • worker崩溃任务超时且无响应:

    • coordinator 在分配任务时检查超时任务并重新分配;
    • worker 如果是程序性崩溃,应该在退出前执行清理工作;
    • worker 如果是机器导致的崩溃,无法执行清理工作,则其他重新分配到任务的worker应该感知到这种崩溃以避免被未完成的清理工作影响

实现细节

Hints: One way to get started is to modify mr/worker.go's Worker() to send an RPC to the coordinator asking for a task. Then modify the coordinator to respond with the file name of an as-yet-unstarted map task. Then modify the worker to read that file and call the application Map function, as in mrsequential.go.

Worker 实现

Worker 工作流程表现为三段式:1. 通过RPC调用获取任务;2. 根据任务类型执行任务;3.将任务执行情况返回给Coordinator。
在设计时,需要避免worker提前退出导致Coordinator无法分配未完成的任务,以及保证worker的正常退出。本文将任务完成报告这一职责交由Map和Reduce各自的执行函数,以分离不同的响应和不同的异常收尾流程。

// main/mrworker.go calls this function.
func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {

	for {
		// 1. 通过 RPC 调用获取任务
		task := callGetTask()
		if task == nil {
			// 无可用任务,但仍有任务未完成,不可提前退出
			fmt.Println("task is nil, continue")
			time.Sleep(2 * time.Second)
			continue
		}

		fmt.Printf("worker: receive coordinators get task: %v\n", task)

		// 2. 执行特定的任务
		switch task.TaskType {
		case MapTask:
			doMap(mapf, task)
		case ReduceTask:
			doReduce(reducef, task)
		case AllTasksDone:
			// 当收到 Coordinator 的退出信号时,正常退出
			return
		default:
			// 非期望的任务类型,重试
			fmt.Printf("unexpected TaskType %v", task.TaskType)
			time.Sleep(time.Second)
			continue
		}

		// 3. 通过 RPC 调用,报告 Map/Reduce 任务完成状况
	}
}

Uber go style guide 中提到应在生产代码中避免使用panic。但可以注意到以下代码中,错误被当做异常通过 panic-recover 的形式捕获,这种实现方式是否合理呢?

笔者个人认为是合理的。首先是错误和可能的 IO panic 是否有区分的必要?在本次实验中错误没有被本层或上层恢复的可能,均体现为本次任务的执行失败,因此无需向上层传递错误,自然也可以将错误或panic在本层截获以生成任务失败的响应。其次是 panic 完全在函数内截获,仅能表示当前任务的失败,而不会影响到 worker 的正常执行,不会导致 worker 的崩溃。

要小心的是,为了保证任务仅被正确执行一次,Map 和 Reduce 都需要以重命名的方式检查任务是否冲突。但二者略有差别,Map 任务由于需要重命名一系列中间文件,也存在中途崩溃的可能。在这种情况下,后续执行该任务的 worker 在前几次重命名时却会检查到冲突,导致该任务始终无法被完成。因此 Map 任务需要在重命名时感知曾经的worker是否在重命名时崩溃,并处理失效的输出文件。
另外需要明确,删除崩溃导致的输出文件应交由下一个worker进行,而中间文件应交由当前任务来删除。中间文件在正常情况下能被覆盖重命名,但有可能出现一种情况:任务被正确处理,但响应超时。任务会被重新分配,而下一个worker仅会在重命名时感知到任务被正确处理,直接退出可能会导致中间文件残留,占用大量存储空间。

func doMap(mapf func(string, string) []KeyValue, task *Task) {
	// 异常处理
	defer func() {
		task.Status = Done
		if err := recover(); err != nil {
			// panic 前执行收尾工作
			fmt.Printf("worker: doMap panic, %s", err)
			task.Status = Waiting // 通知 Coordinator 本次任务执行失败
		}
		callTaskDoneOr(task)

		// 删除可能存在的中间文件
		for i := 0; i < task.NReduce; i++ {
			intermediateFile := fmt.Sprintf("./intermediates/mr-%s-%d.tmp%v", strings.TrimSuffix(filepath.Base(task.FileNames[0]), ".txt"), i, task.StartTime)
			os.Remove(intermediateFile)
		}
	}()

	// 1. 读取输入
	content, err := ioutil.ReadFile(task.FileNames[0])
	if err != nil {
		panic(fmt.Sprintf("worker crash on map task, can not read %s", task.FileNames[0]))
	}

	// 2. 覅用 plugin map function
	kvs := mapf(task.FileNames[0], string(content))

	// 3. 将键值对写入到 nReduce 个中间文件中
	bucketfiles := make([]*os.File, task.NReduce)
	os.MkdirAll("./intermediates/", os.ModeDir) // create dir and parents if not exists
	for i := 0; i < task.NReduce; i++ {
		// 创建中间文件
		intermediateFile := fmt.Sprintf("./intermediates/mr-%s-%d.tmp%v", strings.TrimSuffix(filepath.Base(task.FileNames[0]), ".txt"), i, task.StartTime)
		f, err := os.OpenFile(intermediateFile, os.O_CREATE|os.O_RDWR|os.O_APPEND, os.ModePerm) // ModePerm=0777
		if err != nil {
			panic(err)
		}
		defer f.Close()
		os.Chmod(intermediateFile, os.ModePerm) // Go 创建文件设置权限的坑,重新设置保证权限的正确
		bucketfiles[i] = f
	}

	// 将键值对写入到对应中间文件中
	for _, kv := range kvs {
		idx := ihash(kv.Key) % task.NReduce
		_, err := bucketfiles[idx].WriteString(fmt.Sprintf("%s %s\n", kv.Key, kv.Value))
		if err != nil {
			// fail to wrtite
			panic(fmt.Sprintf("worker crash on map task, fail append kv (%s, %s) to intermediate file %s", kv.Key, kv.Value, bucketfiles[idx].Name()))
		}
	}

	// 5. 保证输入仅被正确处理一次
	for _, file := range bucketfiles {
		oname := strings.TrimSuffix(file.Name(), fmt.Sprintf(".tmp%v", task.StartTime))
		// 如果重命名失败,说明已有其他 worker 执行了该任务,但由于需要重命名多个文件,仍可能在过程中崩溃,导致只重命名了部分文件
		if err := os.Rename(file.Name(), oname); err != nil {
			lastfile := strings.TrimSuffix(bucketfiles[len(bucketfiles)-1].Name(), fmt.Sprintf(".tmp%v", task.StartTime))
			// 如果最后一个中间文件未被重命名,说明其他worker在重命名时崩溃
			fmt.Println(err)
			if _, err := os.Lstat(lastfile); err != nil {
				// 尝试删除已被命名的文件并重试重命名
				os.Remove(strings.TrimSuffix(file.Name(), fmt.Sprintf(".tmp%v", task.StartTime)))
				os.Rename(file.Name(), strings.TrimSuffix(file.Name(), fmt.Sprintf(".tmp%v", task.StartTime)))
			}
		}
	}

}

func doReduce(reducef func(string, []string) string, task *Task) {
	defer func() {
		task.Status = Done
		if err := recover(); err != nil {
			// panic 前的收尾工作
			fmt.Printf("worker: doReduce panic, %s\n", err)
			task.Status = Waiting // 通知 coordinator 任务失败
		}
		callTaskDoneOr(task)

		// 删除可能的中间文件
		os.Remove(fmt.Sprintf("mr-out-%d.tmp%v", task.Id, task.StartTime))
	}()

	var kvs ByKey // kvs sorted by key

	// 1. 读取中间文件
	for _, fileName := range task.FileNames {
		f, err := os.Open(fileName)
		if err != nil {
			panic(fmt.Sprintf("worker crash on reduce task, fail to open %s", fileName))
		}
		defer f.Close()

		// 逐行读取
		sc := bufio.NewScanner(f)
		for sc.Scan() {
			kvStrs := strings.Fields(sc.Text())
			if len(kvStrs) != 2 {
				panic("worker crash on reduce task, kv fomat wrong")
			}
			kvs = append(kvs, KeyValue{
				Key:   kvStrs[0],
				Value: kvStrs[1],
			})
		}
	}

	sort.Sort(kvs) // 根据 key 排序

	// 2. 写入到一个临时文件中
	oname := fmt.Sprintf("mr-out-%d.tmp%v", task.Id, task.StartTime)
	ofile, err := os.Create(oname)
	if err != nil {
		panic(err)
	}
	defer ofile.Close()

	i := 0 // start idx for same keys
	for i < len(kvs) {
		j := i + 1 // end idx for same keys
		for j < len(kvs) && kvs[j].Key == kvs[i].Key {
			j++
		}

		// collect values with the same key
		values := []string{}
		for k := i; k < j; k++ {
			values = append(values, kvs[k].Value)
		}
		output := reducef(kvs[i].Key, values)

		// this is the correct format for each line of Reduce output.
		fmt.Fprintf(ofile, "%v %v\n", kvs[i].Key, output)

		i = j
	}

	// 3. 重命名以保证任务仅正确执行一次
	os.Rename(ofile.Name(), strings.TrimSuffix(ofile.Name(), fmt.Sprintf(".tmp%v", task.StartTime)))
}

Task 及 RPC 消息定义

Task 中 Map/Reduce 的输入文件均使用同一个变量保存,但对于 Map 任务而言,输入文件列表的长度始终为 1。在定义任务类型时,考虑到 0 通常表示一种异常值,为了避免歧义从 1 开始设置。

为简单起见,RPC 消息传递整个任务的信息,但要意识到这种做法存在冗余的开销。

// Add your RPC definitions here.
type TaskType int

const (
	_ TaskType = iota // 0 is unknown
	MapTask
	ReduceTask
	AllTasksDone
)

type TaskStatus int

const (
	Waiting TaskStatus = iota // wait for getting
	Runing
	Done
)

type Task struct {
	TaskType  TaskType
	FileNames []string // len slice for map task is 1
	Id        int
	StartTime int64 // time.Time().UnixNano()
	Status    TaskStatus
	NReduce   int
}

type GetTaskRequest struct {
	// all workers are equal
}

type GetTaskReply struct {
	Task *Task
}

type TaskDoneOrRequest struct {
	Task *Task
}

type TaskDoneOrReply struct {
	// coordinator does not reply
}

Coordinator 实现

const (
	TaskTimeout = 10 * time.Second // 任务超时
)

type phase int

const (
	duringMap phase = iota
	duringReduce
	finish
)

var clock time.Timer // 可重用的计时器

type Coordinator struct {
	// Your definitions here.
	tasks map[TaskType][]*Task

	nMap    int
	nReduce int

	phase          phase
	finishedMap    int
	finishedReduce int

	mu sync.Mutex
}

// Your code here -- RPC handlers for the worker to call.
func (c *Coordinator) GetTask(req *GetTaskRequest, reply *GetTaskReply) error {
	c.mu.Lock()
	defer c.mu.Unlock()

	// 分配任务或nil,nil 表示暂无可分配的任务
	switch c.phase {
	case duringMap:
		reply.Task = findAvaliableTask(c.tasks[MapTask])
	case duringReduce:
		reply.Task = findAvaliableTask(c.tasks[ReduceTask])
	case finish:
		// 重置定时器
		clock.Reset(10 * time.Second)

		reply.Task = &Task{
			TaskType: AllTasksDone,
		}
	}

	return nil
}

func findAvaliableTask(tasks []*Task) *Task {
	for _, task := range tasks {
		// 从未分配的任务或超时的任务中分配
		if task.Status == Waiting || (task.Status == Runing && time.Now().UnixNano()-task.StartTime > TaskTimeout.Nanoseconds()) {
			task.StartTime = time.Now().UnixNano()
			task.Status = Runing
			return task
		}
	}
	return nil
}

func (c *Coordinator) TaskDoneOr(req *TaskDoneOrRequest, reply *TaskDoneOrReply) error {
	c.mu.Lock()
	defer c.mu.Unlock()
	taskType, id := req.Task.TaskType, req.Task.Id
	// 报告 Map 任务的完成情况
	if taskType == MapTask && c.tasks[MapTask][id].Status != Done {
		c.tasks[MapTask][id] = req.Task // 更新任务状态 Done or Waiting
		c.finishedMap++
		if c.finishedMap == c.nMap {
			c.phase = duringReduce
			fmt.Println("coodinator: all map tasks finished")
		}
	}

	// 报告 Reduce 任务的完成情况
	if taskType == ReduceTask && c.tasks[ReduceTask][id].Status != Done {
		c.tasks[ReduceTask][id] = req.Task // Update Task.Status = Done
		c.finishedReduce++
		if c.finishedReduce == c.nReduce {
			c.phase = finish
			fmt.Println("coodinator: all reduce tasks finished")

			// 当所有任务已完成,初始化定时器,并启动定时退出程序
			clock = *time.NewTimer(10 * time.Second)
			go func() {
				for {
					select {
					case <-clock.C:
						os.Exit(0) // 超时后正常退出
					}
				}
			}()

		}
	}

	return nil
}

调试及测试

单 worker 调试

  1. 先运行 coordinator:go run -race mrcoordinator.go pg-*.txt

  2. 再载入 plugin 以运行 worker:

    • go build -race -buildmode=plugin ../mrapps/wc.go
    • go run -race mrworker.go wc.so

实验脚本测试

  1. 单次测试:bash test-mr.sh
  2. 多次测试:bash test-mr-many.sh 5

测试通过情况

标签:6.824,err,fmt,worker,MapReduce,Lab1,task,任务,Task
From: https://www.cnblogs.com/porient/p/17215701.html

相关文章

  • mit6.824 raft
    lab2A这部分的内容是用来实现leaderelection过程,按照解释,每个节点创建之初都为follower都会有一个超时时间,超时之后我们进入leaderelection状态。过程需要currentTIme+......
  • MapReduce原理——切片代码分析
    (1)程序先找到数据存储的目录(2)遍历目录对每个文件进行切片(3)遍历一个文件:获取文件大小计算切片大小默认情况下,切片大小等于blocksize......
  • MapReduce框架原理
    原理一:切片与MapTask并行度决定机制MapTask之前了解到了,他是在分布式程序在map阶段的一个进程,管理之一个map任务类似于一个master。那么什么是切片?说起切片,很明......
  • MapReduce概述
    MapReduce是一种分布式运算程序的编程框架,是用户开发“基于hadoop数据分析应用”的核心框架。核心功能是用户编写的业务逻辑代码和系统自带的组件组合在一起,构成一个分布......
  • 6.824 Lab 2
    题目:http://nil.csail.mit.edu/6.824/2022/labs/lab-raft.htmlPart2A:Leaderelection为了防止所有节点同时发起选举形成活锁,节点的超时时间需要带有随机性,论文中推荐1......
  • 6.824 Lab 3
    题目:http://nil.csail.mit.edu/6.824/2022/labs/lab-kvraft.htmlPartAPartA主要有两个场景:任务1:正常场景,没有消息丢失和节点失效任务2:异常场景,例如server失效导致消......
  • 6.824 Lab 4
    题目:http://nil.csail.mit.edu/6.824/2022/labs/lab-shard.html两个组件:replicagroups:每个group负责一部分shards;shardcontroller:决定哪一个replicagroup负责哪一个......
  • Google_MapReduce中文版
    笔者最近在看MIT6.824的lab1,实验内容是实现一个简易的MapReduce。本篇文章是MapReduce论文的中文翻译。@Author:Akai-yuan@更新时间:2023/2/13摘要MapReduce是一个编程......
  • mit 6.824 lab1 思路贴
    前言为遵守mit的约定,这个帖子不贴太多具体的代码,主要聊聊自己在码代码时的一些想法和遇到的问题。这个实验需要我们去实现一个map-reduce的功能。实质上,这个实验分为......
  • MapReduce文件切分个数计算方法
    Hadoop的MapReduce计算的第一个阶段是InputFormat处理的,先将文件进行切分,然后将每个切分传递给每个Map任务来执行,本文阐述切分个数,也就是Map任务数目的计算方法;Hadoop首......