切片与MapTask并行度决定机制
- MapTask的并行度决定Map阶段的任务处理并发度,进而影响到整个Job的处理速度
- 数据块:Block是HDFS物理上把数据分成一块一块,数据块是HDFS存储数据单位
- 数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储,数据切片是MapReduce程序计算输入数据的单位,一个切片会对应启动一个MapTask
- 一个Job的Map阶段并行度由客户端在提交Job时的切片数决定
- 每一个Split切片分配一个MapTask并行实例处理
- 默认情况下,切片大小=BlockSize
- 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
Job提交流程源码详解
waitForCompletion()
submit();
// 1 建立连接
connect();
// 1)创建提交 Job 的代理
new Cluster(getConfiguration());
// (1)判断是本地运行环境还是 yarn 集群运行环境
initialize(jobTrackAddr, conf);
// 2 提交 job
submitter.submitJobInternal(Job.this, cluster)
// 1)创建给集群提交数据的 Stag 路径
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
// 2)获取 jobid ,并创建 Job 路径
JobID jobId = submitClient.getNewJobID();
// 3)拷贝 jar 包到集群
copyAndConfigureFiles(job, submitJobDir);
rUploader.uploadFiles(job, jobSubmitDir);
// 4)计算切片,生成切片规划文件
writeSplits(job, submitJobDir);
maps = writeNewSplits(job, jobSubmitDir);
input.getSplits(job);
// 5)向 Stag 路径写 XML 配置文件
writeConf(conf, submitJobFile);
conf.writeXml(out);
// 6)提交 Job,返回提交状态
status = submitClient.submitJob(jobId,submitJobDir.toString(),job.getCredentials());
FileInputFormat切片机制
1、程序首先找到数据存储的目录
2、开始遍历处理(规划切片)目录下的每一个文件
3、遍历第一个文件
- 获取文件大小
fs.sizeOf()
- 计算切片大小
computeSplitSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M
- 默认情况下,切片大小 = blocksize
- 开始切片,假设文件大小为300M,形成第一个切片:
0:128M
,第二个切片:128-256M
,第三个切片:256:300M
,每次切片时,都要判断切完剩下的部分是否大于块的1.1倍,不大于1.1倍就划分一块切片 - 将切片信息写到一个切片规划文件中
- 整个切片的核心过程在getSplit()方法中完成
- InputSplit只记录了切片的元数据信息,比如起始位置、长度以及所在的节点列表等
4、提交切片规划文件到YARN上,YARN上的MrAppMaster就可以根据切片规划文件计算开启MapTask个数
在运行MapReduce程序时,输入的文件格式包括:基于行的日志文件、二进制格式文件、数据库表等,FileInputFormat常见的接口实现类包括:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat和自定义InputFormat等
一、TextInputFormat
- TextInputFormat是默认的FileInputFormat实现类,按行读取每条记录
- 键是存储该行在整个文件中的起始字节偏移量, LongWritable类型,值是这行的内容,不包括任何行终止符(换行符和回车符),Text类型
示例
- 一个分片包含了如下4条文本记录
Rich learning form
Intelligent learning engine
Learning more convenient
From the real demand for more close to the enterprise
- 每条记录表示为以下键/值对:
(0,Rich learning form)
(20,Intelligent learning engine)
(49,Learning more convenient)
(74,From the real demand for more close to the enterprise)
二、CombineTextInputFormat切片机制
- TextInputFormat切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下
- CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理
- 虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值
切片机制:虚拟存储过程和切片过程
- 假设虚拟存储切片最大值为4M
- 现有文件a.txt(1.7M)、b.txt(5.1M)
(1)虚拟存储过程:将输入目录下所有文件大小,依次和设置的setMaxInputSplitSize值比较,如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值2倍,此时将文件均分成2个虚拟存储块(防止出现太小切片)。例如上述文件a划分一块,文件b划分两块,块1=块2=2.55M
(2)切片过程:判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个切片,如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。例如上述三个块1.7M、2.55M、2.55M最终会形成两个切片(1.7+2.55)M、2.55M