首页 > 编程语言 >hadoop单个数据节点的不同存储路径的存储策略源码分析。

hadoop单个数据节点的不同存储路径的存储策略源码分析。

时间:2022-11-11 23:35:52浏览次数:49  
标签:存储 File hadoop volume 源码 volumes new final block


产生问题于数据集群的数节点存储磁盘大小不同,造成使用一段时间以后容量小的磁盘空间紧张。

其实,早期配置了磁盘使用存储策略,就能解决该问题,部分网来上说这个策略无效,再hadoop2.0.1 本版有效,该版本应用于CHD4.6中。

为了找到准确的程序定位点,参考了以下的Hadoop设计文档。

参考 

Hadoop中HDFS文件系统的Append/Hflush/Read设计文档:


文档中给出:

在一个DN的disk中,每个DN具有三个目录:current\tem\rbw,current包含finallized的replica,tmp包含temporary replica,rbw包含rbw,rwr,rur replicas。当一个replica第一次被dfs client发起请求而创建的时候,将会放到rbw中。当第一次创建是在block replication和clust balance过程中发起的话,replica就会放置到tmp中。一旦一个replica被finallized,他就会被move到current中。当一个DN重启之后,tmp中的replica将会被删除,rbw中的将会被加载为rwr状态,current中的会load为finallized状态

我们就从tmp 或 rbw 文件创建开始。

1.参见java class BlockPoolSlice


/**
* A block pool slice represents a portion of a block pool stored on a volume.
* Taken together, all BlockPoolSlices sharing a block pool ID across a
* cluster represent a single block pool.
*
* This class is synchronized by {@link FsVolumeImpl}.
*/
class BlockPoolSlice {
private final String bpid;
private final FsVolumeImpl volume; // volume to which this BlockPool belongs to
private final File currentDir; // StorageDirectory/current/bpid/current
private final LDir finalizedDir; // directory store Finalized replica
private final File rbwDir; // directory store RBW replica
private final File tmpDir; // directory store Temporary replica


从类的描述中看出BlockPoolSlice  是创建集群数据block的基础。


/**
* Temporary files. They get moved to the finalized block directory when
* the block is finalized.
*/
File createTmpFile(Block b) throws IOException {
File f = new File(tmpDir, b.getBlockName());
return DatanodeUtil.createTmpFile(b, f);
}

/**
* RBW files. They get moved to the finalized block directory when
* the block is finalized.
*/
File createRbwFile(Block b) throws IOException {
File f = new File(rbwDir, b.getBlockName());
return DatanodeUtil.createTmpFile(b, f);
}

这是创建基础文件的方法。

2.该方法的实现


/** Provide utility methods for Datanode. */
@InterfaceAudience.Private
public class DatanodeUtil {
public static final String UNLINK_BLOCK_SUFFIX = ".unlinked";

public static final String DISK_ERROR = "Possible disk error: ";

/** Get the cause of an I/O exception if caused by a possible disk error
* @param ioe an I/O exception
* @return cause if the I/O exception is caused by a possible disk error;
* null otherwise.
*/
static IOException getCauseIfDiskError(IOException ioe) {
if (ioe.getMessage()!=null && ioe.getMessage().startsWith(DISK_ERROR)) {
return (IOException)ioe.getCause();
} else {
return null;
}
}

/**
* Create a new file.
* @throws IOException
* if the file already exists or if the file cannot be created.
*/
public static File createTmpFile(Block b, File f) throws IOException {
if (f.exists()) {
throw new IOException("Failed to create temporary file for " + b
+ ". File " + f + " should not be present, but is.");
}
// Create the zero-length temp file
final boolean fileCreated;
try {
fileCreated = f.createNewFile();
} catch (IOException ioe) {
throw new IOException(DISK_ERROR + "Failed to create " + f, ioe);
}
if (!fileCreated) {
throw new IOException("Failed to create temporary file for " + b
+ ". File " + f + " should be creatable, but is already present.");
}
return f;
}

在调用该方法创建数据block时,并没有我们关心的存储路径的选择策略。


3.我们再来查找createRbwFile调用出处

/**************************************************
* FSDataset manages a set of data blocks. Each block
* has a unique name and an extent on disk.
*
***************************************************/
@InterfaceAudience.Private
class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
static final Log LOG = LogFactory.getLog(FsDatasetImpl.class);

block管理操作类

@Override // FsDatasetSpi
public synchronized ReplicaInPipeline createRbw(ExtendedBlock b)
throws IOException {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getBlockId());
if (replicaInfo != null) {
throw new ReplicaAlreadyExistsException("Block " + b +
" already exists in state " + replicaInfo.getState() +
" and thus cannot be created.");
}
// create a new block
FsVolumeImpl v = volumes.getNextVolume(b.getNumBytes());
// create a rbw file to hold block in the designated volume
File f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(),
b.getGenerationStamp(), v, f.getParentFile());
volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
return newReplicaInfo;
}

