首页 > 其他分享 >MapReduce

MapReduce

时间:2022-12-16 10:45:36浏览次数:34  
标签:map false string err reduce MapReduce true

实验

干嘛

实现一个分布式的 MapReduce, 由两部分组成,master 和 worker。一个master,多个worker。在本机运行,worker 和 master 用 rpc 通信。每个worker 向 master 索要任务,从一个或多个文件读取任务的输入,执行任务,并将任务的输出写入一个或更多文件。如果超时(10s)将工作分配给其他worker
注意的一些点:

  1. 输出文件名 必须是mr-out-%v

  2. map -> file -> reduce ,这个 file 里面存储数据的格式需要注意

  3. 如果一个分配的任务 工作的时长 > 超时时间,更改状态为 未分配,重新给其他人分配

  4. 当 worker 发现 请求不了任务,套接字关闭,则退出(master 工作完毕)。

  5. 前提:我们不需要考虑,map工作完成后,map 宕机的情况,解释:因为我们是在本机,如果map完成工作,我们master会将他输出的临时文件 重命名为 正式文件。

  6. worker 对于 master 的rpc调用只有两条

    1. 请求任务

我们可以发现只需要只有两种类型 map,reduce然后请求返回的请求实体,
map: 有多少个 reduce, 因为他要输出的文件切片; 当前的任务编号([0,m)),然后输入文件名。
reduce: 输入文件名,当前任务编号([0,r)).

  1. 提交任务

map 和 reduce 都需要将自己的 临时输出文件名传回,还有自己的 任务编号,以供 master 将该任务标记为 已完成,并且将 临时输出文件 转正。

master 需要维护:

  1. 每个map或者 reduce 工作状态
  2. 开始时间(以提供后面判断是否超时,或者需要重新分配)
  3. 输出文件 (将传回的临时文件名 重命名后的文件名)
  4. 任务状态 (完成,已分配,未分配)。
  5. map 任务全部是否完成的标志(用于判断是否开始分配给reduce),reduce是否全部完成的标志(用于判断是否结束)
  6. 输入的文件列表。
// 类型 
type TaskType uint

const (
	NoAssign TaskType = iota // 表示没有分配任务 worker 就睡1秒
	MapTask
	RedTask
)

type taskStatus uint

const (
	UnAssigned = iota
	Assigned
	Finished
)

type TaskStatus struct {
	Type      TaskType
	Index     int // 第几个任务
	Status    taskStatus
	StartTime time.Time
	//如果是 map 的话就是 Files[0]
	// 如果是reduce的话就是 这个切片
	Files []string // 输出输入文件表
	// 如果是提交,就是提交的临时文件名
}
// 和工人通话的唯一通道
func (c *Coordinator) Talk(args *TalkReq, resp *TalkResp) error {
}

type Coordinator struct {
	// Your definitions here.
	lock       sync.RWMutex
	inputFiles []string //map 输入文件表
	nReduce    int      //多少个 reduce 任务
	// 维护 map reduce 任务的转台
	mapTask   []TaskStatus
	mapFinish bool       //如果map完成我们就开始分配 reduce
	mapOutput [][]string // map  的中间输出文件名

	redTask   []TaskStatus
	redFinish bool // 如果 reduce 完成我们就可以结束了
	// 因为我们是提交后重命名,如果放在worker 中,我们可能出现 一起重命名,最好的方式是文件名传送给 master 让其执行原子操作(lock)
	redOutput []string // reduce  的中间输出文件名
}

