首页 > 其他分享 >hadoop之shuffle阶段相关面试题解析

hadoop之shuffle阶段相关面试题解析

时间:2023-02-15 12:05:00浏览次数:34  
标签:map 面试题 shuffle -- 分区 reduce hadoop key class


--思考1:map()方法写出的数据存储到哪里?                                              --内存中
1、在内存中存有一个环形缓冲区,该缓冲区默认大小是100M,map()方法中写出的kv数据会进入到环形缓冲区内,但是map()写出的kv数据是很大的,环形缓冲区不可能存的下,考虑到这一情况,设计者设置环形缓冲区的运行机制设置为:当写入环形缓冲区的数据达到整个缓冲区的80%时,发生溢写操作(落盘),将缓冲区内80%的kv数据溢写到磁盘,保证后续的数据可以陆续写入

--思考2:为什么不等到将缓冲区写满100%后再发生溢写操作?
为了避免map()方法写数据的停止,设计者预留出20%的空间,当缓冲区内80%的数据在执行溢写操作时,另外20%的空间仍然可以进行kv数据的写入操作(写入到缓冲区),从而不用终止map()方法写入的操作

--思考3:map()方法写出到环形缓冲区中的数据有哪些?
map()方法写入缓冲区的数据不仅仅是kv数据,同时还含有~kv数据的元数据meta信息、kv所属分区(在map方法写出数据进入到缓冲区前 ~就计算出分区号)

2、缓冲区内记录的N多个kv数据写入磁盘时,并不是直接进行一次性写入的,而是要对多个kv进行排序(按照key排序),默认情况下是按照字典序排序,将排好序的kv数据写出到不同的分区内,比如分区1、分区2...
注意:各分区内部的数据是独立进行排序的,互不影响                             --map阶段第1次排序(快排)
注意:本次排序只是对索引进行排序,并不产生位置的交换,根据索引去内存中查找数据,溢写数据时直接根据索引找值

3、溢写操作~排好序的分区数据会溢写到一个文件中,该文件中存储多个分区的数据
4、那么整个map()方法写出的数据经过缓冲区处理后,可能是产生N多个溢写文件
5、对于reduce来讲,缓冲区的N多个溢写文件就是它要处理的数据,

--思考4:reduce是直接从多个MapTask~map()方法 写出的多个溢写文件中 拷贝对应分区的数据做处理?
不是,原因:在一个MR程序中,MapTask的个数一般是大于ReduceTask个数的,因为分数据可以多台机器做,但是合数据reduce一定是少的,代表计算能力比较弱

6、考虑到合操作较弱的情况,因此设计者设计在每个map()方法经过缓冲区处理后产生的N多个溢写文件提前进行一次合并(归并操作),从而减缓reduce端合数据计算的压力
注意:N多个溢写文件在进行归并排序时,各分区的数据在合并时仍然是做排序的    --map阶段第2次排序(归并)
                                              局部有序的数据在做整体排序时,归并排序效率是很高的
7、N多个MapTask任务执行完毕后,N多个map()方法,都会最终产生归并排序后的 大~总溢写文件,那么ReduceTask在获取多个总溢写文件时,根据分区 拷贝多个总溢写文件上 对应的分区数据交给对应的reduce程序处理,内存不够,写到磁盘,等全部数据读取完毕后,reduce对属于同分区的kv数据再次做归并排序,最后分组,分组原因:相同key的多个kv组进入到同一个reduce方法做运算                                       --reduce端排序(归并)
    ( 每个ReduceTask按照所要处理的分区, 到每个MapTask中拷贝对应的分区的数据.)
8、reduce端对排好序的数据进行分组,然后进入reduce方法进行业务处理

--思考5:用mapreduce怎么处理数据倾斜问题?

数据倾斜问题分析:当map /reduce程序执行时,reduce节点大部分执行完毕,但是有一个或者几个reduce节点运行很慢,导致整个程序的处理时间很长,这是因为某一个key的条数比其他key多很多(有时是百倍或者千倍之多),这条key所在的reduce节点所处理的数据量比其他节点就大很多,从而导致某几个节点迟迟运行不完,此称之为数据倾斜。

解决方案:

(1)局部聚合加全局聚合。

