首页 > 其他分享 >MIT 6.5840 2023 Spring(6.824)LAB1:MapReduce

MIT 6.5840 2023 Spring(6.824)LAB1:MapReduce

时间:2023-04-10 11:33:42浏览次数:43  
标签:6.824 string int 6.5840 Coordinator 任务 2023 msg reply

MIT 6.5840 2023 Spring(6.824)LAB1:MapReduce

前言

本次lab主要是完成一个基于RPC远程调用的单机单文件系统的简单MapReduce框架,并完成单词计数任务。基于golang实现,单Master,多Worker。实现worker的奔溃恢复(Fault Torrance),通过超时重新执行实现。主要的任务有,RPC调用参数及返回参数的设计,Master RPC Handler设计,Master控制逻辑设计,Worker任务执行逻辑设计,任务完成判断逻辑设计。

什么是MapReduce?

具体框架图如下

  • 一个核心思想:分治

  • 两个重要阶段:Map阶段,Reduce阶段

    1. 在Map阶段中,将一个Task分为多个小Task,发送到不同机器上运行,并将输出的结果根据Key的哈希值,分发到不同的中间文件中。
    2. 等待所有Map结束后,启动Reduce阶段。在Reduce阶段中,每个Reduce对一个中间文件执行Reduce操作,得到输出,输出为文件
  • MapReduce具体信息,请参照MapReduce原论文。

环境准备

  • 操作系统:Ubuntu 22.04 LTS
  • golang toolchain:go1.20.3 linux/amd64
  • IDE:Goland
  • 操作方式:虚拟机安装OS,goland ssh到虚拟机上进行编码
  • 版本控制:公司内部的gitlab

本次LAB我的系统整体框架图

概要设计解析

RPC设计

rpc分为调用参数和返回参数。其中,worker调用rpc,可以分为两种行为。

  • 初始化调用:Worker第一次调用RPC,这时候Master将为其进行注册WorkerId

  • 任务控制信息调用:附带workerId,以及任务的状态,任务ID等,以及任务的输出

    1. 任务完成
    2. 任务失败

worker调用rpc后,会进行阻塞,直到rpc调用成功,获得Reply。Reply有两种

  • 任务分配Reply

    1. Map任务
    2. Reduce任务
  • 控制信息Reply

    1. worker退出控制信息

我的类设计如下

type WorkerArgs struct {
	WorkerId    int
	RequestType int // request的类型

	// 下面的两个参数仅任务完成时存在且发送
	Output   []string // map任务生成的中间文件名或者reduce任务生成的文件名
	Input    []string
	TaskId   int // 对应的task id
	TaskType int
}

const (
	InitialRequest  = 0
	FinishedRequest = 1
	FailedRequest   = 2
)

type WorkerReply struct {
	WorkerId  int // 记录worker id
	TaskType  int // 任务的类型
	NReduce   int // reduce需要输出文件的数量
	TaskId    int // task的id
	Input     []string
	ReduceNum int
	ExitMsg   bool
}

Master设计

master有哪些职责?

  1. 回复Worker的RPC调用
  2. 任务的状态控制
  3. 崩溃恢复(通过超时重传)
  4. 任务完成判断

先从简单的讲起

  • 任务完成判断:所有Reduce任务完成,并且没有Worker阻塞在RPC调用中,那么任务即完成,可以安全退出。
  • 崩溃恢复:超时重新执行任务
  • 任务的状态控制:通过单独的任务状态控制模块来实现
  • 回复worker rpc调用:单独的handler

其中有一些问题:

  1. 是否有可能相同任务会造成多次完成?因为有超时重传机制存在,因此可能会有相同任务的多次Finish信息的存在,因此需要在任务控制模块中进行一些相应的控制
  2. 基于共享内存的状态控制太难写了,很多锁套锁,太难受了,怎么办(一开始我就是这么设计的)?使用基于通信的并发控制,将状态控制解耦,成为一个单独的模块。
  3. TODO

下面是我的类设计

