首页 > 编程语言 >JournalNode元数据处理过程

JournalNode元数据处理过程

时间:2023-11-01 14:31:51浏览次数:40  
标签:JournalNode fsimage edits IOException 数据处理 NameNode 过程 throws

1. 背景

在Hadoop2.x之前,只有一台NameNode负责对外提供服务,另外一台secondary NameNode只用于合并fsimage,不提供对外元数据服务。因此NameNode和secondary NameNode都存在单点问题。

为了解决secondary NameNode单点问题,HDFS引入多个JournalNode服务存储操作日志,取代单台secondary NameNode。JournalNode引入了Quorum机制:Active NameNode写EditLog时,除了向NameNode本地磁盘写操作日志,还会向所有JournalNode发送写请求。对于2N+1台JN组成的集群,最多可以容忍N个JN节点异常。

由于NameNode允许JournalNode半数写失败,在NameNode恢复editlog时,会先调用FSEditLog.recoverUnclosedStreams()方法让JournalNode 集群中各个节点上的 EditLog 达成一致。因此NameNode的元数据遵从最终一致性,而不是强一致性。元数据恢复期间,NameNode为安全模式,不可访问,因此HDFS是CP架构。

2. JournalNode概念

  1. fsimage:fsimage文件是Hadoop系统元数据的永久性检查点,包含了系统中所有文件的目录和文件inode序列化信息。
  2. edits:edits日志文件存放hadoop所有操作的日志信息,操作首先会被记录到edits文件中,定时合并为fsimage文件,在没有开启HA的情况下是由secondary nn来进行合并操作,开启HA的情况下是JournalNode节点来进行合并以及同步。
  3. epoch:epoch是paxos协议中的一个概念,可以用于标识active NameNode。每次NameNode切换时,epoch就会加1,每个请求都会向JournalNode携带epoch。一旦epoch小于JournalNode中维护的last-promised-epoch,说明该NameNode是切换之前的NameNode,此时集群发生了脑裂,禁止此NameNode向JournalNode发送editlog。
  4. txid:每个editlog都有对应的事务txid。
  5. segment:多个连续的txid事务存储到一个editlog文件中,只会有一个segment处于正在写的状态(Inprogress),而其他的segment文件则都处于写完关闭的状态(Finalized)。每当写一个新的segment时,会比较epoch是否比editlog维护的last-writer-epoch大,如果大就写入到segment文件中。
  6. committed-txid:journalNode记录己接受,正在处理的事务id。
  7. edits_$starttxid-$endtxid:已经写完关闭的segment文件,该文件记录了从开始事务ID到结束事务ID的连续的editlog信息。
  8. edits_inprogress_$lasttxid:当前正在写的segment文件,文件名中记录了开始事务ID。
  9. VERSION:记录集群的相关信息,包括命名空间ID,集群ID,创建时间等。

dfs.namenode.name.dir配置定义了namenode中editlog日志位置:

<property>
    <name>dfs.namenode.name.dir</name>
    <value>file:///disk1/dfs/name,file:///disk2/dfs/name</value>
  </property>

dfs.journalnode.edits.dir配置定义了journalNode中editlog日志位置:

<name>dfs.journalnode.edits.dir</name>
    <value>/home/data/jn</value>

namenode中editlog和fsimage如下所示:

Untitled.png

journalNode中editlog如下所示:

Untitled 1.png

3. NameNode操作日志写入JournalNode流程

3.1 客户端rpc请求到NameNode生成edit op操作过程

https://blog.51cto.com/u_15327484/8089923文章中,分析了客户端create请求时,NameNode会在内存的FSNamesystem文件系统树中更新对应的INodeFile对象。

NameNodeRpcServer.create方法在调用过程中,FSNamesystem.startFileInt方法先在HDFS文件系统树中创建文件INode,再通过getEditLog().logSync()记录操作日志:

private HdfsFileStatus startFileInt(String src,
      PermissionStatus permissions, String holder, String clientMachine,
      EnumSet<CreateFlag> flag, boolean createParent, short replication,
      long blockSize, CryptoProtocolVersion[] supportedVersions,
      String ecPolicyName, boolean logRetryCache) throws IOException {
      //开始在HDFS文件系统树中创建文件
      try {
        stat = FSDirWriteFileOp.startFile(this, iip, permissions, holder,
            clientMachine, flag, createParent, replication, blockSize, feInfo,
            toRemoveBlocks, shouldReplicate, ecPolicyName, logRetryCache);
    
    } finally {
      writeUnlock("create");
      // There might be transactions logged while trying to recover the lease.
      // They need to be sync'ed even when an exception was thrown.
      if (!skipSync) {
        //记录操作日志到editlog中
        getEditLog().logSync();
        if (toRemoveBlocks != null) {
          removeBlocks(toRemoveBlocks);
          toRemoveBlocks.clear();
        }
      }
    }

    return stat;
  }

在FSDirWriteFileOp.startFile方法中,先调用addFile方法在文件系统树中增加INodeFile,然后调用fsd.getEditLog().logOpenFile方法记录create请求对应的OpenFile类型的操作日志:

static HdfsFileStatus startFile(
      FSNamesystem fsn, INodesInPath iip,
      PermissionStatus permissions, String holder, String clientMachine,
      EnumSet<CreateFlag> flag, boolean createParent,
      short replication, long blockSize,
      FileEncryptionInfo feInfo, INode.BlocksMapUpdateInfo toRemoveBlocks,
      boolean shouldReplicate, String ecPolicyName, boolean logRetryEntry)
      throws IOException {
    //省略
    //在文件系统树中增加INodeFile
    if (parent != null) {
      iip = addFile(fsd, parent, iip.getLastLocalName(), permissions,
          replication, blockSize, holder, clientMachine, shouldReplicate,
          ecPolicyName);
      newNode = iip != null ? iip.getLastINode().asFile() : null;
    }
    //省略
    //在editlog中记录OpenFile类型的操作日志
    fsd.getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry);
    if (NameNode.stateChangeLog.isDebugEnabled()) {
      NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: added " +
          src + " inode " + newNode.getId() + " " + holder);
    }
    return FSDirStatAndListingOp.getFileInfo(fsd, iip, false, false);
  }

FSEditLog.logOpenFile方法创建了AddOp对象,记录了操作内容,最后将该对象内容通过logEdit方法记录到输出流中:

public void logOpenFile(String path, INodeFile newNode, boolean overwrite,
      boolean toLogRpcIds) {
    Preconditions.checkArgument(newNode.isUnderConstruction());
    PermissionStatus permissions = newNode.getPermissionStatus();
    //构建操作日志
    AddOp op = AddOp.getInstance(cache.get())
      .setInodeId(newNode.getId())
      .setPath(path)
      .setReplication(newNode.getFileReplication())
      .setModificationTime(newNode.getModificationTime())
      .setAccessTime(newNode.getAccessTime())
      .setBlockSize(newNode.getPreferredBlockSize())
      .setBlocks(newNode.getBlocks())
      .setPermissionStatus(permissions)
      .setClientName(newNode.getFileUnderConstructionFeature().getClientName())
      .setClientMachine(
          newNode.getFileUnderConstructionFeature().getClientMachine())
      .setOverwrite(overwrite)
      .setStoragePolicyId(newNode.getLocalStoragePolicyID())
      .setErasureCodingPolicyId(newNode.getErasureCodingPolicyID());

    AclFeature f = newNode.getAclFeature();
    if (f != null) {
      op.setAclEntries(AclStorage.readINodeLogicalAcl(newNode));
    }

    XAttrFeature x = newNode.getXAttrFeature();
    if (x != null) {
      op.setXAttrs(x.getXAttrs());
    }

    logRpcIds(op, toLogRpcIds);
    //输出
    logEdit(op);
  }

如下是logOpenFile对应的AddOp定义,当执行ClientProtocol.create和ClientProtocol.append时,会生成AddOp这种操作日志:

/**
   * {@literal @AtMostOnce} for {@link ClientProtocol#create} and
   * {@link ClientProtocol#append}
   */