第一次在 map 阶段对那些导致了数据倾斜的 key 加上 1 到 n 的随机前缀,这样本来相同的 key 也会被分到多个 Reducer 中进行局部聚合,数量就会大大降低。

第二次 mapreduce,去掉 key 的随机前缀,进行全局聚合。

思想:二次 mr,第一次将 key 随机散列到不同 reducer 进行处理达到负载均衡目的。第二次再根据去掉 key 的随机前缀,按原 key 进行 reduce 处理。

这个方法进行两次 mapreduce,性能稍差。

(2)增加 Reducer,提升并行度

JobConf.setNumReduceTasks(int)

(3)实现自定义分区

根据数据分布情况,自定义散列函数,将 key 均匀分配到不同 Reducer

--思考6:为什么设置reduce的个数,就可以实现分区的效果?

首先默认情况下,通过源码MapTask~run()方法~找到runNewMapper(//347行)方法进入~找到output = new NewOutputCollector(//782行)方法进入~查看下面代码并解析

//此位置获取的就是在Driver类中设置的reduce个数,如果没有设置默认就是1个
partitions = jobContext.getNumReduceTasks(); -711行
if (partitions > 1) {
//如果说reduce的个数大于1, 会尝试获取一个分区器类,通过mapreduce.job.partitioner.class参数获取,
// 默认mapreduce.job.partitioner.class没有配置,则直接返回HashPartitioner.class 。
partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>) --partitioner分区器对象
ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
//查看getPartitionerClass()源码~找JobContextImpl(232行)类~查看getClass()方法
//查看【mapred-default.xml】~mapreduce.job.partitioner.class未设置值~使用HashPartitioner
} else { --partitions不大于1,默认是1
// 最终的分区号就是固定的0号分区。
partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
@Override
public int getPartition(K key, V value, int numPartitions) {
return partitions - 1; --默认使用的是HashPartitioner分区,返回0,即0号分区
}
};
}

源码解析得出结论:
有多少个分区是由reduce个数决定的,
设置reduce的个数后,源码层面是生成一个分区器对象,而分区器对象中的业务处理就是计算最终要分区的个数(通过重写getPartition()方法),所以说,设置了reduce的个数就实现了分区的效果。

--思考7:获取到的分区器在哪里使用?
通过源码逐步分析:
1、context.write(outk,outv);                    --Mapper中的map()方法写出数据到缓冲区,查看源码
2、进入write()方法~TaskInputOutputContext接口~(ctrl+alt+B)查找该接口实现类~进入ChainMapContextImpl类(108行)~output.write()~进入RecordWriter抽象类~查找该抽象类的实现类NewOutputCollector(在MapTask中705行)~找到NewOutputCollector类中的write()方法726行
@Override
public void write(K key, V value) throws IOException, InterruptedException {
//collector可以看作就是缓冲区对象,它去收集kv,并同时通过分区器对象获取的分区号,所以在数据写出到缓冲区之前,分区号就已经是计算出来的
     collector.collect(key, value,
                 partitioner.getPartition(key, value, partitions));
}

--思考8:分区的数据具体是如何分的?
    1、首先明确一点,数据的分区是由分区器(Partitioner)对象来决定的.    
    2、Hadoop有默认的分区器对象 HashPartitioner .          --通过源码可以看到
          HashPartitioner会按照k的hash值对Reduce的个数进行取余操作~得到k所对应的分区.
    3、hadoop也支持用户自定义分区器
    
--思考9:HashPartitioner默认分区器具体是如何进行分区号计算的?
通过ctrl+N查找HashPartitioner类(mapreduce包下)~查看getPartition()方法
public int getPartition(K key, V value,int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}

源码分析得出结论:
通过计算每个key的hash值和Int类型的最大值做 与运算,然后对设置的reduce个数进行 取余操作,最终得到分区号。
通过取余操作,余数是0的数据,就进入0号分区文件中,余数是1的,就进入1号分区文件中
与运算的作用是保证得到的数值是正数。
注意:分区的计算不一定是非要按照key进行计算,HADOOP默认情况是通过key计算分区号,也可以自己定义按照value或者key+value等等。

--设置分区个数的注意事项