type Coordinator struct {
	NReduce  int
	InputNum int
	Timeout  time.Duration

	NextWorkerId      int
	NextWorkerIdMutex sync.Mutex

	NextTaskId      int
	NextTaskIdMutex sync.Mutex

	TaskChan    chan Task
	TaskMsgChan chan TaskMsg

	MapResult      []string // 收到worker发来的结束消息后,task池里面的task就会删除然后放入MapResult
	MapResultMutex sync.Mutex

	WorkingTask      map[int]TaskStatus // 正在运行的task就放在这里面
	WorkingTaskMutex sync.Mutex

	//ReduceFinished      bool
	//ReduceFinishedMutex sync.Mutex

	ReduceTaskNum      int
	ReduceTaskNumMutex sync.Mutex

	WorkerNum      int
	WorkerNumMutex sync.Mutex

	// 阻塞在RPC调用的任务的数量
	WaitingNum      int
	WaitingNumMutex sync.Mutex
}
type Task struct {
	Input           []string
	CreateTask      bool
	TaskType        int
	TaskId          int
	ExcludeWorkerId int
	ReduceNum       int
	ExitTask        bool
}
type TaskMsg struct {
	Msg       int
	TaskId    int
	TaskType  int
	Input     []string
	Output    []string
	TimeStamp time.Time
	WorkerId  int
	ReduceNum int
}

const (
	CreateTaskMsg = 0
	FinishTaskMsg = 1
	UpdateTaskMsg = 2
	FailedTaskMsg = 3
)
const (
	TaskTypeMap    = 0
	TaskTypeReduce = 1
)

type TaskStatus struct {
	TaskId    int       // task的id
	TaskType  int       // task的类型
	Input     []string  // task的输入
	StartTime time.Time // task的开始时间
	WorkerId  int
	ReduceNum int
}

Worker设计

worker设计就比较简单了,其实就是调用RPC,然后对reply做出相应的动作,这里略了。

详细设计

master任务状态控制模块

状态控制信息有四种。

  • 任务状态创建信息
    case CreateTaskMsg:
    		{
    			taskss := TaskStatus{
    				msg.TaskId,
    				msg.TaskType,
    				msg.Input,
    				msg.TimeStamp,
    				msg.WorkerId,
    				msg.ReduceNum,
    			}
    			c.WorkingTaskMutex.Lock()
    			c.WorkingTask[taskss.TaskId] = taskss
    			c.WorkingTaskMutex.Unlock()
    		}
    
  • 任务完成信息
    case FinishTaskMsg:
    		{
    			c.WorkingTaskMutex.Lock()
    			if _, ok := c.WorkingTask[msg.TaskId]; ok {
    				// 如果存在,那么表示这个task结束了,直接删除
    				if msg.TaskType == TaskTypeMap {
    					c.MapResultMutex.Lock()
    					c.MapResult = append(c.MapResult, msg.Output...)
    					//fmt.Printf("%v %v", len(c.MapResult), c.InputNum)
    					if len(c.MapResult) == c.InputNum*c.NReduce {
    						// 表示所有map任务都已经完成,可以加入reduce任务了
    						reduceTask := make([][]string, c.NReduce)
    
    						for _, mres := range c.MapResult {
    							var idx int
    							var taskId int
    							var workerId int
    							fmt.Sscanf(mres, "mr-int-%d-%d-%d", &taskId, &workerId, &idx)
    							reduceTask[idx] = append(reduceTask[idx], mres)
    						}
    						for i := 0; i < c.NReduce; i++ {
    							task := Task{
    								reduceTask[i],
    								true,
    								TaskTypeReduce,
    								c.getNextTaskId(),
    								0,
    								i,
    								false,
    							}
    							c.TaskChan <- task
    						}
    					}
    					c.MapResultMutex.Unlock()
    				} else {
    					c.ReduceTaskNumMutex.Lock()
    					c.ReduceTaskNum -= 1
    					c.ReduceTaskNumMutex.Unlock()
    				}
    				delete(c.WorkingTask, msg.TaskId)
    			}
    			c.WorkingTaskMutex.Unlock()
    			if c.getReduceTaskNum() == 0 {
    				task := Task{ExitTask: true}
    				for {
    					//进入收尾阶段,啥都不干了
    					c.TaskChan <- task
    				}
    			}
    		}
    
  • 任务失败信息
    case UpdateTaskMsg:
    		{
    			c.WorkingTaskMutex.Lock()
    			if _, ok := c.WorkingTask[msg.TaskId]; ok {
    				c.WorkingTask[msg.TaskId] = TaskStatus{
    					msg.TaskId,
    					msg.TaskType,
    					msg.Input,
    					msg.TimeStamp,
    					msg.WorkerId,
    					msg.ReduceNum,
    				}
    			}
    			c.WorkingTaskMutex.Unlock()
    		}
    
  • 任务状态更新信息
    case FailedTaskMsg:
    		{
    			c.WorkingTaskMutex.Lock()
    			if _, ok := c.WorkingTask[msg.TaskId]; ok {
    				c.TaskChan <- Task{
    					msg.Input,
    					false,
    					msg.TaskType,
    					msg.TaskId,
    					msg.WorkerId,
    					c.WorkingTask[msg.TaskId].ReduceNum,
    					false,
    				}
    			}
    			c.WorkingTaskMutex.Unlock()
    		}
    