FSEditLog.logEdit方法开始写入事务内容:

  1. 先调用beginTransaction开启事务,txid自增。
  2. 将AddOp操作内容写入到editLogStream输出流中。
  3. 调用logSync异步写入到磁盘和远程journalNode中。
void logEdit(final FSEditLogOp op) {
    boolean needsSync = false;
    //省略
      // check if it is time to schedule an automatic sync
      //以事务的方式写入edit
      needsSync = doEditTransaction(op);
      //省略
    // Sync the log if an automatic sync is required.
    if (needsSync) {
      logSync();
    }
  }

//开启事务
synchronized boolean doEditTransaction(final FSEditLogOp op) {
    //开启事务
    long start = beginTransaction();
    op.setTransactionId(txid);

    try {
      //将AddOp写入到editLogStream输出流中
      editLogStream.write(op);
    } catch (IOException ex) {
      // All journals failed, it is handled in logSync.
    } finally {
      op.reset();
    }
    endTransaction(start);
    return shouldForceSync();
  }

//开启事务时,将事务txid自增
private long beginTransaction() {
    assert Thread.holdsLock(this);
    // get a new transactionId
    txid++;

    //
    // record the transactionId when new data was written to the edits log
    //
    TransactionId id = myTransactionId.get();
    id.txid = txid;
    return monotonicNow();
  }

在editLogStream.write写入时,发现它写入的是bufCurrent:

public void writeOp(FSEditLogOp op, int logVersion) throws IOException {
  bufCurrent.writeOp(op, logVersion);
}

在logSync方法中:

  1. 判断edit是否还未同步完,并且正在同步,就等待1s再尝试同步。
  2. 如果已经同步完,就退出。
  3. 交换currentBuffer和ReadyBuffer的执行。
  4. 将ReadyBuffer内容写出到磁盘和journalNode中。

NameNode双缓冲Buffer架构如下所示:

Untitled 2.png

代码如下:

EditLogOutputStream logStream = null;
      synchronized (this) {
        try {
          printStatistics(false);
          //判断edit是否还未同步完,并且正在同步,就等待1s再尝试同步。
          // if somebody is already syncing, then wait
          while (mytxid > synctxid && isSyncRunning) {
            try {
              wait(1000);
            } catch (InterruptedException ie) {
            }
          }
  
          //
          // If this transaction was already flushed, then nothing to do
          //如果已经同步完,就退出。
          if (mytxid <= synctxid) {
            return;
          }

          // now, this thread will do the sync.  track if other edits were
          // included in the sync - ie. batched.  if this is the only edit
          // synced then the batched count is 0
          editsBatchedInSync = txid - synctxid - 1;
          syncStart = txid;
          isSyncRunning = true;
          sync = true;

          // swap buffers
          try {
            if (journalSet.isEmpty()) {
              throw new IOException("No journals available to flush");
            }
            //交换currentBuffer和ReadyBuffer的执行
            editLogStream.setReadyToFlush();
          } catch (IOException e) {
            final String msg =
                "Could not sync enough journals to persistent storage " +
                "due to " + e.getMessage() + ". " +
                "Unsynced transactions: " + (txid - synctxid);
            LOG.error(msg, new Exception());
            synchronized(journalSetLock) {
              IOUtils.cleanupWithLogger(LOG, journalSet);
            }
            terminate(1, msg);
          }
        } finally {
          // Prevent RuntimeException from blocking other log edit write 
          doneWithAutoSyncScheduling();
        }
        //editLogStream may become null,
        //so store a local variable for flush.
        logStream = editLogStream;
      }
      
      // do the sync
      long start = monotonicNow();
      try {
        if (logStream != null) {
          //将ReadyBuffer内容写出到磁盘和journalNode中
          logStream.flush();
        }
      } catch (IOException ex) {
        synchronized (this) {
          final String msg =
              "Could not sync enough journals to persistent storage. "
              + "Unsynced transactions: " + (txid - synctxid);
          LOG.error(msg, new Exception());
          synchronized(journalSetLock) {
            IOUtils.cleanupWithLogger(LOG, journalSet);
          }
          terminate(1, msg);
        }
      }

