首页 > 编程语言 >Hadoop文件切分的源码

Hadoop文件切分的源码

时间:2023-02-02 10:35:36浏览次数:46  
标签:job Hadoop long 切分 length 源码 file blkLocations bytesRemaining


TextInputFormat

Hadoop文件的切分原则:

一 按每个文件切分

二 文件大小/分片大小《=1.1则划分为一个文件,否则切分为2个文件

三 一个切片一个Maptask,一个Maptask代表一个并行度

分片默认设置

Hadoop文件切分的源码_hadoop

分片切分的核心源码

public List<InputSplit> getSplits(JobContext job) throws IOException {
StopWatch sw = new StopWatch().start();
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);

// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus> files = listStatus(job);

boolean ignoreDirs = !getInputDirRecursive(job)
&& job.getConfiguration().getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false);
//循环文件
for (FileStatus file: files) {
if (ignoreDirs && file.isDirectory()) {
continue;
}
//获取文件的长度值
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
//是否支持切割
if (isSplitable(job, path)) {
long blockSize = file.getBlockSize();
//由集群指定的块大小,最小和最大 下面对切片大小有描述
long splitSize = computeSplitSize(blockSize, minSize, maxSize);

long bytesRemaining = length;
//判断是否可以切成一块,如果大于1.1切成两片,如果小于1.1形成一片
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}

if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
if (LOG.isDebugEnabled()) {
// Log only if the file is big enough to be splitted
if (length > Math.min(file.getBlockSize(), minSize)) {
LOG.debug("File is not splittable so no parallelization "
+ "is possible: " + file.getPath());
}
}
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files for metrics/loadgen
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
}
return splits;
}

修改分片规则

//取maxsize和blocksize的最小值
//可以控制minsize和maxsize来控制切片大小
protected long computeSplitSize(long blockSize, long minSize,
long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}

流程的总结

Hadoop文件切分的源码_List_02

CombineTextInputFormat

Hadoop文件切分的源码_hadoop_03


Hadoop文件切分的源码_List_04


Hadoop文件切分的源码_List_05


标签:job,Hadoop,long,切分,length,源码,file,blkLocations,bytesRemaining
From: https://blog.51cto.com/u_15063934/6033048

相关文章