调用了

createRbwFile 方法,该方法同样创建rbw文件。

这里发现了我们关系的volumes,它是配置的存储路径。

4.查看volumes 的初始

volumnes是在构造函数中初始化的,使用了volArray

/**
* An FSDataset has a directory where it loads its data files.
*/
FsDatasetImpl(DataNode datanode, DataStorage storage, Configuration conf
) throws IOException {
this.datanode = datanode;
// The number of volumes required for operation is the total number
// of volumes minus the number of failed volumes we can tolerate.
final int volFailuresTolerated =
conf.getInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY,
DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT);

String[] dataDirs = conf.getTrimmedStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);

int volsConfigured = (dataDirs == null) ? 0 : dataDirs.length;
int volsFailed = volsConfigured - storage.getNumStorageDirs();
this.validVolsRequired = volsConfigured - volFailuresTolerated;

if (volFailuresTolerated < 0 || volFailuresTolerated >= volsConfigured) {
throw new DiskErrorException("Invalid volume failure "
+ " config value: " + volFailuresTolerated);
}
if (volsFailed > volFailuresTolerated) {
throw new DiskErrorException("Too many failed volumes - "
+ "current valid volumes: " + storage.getNumStorageDirs()
+ ", volumes configured: " + volsConfigured
+ ", volumes failed: " + volsFailed
+ ", volume failures tolerated: " + volFailuresTolerated);
}

final List<FsVolumeImpl> volArray = new ArrayList<FsVolumeImpl>(
storage.getNumStorageDirs());
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
final File dir = storage.getStorageDir(idx).getCurrentDir();
volArray.add(new FsVolumeImpl(this, storage.getStorageID(), dir, conf));
LOG.info("Added volume - " + dir);
}
volumeMap = new ReplicaMap(this);

@SuppressWarnings("unchecked")
final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl =
ReflectionUtils.newInstance(conf.getClass(
DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY,
RoundRobinVolumeChoosingPolicy.class,
VolumeChoosingPolicy.class), conf);
volumes = new FsVolumeList(volArray, volsFailed, blockChooserImpl);
volumes.getVolumeMap(volumeMap);

File[] roots = new File[storage.getNumStorageDirs()];
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
roots[idx] = storage.getStorageDir(idx).getCurrentDir();
}
asyncDiskService = new FsDatasetAsyncDiskService(datanode, roots);
registerMBean(storage.getStorageID());
}

而volArray 如下生成的:

final List<FsVolumeImpl> volArray = new ArrayList<FsVolumeImpl>(
storage.getNumStorageDirs());
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
final File dir = storage.getStorageDir(idx).getCurrentDir();
volArray.add(new FsVolumeImpl(this, storage.getStorageID(), dir, conf));
LOG.info("Added volume - " + dir);
}

正式配置文件中的存储路径。

到此,我们找到了需要的存储路径,下面再找到如何选择的路径的就容易多了。

5.路径选择从getNextVolume开始


