产生问题于数据集群的数节点存储磁盘大小不同,造成使用一段时间以后容量小的磁盘空间紧张。
其实,早期配置了磁盘使用存储策略,就能解决该问题,部分网来上说这个策略无效,再hadoop2.0.1 本版有效,该版本应用于CHD4.6中。
为了找到准确的程序定位点,参考了以下的Hadoop设计文档。
参考
Hadoop中HDFS文件系统的Append/Hflush/Read设计文档:
文档中给出:
在一个DN的disk中,每个DN具有三个目录:current\tem\rbw,current包含finallized的replica,tmp包含temporary replica,rbw包含rbw,rwr,rur replicas。当一个replica第一次被dfs client发起请求而创建的时候,将会放到rbw中。当第一次创建是在block replication和clust balance过程中发起的话,replica就会放置到tmp中。一旦一个replica被finallized,他就会被move到current中。当一个DN重启之后,tmp中的replica将会被删除,rbw中的将会被加载为rwr状态,current中的会load为finallized状态
我们就从tmp 或 rbw 文件创建开始。
1.参见java class BlockPoolSlice
/**
* A block pool slice represents a portion of a block pool stored on a volume.
* Taken together, all BlockPoolSlices sharing a block pool ID across a
* cluster represent a single block pool.
*
* This class is synchronized by {@link FsVolumeImpl}.
*/
class BlockPoolSlice {
private final String bpid;
private final FsVolumeImpl volume; // volume to which this BlockPool belongs to
private final File currentDir; // StorageDirectory/current/bpid/current
private final LDir finalizedDir; // directory store Finalized replica
private final File rbwDir; // directory store RBW replica
private final File tmpDir; // directory store Temporary replica
从类的描述中看出BlockPoolSlice 是创建集群数据block的基础。
/**
* Temporary files. They get moved to the finalized block directory when
* the block is finalized.
*/
File createTmpFile(Block b) throws IOException {
File f = new File(tmpDir, b.getBlockName());
return DatanodeUtil.createTmpFile(b, f);
}
/**
* RBW files. They get moved to the finalized block directory when
* the block is finalized.
*/
File createRbwFile(Block b) throws IOException {
File f = new File(rbwDir, b.getBlockName());
return DatanodeUtil.createTmpFile(b, f);
}
这是创建基础文件的方法。
2.该方法的实现
/** Provide utility methods for Datanode. */
@InterfaceAudience.Private
public class DatanodeUtil {
public static final String UNLINK_BLOCK_SUFFIX = ".unlinked";
public static final String DISK_ERROR = "Possible disk error: ";
/** Get the cause of an I/O exception if caused by a possible disk error
* @param ioe an I/O exception
* @return cause if the I/O exception is caused by a possible disk error;
* null otherwise.
*/
static IOException getCauseIfDiskError(IOException ioe) {
if (ioe.getMessage()!=null && ioe.getMessage().startsWith(DISK_ERROR)) {
return (IOException)ioe.getCause();
} else {
return null;
}
}
/**
* Create a new file.
* @throws IOException
* if the file already exists or if the file cannot be created.
*/
public static File createTmpFile(Block b, File f) throws IOException {
if (f.exists()) {
throw new IOException("Failed to create temporary file for " + b
+ ". File " + f + " should not be present, but is.");
}
// Create the zero-length temp file
final boolean fileCreated;
try {
fileCreated = f.createNewFile();
} catch (IOException ioe) {
throw new IOException(DISK_ERROR + "Failed to create " + f, ioe);
}
if (!fileCreated) {
throw new IOException("Failed to create temporary file for " + b
+ ". File " + f + " should be creatable, but is already present.");
}
return f;
}
在调用该方法创建数据block时,并没有我们关心的存储路径的选择策略。
3.我们再来查找createRbwFile调用出处
/**************************************************
* FSDataset manages a set of data blocks. Each block
* has a unique name and an extent on disk.
*
***************************************************/
@InterfaceAudience.Private
class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
static final Log LOG = LogFactory.getLog(FsDatasetImpl.class);
block管理操作类
@Override // FsDatasetSpi
public synchronized ReplicaInPipeline createRbw(ExtendedBlock b)
throws IOException {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getBlockId());
if (replicaInfo != null) {
throw new ReplicaAlreadyExistsException("Block " + b +
" already exists in state " + replicaInfo.getState() +
" and thus cannot be created.");
}
// create a new block
FsVolumeImpl v = volumes.getNextVolume(b.getNumBytes());
// create a rbw file to hold block in the designated volume
File f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(),
b.getGenerationStamp(), v, f.getParentFile());
volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
return newReplicaInfo;
}
调用了
createRbwFile 方法,该方法同样创建rbw文件。
这里发现了我们关系的volumes,它是配置的存储路径。
4.查看volumes 的初始
volumnes是在构造函数中初始化的,使用了volArray
/**
* An FSDataset has a directory where it loads its data files.
*/
FsDatasetImpl(DataNode datanode, DataStorage storage, Configuration conf
) throws IOException {
this.datanode = datanode;
// The number of volumes required for operation is the total number
// of volumes minus the number of failed volumes we can tolerate.
final int volFailuresTolerated =
conf.getInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY,
DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT);
String[] dataDirs = conf.getTrimmedStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
int volsConfigured = (dataDirs == null) ? 0 : dataDirs.length;
int volsFailed = volsConfigured - storage.getNumStorageDirs();
this.validVolsRequired = volsConfigured - volFailuresTolerated;
if (volFailuresTolerated < 0 || volFailuresTolerated >= volsConfigured) {
throw new DiskErrorException("Invalid volume failure "
+ " config value: " + volFailuresTolerated);
}
if (volsFailed > volFailuresTolerated) {
throw new DiskErrorException("Too many failed volumes - "
+ "current valid volumes: " + storage.getNumStorageDirs()
+ ", volumes configured: " + volsConfigured
+ ", volumes failed: " + volsFailed
+ ", volume failures tolerated: " + volFailuresTolerated);
}
final List<FsVolumeImpl> volArray = new ArrayList<FsVolumeImpl>(
storage.getNumStorageDirs());
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
final File dir = storage.getStorageDir(idx).getCurrentDir();
volArray.add(new FsVolumeImpl(this, storage.getStorageID(), dir, conf));
LOG.info("Added volume - " + dir);
}
volumeMap = new ReplicaMap(this);
@SuppressWarnings("unchecked")
final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl =
ReflectionUtils.newInstance(conf.getClass(
DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY,
RoundRobinVolumeChoosingPolicy.class,
VolumeChoosingPolicy.class), conf);
volumes = new FsVolumeList(volArray, volsFailed, blockChooserImpl);
volumes.getVolumeMap(volumeMap);
File[] roots = new File[storage.getNumStorageDirs()];
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
roots[idx] = storage.getStorageDir(idx).getCurrentDir();
}
asyncDiskService = new FsDatasetAsyncDiskService(datanode, roots);
registerMBean(storage.getStorageID());
}
而volArray 如下生成的:
final List<FsVolumeImpl> volArray = new ArrayList<FsVolumeImpl>(
storage.getNumStorageDirs());
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
final File dir = storage.getStorageDir(idx).getCurrentDir();
volArray.add(new FsVolumeImpl(this, storage.getStorageID(), dir, conf));
LOG.info("Added volume - " + dir);
}
正式配置文件中的存储路径。
到此,我们找到了需要的存储路径,下面再找到如何选择的路径的就容易多了。
5.路径选择从getNextVolume开始
class FsVolumeList {
/**
* Read access to this unmodifiable list is not synchronized.
* This list is replaced on modification holding "this" lock.
*/
volatile List<FsVolumeImpl> volumes = null;
private final VolumeChoosingPolicy<FsVolumeImpl> blockChooser;
private volatile int numFailedVolumes;
FsVolumeList(List<FsVolumeImpl> volumes, int failedVols,
VolumeChoosingPolicy<FsVolumeImpl> blockChooser) {
this.volumes = Collections.unmodifiableList(volumes);
this.blockChooser = blockChooser;
this.numFailedVolumes = failedVols;
}
int numberOfFailedVolumes() {
return numFailedVolumes;
}
/**
* Get next volume. Synchronized to ensure {@link #curVolume} is updated
* by a single thread and next volume is chosen with no concurrent
* update to {@link #volumes}.
* @param blockSize free space needed on the volume
* @return next volume to store the block in.
*/
synchronized FsVolumeImpl getNextVolume(long blockSize) throws IOException {
return blockChooser.chooseVolume(volumes, blockSize);
}
6.继续chooseVolume 源自于 blockChooser 类型是 VolumeChoosingPolicy ,该方法实现在下面的类中:
/**
* A DN volume choosing policy which takes into account the amount of free
* space on each of the available volumes when considering where to assign a
* new replica allocation. By default this policy prefers assigning replicas to
* those volumes with more available free space, so as to over time balance the
* available space of all the volumes within a DN.
*/
public class AvailableSpaceVolumeChoosingPolicy<V extends FsVolumeSpi>
implements VolumeChoosingPolicy<V>, Configurable {
private static final Log LOG = LogFactory.getLog(AvailableSpaceVolumeChoosingPolicy.class);
private static final Random RAND = new Random();
private long balancedSpaceThreshold = DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_DEFAULT;
private float balancedPreferencePercent = DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_DEFAULT;
从描述中可以看出了,这就是策略文件。
7.策略实现就是这样的:
@Override
public synchronized V chooseVolume(List<V> volumes,
final long replicaSize) throws IOException {
if (volumes.size() < 1) {
throw new DiskOutOfSpaceException("No more available volumes");
}
AvailableSpaceVolumeList volumesWithSpaces =
new AvailableSpaceVolumeList(volumes);
if (volumesWithSpaces.areAllVolumesWithinFreeSpaceThreshold()) {
// If they're actually not too far out of whack, fall back on pure round
// robin.
V volume = roundRobinPolicyBalanced.chooseVolume(volumes, replicaSize);
if (LOG.isDebugEnabled()) {
LOG.debug("All volumes are within the configured free space balance " +
"threshold. Selecting " + volume + " for write of block size " +
replicaSize);
}
return volume;
} else {
V volume = null;
// If none of the volumes with low free space have enough space for the
// replica, always try to choose a volume with a lot of free space.
long mostAvailableAmongLowVolumes = volumesWithSpaces
.getMostAvailableSpaceAmongVolumesWithLowAvailableSpace();
List<V> highAvailableVolumes = extractVolumesFromPairs(
volumesWithSpaces.getVolumesWithHighAvailableSpace());
List<V> lowAvailableVolumes = extractVolumesFromPairs(
volumesWithSpaces.getVolumesWithLowAvailableSpace());
float preferencePercentScaler =
(highAvailableVolumes.size() * balancedPreferencePercent) +
(lowAvailableVolumes.size() * (1 - balancedPreferencePercent));
float scaledPreferencePercent =
(highAvailableVolumes.size() * balancedPreferencePercent) /
preferencePercentScaler;
if (mostAvailableAmongLowVolumes < replicaSize ||
RAND.nextFloat() < scaledPreferencePercent) {
volume = roundRobinPolicyHighAvailable.chooseVolume(
highAvailableVolumes,
replicaSize);
if (LOG.isDebugEnabled()) {
LOG.debug("Volumes are imbalanced. Selecting " + volume +
" from high available space volumes for write of block size "
+ replicaSize);
}
} else {
volume = roundRobinPolicyLowAvailable.chooseVolume(
lowAvailableVolumes,
replicaSize);
if (LOG.isDebugEnabled()) {
LOG.debug("Volumes are imbalanced. Selecting " + volume +
" from low available space volumes for write of block size "
+ replicaSize);
}
}
return volume;
}
}
关于配置中各个存储路径如何选择及选择策略都在这里了,sigh 累死了~~
花费了接近3天的时间,纯代码看着实累,可以步进就好了。
相关的配置说明。
dfs.datanode.fsdataset.volume.choosing.policy
dfs.datanode.available-space-volume-choosing-policy.balanced-space-threshold
Only used when the dfs.datanode.fsdataset.volume.choosing.policy is set to org.apache.hadoop.hdfs.server.datanode.fsdataset.AvailableSpaceVolumeChoosingPolicy. This setting controls how much DN volumes are allowed to differ in terms of bytes of free disk space before they are considered imbalanced. If the free space of all the volumes are within this range of each other, the volumes will be considered balanced and block assignments will be done on a pure round robin basis.
dfs.datanode.available-space-volume-choosing-policy.balanced-space-preference-fraction
0.75f
Only used when the dfs.datanode.fsdataset.volume.choosing.policy is set to org.apache.hadoop.hdfs.server.datanode.fsdataset.AvailableSpaceVolumeChoosingPolicy. This setting controls what percentage of new block allocations will be sent to volumes with more available disk space than others. This setting should be in the range 0.0 - 1.0, though in practice 0.5 - 1.0, since there should be no reason to prefer that volumes with less available disk space receive more block allocations.
另附上其他的一些类分析:
DataNode的相关重要类
FSDataset:所有和数据块相关的操作,都在FSDataset相关的类。详细分析参考 http://caibinbupt.iteye.com/blog/284365
DataXceiverServer:处理数据块的流读写的的服务器,处理逻辑由DataXceiver完成。详细分析参考 http://caibinbupt.iteye.com/blog/284979
DataXceiver:处理数据块的流读写的线程。详细分析参考 http://caibinbupt.iteye.com/blog/284979
还有处理非读写的非主流的流程。详细分析参考 http://caibinbupt.iteye.com/blog/286533
BlockReceiver:完成数据块的流写操作。详细分析参考 http://caibinbupt.iteye.com/blog/286259
BlockSender:完成数据块的流读操作。
DataBlockScanner:用于定时对数据块文件进行校验。详细分析参考http://caibinbupt.iteye.com/blog/286650