editLogStream.setReadyToFlush交换bufCurrent和bufReady的指针,准备将bufReady的数据写出:

public void setReadyToFlush() {
    assert isFlushed() : "previous data not flushed yet";
    TxnBuffer tmp = bufReady;
    bufReady = bufCurrent;
    bufCurrent = tmp;
  }

3.2 NameNode editlog落盘和写入journalnode过程

通过3.1节可以了解到,通过EditLogOutputStream.flush方法,NameNode将ReadyBuffer内容写出到磁盘和journalNode中。

EditLogOutputStream是一个抽象类,先看一下它的实现:

Untitled 3.png

其中:

  1. EditLogFileOutputStream:用于写本地磁盘。
  2. QuorumOutputStream:用于写JournalNode。
  3. JournalSetOutputStream:包装类,可以添加EditLogFileOutputStream和QuorumOutputStream对象,他会调用EditLogFileOutputStream和QuorumOutputStream的方法从而依次写本地磁盘和写JournalNode。

在NameNode调用startLogSegment按批次写入edits时,就指定editLogStream为JournalSetOutputStream:

editLogStream = journalSet.startLogSegment(segmentTxId, layoutVersion);

public EditLogOutputStream startLogSegment(final long txId,
      final int layoutVersion) throws IOException {
    mapJournalsAndReportErrors(new JournalClosure() {
      @Override
      public void apply(JournalAndStream jas) throws IOException {
        jas.startLogSegment(txId, layoutVersion);
      }
    }, "starting log segment " + txId);
    return new JournalSetOutputStream();
  }

回到logSync方法,最终调用JournalSet.mapJournalsAndReportErrors方法,它最终调用每个journals对象的getCurrentStream().flushAndSync方法:

protected void flushAndSync(final boolean durable) throws IOException {
      //调用mapJournalsAndReportErrors方法,注意定义了apply方法为jas.getCurrentStream().flushAndSync
      mapJournalsAndReportErrors(new JournalClosure() {
        @Override
        public void apply(JournalAndStream jas) throws IOException {
          if (jas.isActive()) {
            jas.getCurrentStream().flushAndSync(durable);
          }
        }
      }, "flushAndSync");
    }
