首页 > 其他分享 >MapReduce执行流程

MapReduce执行流程

时间:2024-03-20 09:03:04浏览次数:25  
标签:文件 流程 ReduceTask MapReduce MapTask mapreduce 缓冲区 执行 数据

MapReduce

执行流程

MapTask执行流程

  1. Read:读取阶段

    1. MapTask会调用InputFormat中的getSplits方法来对文件进行切片

    2. 切片之后,针对每一个Split,产生一个RecordReader流用于读取数据

    3. 数据是以Key-Value形式来产生,交给map方法来处理。每一个键值对触发调用一次map方法

  2. Map:映射阶段

    1. map方法在获取到键值对之后,按照要求对键值对中的数据进行拆分解析,解析之后按照要求输出键值对形式的结果

  3. Collect:收集阶段

    1. MapTask拆分产生数据之后,并不是直接将数据传输给ReduceTask,而是会调用OutputCollector.collect方法来收集输出结果

    2. OutputCollector.collect在收集到数据之后,会先按照指定的规则,对数据进行分区,分区完成之后,会将数据写到缓冲区中

    3. 缓冲区本质上是一个环形的字节数组,默认大小是100M(可以通过属性mapreduce.task.io.sort.mb来调节),默认阈值是0.8(可以通过属性mapreduce.map.sort.spill.percent来调节)

  4. Spill:溢写阶段

    1. 当缓冲区使用达到指定阈值的时候,MapTask会将缓冲区中的数据冲刷(flush)到磁盘上,这个过程称之为溢写(spill)

    2. 在溢写的时候,会按照如下步骤进行

      1. 排序(sort)。此时,是将毫无规律的数据整理成有序数据,采用的Quick Sort(快速排序)。需要注意的是,数据在排序的时候,是按照分区内进行的排序,即先按照分区大小进行分区号的升序,然后每一个分区内按照指定规则排序。因此,数据是分区内有序

      2. 合并(combine)。如果用户指定了Combiner,那么此时会将数据进行合并处理

      3. 写出(flush)。按照分区的顺序,将每一个分区中的数据依次写入任务的工作目录的临时文件output/spillN.out中。N表示溢写次数。溢写次数不能完全由原始数据大小来决定,还得考虑map方法的处理过程。此时,单个结果文件中是分区且有序的,整体而言是局部有序

      4. 压缩(compress)。如果用户指定了对MapTask的结果进行压缩,那么数据在写出之后还会进行压缩处理

      5. 记录(record)。将分区数据的元信息记录到内存索引数据结果SpillRecord中。元信息中包含:每一个分区在每一个临时结果文件中的偏移量(offset),每一个分区压缩前的数据大小以及压缩后的数据大小。如果SpillRecord中记录的所有的元信息大小之和不超过1M,那么SpillRecord中的数据也会写到output/spillN.out.index

  5. Merge:合并阶段

    1. 当MapTask将所有的数据处理完成之后,会将所有的临时结果文件spillN.out合并(merge)成一个大的结果文件output/file.out,同时会为这个文件生成索引文件file.out.index

    2. 在merge过程中,数据会再次进行分区。分区之后数据会再次进行排序。注意,此次排序是将局部有序的数据整理成整体有序,采用的是Merge Sort(归并排序)

    3. 在merge过程中,如果指定了Combiner,那么数据会进行combine操作

    4. merge的时候,默认情况下,是每10个(可以通过属性mapreduce.task.io.sort.factor来调节)小文件合并成1个大文件,通过多次合并,最后会产生一个结果文件file.out。这样子,能够有效的避免同时打开大量文件带来的开销

    5. 注意:无论是否会进行Spill过程,最后都会产生一个file.out文件!!!

ReduceTask执行流程

  1. 当达到启动阈值的时候,ReduceTask就会启动。默认情况下,启动阈值是0.05,即5%的MapTask结束ReduceTask就会启动。可以通过属性mapreduce.job.reduce.slowstart.completedmaps来调节

  2. ReduceTask启动之后,会启动fetch线程来抓取数据。默认情况下,每一个ReduceTask最多可以启动5个fetch线程来抓取数据。可以通过属性mapreduce.reduce.shuffle.parrellelcopies来调节

  3. fetch启动之后,会通过http请求的get请求来获取数据,在请求的时候,会携带参数表示当前的分区号。MapTask在收到请求之后,根据参数(分区号)来解析file.out.index文件,从中获取指定分区的位置,然后才会读取file.out,将对应分区的数据返回

  4. fetch线程在抓取到数据之后,会先对数据进行大小判断。如果数据超过了ReduceTask的缓冲区阈值,那么会将数据直接以文件形式写到磁盘上;如果没有超过ReduceTask的缓冲区阈值,那么就先放到缓冲区中

    1. 缓冲区的大小由属性mapreduce.reduce.shuffle.input.buffer.percent来决定,默认是0.7,是ReduceTask执行过程中允许占用内存的70%

    2. 缓冲区的阈值由属性mapreduce.reduce.shuffle.merge.percent来决定,默认是0.66,即缓冲区大小的66%

  5. fetch线程在抓取数据的同时,ReduceTask会启动两个后台线程将抓取来的数据进行merge,以防内存以及磁盘占用过多

  6. 所有的fetch线程完成之后,ReduceTask会将抓取来的所有的数据进行排序。同样,此时也是将局部有序的数据整理成整体有序的数据,所以依然采用的是归并排序(Merge Sort)

  7. 经过排序和merge,最终产生了一个大的临时文件来交给ReduceTask来处理。此时,ReduceTask会将相同的键对应的值放到一起,形成一个伪迭代器(本质上就是一个流,来读取刚刚产生的临时文件),这个过程称之为分组(group)。也正因为是伪迭代器(将流包装成了迭代器,临时文件读取完之后就会被销毁),所以只能遍历一次!

  8. 分组完成之后,每一个键调用一次reduce方法,按照指定的规则来对数据进行聚合

  9. reduce处理完之后,会将结果传递给OutputFormat,按照指定规则将数据以指定形式写出到指定位置

