首页 > 其他分享 >Hadoop之MapReduce性能优化

Hadoop之MapReduce性能优化

时间:2023-05-29 22:11:47浏览次数:28  
标签:文件 conf Text Hadoop MapReduce SequenceFile key new 优化

现在大家已经掌握了MapReduce程序的开发步骤,注意了,针对MapReduce的案例我们并没有讲太多,主要是因为在实际工作中真正需要我们去写MapReduce代码的场景已经是凤毛麟角了,因为后面我们会学习一个大数据框架Hive,Hive支持SQL,这个Hive底层会把SQL转化为MapReduce执行,不需要我们写一行代码。

但是MapReduce代码的开发毕竟是基本功,所以前面我们也详细的讲解了它的开发流程。虽然现在MapReduce代码写的很少了,但是针对MapReduce程序的性能优化是少不了的,面试也是经常会问到的,所以下面我们就来分析一下MapReduce中典型的性能优化场景

第一个场景是:小文件问题
第二个场景是:数据倾斜问题

小文件问题

咱们前面分析过,Hadoop的HDFS和MapReduce都是针对大数据文件来设计的,在小文件的处理上不但效率低下,而且十分消耗内存资源

针对HDFS而言,每一个小文件在namenode中都会占用150字节的内存空间,最终会导致集群中虽然存储了很多个文件,但是文件的体积并不大,这样就没有意义了。

针对MapReduce而言,每一个小文件都是一个Block,都会产生一个InputSplit,最终每一个小文件都会产生一个map任务,这样会导致同时启动太多的Map任务,Map任务的启动是非常消耗性能的,但是启动了以后执行了很短时间就停止了,因为小文件的数据量太小了,这样就会造成任务执行消耗的时间还没启动任务消耗的时间多,这样也会影响MapReduce执行的效率。

针对这个问题,解决办法通常是选择一个容器,将这些小文件组织起来统一存储,HDFS提供了两种类型的容器,分别是SequenceFile 和 MapFile

SequeceFile是Hadoop 提供的一种二进制文件,这种二进制文件直接将<key, value>对序列化到文件中。一般对小文件可以使用这种文件合并,即将小文件的文件名作为key,文件内容作为value序列化到大文件中。但是这个文件有一个缺点,就是它需要一个合并文件的过程,最终合并的文件会比较大,并且合并后的文件查看起来不方便,必须通过遍历才能查看里面的每一个小文件。所以这个SequenceFile 其实可以理解为把很多小文件压缩成一个大的压缩包了。

SequeceFile

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;

import java.io.File;
import java.io.IOException;
import java.util.Objects;

public class TestSequenceFile {
    public static void main(String[] args) throws IOException {
        write("C:\\D-myfiles\\testjar\\hadoop", "/sequenceFile");
        read("/sequenceFile");
    }

    private static void read(String inputFile) throws IOException {
        //创建一个配置对象
        Configuration conf = new Configuration();
        //指定HDFS的地址
        conf.set("fs.defaultFS", "hdfs://bigdata01:9000");
        conf.set("dfs.client.use.datanode.hostname", "true");
        //创建阅读器
        SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(new Path(inputFile)));
        Text key = new Text();
        Text value = new Text();
        //循环读取数据
        while (reader.next(key, value)) {
            //输出文件名称
            System.out.print("文件名:" + key.toString() + ",");
            //输出文件的内容
            System.out.println("文件内容:" + value.toString());
        }
        reader.close();

    }


    private static void write(String inputDir, String outputFile) throws IOException {
        //创建一个配置对象
        Configuration conf = new Configuration();
        //指定HDFS的地址
        conf.set("fs.defaultFS", "hdfs://bigdata01:9000");
        conf.set("dfs.client.use.datanode.hostname", "true");
        //获取操作HDFS的对象
        FileSystem fileSystem = FileSystem.get(conf);
        //删除输出文件
        fileSystem.delete(new Path(outputFile), true);
        //构造opts数组,有三个元素
         /*
         第一个是输出路径
         第二个是key类型
         第三个是value类型
         */
        SequenceFile.Writer.Option[] opts = new SequenceFile.Writer.Option[]{
                SequenceFile.Writer.file(new Path(outputFile)),
                SequenceFile.Writer.keyClass(Text.class),
                SequenceFile.Writer.valueClass(Text.class)};
        //创建一个writer实例
        SequenceFile.Writer writer = SequenceFile.createWriter(conf, opts);
        //指定要压缩的文件的目录
        File inputDirPath = new File(inputDir);
        if (inputDirPath.isDirectory()) {
            File[] files = inputDirPath.listFiles();
            if (Objects.nonNull(files)) {
                for (File file : files) {
                    //获取文件全部内容
                    String content = FileUtils.readFileToString(file, "UTF-8");
                    //文件名作为key
                    Text key = new Text(file.getName());
                    //文件内容作为value
                    Text value = new Text(content);
                    writer.append(key, value);
                }
            }
        }
        writer.close();
    }
}

