第一次接触mr还是在入门mit6.824的lab1,最近重新读了一遍原始论文,又有了一些新的想法,简单做一些记录。
作为Google分布式系统的重要组成,本篇文章核心在于map/reduce操作带来的抽象并行化,给出接口之后,编写应用程序的程序员就不需要对底层的机制做过多的处理。而在本质上,mr只是实现了一组分布式的并行框架,而实际依赖的底层分布式infrastructure还是GFS。
MapReduce: Simplified Data Processing on Large Clusters
Programming Model
K/V pairs
original task --map--> intermedicate K/V pairs --shuffle--> --reduce--> result
shuffle: to generate the same key list
the result can be multi-set, the merge work is finished by user function
Here is a word cound app from mit 6.824(golang)
// The map function is called once for each file of input. The first
// argument is the name of the input file, and the second is the
// file's complete contents. You should ignore the input file name,
// and look only at the contents argument. The return value is a slice
// of key/value pairs.
func Map(filename string, contents string) []mr.KeyValue {
// function to detect word separators.
ff := func(r rune) bool { return !unicode.IsLetter(r) }
// split contents into an array of words.
words := strings.FieldsFunc(contents, ff)
kva := []mr.KeyValue{}
for _, w := range words {
kv := mr.KeyValue{w, "1"}
kva = append(kva, kv)
}
return kva
}
// The reduce function is called once for each key generated by the
// map tasks, with a list of all the values created for that key by
// any map task.
func Reduce(key string, values []string) string {
// return the number of occurrences of this word.
return strconv.Itoa(len(values))
}
map (k1,v1) → list(k2,v2)
reduce (k2,list(v2)) → list(v2)
working flow
- split input file
- master(coordinator) allocate map-task
- do map, generate inter k/v pair
- write inter k/v pair in R partition
- sort on master, reduce: RPC read inter-file from map machine
- final output to GFS
- return
After successful completion, the output of the mapreduce execution is available in the R output files (one per reduce task, with file names as specified by the user). Typically, users do not need to combine these R output files into one file – they often pass these files as input to another MapReduce call, or use them from another distributed application that is able to deal with input that is partitioned into multiple files.
In practical grogramming, the atomic operation is important(regardless of C++ or Go or...)
fault tolerant
heartbeat: master <---> slave
Completed map tasks are re-executed on a failure because their output is stored on the local disk(s) of the failed machine and is therefore inaccessible. Completed reduce tasks do not need to be re-executed since their output is stored in a global file system.
reduce re-execute if has not read finish from a map-machine(RPC would fail)
Task Granularity
M, R >> machine number -> load balance
common: M > R, to decrease final file number
Backup Tasks
solve straggler: When a MapReduce operation is close to completion, the master schedules backup executions of the remaining in-progress tasks.
Refinements
- Partitioning function: pre-define the number of output file: use hash
- pre-sort
- combiner, eg. in wc map-task, append the same key-value here
- input/output: different file type(read by line or offset), database and memory are also useful.
- side-effects
- error in code: skipping bad records(optional)
- sequential on local machine(help to debug)
- display the task status(command or gui) -> data collect and analyse
- counter(in lib) for sth.
Discussion
in some cases: we can also store the inter-file in the global file system,
thus we dont need re-execute the map-task if machine shutdown,
take the reduce RPC as GFS reading.
the bindwidth can be the essential bottleneck, p2p network can decrease the master's I/O pressure
MapReduce: open source version: hadoop(yahoo/apache)
middle step: shuffle, one key run (not) once????? in reduce
so we need combiner?
- use combiner: reduce one key once
- dont use combiner: reduce one key map partition times
but where combiner running?
map-local-disk?: local combine
master?: dont do any logic calculating work
before reduce?: shuffle
shuffle & combine could be bottleneck
task failure: restart tasks
node failure: restart tasks on new node: re-run all finished task for lose inter-file