Master RPC Handler模块

  • Handler模块
        // worker阻塞数量加1
        c.incWaitingNum()
        defer c.decWaitingNum()
        if args.RequestType == InitialRequest {
            reply.WorkerId = c.getNextWorkerId()
            args.WorkerId = reply.WorkerId
            c.incWorkerNum()
        } else if args.RequestType == FinishedRequest {
            taskmsg := TaskMsg{
                FinishTaskMsg,
                args.TaskId,
                args.TaskType,
                args.Input,
                args.Output,
                time.Now(),
                args.WorkerId,
                0,
            }
            c.TaskMsgChan <- taskmsg
        } else if args.RequestType == FailedRequest {
            taskmsg := TaskMsg{
                FailedTaskMsg,
                args.TaskId,
                args.TaskType,
                args.Input,
                args.Output,
                time.Now(),
                args.WorkerId,
                0,
            }
            c.TaskMsgChan <- taskmsg
        }
        task := <-c.TaskChan
        if task.ExitTask {
            reply.ExitMsg = true
            c.decWorkerNum()
            return nil
        }
        for task.ExcludeWorkerId == args.WorkerId {
            c.TaskChan <- task
            time.Sleep(time.Duration(time.Millisecond))
            task = <-c.TaskChan
        }
        taskmsg := TaskMsg{}
        if task.CreateTask {
            taskmsg.Msg = CreateTaskMsg
        } else {
            taskmsg.Msg = UpdateTaskMsg
        }
        taskmsg.TaskId = task.TaskId
        taskmsg.TimeStamp = time.Now()
        taskmsg.Input = task.Input
        taskmsg.WorkerId = args.WorkerId
        taskmsg.TaskType = task.TaskType
        taskmsg.ReduceNum = task.ReduceNum
        c.TaskMsgChan <- taskmsg
        reply.TaskId = taskmsg.TaskId
        reply.TaskType = taskmsg.TaskType
        reply.Input = taskmsg.Input
        reply.NReduce = c.NReduce
        reply.WorkerId = args.WorkerId
        reply.ReduceNum = task.ReduceNum
        return nil
    
  • 辅助函数模块
    func (c *Coordinator) getNextTaskId() int {
    c.NextTaskIdMutex.Lock()
    defer c.NextTaskIdMutex.Unlock()
    res := c.NextTaskId
    c.NextTaskId += 1
    return res
    }
    func (c *Coordinator) getNextWorkerId() int {
        c.NextWorkerIdMutex.Lock()
        defer c.NextWorkerIdMutex.Unlock()
        res := c.NextWorkerId
        c.NextWorkerId += 1
        return res
    }
    func (c *Coordinator) incWorkerNum() {
        c.WorkerNumMutex.Lock()
        defer c.WorkerNumMutex.Unlock()
        c.WorkerNum += 1
    }
    func (c *Coordinator) decWorkerNum() {
        c.WorkerNumMutex.Lock()
        defer c.WorkerNumMutex.Unlock()
        c.WorkerNum -= 1
    }
    func (c *Coordinator) getReduceTaskNum() int {
        c.ReduceTaskNumMutex.Lock()
        defer c.ReduceTaskNumMutex.Unlock()
        return c.ReduceTaskNum
    }
    func (c *Coordinator) incWaitingNum() {
        c.WaitingNumMutex.Lock()
        defer c.WaitingNumMutex.Unlock()
        c.WaitingNum += 1
    }
    func (c *Coordinator) decWaitingNum() {
        c.WaitingNumMutex.Lock()
        defer c.WaitingNumMutex.Unlock()
        c.WaitingNum -= 1
    }
    func (c *Coordinator) getWaitingNum() int {
        c.WaitingNumMutex.Lock()
        defer c.WaitingNumMutex.Unlock()
        return c.WaitingNum
    }
    
    

超时重传模块

func (c *Coordinator) CheckTimeout() {
	for {
		c.WorkingTaskMutex.Lock()
		now := time.Now()
		for key, _ := range c.WorkingTask {
			if now.Sub(c.WorkingTask[key].StartTime) >= c.Timeout {
				// 超时了
				task := Task{
					c.WorkingTask[key].Input,
					false,
					c.WorkingTask[key].TaskType,
					c.WorkingTask[key].TaskId,
					c.WorkingTask[key].WorkerId,
					c.WorkingTask[key].ReduceNum,
					false,
				}
				c.TaskChan <- task
				// 其实这边可以只发送一个msg给task status manager
			}
		}
		c.WorkingTaskMutex.Unlock()
		time.Sleep(c.Timeout)
	}
}

