首页 > 其他分享 >MapReduce分布式计算(二)

MapReduce分布式计算(二)

时间:2023-05-12 18:57:20浏览次数:38  
标签:文件 combiner 04 map reduce 分布式计算 MapReduce 2022

练习
同一时间不同地区的温度 求每天的最高温度

2022-04-03,21.2
2022-04-03,18.5
2022-04-03,24.3
2022-04-03,16.5
2022-04-03,10.0
2022-04-04,28.3
2022-04-04,18.7
2022-04-04,30.0
2022-04-04,21.1

代码实现

package com.doit.demo04;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class Test {

    /*
        2022-04-03,21.2
        K1:行的起始位置    LongWritable
        V1:一行数据     Text
        K2:日期        Text
        V2:温度        DoubleWritable
     */
    private static class TemperatureMapper extends Mapper<LongWritable, Text,Text, DoubleWritable>{
       //每行数据都会调用一次map方法 这样可以减少对象创建
        private Text k2 = new Text();
        private DoubleWritable v2 = new DoubleWritable();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] arr = line.split(",");

            k2.set(arr[0]);
            v2.set(Double.parseDouble(arr[1]));

            context.write(k2,v2);

        }
    }

    /*
         K2:日期        Text
         V2:温度        DoubleWritable
         K3:日期        Text
         V3:最高温度    DoubleWritable
     */
    private static class  TemperatureReducer extends Reducer<Text,DoubleWritable,Text,DoubleWritable>{

        private DoubleWritable d = new DoubleWritable();
        /**
         * @param key   K2
         * @param values   V2的集合
         */
        @Override
        protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {

            double max = values.iterator().next().get();
            for (DoubleWritable value : values) {
                 double v =  value.get();
                if(v>max){
                    max = v;
                }
            }

            d.set(max);
            context.write(key,d);
        }
    }


    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();

        //创建任务
        Job job = Job.getInstance(conf, "temprature");
        //设置Mapper类
        job.setMapperClass(TemperatureMapper.class);
        //设置Reduce类
        job.setReducerClass(TemperatureReducer.class);
        //设置map的输出类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(DoubleWritable.class);
        //设置reduce的输出类型
        job.setOutputKeyClass(Text.class);
        job.setMapOutputValueClass(DoubleWritable.class);

        //设置输入文件位置
        FileInputFormat.setInputPaths(job,new Path("d:\\work\\abc\\temprature.txt"));
        //设置输出文件位置
        FileOutputFormat.setOutputPath(job,new Path("d:\\work\\abc\\out_put2"));

        //将任务提交 并等待完成
        job.waitForCompletion(true);


    }
}
结果
	2022-04-03	24.3
	2022-04-04	30.0

MapReduce工作流程

原始数据File

1T数据被切分成块存放在HDFS上,每一个块有128M大小

数据块Block

hdfs上数据存储的一个单元,同一个文件中块的大小都是相同的
因为数据存储到HDFS上不可变,所以有可能块的数量和集群的计算能力不匹配 我们需要一个动态调整本次参与计算节点数量的一个单位

切片Split

切片是一个逻辑概念
在不改变现在数据存储的情况下,可以控制参与计算的节点数目 通过切片大小可以达到控制计算节点数量的目的
有多少个切片就会执行多少个Map任务

image

image

image

一般切片大小为Block的整数倍(2 1/2)
防止多余创建和很多的数据连接
如果Split>Block ,计算节点少了
如果Split<Block ,计算节点多了
默认情况下,Split切片的大小等于Block的大小 ,默认128M 一个切片对应一个MapTask

MapTask

map默认从所属切片读取数据,每次读取一行(默认读取器)到内存中
我们可以根据自己书写的分词逻辑(空格分隔).计算每个单词出现的次数 这是就会产生 (Map<String,Integer>)临时数据,存放在内存中
但是内存大小是有限的,如果多个任务同时执行有可能内存溢出(OOM) 如果把数据都直接存放到硬盘,效率太低
我们需要在OOM和效率低之间提供一个有效方案
可以现在内存中写入一部分,然后写出到硬盘