执行代码中的write方法,可以看到在HDFS上会产生一个/seqFile文件,这个文件就是最终生成的大文件。执行代码中的read方法,可以输出小文件的名称和内容

MapFile

MapFile是排序后的SequenceFile,MapFile由两部分组成,分别是index和data。index作为文件的数据索引,主要记录了每个Record的key值,以及该Record在文件中的偏移位置。在MapFile被访问的时候,索引文件会被加载到内存,通过索引映射关系可迅速定位到指定Record所在文件位置,因此,相对SequenceFile而言,MapFile的检索效率是高效的,缺点是会消耗一部分内存来存储index数据。

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;

import java.io.File;
import java.io.IOException;
import java.util.Objects;

/**
 * 小文件解决方案之MapFile
 */
public class TestMapFile {
    public static void main(String[] args) throws IOException {
        write("C:\\D-myfiles\\testjar\\hadoop", "/mapFile");
        read("/mapFile");
    }

    private static void read(String inputFile) throws IOException {
        //创建一个配置对象
        Configuration conf = new Configuration();
        //指定HDFS的地址
        conf.set("fs.defaultFS", "hdfs://bigdata01:9000");
        conf.set("dfs.client.use.datanode.hostname", "true");
        //创建阅读器
        MapFile.Reader reader = new MapFile.Reader(new Path(inputFile), conf);
        Text key = new Text();
        Text value = new Text();
        //循环读取数据
        while (reader.next(key, value)) {
            //输出文件名称
            System.out.print("文件名:" + key.toString() + ",");
            //输出文件的内容
            System.out.println("文件内容:" + value.toString());
        }
        reader.close();

    }


    private static void write(String inputDir, String outputFile) throws IOException {
        //创建一个配置对象
        Configuration conf = new Configuration();
        //指定HDFS的地址
        conf.set("fs.defaultFS", "hdfs://bigdata01:9000");
        conf.set("dfs.client.use.datanode.hostname", "true");
        //获取操作HDFS的对象
        FileSystem fileSystem = FileSystem.get(conf);
        //删除输出文件
        fileSystem.delete(new Path(outputFile), true);
        //构造opts数组,有两个元素
        SequenceFile.Writer.Option[] opts = new SequenceFile.Writer.Option[]{
                MapFile.Writer.keyClass(Text.class),
                MapFile.Writer.valueClass(Text.class)};
        //创建一个writer实例
        MapFile.Writer writer = new MapFile.Writer(conf, new Path(outputFile), opts);
        //指定要压缩的文件的目录
        File inputDirPath = new File(inputDir);
        if (inputDirPath.isDirectory()) {
            File[] files = inputDirPath.listFiles();
            if (Objects.nonNull(files)) {
                for (File file : files) {
                    //获取文件全部内容
                    String content = FileUtils.readFileToString(file, "UTF-8");
                    //文件名作为key
                    Text key = new Text(file.getName());
                    //文件内容作为value
                    Text value = new Text(content);
                    writer.append(key, value);
                }
            }
        }
        writer.close();
    }
}

执行代码中的write方法,可以看到在HDFS上会产生一个/mapFile目录,这个目录里面有两个文件,一个
index索引文件,一个data数据文件。执行代码中的read方法,可以输出小文件的名称和内容

通过MapReduce读取SequenceFile

们之前的代码默认只能读取普通文本文件,针对SequenceFile是无法读取的
那该如何设置才能让mapreduce可以读取SequenceFile呢?
很简单,只需要在job中设置输入数据处理类就行了,默认情况下使用的是TextInputFormat

