首页 > 其他分享 >6.824 lab1-MapReduce

6.824 lab1-MapReduce

时间:2022-10-25 14:46:55浏览次数:81  
标签:文件 6.824 string reduce MapReduce lab1 task 任务 go

lab1是实现MapReduce

老师完成了框架的大部分
我我们只需要做的是填充哪重要的几个部分

2022的官方连接
https://pdos.csail.mit.edu/6.824/labs/lab-mr.html

准备工作

下载

git clone git://g.csail.mit.edu/6.824-golabs-2022 6.824

image

运行示例

编译库

首先进入main目录中


在最新的2022版本中已经没有这个问题了
go build -race -buildmode=plugin ../mrapps/wc.go
但是会出错,因为这个代码比较早,1.16
我们需要重新指定一下go mod
同时将项目中的相对路径改了
修改src/main/mrsequential.go,修改为import "modename/src/mr"
修改src/mrapps/wc.go修改为import "modename/src/mr"


执行

go build -race -buildmode=plugin ../mrapps/wc.go
mac下去掉-race

win中无法进行此操作,我们要生成so文件,这是linux中的文件,只能在linux,macos,freebsd,

编译完成之后会在main下生成wc.so

so,share object,是linux中的动态库,相较于静态连接。动态库系统只会加载一次,供所有人用,节省内存。如果是静态连接,每链接一次就多一个份,不够灵活

go run -race mrsequential.go wc.so pg*.txt
项目中给我们实现了一个简单的mapreduce的实现,在main/mrsequential.go
其中用到了wc中几个函数,所以需要加上wc.go
功能也是论文经典操作,统计单词频次,做倒排索引
统计的文件是 pg*.txt,结果会被输出到mr-out-0

倒排索引
这是一种信息检索的方式
平时我么能见到的多是正排索引,由文件找到文件内容
倒排索引是反过来,通过文件内容关键词找到文件
举个例子搜索引擎可以通过关键词定位网页

我们的任务