流程图

常见优化方案

  1. 减少MapTask的溢写次数。溢写是将数据写到磁盘上,程序和磁盘交互次数越多,效率越低;此时,如果需要提高效率,就可以考虑减少MapTask和磁盘的交互次数

    1. 调节缓冲区的大小。通过mapreduce.task.io.sort.mb来调节,实际过程中,一般会将这个值调节为250M~400M

    2. 调节缓冲区阈值的大小。通过属性mapreduce.map.sort.spill.percent来调节

  2. 减少Merge次数。通过属性mapreduce.task.io.sort.factor来调节

  3. 增加Combiner。如果计算可以传递,那么建议在程序中使用Combiner。根据经验,使用Combiner,大约可以提升40%的效率

  4. 减少Reduce。如果MapTask处理完成之后,不需要使用Reduce聚合,那么此时可以直接省略Reduce

  5. 合理设置ReduceTask的执行内存。默认情况下,每一个ReduceTask最多占用1G内存,如果试图超过1G内存,就会被kill掉

    1. 调大ReducedTask的执行内存。通过mapreduce.reduce.memory.mb属性来调节,单位是MB,默认值是1024

    2. 调大缓冲区的占比。通过mapreduce.reduce.shuffle.input.buffer.percent属性来调节

    3. 调大缓冲区的阈值。通过mapreduce.reduce.shuffle.merge.percent属性来调节

  6. 增加fetch线程的数量。通过mapreduce.reduce.shuffle.parrellelcopies属性来调节

  7. 可以考虑对数据进行压缩。即将MapTask产生的结果进行压缩之后传递给ReduceTask,但是这种方案是在网络和解压效率之间进行了平衡/对比

其他

小文件问题

  1. 在大数据开发环境中,虽然实际处理的文件大部分都是大文件,但是依然无法避免产生小文件

  2. 一般而言,如果文件大小≤Block*0.8,那么此时就认为这是一个小文件。实际过程中,一般认为不超过100M的文件就是小文件

  3. 小文件在分布式环境下的问题

    1. 存储:在HDFS中,每一个小文件对应一条元数据,如果存储大量的小文件,那么会产生大量的元数据,此时会导致占用较多的内存,同时导致元数据的读写效率降低

    2. 计算:在MapReduce中,每一个小文件对应一个切片,每一个切片对应一个MapTask。如果需要对大量的小文件进行处理,那么就意味着需要产生大量的MapTask,导致集群的资源被同时大量的占用和释放

  4. 目前对小文件的处理方案无非两种:合并(merge)和打包

  5. MapReduce提供了一种原生的打包方案:Hadoop Archive,将多个小文件打成一个.har

    # 将/txt/打成txt.har包,放到/result下
    hadoop archive -archiveName txt.har -p /txt /result

数据倾斜

  1. 在集群中,因为处理的数据量不均等导致任务执行时间不一致而产生的等待,称之为数据倾斜

  2. 数据倾斜可能发生在Map端,也可能会发生在Reduce端

    1. Map端产生数据倾斜的直接原因:需要同时处理多个文件,且文件大小不均等,文件不可切

    2. Reduce产生数据倾斜的直接原因:对数据进行分类

  3. 数据倾斜无法避免,因为数据本身就有倾斜特性

  4. 理论上来说,数据倾斜产生之后可以解决,但是实际过程中,不太好解决;对于Reduce端的数据倾斜,实际过程中,可能会采用二阶段聚合方式来处理

推测执行机制

  1. 推测执行机制本质是MapReduce针对慢任务的一种优化。当出现慢任务的时候,MapReduce会将这个任务拷贝一份到其他节点上,两个节点同时执行相同的任务,谁先执行完谁的结果就作为最终结果,另一个没有执行完的就会被kill掉

  2. 慢任务的出现场景

    1. 任务分配不均匀:每一个节点被分配到的任务数量不同

    2. 节点性能不一致:每一台服务器的配置不同

    3. 数据倾斜:任务之间处理的数据量不均等

  3. 实际工作中,因为数据倾斜而导致的慢任务出现的概率更高,此时推测执行机制除了会占用更多的集群资源以外,并不能提高执行效率。因此,实际过程中,一般会考虑关闭推测执行机制

    <!-- MapTask的推测执行机制 -->
    <property>
        <name>mapreduce.map.speculative</name>
        <value>true</value>
    </property>
    <!-- ReduceTask的推测执行机制 -->
    <property>
        <name>mapreduce.reduce.speculative</name>
        <value>true</value>
    </property>