环形数据缓冲区

可以循环利用这块内存区域,减少数据溢写时map的停止时间
每一个Map可以独享的一个内存区域
在内存中构建一个环形数据缓冲区(kvBuffer),默认大小为100M
设置缓冲区的阈值为80%,当缓冲区的数据达到80M开始向外溢写到硬盘

溢写的时候还有20M的空间可以被使用效率并不会被减缓 而且将数据循环写到硬盘,不用担心OOM问题

分区Partation

根据Key直接计算出对应的Reduce
分区的数量和Reduce的数量是相等的
hash(key) % partation = num
默认分区的算法是Hash然后取余
Object的hashCode()---equals()
如果两个对象equals,那么两个对象的hashcode一定相等
如果两个对象的hashcode相等,但是对象不一定equlas

排序Sort

对要溢写的数据进行排序(QuickSort)
按照先Partation后Key的顺序排序-->相同分区在一起,相同Key的在一起
我们将来溢写出的小文件也都是有序的

溢写Spill

将内存中的数据循环写到硬盘,不用担心OOM问题
每次会产生一个80M的文件
如果本次Map产生的数据较多,可能会溢写多个文件

合并Merge

因为溢写会产生很多有序(分区 key)的小文件,而且小文件的数目不确定
后面向reduce传递数据带来很大的问题
所以将小文件合并成一个大文件,将来拉取的数据直接从大文件拉取即可
合并小文件的时候同样进行排序(归并排序),最终产生一个有序的大文件

组合器combiner

集群的带宽限制了mapreduce作业的数量,因此应该尽量避免map和reduce任务之间的数据传 输。hadoop允许用户对map的输出数据进行处理,用户可自定义combiner函数(如同map函数和 reduce函数一般),其逻辑一般和reduce函数一样,combiner的输入是map的输出,combiner 的输出作为reduce的输入,很多情况下可以直接将reduce函数作为conbiner函数来使用
(job.setCombinerClass(FlowCountReducer.class);)。
combiner属于优化方案,所以无法确定combiner函数会调用多少次,可以在环形缓存区溢出文件 时调用combiner函数,也可以在溢出的小文件合并成大文件时调用combiner。但要保证不管调用 几次combiner函数都不会影响最终的结果,所以不是所有处理逻辑都可以使用combiner组件,有 些逻辑如果在使用了combiner函数后会改变最后rerduce的输出结果(如求几个数的平均值,就不 能先用combiner求一次各个map输出结果的平均值,再求这些平均值的平均值,这将导致结果错 误)。
combiner的意义就是对每一个maptask的输出进行局部汇总,以减小网络传输量。
原先传给reduce的数据是 a1 a1 a1 a1 a1
第一次combiner组合之后变为a{1,1,1,1,..}
第二次combiner后传给reduce的数据变为a{4,2,3,5...}

拉取Fetch

我们需要将Map的临时结果拉取到Reduce节点
原则:
相同的Key必须拉取到同一个Reduce节点
但是一个Reduce节点可以有多个Key
未排序前拉取数据的时候必须对Map产生的最终的合并文件做全序遍历
而且每一个reduce都要做一个全序遍历
如果map产生的大文件是有序的,每一个reduce只需要从文件中读取自己所需的即可

合并Merge

因为reduce拉取的时候,会从多个map拉取数据
那么每个map都会产生一个小文件,这些小文件(文件与文件之间无序,文件内部有序) 为了方便计算(没必要读取N个小文件),需要合并文件
归并算法合并成2个
相同的key都在一起

归并Reduce

将文件中的数据读取到内存中
一次性将相同的key全部读取到内存中
直接将相同的key得到结果-->最终结果

写出Output

每个reduce将自己计算的最终结果都会存放到HDFS上