job.setInputFormatClass(SequenceFileInputFormat.class);

数据倾斜问题

在实际工作中,如果我们想提高MapReduce的执行效率,最直接的方法是什么呢?我们知道MapReduce是分为Map阶段和Reduce阶段,其实提高执行效率就是提高这两个阶段的执行效率。

默认情况下Map阶段中Map任务的个数是和数据的InputSplit相关的,InputSplit的个数一般是和Block块是有关联的,所以可以认为Map任务的个数和数据的block块个数有关系,针对Map任务的个数我们一般是不需要干预的,除非是前面我们说的海量小文件,那个时候可以考虑把小文件合并成大文件。其他情况是不需要调整的,那就剩下Reduce阶段了,咱们前面说过,默认情况下reduce的个数是1个,所以现在MapReduce任务的压力就集中在Reduce阶段了,如果说数据量比较大的时候,一个reduce任务处理起来肯定是比较慢的,所以我们可以考虑增加reduce任务的个数,这样就可以实现数据分流了,提高计算效率了。但是注意了,如果增加Reduce的个数,那肯定是要对数据进行分区的,分区之后,每一个分区的数据会被一个reduce任务处理。

如果想要多个分区,很简单,只需要把 numReduceTasks 的数目调大即可。

假设我们有一个文件,有1000W条数据,这里面的值主要都是数字,1,2,3,4,5,6,7,8,9,10,我们希望统计出来每个数字出现的次数
其实在私底下我们是知道这份数据的大致情况的,这里面这1000w条数据,值为5的数据有910w条左右,剩下的9个数字一共只有90w条,那也就意味着,这份数据中,值为5的数据比较集中,或者说值为5的数据属于倾斜的数据,在这一整份数据中,它占得比重比其他的数据多得多。

我们来分析一下,刚才我们说了我们这份数据中,值为5的数据有910w条,这就占了整份数据的90%了,那这90%的数据会被一个reduce任务处理,在这里假设是让reduce5处理了,reduce5这个任务执行的是比较慢的,其他reduce任务都执行结束很长时间了,它还没执行结束,因为reduce5中处理的数据量和其他reduce中处理的数据量规模相差太大了,所以最终reduce5拖了后腿。咱们mapreduce任务执行消耗的时间是一直统计到最后一个执行结束的reduce任务,所以就算其他reduce任务早都执行结束了也没有用,整个mapreduce任务是没有执行结束的。

那针对这种情况怎么办?这个时候单纯的增加reduce任务的个数已经不起多大作用了,如果启动太多可能还会适得其反。其实这个时候最好的办法是把这个值为5的数据尽量打散,把这个倾斜的数据分配到其他reduce任务中去计算,这样才能从根本上解决问题。这就是我们要分析的一个数据倾斜的问题。

具体怎么打散呢?其实就是给他加上一些有规律的随机数字就可以了
在这里我们这样处理,我把5这个数值的数据再分成10份,所以我就在这个数值5后面拼上一个0~9的随机数即可。

@Override
protected void map2(LongWritable k1, Text v1, Context context)
        throws IOException, InterruptedException {
    //输出k1,v1的值
    //System.out.println("<k1,v1>=<"+k1.get()+","+v1.toString()+">");
    //logger.info("<k1,v1>=<"+k1.get()+","+v1.toString()+">");
    //k1 代表的是每一行数据的行首偏移量,v1代表的是每一行内容
    //对获取到的每一行数据进行切割,把单词切割出来
    String[] words = v1.toString().split(" ");
    //把单词封装成<k2,v2>的形式
    String key = words[0];
    if ("5".equals(key)) {
        //把倾斜的key打散,分成10份
        key = "5" + "_" + random.nextInt(10);
    }
    Text k2 = new Text(key);
    LongWritable v2 = new LongWritable(1L);
    //把<k2,v2>写出去
    context.write(k2, v2);
}

最终得到的结果为

1   100000
5_3 1012097
2   100000
5_4 1011163
3   100000
5_5 1010498
4   100000
5_6 1010755
5_7 1010823
5_8 1012394
6   100000
7   100000
5_0 1011274
8   100000
10  100000
5_1 1009972
9   100000
5_2 1011024

