MIT 6.5840 2023 Spring(6.824)LAB1:MapReduce
前言
本次lab主要是完成一个基于RPC远程调用的单机单文件系统的简单MapReduce框架,并完成单词计数任务。基于golang实现,单Master,多Worker。实现worker的奔溃恢复(Fault Torrance),通过超时重新执行实现。主要的任务有,RPC调用参数及返回参数的设计,Master RPC Handler设计,Master控制逻辑设计,Worker任务执行逻辑设计,任务完成判断逻辑设计。
什么是MapReduce?
具体框架图如下
-
一个核心思想:分治
-
两个重要阶段:Map阶段,Reduce阶段
- 在Map阶段中,将一个Task分为多个小Task,发送到不同机器上运行,并将输出的结果根据Key的哈希值,分发到不同的中间文件中。
- 等待所有Map结束后,启动Reduce阶段。在Reduce阶段中,每个Reduce对一个中间文件执行Reduce操作,得到输出,输出为文件
-
MapReduce具体信息,请参照MapReduce原论文。
环境准备
- 操作系统:Ubuntu 22.04 LTS
- golang toolchain:go1.20.3 linux/amd64
- IDE:Goland
- 操作方式:虚拟机安装OS,goland ssh到虚拟机上进行编码
- 版本控制:公司内部的gitlab
本次LAB我的系统整体框架图
概要设计解析
RPC设计
rpc分为调用参数和返回参数。其中,worker调用rpc,可以分为两种行为。
-
初始化调用:Worker第一次调用RPC,这时候Master将为其进行注册WorkerId
-
任务控制信息调用:附带workerId,以及任务的状态,任务ID等,以及任务的输出
- 任务完成
- 任务失败
worker调用rpc后,会进行阻塞,直到rpc调用成功,获得Reply。Reply有两种
-
任务分配Reply
- Map任务
- Reduce任务
-
控制信息Reply
- worker退出控制信息
我的类设计如下
type WorkerArgs struct {
WorkerId int
RequestType int // request的类型
// 下面的两个参数仅任务完成时存在且发送
Output []string // map任务生成的中间文件名或者reduce任务生成的文件名
Input []string
TaskId int // 对应的task id
TaskType int
}
const (
InitialRequest = 0
FinishedRequest = 1
FailedRequest = 2
)
type WorkerReply struct {
WorkerId int // 记录worker id
TaskType int // 任务的类型
NReduce int // reduce需要输出文件的数量
TaskId int // task的id
Input []string
ReduceNum int
ExitMsg bool
}
Master设计
master有哪些职责?
- 回复Worker的RPC调用
- 任务的状态控制
- 崩溃恢复(通过超时重传)
- 任务完成判断
先从简单的讲起
- 任务完成判断:所有Reduce任务完成,并且没有Worker阻塞在RPC调用中,那么任务即完成,可以安全退出。
- 崩溃恢复:超时重新执行任务
- 任务的状态控制:通过单独的任务状态控制模块来实现
- 回复worker rpc调用:单独的handler
其中有一些问题:
- 是否有可能相同任务会造成多次完成?因为有超时重传机制存在,因此可能会有相同任务的多次Finish信息的存在,因此需要在任务控制模块中进行一些相应的控制
- 基于共享内存的状态控制太难写了,很多锁套锁,太难受了,怎么办(一开始我就是这么设计的)?使用基于通信的并发控制,将状态控制解耦,成为一个单独的模块。
- TODO
下面是我的类设计
type Coordinator struct {
NReduce int
InputNum int
Timeout time.Duration
NextWorkerId int
NextWorkerIdMutex sync.Mutex
NextTaskId int
NextTaskIdMutex sync.Mutex
TaskChan chan Task
TaskMsgChan chan TaskMsg
MapResult []string // 收到worker发来的结束消息后,task池里面的task就会删除然后放入MapResult
MapResultMutex sync.Mutex
WorkingTask map[int]TaskStatus // 正在运行的task就放在这里面
WorkingTaskMutex sync.Mutex
//ReduceFinished bool
//ReduceFinishedMutex sync.Mutex
ReduceTaskNum int
ReduceTaskNumMutex sync.Mutex
WorkerNum int
WorkerNumMutex sync.Mutex
// 阻塞在RPC调用的任务的数量
WaitingNum int
WaitingNumMutex sync.Mutex
}
type Task struct {
Input []string
CreateTask bool
TaskType int
TaskId int
ExcludeWorkerId int
ReduceNum int
ExitTask bool
}
type TaskMsg struct {
Msg int
TaskId int
TaskType int
Input []string
Output []string
TimeStamp time.Time
WorkerId int
ReduceNum int
}
const (
CreateTaskMsg = 0
FinishTaskMsg = 1
UpdateTaskMsg = 2
FailedTaskMsg = 3
)
const (
TaskTypeMap = 0
TaskTypeReduce = 1
)
type TaskStatus struct {
TaskId int // task的id
TaskType int // task的类型
Input []string // task的输入
StartTime time.Time // task的开始时间
WorkerId int
ReduceNum int
}
Worker设计
worker设计就比较简单了,其实就是调用RPC,然后对reply做出相应的动作,这里略了。
详细设计
master任务状态控制模块
状态控制信息有四种。
- 任务状态创建信息
case CreateTaskMsg: { taskss := TaskStatus{ msg.TaskId, msg.TaskType, msg.Input, msg.TimeStamp, msg.WorkerId, msg.ReduceNum, } c.WorkingTaskMutex.Lock() c.WorkingTask[taskss.TaskId] = taskss c.WorkingTaskMutex.Unlock() }
- 任务完成信息
case FinishTaskMsg: { c.WorkingTaskMutex.Lock() if _, ok := c.WorkingTask[msg.TaskId]; ok { // 如果存在,那么表示这个task结束了,直接删除 if msg.TaskType == TaskTypeMap { c.MapResultMutex.Lock() c.MapResult = append(c.MapResult, msg.Output...) //fmt.Printf("%v %v", len(c.MapResult), c.InputNum) if len(c.MapResult) == c.InputNum*c.NReduce { // 表示所有map任务都已经完成,可以加入reduce任务了 reduceTask := make([][]string, c.NReduce) for _, mres := range c.MapResult { var idx int var taskId int var workerId int fmt.Sscanf(mres, "mr-int-%d-%d-%d", &taskId, &workerId, &idx) reduceTask[idx] = append(reduceTask[idx], mres) } for i := 0; i < c.NReduce; i++ { task := Task{ reduceTask[i], true, TaskTypeReduce, c.getNextTaskId(), 0, i, false, } c.TaskChan <- task } } c.MapResultMutex.Unlock() } else { c.ReduceTaskNumMutex.Lock() c.ReduceTaskNum -= 1 c.ReduceTaskNumMutex.Unlock() } delete(c.WorkingTask, msg.TaskId) } c.WorkingTaskMutex.Unlock() if c.getReduceTaskNum() == 0 { task := Task{ExitTask: true} for { //进入收尾阶段,啥都不干了 c.TaskChan <- task } } }
- 任务失败信息
case UpdateTaskMsg: { c.WorkingTaskMutex.Lock() if _, ok := c.WorkingTask[msg.TaskId]; ok { c.WorkingTask[msg.TaskId] = TaskStatus{ msg.TaskId, msg.TaskType, msg.Input, msg.TimeStamp, msg.WorkerId, msg.ReduceNum, } } c.WorkingTaskMutex.Unlock() }
- 任务状态更新信息
case FailedTaskMsg: { c.WorkingTaskMutex.Lock() if _, ok := c.WorkingTask[msg.TaskId]; ok { c.TaskChan <- Task{ msg.Input, false, msg.TaskType, msg.TaskId, msg.WorkerId, c.WorkingTask[msg.TaskId].ReduceNum, false, } } c.WorkingTaskMutex.Unlock() }
Master RPC Handler模块
- Handler模块
// worker阻塞数量加1 c.incWaitingNum() defer c.decWaitingNum() if args.RequestType == InitialRequest { reply.WorkerId = c.getNextWorkerId() args.WorkerId = reply.WorkerId c.incWorkerNum() } else if args.RequestType == FinishedRequest { taskmsg := TaskMsg{ FinishTaskMsg, args.TaskId, args.TaskType, args.Input, args.Output, time.Now(), args.WorkerId, 0, } c.TaskMsgChan <- taskmsg } else if args.RequestType == FailedRequest { taskmsg := TaskMsg{ FailedTaskMsg, args.TaskId, args.TaskType, args.Input, args.Output, time.Now(), args.WorkerId, 0, } c.TaskMsgChan <- taskmsg } task := <-c.TaskChan if task.ExitTask { reply.ExitMsg = true c.decWorkerNum() return nil } for task.ExcludeWorkerId == args.WorkerId { c.TaskChan <- task time.Sleep(time.Duration(time.Millisecond)) task = <-c.TaskChan } taskmsg := TaskMsg{} if task.CreateTask { taskmsg.Msg = CreateTaskMsg } else { taskmsg.Msg = UpdateTaskMsg } taskmsg.TaskId = task.TaskId taskmsg.TimeStamp = time.Now() taskmsg.Input = task.Input taskmsg.WorkerId = args.WorkerId taskmsg.TaskType = task.TaskType taskmsg.ReduceNum = task.ReduceNum c.TaskMsgChan <- taskmsg reply.TaskId = taskmsg.TaskId reply.TaskType = taskmsg.TaskType reply.Input = taskmsg.Input reply.NReduce = c.NReduce reply.WorkerId = args.WorkerId reply.ReduceNum = task.ReduceNum return nil
- 辅助函数模块
func (c *Coordinator) getNextTaskId() int { c.NextTaskIdMutex.Lock() defer c.NextTaskIdMutex.Unlock() res := c.NextTaskId c.NextTaskId += 1 return res } func (c *Coordinator) getNextWorkerId() int { c.NextWorkerIdMutex.Lock() defer c.NextWorkerIdMutex.Unlock() res := c.NextWorkerId c.NextWorkerId += 1 return res } func (c *Coordinator) incWorkerNum() { c.WorkerNumMutex.Lock() defer c.WorkerNumMutex.Unlock() c.WorkerNum += 1 } func (c *Coordinator) decWorkerNum() { c.WorkerNumMutex.Lock() defer c.WorkerNumMutex.Unlock() c.WorkerNum -= 1 } func (c *Coordinator) getReduceTaskNum() int { c.ReduceTaskNumMutex.Lock() defer c.ReduceTaskNumMutex.Unlock() return c.ReduceTaskNum } func (c *Coordinator) incWaitingNum() { c.WaitingNumMutex.Lock() defer c.WaitingNumMutex.Unlock() c.WaitingNum += 1 } func (c *Coordinator) decWaitingNum() { c.WaitingNumMutex.Lock() defer c.WaitingNumMutex.Unlock() c.WaitingNum -= 1 } func (c *Coordinator) getWaitingNum() int { c.WaitingNumMutex.Lock() defer c.WaitingNumMutex.Unlock() return c.WaitingNum }
超时重传模块
func (c *Coordinator) CheckTimeout() {
for {
c.WorkingTaskMutex.Lock()
now := time.Now()
for key, _ := range c.WorkingTask {
if now.Sub(c.WorkingTask[key].StartTime) >= c.Timeout {
// 超时了
task := Task{
c.WorkingTask[key].Input,
false,
c.WorkingTask[key].TaskType,
c.WorkingTask[key].TaskId,
c.WorkingTask[key].WorkerId,
c.WorkingTask[key].ReduceNum,
false,
}
c.TaskChan <- task
// 其实这边可以只发送一个msg给task status manager
}
}
c.WorkingTaskMutex.Unlock()
time.Sleep(c.Timeout)
}
}
Master Init模块
func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{}
// Your code here.
c.NReduce = nReduce
c.InputNum = len(files)
c.NextTaskId = 1
c.NextWorkerId = 1
c.TaskChan = make(chan Task, len(files)+1)
c.TaskMsgChan = make(chan TaskMsg, len(files)+1)
c.Timeout = time.Duration(time.Second * 10)
for i := 0; i < len(files); i++ {
input := []string{files[i]}
task := Task{
input,
true,
TaskTypeMap,
c.getNextTaskId(),
0,
0,
false,
}
c.TaskChan <- task
}
c.MapResult = []string{}
c.WorkingTask = make(map[int]TaskStatus)
c.WorkerNum = 0
c.ReduceTaskNum = nReduce
go c.CheckTimeout()
go c.TaskStatusManager()
c.WaitingNum = 0
c.server()
return &c
}
Worker RPC调用模块
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
// Your worker implementation here.
// uncomment to send the Example RPC to the coordinator.
// CallExample()
// initial request
var workerId int
var nReduce int
var taskId int
var taskType int
var inputs []string
var failed bool
var output []string
args := WorkerArgs{WorkerId: 0, RequestType: InitialRequest}
reply := WorkerReply{}
call("Coordinator.WorkerHandler", &args, &reply)
for {
if reply.ExitMsg {
break
}
failed = false
workerId = reply.WorkerId
nReduce = reply.NReduce
taskId = reply.TaskId
taskType = reply.TaskType
inputs = reply.Input
if taskType == TaskTypeMap {
// 执行map操作
intermediate := []KeyValue{}
for _, filename := range inputs {
file, err := os.Open(filename)
if err != nil {
file.Close()
fail(workerId, FailedRequest, make([]string, 0), inputs, taskId, taskType, &reply)
failed = true
break
}
data, err := io.ReadAll(file)
if err != nil {
file.Close()
fail(workerId, FailedRequest, make([]string, 0), inputs, taskId, taskType, &reply)
failed = true
break
}
kva := mapf(filename, string(data))
intermediate = append(intermediate, kva...)
}
sort.Sort(ByKey(intermediate))
files := []*os.File{}
output = []string{}
for i := 0; i < nReduce; i++ {
oname := "mr-int-" + strconv.Itoa(taskId) + "-" + strconv.Itoa(workerId) + "-" + strconv.Itoa(i)
file, _ := os.Create(oname)
file.Seek(0, 0)
files = append(files, file)
output = append(output, oname)
}
for _, kv := range intermediate {
fmt.Fprintf(files[ihash(kv.Key)%nReduce], "%v %v\n", kv.Key, kv.Value)
}
if failed {
continue
}
args = WorkerArgs{
workerId,
FinishedRequest,
output,
inputs,
taskId,
taskType,
}
call("Coordinator.WorkerHandler", &args, &reply)
} else {
// 执行reduce操作
res := ByKey{}
kvs := []KeyValue{}
for _, filename := range inputs {
file, err := os.Open(filename)
if err != nil {
file.Close()
fail(workerId, FailedRequest, make([]string, 0), inputs, taskId, taskType, &reply)
failed = true
break
}
var key string
var value string
for {
if _, err = fmt.Fscanf(file, "%v %v\n", &key, &value); err != nil {
break
}
kvs = append(kvs, KeyValue{key, value})
}
}
sort.Sort(ByKey(kvs))
if failed {
continue
}
i := 0
for i < len(kvs) {
j := i + 1
for j < len(kvs) && kvs[j].Key == kvs[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, kvs[k].Value)
}
output := reducef(kvs[i].Key, values)
res = append(res, KeyValue{kvs[i].Key, output})
i = j
}
oname := "mr-out-" + strconv.Itoa(reply.ReduceNum)
ofile, _ := os.Create(oname)
for _, kv := range res {
_, err := fmt.Fprintf(ofile, "%v %v\n", kv.Key, kv.Value)
if err != nil {
}
}
args = WorkerArgs{
workerId,
FinishedRequest,
output,
inputs,
taskId,
taskType,
}
call("Coordinator.WorkerHandler", &args, &reply)
}
}
}
func fail(workerId int, requestType int, output []string, input []string, taskId int, taskType int, reply *WorkerReply) {
args := WorkerArgs{
workerId,
FailedRequest,
make([]string, 0),
input,
taskId,
taskType,
}
call("Coordinator.WorkerHandler", &args, reply)
}
测试结果
通过了10轮压力测试,开启了DATA RACE检测选项
吐槽,碎碎念
golang没有枚举类型真的好难用啊。。。如果有Rust那样强大的枚举类型就好了。
标签:6.824,string,int,6.5840,Coordinator,任务,2023,msg,reply From: https://www.cnblogs.com/alyjay/p/17302388.html