1、reduce个数的设置
     如果不设置,reduce的个数默认为1 ,则最终的分区号是固定的 0 
     如果 1 <   reduce个数  < 分区数, 报错 
     如果 reduce个数  > 分区数,  不报错, 多出的reduce节点空跑一趟.
     最佳: reduce的个数就设置为实际的分区数.
2、分区号只能从0开始,逐一累加

--自定义分区器案例演示

1、FlowBean
public class FlowBean implements Writable {

// 上行流量
Integer upFlow;
// 下行流量
Integer downFlow;
// 总流量
Integer sumFlow;

public Integer getUpFlow() {return upFlow;}
public void setUpFlow(Integer upFlow) {this.upFlow = upFlow;}
public Integer getDownFlow() {return downFlow;}
public void setDownFlow(Integer downFlow) {this.downFlow = downFlow;}
public Integer getSumFlow() {return sumFlow;}
public void setSumFlow(Integer sumFlow) {this.sumFlow = sumFlow;}

/**
* 用于计算一个手机号,流量和
*/
public void setSumFlow(){setSumFlow(getUpFlow() + getDownFlow());}

//反序列化执行时,需要一个承载数据的载体,通过无参构造创建载体对象
public FlowBean() {}

public void write(DataOutput out) throws IOException {
out.writeInt(upFlow);
out.writeInt(downFlow);
out.writeInt(sumFlow);
}
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readInt();
this.downFlow = in.readInt();
this.sumFlow = in.readInt();
}
@Override
public String toString() {
return this.upFlow + "\t" + this.downFlow + "\t" + this.sumFlow;
}
}

2、FlowMapper
public class FlowMapper extends Mapper<LongWritable, Text,Text, FlowBean> {
private Text outk = new Text();
private FlowBean outv = new FlowBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// super.map(key, value, context);

String line = value.toString();
String[] splits = line.split("\t");
// 封装key
outk.set(splits[1]);
// 封装value
outv.setUpFlow(Integer.parseInt(splits[splits.length - 3]));
outv.setDownFlow(Integer.parseInt(splits[splits.length - 2]));
outv.setSumFlow();
// 写出数据
context.write(outk,outv);
}
}

3、FlowReducer
public class FlowReducer extends Reducer<Text, FlowBean,Text, FlowBean> {
private FlowBean outv = new FlowBean();
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
// super.reduce(key, values, context);
int totalUpFlow = 0;
int totalDownFlow = 0;
// 迭代values,计算当前key对应的总上行,总下行,总流量和
for (FlowBean value : values) {
totalUpFlow += value.getUpFlow();
totalDownFlow += value.getDownFlow();
}
// 封装输出的value
outv.setUpFlow(totalUpFlow);
outv.setDownFlow(totalDownFlow);
outv.setSumFlow();
// 写出计算后的结果
context.write(key,outv);
}
}

4、MyPartitioner --自定义分区器
public class MyPartitioner extends Partitioner<Text, FlowBean> {
public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
int partition;
String line = text.toString();
if(line.startsWith("136")){
partition = 0;
}else if(line.startsWith("137")){
partition = 1;
}else if(line.startsWith("138")){
partition = 2;
}else if(line.startsWith("139")){
partition = 3;
}else {
partition = 4;
}
return partition;
}
}

5、Driver
public class FlowDriver {
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(FlowDriver.class);
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
// 设置使用自定义分区
job.setPartitionerClass(MyPartitioner.class); --查看set源码
// 设置ReducerTask执行个数,根据分区器逻辑分区个数设置对应ReducerTask
job.setNumReduceTasks(5);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
FileInputFormat.setInputPaths(job,new Path("D:\\bigtools\\hadooptest\\inputflow"));
FileOutputFormat.setOutputPath(job,new Path("D:\\bigtools\\hadooptest\\combineoutput1"));
job.waitForCompletion(true);
}
}

--思考10:shuffle中排序有哪些?

1、全排序     所有的数据整体排序,要求只能有一个分区,一个reduce,效率极低,完全丧失了MR的并行机制
2、区内排序   每个分区内的数据整体排序.根据输入记录的键完成
3、辅助排序    (分组排序) reduce端
4、二次排序   在自定义排序中,如果compareTo中的判断条件为两个即为二次排序(比较规则中用到两个条件)

排序源码分析:

1、查找MapTask~run()方法~找到runNewMapper(//347行)方法~找到new NewOutputCollector(//782行)进入~找到createSortingCollector(//710行)方法进入~找到collector.init(context);进入init方法408行~找到实现类MapOutputCollector定位到1018行~comparator = job.getOutputKeyComparator();获取key的比较器对象~进入getOutputKeyComparator(//JobConf的882行)方法~查看源码

public RawComparator getOutputKeyComparator() {
Class<? extends RawComparator> theClass = getClass(
//KEY_COMPARATOR = "mapreduce.job.output.key.comparator.class",默认没有设置值
JobContext.KEY_COMPARATOR, null, RawComparator.class);
if (theClass != null)
如果能通过参数获取到,则通过反射创建比较器对象
return ReflectionUtils.newInstance(theClass, this);
// 如果通过参数获取不到,则获取到在driver中设置的map的输出的key的类型,
// 并判断key的类型是否属于WritableComparable类型,再尝试为key获取比较器对象.
return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);
}

2、进入WritableComparator.get()方法
private static final ConcurrentHashMap<Class, WritableComparator> comparators
= new ConcurrentHashMap<Class, WritableComparator>();
//comparators是一个map集合,键是输出的某个key的类型,value是该类型的比较器对象
public static WritableComparator get(
Class<? extends WritableComparable> c, Configuration conf) {
WritableComparator comparator = comparators.get(c); --从map集合中得到一个比较器对象
if (comparator == null) {
// force the static initializers to run
forceInit(c); -- 强制进行类加载
// look to see if it is defined now
comparator = comparators.get(c); -- 再次进行获取
// if not, use the generic one
if (comparator == null) { --如果还获取不到,则直接new一个comparator对象出来。
comparator = new WritableComparator(c, conf, true);
}
}
// Newly passed Configuration objects should be used.
ReflectionUtils.setConf(comparator, conf);
return comparator; --正常获取到比较器对象
}

 

标签:map,面试题,shuffle,--,分区,reduce,hadoop,key,class
From: https://blog.51cto.com/u_14389461/6059075

相关文章

  • Hive 面试题——HiveSQL 执行顺序
    描述今天刷到了一个面试题:hivesql执行顺序,接下来就从一个带有groupby的例子看看hivesql的执行顺序执行顺序为from..on..join..where..groupby..having......
  • hadoop模板虚拟机配置
    在安装好虚拟机软件后,进行IP配置 配置windows系统的ip 配置Vmware的ip 配置虚拟机的ip首先输入suroot切换至root身份。然后配置ip和网关vim/etc/sysconfig......
  • ​​面试题 01.05. 一次编辑​
    字符串有三种编辑操作:插入一个英文字符、删除一个英文字符或者替换一个英文字符。给定两个字符串,编写一个函数判定它们是否只需要一次(或者零次)编辑。链接: ​​https:/......
  • #yyds干货盘点# LeetCode面试题:最长公共前缀
    1.简述:编写一个函数来查找字符串数组中的最长公共前缀。如果不存在公共前缀,返回空字符串 ""。 示例1:输入:strs=["flower","flow","flight"]输出:"fl"示例2:输入:strs=["......
  • 集合面试题
    Collection:①List:  Vector(Stack),  ArrayList,  LinekdList ②Set:  HashSet(LinkedHashSet),  TreeSet③Queue:  PriorityQueue,  ArrayDeque......
  • 热点面试题: Array中有哪些非破坏性方法?
    前言极度投入,深度沉浸,边界清晰前端小菜鸡一枚,分享的文章纯属个人见解,若有不正确或可待讨论点可随意评论,与各位同学一起学习~欢迎关注​​『前端进阶圈』​​公众号,一起探......
  • java面试题(七)
    1.21说一说hashCode()和equals()的关系参考答案hashCode()用于获取哈希码(散列码),eauqls()用于比较两个对象是否相等,它们应遵守如下规定:如果两个对象相等,则它们必须有相同的......
  • 面试题,反射创建类实例的三种方式是什么
    1、获得Class:主要有三种方法:(1)Object-->getClass(2)任何数据类型(包括基本的数据类型)都有一个“静态”的class属性(3)通过class类的静态方法:forName(StringclassName)(最常用)publi......
  • 面试题 HashMap和HashTable有什么区别
    ......
  • 面试题如何实现一个IOC容器
     ......