Master Init模块

func MakeCoordinator(files []string, nReduce int) *Coordinator {
	c := Coordinator{}
	// Your code here.
	c.NReduce = nReduce
	c.InputNum = len(files)
	c.NextTaskId = 1
	c.NextWorkerId = 1
	c.TaskChan = make(chan Task, len(files)+1)
	c.TaskMsgChan = make(chan TaskMsg, len(files)+1)
	c.Timeout = time.Duration(time.Second * 10)
	for i := 0; i < len(files); i++ {
		input := []string{files[i]}
		task := Task{
			input,
			true,
			TaskTypeMap,
			c.getNextTaskId(),
			0,
			0,
			false,
		}
		c.TaskChan <- task
	}
	c.MapResult = []string{}
	c.WorkingTask = make(map[int]TaskStatus)
	c.WorkerNum = 0
	c.ReduceTaskNum = nReduce
	go c.CheckTimeout()
	go c.TaskStatusManager()
	c.WaitingNum = 0
	c.server()
	return &c
}

Worker RPC调用模块

func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {
	// Your worker implementation here.

	// uncomment to send the Example RPC to the coordinator.
	// CallExample()
	// initial request
	var workerId int
	var nReduce int
	var taskId int
	var taskType int
	var inputs []string
	var failed bool
	var output []string
	args := WorkerArgs{WorkerId: 0, RequestType: InitialRequest}
	reply := WorkerReply{}
	call("Coordinator.WorkerHandler", &args, &reply)
	for {
		if reply.ExitMsg {
			break
		}
		failed = false
		workerId = reply.WorkerId
		nReduce = reply.NReduce
		taskId = reply.TaskId
		taskType = reply.TaskType
		inputs = reply.Input
		if taskType == TaskTypeMap {
			// 执行map操作
			intermediate := []KeyValue{}
			for _, filename := range inputs {
				file, err := os.Open(filename)
				if err != nil {
					file.Close()
					fail(workerId, FailedRequest, make([]string, 0), inputs, taskId, taskType, &reply)
					failed = true
					break
				}
				data, err := io.ReadAll(file)
				if err != nil {
					file.Close()
					fail(workerId, FailedRequest, make([]string, 0), inputs, taskId, taskType, &reply)
					failed = true
					break
				}
				kva := mapf(filename, string(data))
				intermediate = append(intermediate, kva...)
			}
			sort.Sort(ByKey(intermediate))
			files := []*os.File{}
			output = []string{}
			for i := 0; i < nReduce; i++ {
				oname := "mr-int-" + strconv.Itoa(taskId) + "-" + strconv.Itoa(workerId) + "-" + strconv.Itoa(i)
				file, _ := os.Create(oname)
				file.Seek(0, 0)
				files = append(files, file)
				output = append(output, oname)
			}

			for _, kv := range intermediate {
				fmt.Fprintf(files[ihash(kv.Key)%nReduce], "%v %v\n", kv.Key, kv.Value)
			}
			if failed {
				continue
			}
			args = WorkerArgs{
				workerId,
				FinishedRequest,
				output,
				inputs,
				taskId,
				taskType,
			}
			call("Coordinator.WorkerHandler", &args, &reply)
		} else {
			// 执行reduce操作
			res := ByKey{}
			kvs := []KeyValue{}
			for _, filename := range inputs {
				file, err := os.Open(filename)
				if err != nil {
					file.Close()
					fail(workerId, FailedRequest, make([]string, 0), inputs, taskId, taskType, &reply)
					failed = true
					break
				}
				var key string
				var value string
				for {
					if _, err = fmt.Fscanf(file, "%v %v\n", &key, &value); err != nil {
						break
					}
					kvs = append(kvs, KeyValue{key, value})
				}

			}
			sort.Sort(ByKey(kvs))
			if failed {
				continue
			}
			i := 0
			for i < len(kvs) {
				j := i + 1
				for j < len(kvs) && kvs[j].Key == kvs[i].Key {
					j++
				}
				values := []string{}
				for k := i; k < j; k++ {
					values = append(values, kvs[k].Value)
				}
				output := reducef(kvs[i].Key, values)
				res = append(res, KeyValue{kvs[i].Key, output})
				i = j
			}
			oname := "mr-out-" + strconv.Itoa(reply.ReduceNum)
			ofile, _ := os.Create(oname)
			for _, kv := range res {
				_, err := fmt.Fprintf(ofile, "%v %v\n", kv.Key, kv.Value)
				if err != nil {
				}
			}
			args = WorkerArgs{
				workerId,
				FinishedRequest,
				output,
				inputs,
				taskId,
				taskType,
			}
			call("Coordinator.WorkerHandler", &args, &reply)
		}
	}
}
func fail(workerId int, requestType int, output []string, input []string, taskId int, taskType int, reply *WorkerReply) {
	args := WorkerArgs{
		workerId,
		FailedRequest,
		make([]string, 0),
		input,
		taskId,
		taskType,
	}
	call("Coordinator.WorkerHandler", &args, reply)
}