// 初始化  方便理解各字段的含义
func MakeCoordinator(files []string, nReduce int) *Coordinator {
	var err error
	logFile, err = os.OpenFile("/home/zsj/mit/6.824/src/main/logs/master.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
	if err != nil {
		panic(err)
	}
	log.SetOutput(logFile)
	log.Println("================================Start Working==================================")
	// files 的长度就是 map 工作任务的个数
	nMap := len(files)
	c := Coordinator{
		lock:       sync.RWMutex{},
		inputFiles: files,
		nReduce:    nReduce,
		mapTask:    make([]TaskStatus, nMap),
		mapOutput:  make([][]string, nMap),
		redTask:    make([]TaskStatus, nReduce),
		redOutput:  make([]string, nReduce),
	}
	for i := 0; i < nMap; i++ {
		c.mapTask[i] = TaskStatus{
			Type:   MapTask,
			Index:  i,
			Status: UnAssigned,
			Files:  []string{files[i]},
		}
		c.mapOutput[i] = make([]string, nReduce) // 在这就初始话把
		for j := 0; j < nReduce; j++ {
			//map任务产生的文件的名字格式为: mr-output-(MapID)-(ReduceID)
			c.mapOutput[i][j] = fmt.Sprintf("/home/zsj/mit/6.824/src/main/mr-output/map-%v-%v", i, j)
			log.Println("map temp:", c.mapOutput[i][j])
		}
	}

	for i := 0; i < nReduce; i++ {
		c.redTask[i] = TaskStatus{
			Type:   RedTask,
			Index:  i,
			Status: UnAssigned,
		}
		c.redOutput[i] = fmt.Sprintf("/home/zsj/mit/6.824/src/main/mr-tmp/mr-out-%v", i)
	}

	go c.checkTimeOut()

	// Your code here.
	c.server()
	return &c
}

学到的

库函数


package plugin

// Plugin is a loaded Go plugin.
type Plugin struct {
	pluginpath string
	err        string        // set if plugin failed to load
	loaded     chan struct{} // closed when loaded
	syms       map[string]interface{}
}

// Open opens a Go plugin.
// If a path has already been opened, then the existing *Plugin is returned.
// It is safe for concurrent use by multiple goroutines.
func Open(path string) (*Plugin, error) {
	return open(path)
}

// Lookup searches for a symbol named symName in plugin p.
// A symbol is any exported variable or function.
// It reports an error if the symbol is not found.
// It is safe for concurrent use by multiple goroutines.
func (p *Plugin) Lookup(symName string) (Symbol, error) {
	return lookup(p, symName)
}

// A Symbol is a pointer to a variable or function.
//
// For example, a plugin defined as
//
//	package main
//
//	import "fmt"
//
//	var V int
//
//	func F() { fmt.Printf("Hello, number %d\n", V) }
//
// may be loaded with the Open function and then the exported package
// symbols V and F can be accessed
//
//	p, err := plugin.Open("plugin_name.so")
//	if err != nil {
//		panic(err)
//	}
//	v, err := p.Lookup("V")
//	if err != nil {
//		panic(err)
//	}
//	f, err := p.Lookup("F")
//	if err != nil {
//		panic(err)
//	}
//	*v.(*int) = 7
//	f.(func())() // prints "Hello, number 7"
type Symbol interface{}

plugin可以查询到符号表的东西,类似于将ELF表读入内存,解析,然后将地址翻译为 函数指针。
具体用法 看第 42 行


// Golang program
// demostrating how to use strings.FieldsFunc() Function
  
package main
  
import (
    "fmt"
    "strings"
    "unicode"
)
  
func main() {

    myFunc := func(c rune) bool {
        fmt.Println(string(c),unicode.IsNumber(c))
        return !unicode.IsNumber(c)
    } // 如果满足则删除
  
    // When a number is encountered, 
    // the FieldsFunc() method separates the string 
    // and returns all non-numbers.
    fmt.Printf("The Fields are: %q\n", 
       strings.FieldsFunc("This018is92a6shot179on89Golang864string34FieldFunc", myFunc))
}
=========
T false
h false
i false
s false
0 true
1 true
8 true
i false
s false
9 true
2 true
a false
6 true
s false
h false
o false
t false
1 true
7 true
9 true
o false
n false
8 true
9 true
G false
o false
l false
a false
n false
g false
8 true
6 true
4 true
s false
t false
r false
i false
n false
g false
3 true
4 true
F false
i false
e false
l false
d false
F false
u false
n false
c false
The Fields are: ["018" "92" "6" "179" "89" "864" "34"]
sed -i 's/\r//' one-more.sh

标签:map,false,string,err,reduce,MapReduce,true
From: https://www.cnblogs.com/jgjg/p/16986663.html

相关文章