时间:2022-12-16  
实现一个分布式的 MapReduce, 由两部分组成,master 和 worker。一个master,多个worker。在本机运行,worker 和 master 用 rpc 通信。每个worker 向 master 索要任务,从一个或多个文件读取任务的输入,执行任务,并将任务的输出写入一个或更多文件。如果超时(10s)将工作分配给其他worker

  1. 输出文件名 必须是mr-out-%v

  2. map -> file -> reduce ,这个 file 里面存储数据的格式需要注意

  3. 如果一个分配的任务 工作的时长 > 超时时间,更改状态为 未分配,重新给其他人分配

  4. 当 worker 发现 请求不了任务,套接字关闭,则退出(master 工作完毕)。

  5. 前提:我们不需要考虑,map工作完成后,map 宕机的情况,解释:因为我们是在本机,如果map完成工作,我们master会将他输出的临时文件 重命名为 正式文件。

  6. worker 对于 master 的rpc调用只有两条

    1. 请求任务

我们可以发现只需要只有两种类型 map,reduce然后请求返回的请求实体,
map: 有多少个 reduce, 因为他要输出的文件切片; 当前的任务编号([0,m)),然后输入文件名。
reduce: 输入文件名,当前任务编号([0,r)).

  1. 提交任务

map 和 reduce 都需要将自己的 临时输出文件名传回,还有自己的 任务编号,以供 master 将该任务标记为 已完成,并且将 临时输出文件 转正。

master 需要维护:

  1. 每个map或者 reduce 工作状态
  2. 开始时间(以提供后面判断是否超时,或者需要重新分配)
  3. 输出文件 (将传回的临时文件名 重命名后的文件名)
  4. 任务状态 (完成,已分配,未分配)。
  5. map 任务全部是否完成的标志(用于判断是否开始分配给reduce),reduce是否全部完成的标志(用于判断是否结束)
  6. 输入的文件列表。
// 类型 
type TaskType uint

const (
	NoAssign TaskType = iota // 表示没有分配任务 worker 就睡1秒

type taskStatus uint

const (
	UnAssigned = iota

type TaskStatus struct {
	Type      TaskType
	Index     int // 第几个任务
	Status    taskStatus
	StartTime time.Time
	//如果是 map 的话就是 Files[0]
	// 如果是reduce的话就是 这个切片
	Files []string // 输出输入文件表
	// 如果是提交,就是提交的临时文件名
// 和工人通话的唯一通道
func (c *Coordinator) Talk(args *TalkReq, resp *TalkResp) error {

type Coordinator struct {
	// Your definitions here.
	lock       sync.RWMutex
	inputFiles []string //map 输入文件表
	nReduce    int      //多少个 reduce 任务
	// 维护 map reduce 任务的转台
	mapTask   []TaskStatus
	mapFinish bool       //如果map完成我们就开始分配 reduce
	mapOutput [][]string // map  的中间输出文件名

	redTask   []TaskStatus
	redFinish bool // 如果 reduce 完成我们就可以结束了
	// 因为我们是提交后重命名,如果放在worker 中,我们可能出现 一起重命名,最好的方式是文件名传送给 master 让其执行原子操作(lock)
	redOutput []string // reduce  的中间输出文件名

// 初始化  方便理解各字段的含义
func MakeCoordinator(files []string, nReduce int) *Coordinator {
	var err error
	logFile, err = os.OpenFile("/home/zsj/mit/6.824/src/main/logs/master.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
	if err != nil {
	log.Println("================================Start Working==================================")
	// files 的长度就是 map 工作任务的个数
	nMap := len(files)
	c := Coordinator{
		lock:       sync.RWMutex{},
		inputFiles: files,
		nReduce:    nReduce,
		mapTask:    make([]TaskStatus, nMap),
		mapOutput:  make([][]string, nMap),
		redTask:    make([]TaskStatus, nReduce),
		redOutput:  make([]string, nReduce),
	for i := 0; i < nMap; i++ {
		c.mapTask[i] = TaskStatus{
			Type:   MapTask,
			Index:  i,
			Status: UnAssigned,
			Files:  []string{files[i]},
		c.mapOutput[i] = make([]string, nReduce) // 在这就初始话把
		for j := 0; j < nReduce; j++ {
			//map任务产生的文件的名字格式为: mr-output-(MapID)-(ReduceID)
			c.mapOutput[i][j] = fmt.Sprintf("/home/zsj/mit/6.824/src/main/mr-output/map-%v-%v", i, j)
			log.Println("map temp:", c.mapOutput[i][j])

	for i := 0; i < nReduce; i++ {
		c.redTask[i] = TaskStatus{
			Type:   RedTask,
			Index:  i,
			Status: UnAssigned,
		c.redOutput[i] = fmt.Sprintf("/home/zsj/mit/6.824/src/main/mr-tmp/mr-out-%v", i)

	go c.checkTimeOut()

	// Your code here.
	return &c



From: https://www.cnblogs.com/jgjg/p/16986663.html