测试结果

通过了10轮压力测试,开启了DATA RACE检测选项

吐槽,碎碎念

golang没有枚举类型真的好难用啊。。。如果有Rust那样强大的枚举类型就好了。

标签:6.824,string,int,6.5840,Coordinator,任务,2023,msg,reply
From: https://www.cnblogs.com/alyjay/p/17302388.html

相关文章

  • GreatSQL社区月报 | 2023.03
    GreatSQL社区月报|2023.03GreatSQL是一个开源的MySQL技术路线数据库社区,社区致力于通过开放的社区合作,构建国内自主MySQL版本及开源数据库技术,推动中国开源数据库及应用生态繁荣发展。为了帮助社区的小伙伴们更好地了解GreatSQL社区的实时进展,我们决定每月更新发布一次......
  • 【2023-04-08】连岳摘抄
    23:59我们没有权利假借后天的给予对别人颐指气使,也没有理由为后天的际遇而自怨自艾。在人之上,要视别人为人;在人之下,要视自己为人。                                             ......
  • 【2023-04-09】连岳摘抄
    23:59要想了解一个人,需要逐渐地、仔细地观察他,以免造成偏见和误解,那是过后很难纠正和挽回的。                                                 ——陀思妥耶夫斯基......
  • 20230409-Python-字符串-day6
    字符串4月9字符串是python中最常见的数据类型,我们可以使用单引号''、双引号""、三引号""""""来创建字符串,只要为变量分配一个值即可#单引号var1='helloword'#双引号var2="helloPython"#三引号,可以换行,如果没有变量名,这就是一个多行注释var......
  • 2023/4/10随笔
    今天,学习了Android的fragment的相关内容,如下:创建静态fragment创建动态fragmentfragment的生命周期,增删查换。建立可滑动图片集建立可滑动桌面主页建立可滑动点击桌面主页对于fragment的内容了解更深,fragment,意为碎片,是轻量级的类似于activity的一个,依赖于activity,一个activ......
  • 2023-04-09 有向图及相关算法
    有向图及相关算法1有向图的实现有向图的的应用场景社交网络中的关注互联网连接程序模块的引用任务调度学习计划食物链论文引用无向图是特殊的有向图,即每条边都是双向的改进Graph和WeightedGraph类使之支持有向图Graph类的改动WeightedGraph类的改动2有向图算......
  • # 2023被行计网实验二数据链路层实验的设计性实验部分
    设计型实验(选作)一个公司需要组建局域网,公司主要有财务、人事、工程、研发、市场等部门,每个部门人数都不超过20人,另外公司还有一些公共服务器。请给出设计方案,并提供实验验证。要求满足:所有部门不能互相访问;每个部门都可以访问公共服务器。VLAN端口的分类交换机的端口可以......
  • 2023年郑州轻工业大学校赛邀请赛myh
    赛程回顾和赛后总结赛程回顾although昨天刚复盘的,但还是记不住题号。就口胡下是那类型题吧。刚开始时,我和队长先看的a,让jc去找签到题。我们看了下a,队长说可能dp,但还是感觉没啥思路就逃了。看到别人有过别的,立马看,应该先a了那道数学签到题,然后开始看另一道签到题,感觉一眼题意,......
  • C/C++模拟校园卡消费记录查询系统[2023-04-09]
    C/C++模拟校园卡消费记录查询系统[2023-04-09]模拟校园卡1问题描述同学们都在机房做实验或自由上机,请根据自己实际使用情况编写一份模拟校园卡消费记录查询系统,实现登录,计费,挂失,统计等相关功能。2功能要求主要功能模块:(1)登录模块:同学根据自己设定的密码登录。三次错误则......
  • 【2023.04.09】乐乐兄弟8858航空飞船、8859航天火箭短评
    前言本人是自费购买积木,购买原因是给妹妹培养动手能力,减少短视频占用时间,其次是给家里做摆饰,所以选择积木多考虑了美观非专业评测,如果想看更多积木评测请点进我的博客主页分类查看正文东西的质量不错,也可以不用拼,做一些零件的MOC来用拼起来什么感觉呢,就是有点松,之前买过乐乐......