实验
干嘛
实现一个分布式的 MapReduce, 由两部分组成,master 和 worker。一个master,多个worker。在本机运行,worker 和 master 用 rpc 通信。每个worker 向 master 索要任务,从一个或多个文件读取任务的输入,执行任务,并将任务的输出写入一个或更多文件。如果超时(10s)将工作分配给其他worker
注意的一些点:
-
输出文件名 必须是
mr-out-%v
-
map -> file -> reduce ,这个 file 里面存储数据的格式需要注意
-
如果一个分配的任务 工作的时长 > 超时时间,更改状态为 未分配,重新给其他人分配
-
当 worker 发现 请求不了任务,套接字关闭,则退出(master 工作完毕)。
-
前提:我们不需要考虑,map工作完成后,map 宕机的情况,解释:因为我们是在本机,如果map完成工作,我们
master
会将他输出的临时文件 重命名为 正式文件。 -
worker 对于 master 的rpc调用只有两条
- 请求任务
我们可以发现只需要只有两种类型 map,reduce
然后请求返回的请求实体,
map: 有多少个 reduce, 因为他要输出的文件切片; 当前的任务编号([0,m)
),然后输入文件名。
reduce: 输入文件名,当前任务编号([0,r)
).
- 提交任务
map 和 reduce 都需要将自己的 临时输出文件名传回,还有自己的 任务编号,以供 master
将该任务标记为 已完成,并且将 临时输出文件 转正。
master 需要维护:
- 每个map或者 reduce 工作状态
- 开始时间(以提供后面判断是否超时,或者需要重新分配)
- 输出文件 (将传回的临时文件名 重命名后的文件名)
- 任务状态 (完成,已分配,未分配)。
- map 任务全部是否完成的标志(用于判断是否开始分配给reduce),reduce是否全部完成的标志(用于判断是否结束)
- 输入的文件列表。
// 类型
type TaskType uint
const (
NoAssign TaskType = iota // 表示没有分配任务 worker 就睡1秒
MapTask
RedTask
)
type taskStatus uint
const (
UnAssigned = iota
Assigned
Finished
)
type TaskStatus struct {
Type TaskType
Index int // 第几个任务
Status taskStatus
StartTime time.Time
//如果是 map 的话就是 Files[0]
// 如果是reduce的话就是 这个切片
Files []string // 输出输入文件表
// 如果是提交,就是提交的临时文件名
}
// 和工人通话的唯一通道
func (c *Coordinator) Talk(args *TalkReq, resp *TalkResp) error {
}
type Coordinator struct {
// Your definitions here.
lock sync.RWMutex
inputFiles []string //map 输入文件表
nReduce int //多少个 reduce 任务
// 维护 map reduce 任务的转台
mapTask []TaskStatus
mapFinish bool //如果map完成我们就开始分配 reduce
mapOutput [][]string // map 的中间输出文件名
redTask []TaskStatus
redFinish bool // 如果 reduce 完成我们就可以结束了
// 因为我们是提交后重命名,如果放在worker 中,我们可能出现 一起重命名,最好的方式是文件名传送给 master 让其执行原子操作(lock)
redOutput []string // reduce 的中间输出文件名
}
// 初始化 方便理解各字段的含义
func MakeCoordinator(files []string, nReduce int) *Coordinator {
var err error
logFile, err = os.OpenFile("/home/zsj/mit/6.824/src/main/logs/master.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
panic(err)
}
log.SetOutput(logFile)
log.Println("================================Start Working==================================")
// files 的长度就是 map 工作任务的个数
nMap := len(files)
c := Coordinator{
lock: sync.RWMutex{},
inputFiles: files,
nReduce: nReduce,
mapTask: make([]TaskStatus, nMap),
mapOutput: make([][]string, nMap),
redTask: make([]TaskStatus, nReduce),
redOutput: make([]string, nReduce),
}
for i := 0; i < nMap; i++ {
c.mapTask[i] = TaskStatus{
Type: MapTask,
Index: i,
Status: UnAssigned,
Files: []string{files[i]},
}
c.mapOutput[i] = make([]string, nReduce) // 在这就初始话把
for j := 0; j < nReduce; j++ {
//map任务产生的文件的名字格式为: mr-output-(MapID)-(ReduceID)
c.mapOutput[i][j] = fmt.Sprintf("/home/zsj/mit/6.824/src/main/mr-output/map-%v-%v", i, j)
log.Println("map temp:", c.mapOutput[i][j])
}
}
for i := 0; i < nReduce; i++ {
c.redTask[i] = TaskStatus{
Type: RedTask,
Index: i,
Status: UnAssigned,
}
c.redOutput[i] = fmt.Sprintf("/home/zsj/mit/6.824/src/main/mr-tmp/mr-out-%v", i)
}
go c.checkTimeOut()
// Your code here.
c.server()
return &c
}
学到的
库函数
package plugin
// Plugin is a loaded Go plugin.
type Plugin struct {
pluginpath string
err string // set if plugin failed to load
loaded chan struct{} // closed when loaded
syms map[string]interface{}
}
// Open opens a Go plugin.
// If a path has already been opened, then the existing *Plugin is returned.
// It is safe for concurrent use by multiple goroutines.
func Open(path string) (*Plugin, error) {
return open(path)
}
// Lookup searches for a symbol named symName in plugin p.
// A symbol is any exported variable or function.
// It reports an error if the symbol is not found.
// It is safe for concurrent use by multiple goroutines.
func (p *Plugin) Lookup(symName string) (Symbol, error) {
return lookup(p, symName)
}
// A Symbol is a pointer to a variable or function.
//
// For example, a plugin defined as
//
// package main
//
// import "fmt"
//
// var V int
//
// func F() { fmt.Printf("Hello, number %d\n", V) }
//
// may be loaded with the Open function and then the exported package
// symbols V and F can be accessed
//
// p, err := plugin.Open("plugin_name.so")
// if err != nil {
// panic(err)
// }
// v, err := p.Lookup("V")
// if err != nil {
// panic(err)
// }
// f, err := p.Lookup("F")
// if err != nil {
// panic(err)
// }
// *v.(*int) = 7
// f.(func())() // prints "Hello, number 7"
type Symbol interface{}
plugin
可以查询到符号表的东西,类似于将ELF表读入内存,解析,然后将地址翻译为 函数指针。
具体用法 看第 42 行
// Golang program
// demostrating how to use strings.FieldsFunc() Function
package main
import (
"fmt"
"strings"
"unicode"
)
func main() {
myFunc := func(c rune) bool {
fmt.Println(string(c),unicode.IsNumber(c))
return !unicode.IsNumber(c)
} // 如果满足则删除
// When a number is encountered,
// the FieldsFunc() method separates the string
// and returns all non-numbers.
fmt.Printf("The Fields are: %q\n",
strings.FieldsFunc("This018is92a6shot179on89Golang864string34FieldFunc", myFunc))
}
=========
T false
h false
i false
s false
0 true
1 true
8 true
i false
s false
9 true
2 true
a false
6 true
s false
h false
o false
t false
1 true
7 true
9 true
o false
n false
8 true
9 true
G false
o false
l false
a false
n false
g false
8 true
6 true
4 true
s false
t false
r false
i false
n false
g false
3 true
4 true
F false
i false
e false
l false
d false
F false
u false
n false
c false
The Fields are: ["018" "92" "6" "179" "89" "864" "34"]
sed -i 's/\r//' one-more.sh
标签:map,false,string,err,reduce,MapReduce,true
From: https://www.cnblogs.com/jgjg/p/16986663.html