//执行apply方法,即jas.getCurrentStream().flushAndSync
private void mapJournalsAndReportErrors(
      JournalClosure closure, String status) throws IOException{

    List<JournalAndStream> badJAS = Lists.newLinkedList();
    for (JournalAndStream jas : journals) {
      try {
        closure.apply(jas);
      //省略
  }

再查看journals如何初始化的。在NameNode启动时,调用initJournals,会在journals对象中添加FileJournalManager和QuorumJournalManager:

private synchronized void initJournals(List<URI> dirs) {
    int minimumRedundantJournals = conf.getInt(
        DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_KEY,
        DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_DEFAULT);

    synchronized(journalSetLock) {
      journalSet = new JournalSet(minimumRedundantJournals);

      for (URI u : dirs) {
        boolean required = FSNamesystem.getRequiredNamespaceEditsDirs(conf)
            .contains(u);
        if (u.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) {
          StorageDirectory sd = storage.getStorageDirectory(u);
          if (sd != null) {
            journalSet.add(new FileJournalManager(conf, sd, storage),
                required, sharedEditsDirs.contains(u));
          }
        } else {
          journalSet.add(createJournal(u), required,
              sharedEditsDirs.contains(u));
        }
      }
    }
 
    if (journalSet.isEmpty()) {
      LOG.error("No edits directories configured!");
    } 
  }

FileJournalManager和QuorumJournalManager都会管理自己的stream数据流,它们都会调用startLogSegment方法分别创建EditLogFileOutputStream和QuorumOutputStream写出:

public void startLogSegment(long txId, int layoutVersion) throws IOException {
      Preconditions.checkState(stream == null);
      disabled = false;
      stream = journal.startLogSegment(txId, layoutVersion);
    }

对于本地落盘过程,直接调用EditsDoubleBuffer.flushTo将bufReady写出到磁盘:

public void flushTo(OutputStream out) throws IOException {
    bufReady.writeTo(out); // write data to file
    bufReady.reset(); // erase all data in the buffer
  }

对于写入JournalNode过程,直接调用QuorumOutputStream.flushAndSync方法,通过AsyncLoggerSet发送edits:

QuorumCall<AsyncLogger, Void> qcall = loggers.sendEdits(
          segmentTxId, firstTxToFlush,
          numReadyTxns, data);

public QuorumCall<AsyncLogger, Void> sendEdits(
      long segmentTxId, long firstTxnId, int numTxns, byte[] data) {
    Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMap();
    //每个AsyncLogger都发送一遍
    for (AsyncLogger logger : loggers) {
      ListenableFuture<Void> future = 
        logger.sendEdits(segmentTxId, firstTxnId, numTxns, data);
      calls.put(logger, future);
    }
    return QuorumCall.create(calls);
  }

AsyncLoggerSet中包含多个AsyncLogger,每个AsyncLogger对应一个journalNode,它用于向对应的JournalNode发送edits。如下是AsyncLogger初始化过程:

for (InetSocketAddress addr : addrs) {
      ret.add(factory.createLogger(conf, nsInfo, jid, nameServiceId, addr));
    }

AsyncLogger会启动一个线程提交该edits。线程哪构建journalNode的proxy:

ret = singleThreadExecutor.submit(new Callable<Void>() {
        @Override
        public Void call() throws IOException {
          throwIfOutOfSync();

          long rpcSendTimeNanos = System.nanoTime();
          try {
            getProxy().journal(createReqInfo(),
                segmentTxId, firstTxnId, numTxns, data);
          } catch (IOException e) {
            QuorumJournalManager.LOG.warn(
                "Remote journal " + IPCLoggerChannel.this + " failed to " +
                "write txns " + firstTxnId + "-" + (firstTxnId + numTxns - 1) +
                ". Will try to write to this JN again after the next " +
                "log roll.", e); 
            synchronized (IPCLoggerChannel.this) {
              outOfSync = true;
            }
            throw e;
          }

最后AsyncLoggerSet根据getMajoritySize()计算最多的journalNode失败数量,超过该数量就报错:

int getMajoritySize() {
    return loggers.size() / 2 + 1;
  }

if (q.countSuccesses() < majority) {
      q.rethrowException("Got too many exceptions to achieve quorum size " +
          getMajorityString());
    }

3.3 JournalNode落盘editlog流程

NameNode通过QJournalProtocol.journal请求JournalNode写入edits。JournalNode服务端会调用

JournalNodeRpcServer.journal方法进行处理。

JournalNodeRpcServer执行Journal.journal()方法:

public void journal(RequestInfo reqInfo,
      long segmentTxId, long firstTxnId,
      int numTxns, byte[] records) throws IOException {
    jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId())
       //执行Journal.journal()方法
       .journal(reqInfo, segmentTxId, firstTxnId, numTxns, records);
  }

在Journal.journal()方法中,EditLogFileOutputStream将edits写入curBuffer,setReadyToFlush将curBuffer和ReadyBuffer进行交换指针,ReadyBuffer写入磁盘中:

//curSegment为EditLogFileOutputStream
curSegment.writeRaw(records, 0, records.length);
    curSegment.setReadyToFlush();
    curSegment.flush(shouldFsync);

最终,JournalNode将会将edits发送给Standby Namenode,代码暂未定位到。

3.4 NameNode edits落盘与发送JouranlNode流程总结

流程图如下所示:

Untitled 4.png

4. Standby NameNode 加载最新edits过程

为了防止journalNode中存放过多的edits文件,在StandBy NameNode中,定期向jouranlNode获取edits文件,将edits文件合并成为fsimage,合并后的edits文件就可以进行删除。这个过程就是checkpoint。

NameNode启动时,都默认置为Standby状态。FSNamesystem启动startStandbyServices方法,启动EditLogTailer线程;如果要退出Standby状态,进入Active状态,就会停止EditLogTailer线程。这样就保证了只有Standby NameNode执行EditLogTailer。Standby NameNode同时会启动StandbyCheckpointer线程。

  • EditLogTailer线程:定时将最新的本地磁盘中的edits或journalnode应用到内存的fsimage中。
  • StandbyCheckpointer线程:将内存中的fsimage保存到本地磁盘中,然后发送给active NameNode。
void startStandbyServices(final Configuration conf, boolean isObserver)
      throws IOException {
    //省略
    //启动EditLogTailer
    editLogTailer = new EditLogTailer(this, conf);
    editLogTailer.start();

    //启动
    standbyCheckpointer = new StandbyCheckpointer(conf, this);
    standbyCheckpointer.start();
    //省略
  }

void stopStandbyServices() throws IOException {
   if (standbyCheckpointer != null) {
      standbyCheckpointer.stop();
    }
    if (editLogTailer != null) {
      editLogTailer.stop();
    }
  }

EditLogTailer实际上启动的是EditLogTailerThread线程。EditLogTailerThread先后调用doWork、doTailEdits方法,循环定期将edits更新内存中的fsimage:

public long doTailEdits() throws IOException, InterruptedException {
    // Write lock needs to be interruptible here because the 
    // transitionToActive RPC takes the write lock before calling
    // tailer.stop() -- so if we're not interruptible, it will
    // deadlock.
   
      //获取StandBy NameNode内存中fsimage
      FSImage image = namesystem.getFSImage();

     
      //省略
        //根据输入流加载edits
        editsLoaded = image.loadEdits(
            streams, namesystem, maxTxnsPerLock, null, null);
      //省略
    }
  }

如下,EditLogFileOutputStream和QuorumOutputStream都构建输入流。即从本地和journalNode中读取edits:

public void selectInputStreams(Collection<EditLogInputStream> streams,
      long fromTxId, boolean inProgressOk, boolean onlyDurableTxns) {
    final PriorityQueue<EditLogInputStream> allStreams = 
        new PriorityQueue<EditLogInputStream>(64,
            EDIT_LOG_INPUT_STREAM_COMPARATOR);
    for (JournalAndStream jas : journals) {
        jas.getManager().selectInputStreams(allStreams, fromTxId,
            inProgressOk, onlyDurableTxns);
    }
  }

FSImage调用loadEdits方法,依次从本地和journalNode加载最新edits:

for (EditLogInputStream editIn : editStreams) {
        //省略
        try {
          remainingReadTxns -= loader.loadFSEdits(editIn, lastAppliedTxId + 1,
                  remainingReadTxns, startOpt, recovery);
        //省略
      }

FSEditLogLoader.loadEditRecords方法从输入流中解析最新的FSEditLogOp,加载到文件系统树中:

try {
          FSEditLogOp op;
          try {
            op = in.readOp();
            if (op == null) {
              break;
            }
          }//省略
            long inodeId = applyEditLogOp(op, fsDir, startOpt,
                in.getVersion(true), lastInodeId);
            if (lastInodeId < inodeId) {
              lastInodeId = inodeId;
            }

加载最新editslog流程图如下所示:

Untitled 5.png

5. Standby NameNode checkpoint过程

Standby NameNode启动时,会启动CheckpointerThread线程。线程循环定期执行doCheckpoint(sendRequest)方法开始checkpoint。doCheckpoint方法中,先将fsimage信息保存到本地磁盘中,再将本地磁盘中的fsimage文件发送给active NameNode:

//将内存中的fsimage信息保存到本地磁盘中
FSImage img = namesystem.getFSImage();
img.saveNamespace(namesystem, imageType, canceler);
//将本地磁盘中的fsimage文件发送给active NameNode
List<Future<TransferFsImage.TransferResult>> uploads =
        new ArrayList<Future<TransferFsImage.TransferResult>>();
    for (final URL activeNNAddress : activeNNAddresses) {
      Future<TransferFsImage.TransferResult> upload =
          executor.submit(new Callable<TransferFsImage.TransferResult>() {
            @Override
            public TransferFsImage.TransferResult call()
                throws IOException, InterruptedException {
              CheckpointFaultInjector.getInstance().duringUploadInProgess();
              return TransferFsImage.uploadImageFromStorage(activeNNAddress, conf, namesystem
                  .getFSImage().getStorage(), imageType, txid, canceler);
            }
          });
      uploads.add(upload);
    }

首先看FSImage.saveNamespace方法,它保存fsimage到本地磁盘:

public synchronized void saveNamespace(FSNamesystem source, NameNodeFile nnf,
      Canceler canceler) throws IOException {
    //省略
        saveFSImageInAllDirs(source, nnf, imageTxId, canceler);
   //省略
  }

FSImage启动FSImageSaver线程,调用saveFSImage方法,将fsimage和md5信息写入到不同的文件中:

void saveFSImage(SaveNamespaceContext context, StorageDirectory sd,
      NameNodeFile dstType) throws IOException {
    long txid = context.getTxId();
    File newFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW, txid);
    File dstFile = NNStorage.getStorageFile(sd, dstType, txid);
    
    FSImageFormatProtobuf.Saver saver = new FSImageFormatProtobuf.Saver(context);
    FSImageCompression compression = FSImageCompression.createCompression(conf);
    //将fsimage写入文件中
    long numErrors = saver.save(newFile, compression);
    if (numErrors > 0) {
      // The image is likely corrupted.
      LOG.error("Detected " + numErrors + " errors while saving FsImage " +
          dstFile);
      exitAfterSave.set(true);
    }
    //写入md5文件中
    MD5FileUtils.saveMD5File(dstFile, saver.getSavedDigest());
    storage.setMostRecentCheckpointInfo(txid, Time.now());
  }

保存完fsimage后,Standby Namenode会将在异步线程中调用TransferFsImage.uploadImageFromStorage方法将fsimage文件发送给active NameNode。

private static void uploadImage(URL url, Configuration conf,
      NNStorage storage, NameNodeFile nnf, long txId, Canceler canceler)
      throws IOException {

    File imageFile = storage.findImageFile(nnf, txId);
    if (imageFile == null) {
      throw new IOException("Could not find image with txid " + txId);
    }

    HttpURLConnection connection = null;
      //建立与Active NameNode的连接
      connection = (HttpURLConnection) connectionFactory.openConnection(
          urlWithParams, UserGroupInformation.isSecurityEnabled());
      // Set the request to PUT
      connection.setRequestMethod("PUT");
      connection.setDoOutput(true);

      //省略
      //将fsimage文件发送给Active NameNode
      // Write the file to output stream.
      writeFileToPutRequest(conf, connection, imageFile, canceler);

      //省略
  }

checkpoint流程如下所示:

Untitled 6.png

另外当NameNode从Standby状态切换到Active状态时,会加载本地的fsimage文件,还会增量读取journalnode中的最新editlog。它调用EditLogTailer.catchupDuringFailover方法,执行doTailEdits实现,而doTailEdits流程上述已经讲过:

public void catchupDuringFailover() throws IOException {
    Preconditions.checkState(tailerThread == null ||
        !tailerThread.isAlive(),
        "Tailer thread should not be running once failover starts");
    // Important to do tailing as the login user, in case the shared
    // edits storage is implemented by a JournalManager that depends
    // on security credentials to access the logs (eg QuorumJournalManager).
    SecurityUtil.doAsLoginUser(new PrivilegedExceptionAction<Void>() {
      @Override
      public Void run() throws Exception {
        try {
          // It is already under the full name system lock and the checkpointer
          // thread is already stopped. No need to acqure any other lock.
          doTailEdits();
        } catch (InterruptedException e) {
          throw new IOException(e);
        }
        return null;
      }
    });
  }

标签:JournalNode,fsimage,edits,IOException,数据处理,NameNode,过程,throws
From: https://blog.51cto.com/u_15327484/8122340

相关文章

  • xss-labs level 1-3 解题过程
    level1过程网页中并没有输入框,观察URL发现name=test,尝试在此进行注入http://127.0.0.1:82/level1.php?name=<script>alert(1)</script>过关payload<script>alert(1)</script>level2过程输入框先尝试输入<script>alert(1)</script>无反应观察源码:<inputname=keyw......
  • ETL工具与数据处理的关系​
    ETL工具与数据处理之间存在密切的关系。数据处理是指对原始数据进行清洗、整理、加工和分析等操作,以便生成有用的信息和洞察力。而ETL工具则提供了一种自动化和可视化的方式来执行这些数据处理任务。通过ETL工具,用户可以定义数据抽取、转换和加载的规则和流程,实现数据从不同来源系......
  • 记录一次前端表格选型过程
    客户需求:最近,接到一个客户项目,前期沟通时,客户说,我们日常基本都是使用Excel来做一些信息收集。但是每次收集信息时,都需要文件传来传去,十分麻烦。本来是想着用一些云文档,但是沟通下来领导层没通过,主要原因是:(1)内部文件安全级别高,信息不能托管在其它三方平台上,这就需要对云文档做私有......
  • 在使用mssql过程中踩过的坑
    测试环境;dockerwithdebian12问题1:如何在docker环境下部署sqlserver:1、部署镜像dockerrun--nameSQLServer-e"ACCEPT_EULA=Y"-e"SA_PASSWORD=Abc12345"-p1433:1433-dmcr.microsoft.com/mssql/server2、进入容器dockerexec-itSQLServerbash3、连接到数......
  • Lab3:数据处理基本方法及创新应用(基础)
    ++x是先进行x=x+1,再返回x;x++是先返回x,再进行x++55/7=7,因为是整型运算;55/7.0=7.85714286,因为是浮点型运算'b'<'a'返回值为1;x>y返回值在x>y时为1,x<=y为0x>0时返回x,否则返回-1x<<2==x*48=1000,9=1001,z=8&9=1000=866&&88返回值为1,是逻辑与;6......
  • Redisson在日常开发过程中的使用
    使用BitSet实现日期连续签到@GetMapping("/user/sign/{id}")publicResult<String>userSign(@PathVariable("id")Longid,@RequestParam(value="date",required=false)......
  • R语言贝叶斯Metropolis-Hastings Gibbs 吉布斯采样器估计变点指数分布分析泊松过程车
    原文链接:http://tecdat.cn/?p=26578 原文出处:拓端数据部落公众号最近我们被客户要求撰写关于吉布斯采样器的研究报告,包括一些图形和统计输出。指数分布是泊松过程中事件之间时间的概率分布,因此它用于预测到下一个事件的等待时间,例如,您需要在公共汽车站等待的时间,直到下一班车......
  • 螺母制造过程有哪几步?
    螺母就是螺帽,与螺栓或螺杆拧在一起用来起紧固作用的零件,所有生产制造机械必须用的一种元件根据材质的不同,分为碳钢、不锈钢、有色金属(如铜)等几大类型。我们常见的螺母是怎么生产出来的呢?一般螺母的制造工艺有以下几个步骤:1、钢料冲压成型2、攻丝,也就是我们所说的加工螺纹3、热处理,......
  • 记一次服务器Cuda驱动崩溃修复过程
    基本过程今天实验室师兄在服务器运行深度学习训练时候得到报错CUDAinitialization:UnexpectederrorfromcudaGetDeviceCount()疑似Cuda与NVIDIA显卡驱动沟通中出现了问题,使用nvidia-smi指令时提示FailedtoinitializeNVML:Driver/libraryversionmismatch,经过沟通了解到,重......
  • 《软件需求开发最佳实践:基于模型驱动的需求开发过程》阅读笔记二
    在阅读《软件需求开发最佳实践:基于模型驱动的需求开发过程》的四到六章后,我对基于模型驱动的需求开发过程有了更深入的理解和实践。这些章节详细介绍了需求建模、需求验证和需求变更管理的方法和技巧,为我提供了更全面的指导。在需求建模方面,书中介绍了如何使用统一建模语言(UML)和......