标签:文件,combiner,04,map,reduce,分布式计算,MapReduce,2022
From: https://www.cnblogs.com/paopaoT/p/17396045.html

相关文章

  • MapReduce分布式计算
    MapReduce是Hadoop系统核心组件之一,它是一种可用于大数据并行处理的计算模型、框架和平台,主要解决海量数据的计算,是目前分布式计算模型中应用较为广泛的一种。练习:计算a.txt文件中每个单词出现的次数helloworldhellohadoophello51doithadoopmapreducemapreducespark......
  • Mapreduce二次排序时,将jar包上传至Hadoop上运行时,抛出异常"java.util.NoSuchElementEx
    查询原因后发现是java中实现实现Mapper时StringTokenizer类时使用了一个方法nextToken()会抛出这个异常,”我们可以使用hasMoreTokens()和hasMoreElements()方法来避免异常。如果标记器的字符串中有更多标记可用,则这两种方法都返回true。只有当hasMoreTokens()方法返回Tr......
  • mapreduce测试时出现INFO client.RMProxy: Connecting to ResourceManager at 0.0.0.0
    如运行wordcount后出现INFOclient.RMProxy:ConnectingtoResourceManagerat0.0.0.0:8032长时间不动,我尝试修改我的yarn-site.xml配置后可以成功运行  <property>    <name>yarn.nodemanager.aux-services</name>    <value>mapreduce_shuffle</value>  </pr......
  • ray-分布式计算框架-集群与异步Job管理
    0.ray简介ray是开源分布式计算框架,为并行处理提供计算层,用于扩展AI与Python应用程序,是ML工作负载统一工具包RayAIRuntimeML应用程序库集RayCore通用分布式计算库Task--Ray允许任意Python函数在单独的Pythonworker上运行,这些异步Python函数称为任务Actor......
  • MapReduce原理
         MapReduce运行流程  MapReduce容错机制 ......
  • hiveSQL mapreduce任务调优
    sethive.merge.mapredfiles=true;--在Map-Reduce的任务结束时合并小文件setmapred.max.split.size=30000000;--决定每个map处理的最大的文件大小,单位为B--setmapred.min.split.size=10000000;--公司集群默认值--setmapred.min.split.size.per.node=;......
  • 并行计算、分布式计算、集群计算和网格计算的介绍,以及主要有哪些区别?
    并行计算(ParallelComputing)并行计算或称平行计算是相对于串行计算来说的。并行计算(ParallelComputing)是指同时使用多种计算资源解决计算问题的过程。为执行并行计算,计算资源应包括一台配有多处理机(并行处理)的计算机、一个与网络相连的计算机专有编号,或者两者结合使用。......
  • Hadoop的生态体系,HDFS和MapReduce等的具体介绍
    Hadoop的两大核心就是HDFS和MapReduce,而整个Hadoop的体系结构主要是通过HDFS的分布式存储作为底层数据支持的。并且会通过MapReduce来进行计算分析。Hadoop1.x的核心:HadoopCommonHadoopDistributedFileSystem(HDFS)HadoopMapReduceHadoop2.x的核心:HadoopCommonHadoopDistribu......
  • 分布式计算技术(下):Impala、Apache Flink、星环Slipstream
    实时计算的发展历史只有十几年,它与基于数据库的计算模型有本质区别,实时计算是固定的计算任务加上流动的数据,而数据库大多是固定的数据和流动的计算任务,因此实时计算平台对数据抽象、延时性、容错性、数据语义等的要求与数据库明显不同,面向实时计算的数据架构也就发展起来。本篇我......
  • MIT 6.5840 2023 Spring(6.824)LAB1:MapReduce
    MIT6.58402023Spring(6.824)LAB1:MapReduce前言本次lab主要是完成一个基于RPC远程调用的单机单文件系统的简单MapReduce框架,并完成单词计数任务。基于golang实现,单Master,多Worker。实现worker的奔溃恢复(FaultTorrance),通过超时重新执行实现。主要的任务有,RPC调用参数及返回参数......