这个时候我们获取到的最终结果是一个半成品,还需要进行一次加工,其实我们前面把这个倾斜的数据打散之后相当于做了一个局部聚合,现在还需要再开发一个mapreduce任务再做一次全局聚合,其实也很简单,获取到上一个map任务的输出,在map端读取到数据之后,对数据先使用空格分割,然后对第一列的数据再使用下划线分割,分割之后总是取第一列,这样就可以把值为5的数据还原出来了。

标签:文件,conf,Text,Hadoop,MapReduce,SequenceFile,key,new,优化
From: https://www.cnblogs.com/strongmore/p/17331087.html

相关文章

  • sse 与 编译器自动优化
    directx形式的矩阵和向量计算代码在编译的时候是自动汇编为sse汇编的何时使用手写sse指令呢,当你的应用程序需要写一些物理运算时候可以使用自己编写的sse计算函数来为3维运算加速关于amd指令集(3dnow)有的程序在编写的时候可以使用判断来判断是否是amd平台的cpu如果上了......
  • MySQL之慢查询sql排查及优化
    前言sql语句优化的方式:1.尽量少join2.尽量少排序3.尽量避免select*4.尽量少or5.尽量用unionall代替union…(优化的方式有很多,这里就不一一举例了)当你避免这些问题的时候,为什么sql查询还是这么慢?排查慢查询sqlps:mysql版本为5.71.连接mysqlmysql-uroot-p2.查......
  • 玩转MySQL数据库之SQL优化之慢查询
    本系列为:MySQL数据库详解,为千锋资深教学老师独家创作,致力于为大家讲解清晰MySQL数据库相关知识点,含有丰富的代码案例及讲解。如果感觉对大家有帮助的话,可以【关注】持续追更~文末有本文重点总结,技术类问题,也欢迎大家和我们沟通交流!前言从今天开始本系列文章就带各位小伙伴学习......
  • hadoop序列化相关问题
    什么时候需要使用序列化?需要在不同服务器传递内存数据时,用序列化。序列化后的所有属性需要再反序列化,那么有先后顺序反序列化吗?有的,比如序列化的属性有abc则反序列化的属性必须是cabc数据切片一般为数据块的倍数,为什么?一般一个数据切片对应启动一个maptask任务,可以保证......
  • MySQL优化思路及方向
    本系列为:MySQL数据库详解,为千锋资深教学老师独家创作,致力于为大家讲解清晰MySQL数据库相关知识点,含有丰富的代码案例及讲解。如果感觉对大家有帮助的话,可以【关注】持续追更~文末有本文重点总结,技术类问题,也欢迎大家和我们沟通交流!前言从今天开始本系列文章就带各位小伙伴学习......
  • 监控优化
    CPU使用率{"Namespace":"acs_ecs_dashboard","Dimensions":"{\"instanceId\":\"i-9vc05ajzjfyhyxnm62m9\"}","MetricName":"CPUUtilization","StartTime":1675817179610,"E......
  • 采集优化
    采集优化做了5项:1.全量同步,资源类型支持可配置,默认同步对象存储和快照。2.任务分片优化,map子任务数最多不超过150。3.单个任务执行完成后,校验资源删除逻辑,不再使用之前等待所有采集任务执行完成再进行校验资源是否删除的逻辑。4.CMDB资源表增加区域和区域项目字段,提供升级脚本。5.......
  • 发布-优化图片和js文件的存放路径
    把JavaScript文件同意生成到js目录中在webpack.config.js配置文件的output节点中,进行如下的配置:{test:/\.jpg|png|gif$/,use:'url-loader?limit=470&outputPath=images'}......
  • Hadoop全分布部署
    安装包下载(百度网盘)链接:https://pan.baidu.com/s/1XrnbpNNqcG20QG_hL4RJoQ?pwd=aec9提取码:aec9基础配置(所有节点)关闭防火墙,selinux安全子系统#关闭防火墙,设置开机自动关闭[root@localhost~]#systemctldisable--nowfirewalldRemoved/etc/systemd/system/multi-user......
  • centos7上Hadoop2.7.2完全分布式部署
    1.规划node1         node2           node3datanode       datanode         datanodenamenode     resourcemanager  secondarynamenodenodemanager   nodemanager     no......