实现一个分布式的mapreduce,实例是一个单体的应用
我们需要实现coordinator worker
任务介绍中,和论文中已经告诉我们需要怎么做了
我想提一点是关于中间结果的保存问题,这个比较特殊,刚开始也应该是我的理解难点因为论文中并没这个规定,6.824网站提示如下

  • A reasonable naming convention for intermediate files is mr-X-Y, where X is the Map task number, and Y is the reduce task number.
    我们map生成的结果要保存为mr-x-y的式样,x是任务的编号,y是reduce的编号,要求中间有10个reduce(在mrcoordinator.go中定义的
    那么0号任务产生mr-0-0mr-0-9一共十个文件
    1号任务也产生mr-1-0mr-1-9共十个文件
    当所有的map运算完成之后,我们要把所有y相同的文件组合到一起,这样我们就得到了10个文件,对应10个reduce任务
    reduce读取这10个文件进行统计即可

实现

基于要求,我不放太多代码,写一下解决的思路,建议自己尝试去写,真要最后写不出来,去参考一下别人的实现

coordinator需要实现任务的管理,上述中间结果的合并,还有任务的控制
即然需要管理任务我们就需要对应的数据结构来表示和管理这些任务
我们的任务分为了两个部分Map,和Reduce
在Map阶段,我们需要

  1. 文件的编号
  2. 文件位置,因为我们直接在本地存取,直接用文件名就可以
  3. 任务阶段,该做map,reduce还是任务完成exit
  4. reduce数量,实验中是10,也可以定死不要这个量,也能通过测试,显然这样处理不太好
  5. 输出的mr-x-y中间文件

在Reduce阶段

  1. 任务编号,其实可以不用
  2. 输入文件
  3. 任务阶段
  4. reduce数量
  5. 输出最终结果
    这两个阶段有不少相同的字段,我们可以将其合并为一个,不合并也没问题
type Task struct {
	TaskId    int
	File      string //文件位置,mapper阶段用
	TaskState Status //需要做哪一步处理
	NReduce   int
	Temp      []string //用于保存每个work产生的n个文件、进行reduce时处理的文件
	Output    string   //输出文件名,reduce阶段用
}

之后就是关于coordinator,这个类是需要管理任务
我们需要的数据

  1. 任务队列
  2. 任务的控制信息。任务是否在运行状态,启动时间,任务指针
  3. reduce数量
  4. 输入文件
  5. map的中间结果
  6. 状态,是否退出

go中没有提供队列,可以自己实现,或者利用有缓冲的chan

type Coordinator struct {
	TaskQueue chan *Task
	TaskInfo  map[int]*TaskInfo
	NReduce   int       
	Files     []string   
	Temp      [][]string 
	Exit      bool       
}

之后是coordinator的初始化构建

//简化
func MakeCoordinator(files []string, nReduce int) *Coordinator {
	c := Coordinator{ //... }   //初始化Coordinator
	//初始化任务,每一个文件就是一个任务
	for id, file := range files {
		task := Task{ //..,}
		tashInfo的添加
                任务队列的添加
	}
	c.server()
	go c.heartBeat()
	return &c
}

看到这离引入一个heratBeat()这个是用来做超时检测的
其逻辑也很简单

//简化版本,提示一下什么时候停止这个协程
func (c Coordinator) heartBeat() {
	for {
		for _, taks := range c.TaskInfo {
		    if taks 超时
                        重制一下任务
		}
		time.Sleep(time.Second * 5)
	}
}

接下来实现RPC函数,Assign任务分配,worker通过这个RPC得到任务

//提示处理安全访问问题
func (c *Coordinator) Assign(args *ExampleArgs, reply *Task) error {
	if 还有任务 { 
		队列取一个任务
		修改这个任务的状态
	} else  coordinator退出 { 
		返回 coordinator退出
	} else 没有任务 { 
		返回 等待  
	}
}

接下来要coordinator要实现什么函数,估计没有什么概念,因为我们还不知道worker的其他需求
所以我们可以转头去实现work一部分

func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {
	for {
		请求分配任务
		switch reply.TaskState {
		case Map:
			mapper(&reply, mapf)  //执行map
		case Reduce:
			reduce(&reply, reducef)  //执行reduce
		case Wait:
			time.Sleep(5 * time.Second)
		case Exit:
			return
		}
	}

}

我们首先实现这个mapper

func mapper(task *Task, mapf func(string, string) []KeyValue) {
	读取文件内容
	keyvalues := mapf(reply.File, 文件内容)  //mapf是实验框架已经实现好的函数,通过参数传递
        keyvalues分成n组,n为指定reduce数量,提示使用ihash函数,在worker.go内已经提供
        将这n分组写到本地n个文件中 ,格式mr-x-y

	将文件位置保存在Task.Temp中
        
	c通知coordinator任务完成
}

上面函数实现过程我们发现,当完成任务之后需要通知coordinator,所以coordinator需要一个方法来处理

//coordinator.go内

func (c *Coordinator) Completed(task *Task, reply *ExampleReply) error {
	if 任务已经完成 {    //这个文件已经被别的worker处理完了
		return nil
	}

	设置完成状态

	/*
		每个worker都会产生n个中间文件,我们需要将这些信息汇总给coordinater,方便之后的任务分配
		每个任务完成时机,也是我们检查是不是所有任务都完成的时机,如果都完成了,就退出
		这个过程要完成任务很多,代码还要上锁,为了提高效率,我们开新的协程
	*/
	go c.GenerateResult(task)
	return nil
}

//合并中间结果,检查所有任务是否完成
func (c *Coordinator) GenerateResult(task *Task) { 
	switch task.TaskState {
	case Map: //map结束,进入reduce阶段
		for id, file := range task.Temp { 
                //将task 产生了temp给coordinator进行统一处理
			c.Temp[id] = append(c.Temp[id], file)  
		}
		if c.IsAllDown() {   //所有任务进入reduce
			创建reduce的任务队列
			for id, file := range c.Temp { 
				task := Task{/...}   //创建任务
				加入队列
				设置任务开始信息
			}
		}
	case Reduce:
		if c.IsAllDown() {
			c.Exit = true //退出coordinator
		}
	}
}

然后我们完成worker部分

//此部分有较多文件读写,直接贴代码

func reduce(task *Task, reducef func(string, []string) string) {
      //将文件内的数据,重新序列化为keyvalue
	intermediate := []KeyValue{} //reduce处理的时keyvalue,我们需要将文件序列化为keyvalue
	for _, file := range task.Temp {
		f, err := os.Open(file)
		if err != nil {
			log.Fatal("Failed to open file", err)
		}
		con := json.NewDecoder(f)
		for {
			var t KeyValue
			if err := con.Decode(&t); err != nil {
				break
			}
			intermediate = append(intermediate, t)
		}
		f.Close()
	}

	//下面可以参考mrsequential.go

	sort.Sort(ByKey(intermediate))
	src, _ := os.Getwd()
	tempFile, err := ioutil.TempFile(src, "mr-tmp-*")
	if err != nil {
		log.Fatal("Failed to create temp file", err)
	}

	i := 0
	for i < len(intermediate) {
		j := i + 1
		for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
			j++
		}
		values := []string{}
		for k := i; k < j; k++ {
			values = append(values, intermediate[k].Value)
		}
		output := reducef(intermediate[i].Key, values)

		// this is the correct format for each line of Reduce output.
		fmt.Fprintf(tempFile, "%v %v\n", intermediate[i].Key, output)

		i = j
	}
	tempFile.Close()
	newname := fmt.Sprintf("mr-out-%d", task.TaskId)
	os.Rename(tempFile.Name(), newname)
	task.Output = newname

	r := ExampleReply{}

	call("Coordinator.Completed", task, &r) //rpc通知coordinator任务完成
}

上面所有代码都有删减,在实现基本逻辑上还都需要的补充一部分内容,需要注意(比如我删了所有的锁)

标签:文件,6.824,string,reduce,MapReduce,lab1,task,任务,go
From: https://www.cnblogs.com/beifangcc/p/16736339.html

相关文章

  • 6.824 Frangipani
    本文重点 缓存一致性、分布式事务、分布式故障恢复设计和功能之间的关联。缓存一致性是指,如果我缓存了一些数据,之后你修改了实际数据但是并没有考虑我缓存中的数据,必须......
  • MIT6.824-Distributed System
    Goversion:1.13.6wgethttps://dl.google.com/go/go1.13.6.linux-amd64.tar.gzsudotar-C/usr/local-xvfgo1.13-.6.linux-amd64.tar.gzsudonano~/.profile在......
  • 6.824笔记3
    大规模存储分布式的底层运行着一个大型分布式存储系统,并有一套接口,评估指标包括并行性能,容错,复制,一致性数据分割并放到多个服务器上,并且需要一个自动化的容错系统,一种容......
  • 6.824笔记2
    线程为每一个prc请求使用一个线程,当请求回收的时候,线程继续运作,多线程能能够开启多个网络请求,形成io并发并行化,线程用来实现并行化异步编程,事件驱动编程,又一个线程,一个循......
  • MIT6.824_LEC3_GFS_Outline
    为什么我们要阅读GFS论文?分布式存储是关键的抽象概念接口和语法应该是怎样的?内部是怎么运行的?GFS论文对6.824这门课的很多主题有指导意义并行性能容错副本......
  • MIT_6.824_LEC3_GFS_FAQ翻译
    GFSFAQQ:Whyisatomicrecordappendat-least-once,ratherthanexactlyonce?为什么记录的追加是至少一次,而不是仅仅只追加一次?Section3.1,Step7,saystha......
  • Hadoop MapReduce
    学习MapReduce,首先要理解它的思想——分而治之,先分再合,分而治之,所谓的分而治之,意思就是将一个复杂的问题,按照一定的分解方法分解为规模较小的若干的部分,再逐个解决,分别找出......
  • MapReduce执行流程
    hadoop中map分治执行流程: ......
  • 恶意代码分析实战 恶意代码的网络特征 lab14-1 14-2 14-3 都是http c2,并用到了自定义
       先反编译看看:函数在做base64加密:   验证下想法,果然:后面的功能,就是在下载执行了:   我们分析下细节: 问题1:使用wireshark进行监控网络特征,运......
  • 恶意代码分析实战 隐蔽的恶意代码启动 lab12-1 12-2 12-3 各种进程注入的姿势都在这里
    一、常用的隐藏技术启动器进程注入进程替换Hook注入DetoursAPC注入二、Lab12-11.行为分析执行之后的效果是每隔一段时间会弹窗。查看processmomitor。可以......