class FsVolumeList {
/**
* Read access to this unmodifiable list is not synchronized.
* This list is replaced on modification holding "this" lock.
*/
volatile List<FsVolumeImpl> volumes = null;

private final VolumeChoosingPolicy<FsVolumeImpl> blockChooser;
private volatile int numFailedVolumes;

FsVolumeList(List<FsVolumeImpl> volumes, int failedVols,
VolumeChoosingPolicy<FsVolumeImpl> blockChooser) {
this.volumes = Collections.unmodifiableList(volumes);
this.blockChooser = blockChooser;
this.numFailedVolumes = failedVols;
}

int numberOfFailedVolumes() {
return numFailedVolumes;
}

/**
* Get next volume. Synchronized to ensure {@link #curVolume} is updated
* by a single thread and next volume is chosen with no concurrent
* update to {@link #volumes}.
* @param blockSize free space needed on the volume
* @return next volume to store the block in.
*/
synchronized FsVolumeImpl getNextVolume(long blockSize) throws IOException {
return blockChooser.chooseVolume(volumes, blockSize);
}


6.继续chooseVolume 源自于 blockChooser  类型是 VolumeChoosingPolicy ,该方法实现在下面的类中:

/**
* A DN volume choosing policy which takes into account the amount of free
* space on each of the available volumes when considering where to assign a
* new replica allocation. By default this policy prefers assigning replicas to
* those volumes with more available free space, so as to over time balance the
* available space of all the volumes within a DN.
*/
public class AvailableSpaceVolumeChoosingPolicy<V extends FsVolumeSpi>
implements VolumeChoosingPolicy<V>, Configurable {

private static final Log LOG = LogFactory.getLog(AvailableSpaceVolumeChoosingPolicy.class);

private static final Random RAND = new Random();

private long balancedSpaceThreshold = DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_DEFAULT;
private float balancedPreferencePercent = DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_DEFAULT;

从描述中可以看出了,这就是策略文件。


7.策略实现就是这样的:

@Override
public synchronized V chooseVolume(List<V> volumes,
final long replicaSize) throws IOException {
if (volumes.size() < 1) {
throw new DiskOutOfSpaceException("No more available volumes");
}

AvailableSpaceVolumeList volumesWithSpaces =
new AvailableSpaceVolumeList(volumes);

if (volumesWithSpaces.areAllVolumesWithinFreeSpaceThreshold()) {
// If they're actually not too far out of whack, fall back on pure round
// robin.
V volume = roundRobinPolicyBalanced.chooseVolume(volumes, replicaSize);
if (LOG.isDebugEnabled()) {
LOG.debug("All volumes are within the configured free space balance " +
"threshold. Selecting " + volume + " for write of block size " +
replicaSize);
}
return volume;
} else {
V volume = null;
// If none of the volumes with low free space have enough space for the
// replica, always try to choose a volume with a lot of free space.
long mostAvailableAmongLowVolumes = volumesWithSpaces
.getMostAvailableSpaceAmongVolumesWithLowAvailableSpace();

List<V> highAvailableVolumes = extractVolumesFromPairs(
volumesWithSpaces.getVolumesWithHighAvailableSpace());
List<V> lowAvailableVolumes = extractVolumesFromPairs(
volumesWithSpaces.getVolumesWithLowAvailableSpace());

float preferencePercentScaler =
(highAvailableVolumes.size() * balancedPreferencePercent) +
(lowAvailableVolumes.size() * (1 - balancedPreferencePercent));
float scaledPreferencePercent =
(highAvailableVolumes.size() * balancedPreferencePercent) /
preferencePercentScaler;
if (mostAvailableAmongLowVolumes < replicaSize ||
RAND.nextFloat() < scaledPreferencePercent) {
volume = roundRobinPolicyHighAvailable.chooseVolume(
highAvailableVolumes,
replicaSize);
if (LOG.isDebugEnabled()) {
LOG.debug("Volumes are imbalanced. Selecting " + volume +
" from high available space volumes for write of block size "
+ replicaSize);
}
} else {
volume = roundRobinPolicyLowAvailable.chooseVolume(
lowAvailableVolumes,
replicaSize);
if (LOG.isDebugEnabled()) {
LOG.debug("Volumes are imbalanced. Selecting " + volume +
" from low available space volumes for write of block size "
+ replicaSize);
}
}
return volume;
}
}


关于配置中各个存储路径如何选择及选择策略都在这里了,sigh 累死了~~

花费了接近3天的时间,纯代码看着实累,可以步进就好了。


相关的配置说明。

dfs.datanode.fsdataset.volume.choosing.policy

dfs.datanode.available-space-volume-choosing-policy.balanced-space-threshold

Only used when the dfs.datanode.fsdataset.volume.choosing.policy is set to org.apache.hadoop.hdfs.server.datanode.fsdataset.AvailableSpaceVolumeChoosingPolicy. This setting controls how much DN volumes are allowed to differ in terms of bytes of free disk space before they are considered imbalanced. If the free space of all the volumes are within this range of each other, the volumes will be considered balanced and block assignments will be done on a pure round robin basis.

dfs.datanode.available-space-volume-choosing-policy.balanced-space-preference-fraction

0.75f

Only used when the dfs.datanode.fsdataset.volume.choosing.policy is set to org.apache.hadoop.hdfs.server.datanode.fsdataset.AvailableSpaceVolumeChoosingPolicy. This setting controls what percentage of new block allocations will be sent to volumes with more available disk space than others. This setting should be in the range 0.0 - 1.0, though in practice 0.5 - 1.0, since there should be no reason to prefer that volumes with less available disk space receive more block allocations.


另附上其他的一些类分析:

DataNode的相关重要类

FSDataset:所有和数据块相关的操作,都在FSDataset相关的类。详细分析参考 http://caibinbupt.iteye.com/blog/284365

DataXceiverServer:处理数据块的流读写的的服务器,处理逻辑由DataXceiver完成。详细分析参考 http://caibinbupt.iteye.com/blog/284979

DataXceiver:处理数据块的流读写的线程。详细分析参考 http://caibinbupt.iteye.com/blog/284979

                  还有处理非读写的非主流的流程。详细分析参考 http://caibinbupt.iteye.com/blog/286533

BlockReceiver:完成数据块的流写操作。详细分析参考 http://caibinbupt.iteye.com/blog/286259

BlockSender:完成数据块的流读操作。

DataBlockScanner:用于定时对数据块文件进行校验。详细分析参考http://caibinbupt.iteye.com/blog/286650












标签:存储,File,hadoop,volume,源码,volumes,new,final,block
From: https://blog.51cto.com/u_2776699/5845581

相关文章

  • 关于hadoop使用lzo压缩的流程
    1.为何要使用lzo看这里,http://blog.cloudera.com/blog/2009/11/hadoop-at-twitter-part-1-splittable-lzo-compression/中文的也很多,搜索一下吧2.安装流程(仅限linux cento......
  • Redis 集群模式的安装与配置【源码安装redis-7.0.5】
    Redis最新版下载地址:http://download.redis.io/releases/redis-7.0.5.tar.gz步骤如下:1)wget http://download.redis.io/releases/redis-7.0.5.tar.gz2)tar-zxf redis-7......
  • 【源码】902- 探索 Snabbdom 模块系统原理
    近几年随着React、Vue等前端框架不断兴起,VirtualDOM概念也越来越火,被用到越来越多的框架、库中。VirtualDOM是基于真实DOM的一层抽象,用简单的JS对象描述真实DOM......
  • Seata Server 1.5.2 源码学习
    Seata包括Server端和Client端。Seata中有三种角色:TC、TM、RM,其中,Server端就是TC,TM和RM属Client端。Client端的源码学习上一篇已讲过,详见《Seata1.5.2源码学习》,今天来......
  • 一周干货回顾&总结(附论文、源码、链接)
    ​作者:Edison_G本周我们“计算机视觉研究院”主要推送了目标检测干货及中国人工智能大会内容,今天给大家总结一下!公众号ID|ComputerVisionGzq学习群|扫码在主页获取加入方式​......
  • EventBridge助力阿里云视觉智能开放平台AI智能存储实践
    本文作者:李建,阿里巴巴达摩院技术专家。01视觉智能开放平台(VIAPI)业务场景介绍阿里云视觉智能开放平台(简称VIAPI),是基于之前很多技术实践经验积累的AI能力的沉淀平台。目......
  • EventBridge助力阿里云视觉智能开放平台AI智能存储实践
    本文作者:李建,阿里巴巴达摩院技术专家。01视觉智能开放平台(VIAPI)业务场景介绍阿里云视觉智能开放平台(简称VIAPI),是基于之前很多技术实践经验积累的AI能力的沉淀平台......
  • oracle的存储结构
    一、oracle体系结构oracle的体系结构分三类:内存结构、进程结构、存储结构 二、存储结构参考连接:Oracle存储结构数据库物理结构和逻辑结构的基本关系1、一个数据库......
  • hadoop cdh4 eclipse plugin
    1)downloadeclipse2)解压eclipse3)安装与配置Ant修改/etc/profile文件exportANT_HOME=[path]/apache-ant-1.7.1......
  • hadoop HA----Quorum Journal 设计…
    原文参考这个链接中的附件:https://issues.apache.org/jira/browse/HDFS-30771概述1.1背景1.2当前实现的一些局限自定义硬盘 -NAS设备和远程控制的PDU非常昂贵,也有别......