1. 背景
在Hadoop2.x之前,只有一台NameNode负责对外提供服务,另外一台secondary NameNode只用于合并fsimage,不提供对外元数据服务。因此NameNode和secondary NameNode都存在单点问题。
为了解决secondary NameNode单点问题,HDFS引入多个JournalNode服务存储操作日志,取代单台secondary NameNode。JournalNode引入了Quorum机制:Active NameNode写EditLog时,除了向NameNode本地磁盘写操作日志,还会向所有JournalNode发送写请求。对于2N+1台JN组成的集群,最多可以容忍N个JN节点异常。
由于NameNode允许JournalNode半数写失败,在NameNode恢复editlog时,会先调用FSEditLog.recoverUnclosedStreams()方法让JournalNode 集群中各个节点上的 EditLog 达成一致。因此NameNode的元数据遵从最终一致性,而不是强一致性。元数据恢复期间,NameNode为安全模式,不可访问,因此HDFS是CP架构。
2. JournalNode概念
- fsimage:fsimage文件是Hadoop系统元数据的永久性检查点,包含了系统中所有文件的目录和文件inode序列化信息。
- edits:edits日志文件存放hadoop所有操作的日志信息,操作首先会被记录到edits文件中,定时合并为fsimage文件,在没有开启HA的情况下是由secondary nn来进行合并操作,开启HA的情况下是JournalNode节点来进行合并以及同步。
- epoch:epoch是paxos协议中的一个概念,可以用于标识active NameNode。每次NameNode切换时,epoch就会加1,每个请求都会向JournalNode携带epoch。一旦epoch小于JournalNode中维护的last-promised-epoch,说明该NameNode是切换之前的NameNode,此时集群发生了脑裂,禁止此NameNode向JournalNode发送editlog。
- txid:每个editlog都有对应的事务txid。
- segment:多个连续的txid事务存储到一个editlog文件中,只会有一个segment处于正在写的状态(Inprogress),而其他的segment文件则都处于写完关闭的状态(Finalized)。每当写一个新的segment时,会比较epoch是否比editlog维护的last-writer-epoch大,如果大就写入到segment文件中。
- committed-txid:journalNode记录己接受,正在处理的事务id。
edits_$starttxid-$endtxid
:已经写完关闭的segment文件,该文件记录了从开始事务ID到结束事务ID的连续的editlog信息。edits_inprogress_$lasttxid
:当前正在写的segment文件,文件名中记录了开始事务ID。- VERSION:记录集群的相关信息,包括命名空间ID,集群ID,创建时间等。
dfs.namenode.name.dir配置定义了namenode中editlog日志位置:
<property>
<name>dfs.namenode.name.dir</name>
<value>file:///disk1/dfs/name,file:///disk2/dfs/name</value>
</property>
dfs.journalnode.edits.dir配置定义了journalNode中editlog日志位置:
<name>dfs.journalnode.edits.dir</name>
<value>/home/data/jn</value>
namenode中editlog和fsimage如下所示:
journalNode中editlog如下所示:
3. NameNode操作日志写入JournalNode流程
3.1 客户端rpc请求到NameNode生成edit op操作过程
在https://blog.51cto.com/u_15327484/8089923文章中,分析了客户端create请求时,NameNode会在内存的FSNamesystem文件系统树中更新对应的INodeFile对象。
NameNodeRpcServer.create方法在调用过程中,FSNamesystem.startFileInt方法先在HDFS文件系统树中创建文件INode,再通过getEditLog().logSync()记录操作日志:
private HdfsFileStatus startFileInt(String src,
PermissionStatus permissions, String holder, String clientMachine,
EnumSet<CreateFlag> flag, boolean createParent, short replication,
long blockSize, CryptoProtocolVersion[] supportedVersions,
String ecPolicyName, boolean logRetryCache) throws IOException {
//开始在HDFS文件系统树中创建文件
try {
stat = FSDirWriteFileOp.startFile(this, iip, permissions, holder,
clientMachine, flag, createParent, replication, blockSize, feInfo,
toRemoveBlocks, shouldReplicate, ecPolicyName, logRetryCache);
} finally {
writeUnlock("create");
// There might be transactions logged while trying to recover the lease.
// They need to be sync'ed even when an exception was thrown.
if (!skipSync) {
//记录操作日志到editlog中
getEditLog().logSync();
if (toRemoveBlocks != null) {
removeBlocks(toRemoveBlocks);
toRemoveBlocks.clear();
}
}
}
return stat;
}
在FSDirWriteFileOp.startFile方法中,先调用addFile方法在文件系统树中增加INodeFile,然后调用fsd.getEditLog().logOpenFile方法记录create请求对应的OpenFile类型的操作日志:
static HdfsFileStatus startFile(
FSNamesystem fsn, INodesInPath iip,
PermissionStatus permissions, String holder, String clientMachine,
EnumSet<CreateFlag> flag, boolean createParent,
short replication, long blockSize,
FileEncryptionInfo feInfo, INode.BlocksMapUpdateInfo toRemoveBlocks,
boolean shouldReplicate, String ecPolicyName, boolean logRetryEntry)
throws IOException {
//省略
//在文件系统树中增加INodeFile
if (parent != null) {
iip = addFile(fsd, parent, iip.getLastLocalName(), permissions,
replication, blockSize, holder, clientMachine, shouldReplicate,
ecPolicyName);
newNode = iip != null ? iip.getLastINode().asFile() : null;
}
//省略
//在editlog中记录OpenFile类型的操作日志
fsd.getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry);
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: added " +
src + " inode " + newNode.getId() + " " + holder);
}
return FSDirStatAndListingOp.getFileInfo(fsd, iip, false, false);
}
FSEditLog.logOpenFile方法创建了AddOp对象,记录了操作内容,最后将该对象内容通过logEdit方法记录到输出流中:
public void logOpenFile(String path, INodeFile newNode, boolean overwrite,
boolean toLogRpcIds) {
Preconditions.checkArgument(newNode.isUnderConstruction());
PermissionStatus permissions = newNode.getPermissionStatus();
//构建操作日志
AddOp op = AddOp.getInstance(cache.get())
.setInodeId(newNode.getId())
.setPath(path)
.setReplication(newNode.getFileReplication())
.setModificationTime(newNode.getModificationTime())
.setAccessTime(newNode.getAccessTime())
.setBlockSize(newNode.getPreferredBlockSize())
.setBlocks(newNode.getBlocks())
.setPermissionStatus(permissions)
.setClientName(newNode.getFileUnderConstructionFeature().getClientName())
.setClientMachine(
newNode.getFileUnderConstructionFeature().getClientMachine())
.setOverwrite(overwrite)
.setStoragePolicyId(newNode.getLocalStoragePolicyID())
.setErasureCodingPolicyId(newNode.getErasureCodingPolicyID());
AclFeature f = newNode.getAclFeature();
if (f != null) {
op.setAclEntries(AclStorage.readINodeLogicalAcl(newNode));
}
XAttrFeature x = newNode.getXAttrFeature();
if (x != null) {
op.setXAttrs(x.getXAttrs());
}
logRpcIds(op, toLogRpcIds);
//输出
logEdit(op);
}
如下是logOpenFile对应的AddOp定义,当执行ClientProtocol.create和ClientProtocol.append时,会生成AddOp这种操作日志:
/**
* {@literal @AtMostOnce} for {@link ClientProtocol#create} and
* {@link ClientProtocol#append}
*/
FSEditLog.logEdit方法开始写入事务内容:
- 先调用beginTransaction开启事务,txid自增。
- 将AddOp操作内容写入到editLogStream输出流中。
- 调用logSync异步写入到磁盘和远程journalNode中。
void logEdit(final FSEditLogOp op) {
boolean needsSync = false;
//省略
// check if it is time to schedule an automatic sync
//以事务的方式写入edit
needsSync = doEditTransaction(op);
//省略
// Sync the log if an automatic sync is required.
if (needsSync) {
logSync();
}
}
//开启事务
synchronized boolean doEditTransaction(final FSEditLogOp op) {
//开启事务
long start = beginTransaction();
op.setTransactionId(txid);
try {
//将AddOp写入到editLogStream输出流中
editLogStream.write(op);
} catch (IOException ex) {
// All journals failed, it is handled in logSync.
} finally {
op.reset();
}
endTransaction(start);
return shouldForceSync();
}
//开启事务时,将事务txid自增
private long beginTransaction() {
assert Thread.holdsLock(this);
// get a new transactionId
txid++;
//
// record the transactionId when new data was written to the edits log
//
TransactionId id = myTransactionId.get();
id.txid = txid;
return monotonicNow();
}
在editLogStream.write写入时,发现它写入的是bufCurrent:
public void writeOp(FSEditLogOp op, int logVersion) throws IOException {
bufCurrent.writeOp(op, logVersion);
}
在logSync方法中:
- 判断edit是否还未同步完,并且正在同步,就等待1s再尝试同步。
- 如果已经同步完,就退出。
- 交换currentBuffer和ReadyBuffer的执行。
- 将ReadyBuffer内容写出到磁盘和journalNode中。
NameNode双缓冲Buffer架构如下所示:
代码如下:
EditLogOutputStream logStream = null;
synchronized (this) {
try {
printStatistics(false);
//判断edit是否还未同步完,并且正在同步,就等待1s再尝试同步。
// if somebody is already syncing, then wait
while (mytxid > synctxid && isSyncRunning) {
try {
wait(1000);
} catch (InterruptedException ie) {
}
}
//
// If this transaction was already flushed, then nothing to do
//如果已经同步完,就退出。
if (mytxid <= synctxid) {
return;
}
// now, this thread will do the sync. track if other edits were
// included in the sync - ie. batched. if this is the only edit
// synced then the batched count is 0
editsBatchedInSync = txid - synctxid - 1;
syncStart = txid;
isSyncRunning = true;
sync = true;
// swap buffers
try {
if (journalSet.isEmpty()) {
throw new IOException("No journals available to flush");
}
//交换currentBuffer和ReadyBuffer的执行
editLogStream.setReadyToFlush();
} catch (IOException e) {
final String msg =
"Could not sync enough journals to persistent storage " +
"due to " + e.getMessage() + ". " +
"Unsynced transactions: " + (txid - synctxid);
LOG.error(msg, new Exception());
synchronized(journalSetLock) {
IOUtils.cleanupWithLogger(LOG, journalSet);
}
terminate(1, msg);
}
} finally {
// Prevent RuntimeException from blocking other log edit write
doneWithAutoSyncScheduling();
}
//editLogStream may become null,
//so store a local variable for flush.
logStream = editLogStream;
}
// do the sync
long start = monotonicNow();
try {
if (logStream != null) {
//将ReadyBuffer内容写出到磁盘和journalNode中
logStream.flush();
}
} catch (IOException ex) {
synchronized (this) {
final String msg =
"Could not sync enough journals to persistent storage. "
+ "Unsynced transactions: " + (txid - synctxid);
LOG.error(msg, new Exception());
synchronized(journalSetLock) {
IOUtils.cleanupWithLogger(LOG, journalSet);
}
terminate(1, msg);
}
}
editLogStream.setReadyToFlush交换bufCurrent和bufReady的指针,准备将bufReady的数据写出:
public void setReadyToFlush() {
assert isFlushed() : "previous data not flushed yet";
TxnBuffer tmp = bufReady;
bufReady = bufCurrent;
bufCurrent = tmp;
}
3.2 NameNode editlog落盘和写入journalnode过程
通过3.1节可以了解到,通过EditLogOutputStream.flush方法,NameNode将ReadyBuffer内容写出到磁盘和journalNode中。
EditLogOutputStream是一个抽象类,先看一下它的实现:
其中:
- EditLogFileOutputStream:用于写本地磁盘。
- QuorumOutputStream:用于写JournalNode。
- JournalSetOutputStream:包装类,可以添加EditLogFileOutputStream和QuorumOutputStream对象,他会调用EditLogFileOutputStream和QuorumOutputStream的方法从而依次写本地磁盘和写JournalNode。
在NameNode调用startLogSegment按批次写入edits时,就指定editLogStream为JournalSetOutputStream:
editLogStream = journalSet.startLogSegment(segmentTxId, layoutVersion);
public EditLogOutputStream startLogSegment(final long txId,
final int layoutVersion) throws IOException {
mapJournalsAndReportErrors(new JournalClosure() {
@Override
public void apply(JournalAndStream jas) throws IOException {
jas.startLogSegment(txId, layoutVersion);
}
}, "starting log segment " + txId);
return new JournalSetOutputStream();
}
回到logSync方法,最终调用JournalSet.mapJournalsAndReportErrors方法,它最终调用每个journals对象的getCurrentStream().flushAndSync方法:
protected void flushAndSync(final boolean durable) throws IOException {
//调用mapJournalsAndReportErrors方法,注意定义了apply方法为jas.getCurrentStream().flushAndSync
mapJournalsAndReportErrors(new JournalClosure() {
@Override
public void apply(JournalAndStream jas) throws IOException {
if (jas.isActive()) {
jas.getCurrentStream().flushAndSync(durable);
}
}
}, "flushAndSync");
}
//执行apply方法,即jas.getCurrentStream().flushAndSync
private void mapJournalsAndReportErrors(
JournalClosure closure, String status) throws IOException{
List<JournalAndStream> badJAS = Lists.newLinkedList();
for (JournalAndStream jas : journals) {
try {
closure.apply(jas);
//省略
}
再查看journals如何初始化的。在NameNode启动时,调用initJournals,会在journals对象中添加FileJournalManager和QuorumJournalManager:
private synchronized void initJournals(List<URI> dirs) {
int minimumRedundantJournals = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_KEY,
DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_DEFAULT);
synchronized(journalSetLock) {
journalSet = new JournalSet(minimumRedundantJournals);
for (URI u : dirs) {
boolean required = FSNamesystem.getRequiredNamespaceEditsDirs(conf)
.contains(u);
if (u.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) {
StorageDirectory sd = storage.getStorageDirectory(u);
if (sd != null) {
journalSet.add(new FileJournalManager(conf, sd, storage),
required, sharedEditsDirs.contains(u));
}
} else {
journalSet.add(createJournal(u), required,
sharedEditsDirs.contains(u));
}
}
}
if (journalSet.isEmpty()) {
LOG.error("No edits directories configured!");
}
}
FileJournalManager和QuorumJournalManager都会管理自己的stream数据流,它们都会调用startLogSegment方法分别创建EditLogFileOutputStream和QuorumOutputStream写出:
public void startLogSegment(long txId, int layoutVersion) throws IOException {
Preconditions.checkState(stream == null);
disabled = false;
stream = journal.startLogSegment(txId, layoutVersion);
}
对于本地落盘过程,直接调用EditsDoubleBuffer.flushTo将bufReady写出到磁盘:
public void flushTo(OutputStream out) throws IOException {
bufReady.writeTo(out); // write data to file
bufReady.reset(); // erase all data in the buffer
}
对于写入JournalNode过程,直接调用QuorumOutputStream.flushAndSync方法,通过AsyncLoggerSet发送edits:
QuorumCall<AsyncLogger, Void> qcall = loggers.sendEdits(
segmentTxId, firstTxToFlush,
numReadyTxns, data);
public QuorumCall<AsyncLogger, Void> sendEdits(
long segmentTxId, long firstTxnId, int numTxns, byte[] data) {
Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMap();
//每个AsyncLogger都发送一遍
for (AsyncLogger logger : loggers) {
ListenableFuture<Void> future =
logger.sendEdits(segmentTxId, firstTxnId, numTxns, data);
calls.put(logger, future);
}
return QuorumCall.create(calls);
}
AsyncLoggerSet中包含多个AsyncLogger,每个AsyncLogger对应一个journalNode,它用于向对应的JournalNode发送edits。如下是AsyncLogger初始化过程:
for (InetSocketAddress addr : addrs) {
ret.add(factory.createLogger(conf, nsInfo, jid, nameServiceId, addr));
}
AsyncLogger会启动一个线程提交该edits。线程哪构建journalNode的proxy:
ret = singleThreadExecutor.submit(new Callable<Void>() {
@Override
public Void call() throws IOException {
throwIfOutOfSync();
long rpcSendTimeNanos = System.nanoTime();
try {
getProxy().journal(createReqInfo(),
segmentTxId, firstTxnId, numTxns, data);
} catch (IOException e) {
QuorumJournalManager.LOG.warn(
"Remote journal " + IPCLoggerChannel.this + " failed to " +
"write txns " + firstTxnId + "-" + (firstTxnId + numTxns - 1) +
". Will try to write to this JN again after the next " +
"log roll.", e);
synchronized (IPCLoggerChannel.this) {
outOfSync = true;
}
throw e;
}
最后AsyncLoggerSet根据getMajoritySize()计算最多的journalNode失败数量,超过该数量就报错:
int getMajoritySize() {
return loggers.size() / 2 + 1;
}
if (q.countSuccesses() < majority) {
q.rethrowException("Got too many exceptions to achieve quorum size " +
getMajorityString());
}
3.3 JournalNode落盘editlog流程
NameNode通过QJournalProtocol.journal请求JournalNode写入edits。JournalNode服务端会调用
JournalNodeRpcServer.journal方法进行处理。
JournalNodeRpcServer执行Journal.journal()方法:
public void journal(RequestInfo reqInfo,
long segmentTxId, long firstTxnId,
int numTxns, byte[] records) throws IOException {
jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId())
//执行Journal.journal()方法
.journal(reqInfo, segmentTxId, firstTxnId, numTxns, records);
}
在Journal.journal()方法中,EditLogFileOutputStream将edits写入curBuffer,setReadyToFlush将curBuffer和ReadyBuffer进行交换指针,ReadyBuffer写入磁盘中:
//curSegment为EditLogFileOutputStream
curSegment.writeRaw(records, 0, records.length);
curSegment.setReadyToFlush();
curSegment.flush(shouldFsync);
最终,JournalNode将会将edits发送给Standby Namenode,代码暂未定位到。
3.4 NameNode edits落盘与发送JouranlNode流程总结
流程图如下所示:
4. Standby NameNode 加载最新edits过程
为了防止journalNode中存放过多的edits文件,在StandBy NameNode中,定期向jouranlNode获取edits文件,将edits文件合并成为fsimage,合并后的edits文件就可以进行删除。这个过程就是checkpoint。
NameNode启动时,都默认置为Standby状态。FSNamesystem启动startStandbyServices方法,启动EditLogTailer线程;如果要退出Standby状态,进入Active状态,就会停止EditLogTailer线程。这样就保证了只有Standby NameNode执行EditLogTailer。Standby NameNode同时会启动StandbyCheckpointer线程。
- EditLogTailer线程:定时将最新的本地磁盘中的edits或journalnode应用到内存的fsimage中。
- StandbyCheckpointer线程:将内存中的fsimage保存到本地磁盘中,然后发送给active NameNode。
void startStandbyServices(final Configuration conf, boolean isObserver)
throws IOException {
//省略
//启动EditLogTailer
editLogTailer = new EditLogTailer(this, conf);
editLogTailer.start();
//启动
standbyCheckpointer = new StandbyCheckpointer(conf, this);
standbyCheckpointer.start();
//省略
}
void stopStandbyServices() throws IOException {
if (standbyCheckpointer != null) {
standbyCheckpointer.stop();
}
if (editLogTailer != null) {
editLogTailer.stop();
}
}
EditLogTailer实际上启动的是EditLogTailerThread线程。EditLogTailerThread先后调用doWork、doTailEdits方法,循环定期将edits更新内存中的fsimage:
public long doTailEdits() throws IOException, InterruptedException {
// Write lock needs to be interruptible here because the
// transitionToActive RPC takes the write lock before calling
// tailer.stop() -- so if we're not interruptible, it will
// deadlock.
//获取StandBy NameNode内存中fsimage
FSImage image = namesystem.getFSImage();
//省略
//根据输入流加载edits
editsLoaded = image.loadEdits(
streams, namesystem, maxTxnsPerLock, null, null);
//省略
}
}
如下,EditLogFileOutputStream和QuorumOutputStream都构建输入流。即从本地和journalNode中读取edits:
public void selectInputStreams(Collection<EditLogInputStream> streams,
long fromTxId, boolean inProgressOk, boolean onlyDurableTxns) {
final PriorityQueue<EditLogInputStream> allStreams =
new PriorityQueue<EditLogInputStream>(64,
EDIT_LOG_INPUT_STREAM_COMPARATOR);
for (JournalAndStream jas : journals) {
jas.getManager().selectInputStreams(allStreams, fromTxId,
inProgressOk, onlyDurableTxns);
}
}
FSImage调用loadEdits方法,依次从本地和journalNode加载最新edits:
for (EditLogInputStream editIn : editStreams) {
//省略
try {
remainingReadTxns -= loader.loadFSEdits(editIn, lastAppliedTxId + 1,
remainingReadTxns, startOpt, recovery);
//省略
}
FSEditLogLoader.loadEditRecords方法从输入流中解析最新的FSEditLogOp,加载到文件系统树中:
try {
FSEditLogOp op;
try {
op = in.readOp();
if (op == null) {
break;
}
}//省略
long inodeId = applyEditLogOp(op, fsDir, startOpt,
in.getVersion(true), lastInodeId);
if (lastInodeId < inodeId) {
lastInodeId = inodeId;
}
加载最新editslog流程图如下所示:
5. Standby NameNode checkpoint过程
Standby NameNode启动时,会启动CheckpointerThread线程。线程循环定期执行doCheckpoint(sendRequest)方法开始checkpoint。doCheckpoint方法中,先将fsimage信息保存到本地磁盘中,再将本地磁盘中的fsimage文件发送给active NameNode:
//将内存中的fsimage信息保存到本地磁盘中
FSImage img = namesystem.getFSImage();
img.saveNamespace(namesystem, imageType, canceler);
//将本地磁盘中的fsimage文件发送给active NameNode
List<Future<TransferFsImage.TransferResult>> uploads =
new ArrayList<Future<TransferFsImage.TransferResult>>();
for (final URL activeNNAddress : activeNNAddresses) {
Future<TransferFsImage.TransferResult> upload =
executor.submit(new Callable<TransferFsImage.TransferResult>() {
@Override
public TransferFsImage.TransferResult call()
throws IOException, InterruptedException {
CheckpointFaultInjector.getInstance().duringUploadInProgess();
return TransferFsImage.uploadImageFromStorage(activeNNAddress, conf, namesystem
.getFSImage().getStorage(), imageType, txid, canceler);
}
});
uploads.add(upload);
}
首先看FSImage.saveNamespace方法,它保存fsimage到本地磁盘:
public synchronized void saveNamespace(FSNamesystem source, NameNodeFile nnf,
Canceler canceler) throws IOException {
//省略
saveFSImageInAllDirs(source, nnf, imageTxId, canceler);
//省略
}
FSImage启动FSImageSaver线程,调用saveFSImage方法,将fsimage和md5信息写入到不同的文件中:
void saveFSImage(SaveNamespaceContext context, StorageDirectory sd,
NameNodeFile dstType) throws IOException {
long txid = context.getTxId();
File newFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW, txid);
File dstFile = NNStorage.getStorageFile(sd, dstType, txid);
FSImageFormatProtobuf.Saver saver = new FSImageFormatProtobuf.Saver(context);
FSImageCompression compression = FSImageCompression.createCompression(conf);
//将fsimage写入文件中
long numErrors = saver.save(newFile, compression);
if (numErrors > 0) {
// The image is likely corrupted.
LOG.error("Detected " + numErrors + " errors while saving FsImage " +
dstFile);
exitAfterSave.set(true);
}
//写入md5文件中
MD5FileUtils.saveMD5File(dstFile, saver.getSavedDigest());
storage.setMostRecentCheckpointInfo(txid, Time.now());
}
保存完fsimage后,Standby Namenode会将在异步线程中调用TransferFsImage.uploadImageFromStorage方法将fsimage文件发送给active NameNode。
private static void uploadImage(URL url, Configuration conf,
NNStorage storage, NameNodeFile nnf, long txId, Canceler canceler)
throws IOException {
File imageFile = storage.findImageFile(nnf, txId);
if (imageFile == null) {
throw new IOException("Could not find image with txid " + txId);
}
HttpURLConnection connection = null;
//建立与Active NameNode的连接
connection = (HttpURLConnection) connectionFactory.openConnection(
urlWithParams, UserGroupInformation.isSecurityEnabled());
// Set the request to PUT
connection.setRequestMethod("PUT");
connection.setDoOutput(true);
//省略
//将fsimage文件发送给Active NameNode
// Write the file to output stream.
writeFileToPutRequest(conf, connection, imageFile, canceler);
//省略
}
checkpoint流程如下所示:
另外当NameNode从Standby状态切换到Active状态时,会加载本地的fsimage文件,还会增量读取journalnode中的最新editlog。它调用EditLogTailer.catchupDuringFailover方法,执行doTailEdits实现,而doTailEdits流程上述已经讲过:
public void catchupDuringFailover() throws IOException {
Preconditions.checkState(tailerThread == null ||
!tailerThread.isAlive(),
"Tailer thread should not be running once failover starts");
// Important to do tailing as the login user, in case the shared
// edits storage is implemented by a JournalManager that depends
// on security credentials to access the logs (eg QuorumJournalManager).
SecurityUtil.doAsLoginUser(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
try {
// It is already under the full name system lock and the checkpointer
// thread is already stopped. No need to acqure any other lock.
doTailEdits();
} catch (InterruptedException e) {
throw new IOException(e);
}
return null;
}
});
}
标签:JournalNode,fsimage,edits,IOException,数据处理,NameNode,过程,throws
From: https://blog.51cto.com/u_15327484/8122340