join

  1. 案例:统计每一天的利润(文件:order.txt和product.txt)

  2. 需要同时处理多个文件,且信息来源不是只依靠单个文件就能处理,此时需要确定一个主文件,将其他的文件作为缓存文件进行处理,这个过程称之为join

标签:文件,流程,ReduceTask,MapReduce,MapTask,mapreduce,缓冲区,执行,数据
From: https://blog.csdn.net/m0_51388399/article/details/136863093

相关文章

  • android App启动流程三-Activity启动流程
    上一篇我们介绍了从App的进程创建到Application启动执行,今天我们继续深入学习一下,Activity的启动流程。realStartActivityLocked我们接着上一篇,从ActivityTaskManagerService.attachApplication函数看起,最终发现会执行到ActivityTaskSupervisor.realStartActivityLocked方法......
  • MyBatis3源码深度解析(十六)SqlSession的创建与执行(三)Mapper方法的调用过程
    文章目录前言5.9Mapper方法的调用过程5.10小结前言上一节【MyBatis3源码深度解析(十五)SqlSession的创建与执行(二)Mapper接口和XML配置文件的注册与获取】已经知道,调用SqlSession对象的getMapper(Class)方法,传入指定的Mapper接口对应的Class对象,即可获得一个动态......
  • shell执行sh
    假设我们有一个名为“shell_script”的脚本文件,文件内容如下#!/bin/bash然后我们准备执行这个文件$chmodu+xshell_script$./shell_script当我们执行./shell_script这行命令的时候由于脚本添加了shebang,相当于在命令行这样执行:/bin/bashshell_script  #!/usr/bin/ba......
  • 使用Selenium执行JavaScript脚本:探索Web自动化的新领域
    前言在我们使用selenium进行自动化测试的时候,selenium能够帮助我们实现元素定位和点击输入等操作,但是有的时候,我们会发现,即使我们的元素定位没有问题,元素也无法执行操作;也有部分情况是我们无法直接定位滚动条河时间控件来进行操作,这个时候,我们就需要借助JavaScript来解决问题。......
  • 直播预约丨《袋鼠云大数据实操指南》No.1:从理论到实践,离线开发全流程解析
    近年来,新质生产力、数据要素及数据资产入表等新兴概念犹如一股强劲的浪潮,持续冲击并革新着企业数字化转型的观念视野,昭示着一个以数据为核心驱动力的新时代正稳步启幕。面对这些引领经济转型的新兴概念,为了更好地服务于客户并提供切实可行的实践指导,自3月20日起,袋鼠云将推出全新......
  • git在单分支(自己分支)上的操作流程
    文章目录一、git命令整体操作流程(了解)二、idea中git操作流程(常用-图文)1、add2、commit,提交代码3、pull拉取最新代码4、push推送代码到远程仓库5、最后就可以在远程仓库中看你提交的代码了。平时在idea中,在自己的git分支上的操作还是比较频繁的,但是很多刚开始操作......
  • 鸿鹄电子招投标系统源码实现与立项流程:基于Spring Boot、Mybatis、Redis和Layui的企业
    随着企业的快速发展,招采管理逐渐成为企业运营中的重要环节。为了满足公司对内部招采管理提升的要求,建立一个公平、公开、公正的采购环境至关重要。在这个背景下,我们开发了一款电子招标采购软件,以最大限度地控制采购成本,提高招投标工作的公开性和透明性,并确保符合国家电子招投标......
  • linux 执行 PHP脚本
    phpapiroot.phpc=crontaba=indexphpD:\www\ddhd\www\apiroot.phpc=crontaba=index上面是运行脚本的命令,适合MVC框架,在入口文件处需要对控制器c和方法a进行特殊处理才能接收到参数 $c=$_GET['c']?:'index';$a=$_GET['a']?:'index';//start......
  • STM32_LVGL移植流程及注意事项
    STM32——LVGL移植流程及注意事项下载源码(lvgl8.2):点击git下载.源码精简lvgl-8.2​|build:使用Cmake工具编译的相关文件​|demos:lvgl官方的测试demos​|docs:lvgl文档​......
  • 基于jmeter的性能全流程测试
    01、做性能测试的步骤1、服务器性能监控首先要在对应服务器上面安装性能监控工具,比如linux系统下的服务器,可以选择nmon或者其他的监控工具,然后在jmeter模拟场景跑脚本的时候,同时启动监控工具,这样就可以获得jmeter的聚合报告和服务器的性能报告,然后分析这两份报告,得到性能测试的......