首页 > 编程语言 >DataNode心跳与块汇报流程

DataNode心跳与块汇报流程

时间:2023-10-31 19:06:41浏览次数:26  
标签:node nodeinfo 流程 datanode DataNode 心跳 new null block

1. 背景

文件内容的变更往往意味着block信息的变更,在datanode中,变更的block会发送给namenode,namenode会更新block信息。本文将介绍datanode心跳流程和block块汇报流程。

2. DataNode心跳线程模型

在Hadoop Federation架构中,一般由一对Active/Standby NameNode为一组作为一个namespace,每个namenode有独属的block pool,block pool是该命名空间下block的集合。不同的namespace可以共用相同的datanode进行存储,在datanode中,每个namespace对于不同的目录,存储所属的block。如下所示,该集群有四个namespace,在每台dn中,就会创建四个对应的block pool目录:

Untitled.png

对于DataNode而言,会以block pool为单位和namenode汇报datanode信息。block pool由BlockPoolManager类管理。在启动DataNode时,会通过调用BlockPoolManager.doRefreshNamenodes在每台DataNode上,创建block pool/namespace对应的对象:BPOfferService。每个namespace对应一个BPOfferService,每个BPOfferService只负责向该Active/Standby NameNode发送心跳、块汇报信息。BPOfferService创建流程如下:

private void doRefreshNamenodes(
      Map<String, Map<String, InetSocketAddress>> addrMap,
      Map<String, Map<String, InetSocketAddress>> lifelineAddrMap)
      throws IOException {
    assert Thread.holdsLock(refreshNamenodesLock);
    
    Set<String> toRefresh = Sets.newLinkedHashSet();
    Set<String> toAdd = Sets.newLinkedHashSet();
    Set<String> toRemove;
    //获取dn中已经创建的nameserviceId,如果还有新的namespace没有创建,准备创建对应的BPOfferService
    synchronized (this) {
      // Step 1. For each of the new nameservices, figure out whether
      // it's an update of the set of NNs for an existing NS,
      // or an entirely new nameservice.
      for (String nameserviceId : addrMap.keySet()) {
        if (bpByNameserviceId.containsKey(nameserviceId)) {
          toRefresh.add(nameserviceId);
        } else {
          toAdd.add(nameserviceId);
        }
      }
      //如果nameserviceId不存在了,准备datanode中删掉对应的BPOfferService
      // Step 2. Any nameservices we currently have but are no longer present
      // need to be removed.
      toRemove = Sets.newHashSet(Sets.difference(
          bpByNameserviceId.keySet(), addrMap.keySet()));
      
   
      // Step 3. Start new nameservices
      if (!toAdd.isEmpty()) {
        LOG.info("Starting BPOfferServices for nameservices: " +
            Joiner.on(",").useForNull("<default>").join(toAdd));
      
        for (String nsToAdd : toAdd) {
          Map<String, InetSocketAddress> nnIdToAddr = addrMap.get(nsToAdd);
          Map<String, InetSocketAddress> nnIdToLifelineAddr =
              lifelineAddrMap.get(nsToAdd);
          ArrayList<InetSocketAddress> addrs =
              Lists.newArrayListWithCapacity(nnIdToAddr.size());
          ArrayList<InetSocketAddress> lifelineAddrs =
              Lists.newArrayListWithCapacity(nnIdToAddr.size());
          for (String nnId : nnIdToAddr.keySet()) {
            addrs.add(nnIdToAddr.get(nnId));
            lifelineAddrs.add(nnIdToLifelineAddr != null ?
                nnIdToLifelineAddr.get(nnId) : null);
          }
          //创建namespace对应的BPOfferService
          BPOfferService bpos = createBPOS(nsToAdd, addrs, lifelineAddrs);
          bpByNameserviceId.put(nsToAdd, bpos);
          offerServices.add(bpos);
        }
      }
      startAll();
    }

创建BPOfferService对象时,会记录Active/Standby NameNode,每个NameNode创建对应的BPServiceActor对象,实际上BPServiceActor会执行真正的心跳等操作:

BPOfferService(
      final String nameserviceId,
      List<InetSocketAddress> nnAddrs,
      List<InetSocketAddress> lifelineNnAddrs,
      DataNode dn) {
    Preconditions.checkArgument(!nnAddrs.isEmpty(),
        "Must pass at least one NN.");
    Preconditions.checkArgument(nnAddrs.size() == lifelineNnAddrs.size(),
        "Must pass same number of NN addresses and lifeline addresses.");
    this.nameserviceId = nameserviceId;
    this.dn = dn;
    //每个NameNode创建一个对应的BPServiceActor用于发送心跳等信息
    for (int i = 0; i < nnAddrs.size(); ++i) {
      this.bpServices.add(new BPServiceActor(nnAddrs.get(i),
          lifelineNnAddrs.get(i), this));
    }
  }

最终,执行BlockPoolManager.startAll方法,会启动每个BPOfferService中的所有BPServiceActor线程。要注意,不管是Active NameNode还是Standby NameNode,都会接受心跳信息,但是只有Active Namenode才能向DataNode发送相应指令:

synchronized void startAll() throws IOException {
    try {
      UserGroupInformation.getLoginUser().doAs(
          new PrivilegedExceptionAction<Object>() {
            @Override
            public Object run() throws Exception {
              for (BPOfferService bpos : offerServices) {
                //启动BPOfferService
                bpos.start();
              }
              return null;
            }
          });
    } catch (InterruptedException ex) {
      IOException ioe = new IOException();
      ioe.initCause(ex.getCause());
      throw ioe;
    }
  }

//启动BPServiceActor线程
class BPOfferService {
  void start() {
    for (BPServiceActor actor : bpServices) {
      actor.start();
    }
  }
}

3. DataNode心跳和块汇报流程

在BPServiceActor.run()线程中,会先与NameNode进行握手,再调用offerService方法进入心跳和块汇报流程:

connectToNNAndHandshake();

while (shouldRun()) {
        try {
          offerService();
        } catch (Exception ex) {
          LOG.error("Exception in BPOfferService for " + this, ex);
          sleepAndLogInterrupts(5000, "offering service");
        }
      }

offerService流程如下:

  1. scheduler.isHeartbeatDue计算当前时间是否可以开始进行心跳。
  2. scheduler.isBlockReportDue判断是否需要进行全量块汇报。
  3. sendHeartBeat发送心跳,并申请块汇报租约。
  4. sendIBRs发送增量块汇报。
  5. blockReport发送全量块汇报。
private void offerService() throws Exception {
    while (shouldRun()) {
      try {
        //当前时间
        final long startTime = scheduler.monotonicNow();

        //计算时间间隔,判断是否应该发送心跳
        final boolean sendHeartbeat = scheduler.isHeartbeatDue(startTime);
        HeartbeatResponse resp = null;
        if (sendHeartbeat) {
          //
          // All heartbeat messages include following info:
          // -- Datanode name
          // -- data transfer port
          // -- Total capacity
          // -- Bytes remaining
          //
          //fullBlockReportLeaseId为0表示当前BPServiceActor线程没有申请租约,如果要进行全量块汇报,就申请块汇报租约
          boolean requestBlockReportLease = (fullBlockReportLeaseId == 0) &&
                  scheduler.isBlockReportDue(startTime);
          if (!dn.areHeartbeatsDisabledForTests()) {
            //发送心跳
            resp = sendHeartBeat(requestBlockReportLease);
            assert resp != null;
            if (resp.getFullBlockReportLeaseId() != 0) {
              if (fullBlockReportLeaseId != 0) {
                LOG.warn(nnAddr + " sent back a full block report lease " +
                        "ID of 0x" +
                        Long.toHexString(resp.getFullBlockReportLeaseId()) +
                        ", but we already have a lease ID of 0x" +
                        Long.toHexString(fullBlockReportLeaseId) + ". " +
                        "Overwriting old lease ID.");
              }
              //获取块汇报租约ID
              fullBlockReportLeaseId = resp.getFullBlockReportLeaseId();
            }
            dn.getMetrics().addHeartbeat(scheduler.monotonicNow() - startTime);

            // If the state of this NN has changed (eg STANDBY->ACTIVE)
            // then let the BPOfferService update itself.
            //
            // Important that this happens before processCommand below,
            // since the first heartbeat to a new active might have commands
            // that we should actually process.
            bpos.updateActorStatesFromHeartbeat(
                this, resp.getNameNodeHaState());
            state = resp.getNameNodeHaState().getState();

            if (state == HAServiceState.ACTIVE) {
              handleRollingUpgradeStatus(resp);
            }

            long startProcessCommands = monotonicNow();
            if (!processCommand(resp.getCommands()))
              continue;
            long endProcessCommands = monotonicNow();
            if (endProcessCommands - startProcessCommands > 2000) {
              LOG.info("Took " + (endProcessCommands - startProcessCommands)
                  + "ms to process " + resp.getCommands().length
                  + " commands from NN");
            }
          }
        }
        //增量块汇报
        if (!dn.areIBRDisabledForTests() &&
            (ibrManager.sendImmediately()|| sendHeartbeat)) {
          ibrManager.sendIBRs(bpNamenode, bpRegistration,
              bpos.getBlockPoolId());
        }

        List<DatanodeCommand> cmds = null;
        boolean forceFullBr =
            scheduler.forceFullBlockReport.getAndSet(false);
        if (forceFullBr) {
          LOG.info("Forcing a full block report to " + nnAddr);
        }
        //全量块汇报
        if ((fullBlockReportLeaseId != 0) || forceFullBr) {
          cmds = blockReport(fullBlockReportLeaseId);
          fullBlockReportLeaseId = 0;
        }
        processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()]));

        if (!dn.areCacheReportsDisabledForTests()) {
          DatanodeCommand cmd = cacheReport();
          processCommand(new DatanodeCommand[]{ cmd });
        }

        if (sendHeartbeat) {
          dn.getMetrics().addHeartbeatTotal(
              scheduler.monotonicNow() - startTime);
        }

        // There is no work to do;  sleep until hearbeat timer elapses, 
        // or work arrives, and then iterate again.
        //阻塞,等待下一次心跳事件到,就唤醒线程
        ibrManager.waitTillNextIBR(scheduler.getHeartbeatWaitTime());
      } //省略
    } // while (shouldRun())
  } // offerServicedfs.datanode.lifeline.interval.seconds

心跳时间判断:scheduler.isHeartbeatDue

读取dfs.heartbeat.interval配置,默认为3s,设置为心跳时间间隔。即没过3s,就向namenode发送心跳:

boolean isHeartbeatDue(long startTime) {
      return (nextHeartbeatTime - startTime <= 0);
    }

long scheduleNextHeartbeat() {
      // Numerical overflow is possible here and is okay.
      nextHeartbeatTime = monotonicNow() + heartbeatIntervalMs;
      scheduleNextLifeline(nextHeartbeatTime);
      return nextHeartbeatTime;
    }

全量块汇报时间判断:scheduler.isBlockReportDue

读取dfs.blockreport.intervalMsec配置,获取全量块汇报间隔,默认为6h。其中,fullBlockReportLeaseId表表是全量块汇报时的租约ID,全量块汇报时租约由HDFS-7923引入,它为了防止NameNode在处理全量块汇报时处理的请求过多导致callQueue过大,造成服务阻塞。块汇报时租约设置租约数量(默认6个)和租约时长(5min)降低NameNode压力。不过当集群块数量多时,也会导致FBR速度变慢:

  • dfs.namenode.max.full.block.report.leases=6
  • dfs.namenode.full.block.report.lease.length.ms=5L * 60L * 1000L

当fullBlockReportLeaseId时,表示全量块汇报还未执行完:

boolean requestBlockReportLease = (fullBlockReportLeaseId == 0) &&
                  scheduler.isBlockReportDue(startTime);

void scheduleNextBlockReport() {
      // If we have sent the first set of block reports, then wait a random
      // time before we start the periodic block reports.
      if (resetBlockReportTime) {
        nextBlockReportTime = monotonicNow() +
            ThreadLocalRandom.current().nextInt((int)(blockReportIntervalMs));
}

sendHeartBeat发送心跳

如下,发送datanode基础容量等信息给namenode:

HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration,
    reports,
    dn.getFSDataset().getCacheCapacity(),
    dn.getFSDataset().getCacheUsed(),
    dn.getXmitsInProgress(),
    dn.getXceiverCount(),
    numFailedVolumes,
    volumeFailureSummary,
    requestBlockReportLease,
    slowPeers,
    slowDisks);

sendIBRs发送增量块汇报

sendIBRs方法中,调用IncrementalBlockReportManager.generateIBRs中,从pendingIBRs中获取待汇报的block:

private synchronized StorageReceivedDeletedBlocks[] generateIBRs() {
    final List<StorageReceivedDeletedBlocks> reports
        = new ArrayList<>(pendingIBRs.size());
    for (Map.Entry<DatanodeStorage, PerStorageIBR> entry
        : pendingIBRs.entrySet()) {
      final PerStorageIBR perStorage = entry.getValue();

        // Send newly-received and deleted blockids to namenode
      final ReceivedDeletedBlockInfo[] rdbi = perStorage.removeAll();
      if (rdbi != null) {
        reports.add(new StorageReceivedDeletedBlocks(entry.getKey(), rdbi));
      }
    }

pendingIBRs的block信息是在datanode完成删除block操作、正在接受block、接受完成block等操作后,准备发送给namenode的:

void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint,
      String storageUuid, boolean isOnTransientStorage) {
    notifyNamenodeBlock(block, BlockStatus.RECEIVED_BLOCK, delHint,
        storageUuid, isOnTransientStorage);
  }

  void notifyNamenodeReceivingBlock(ExtendedBlock block, String storageUuid) {
    notifyNamenodeBlock(block, BlockStatus.RECEIVING_BLOCK, null, storageUuid,
        false);
  }

  void notifyNamenodeDeletedBlock(ExtendedBlock block, String storageUuid) {
    notifyNamenodeBlock(block, BlockStatus.DELETED_BLOCK, null, storageUuid,
        false);
  }

最终调用IncrementalBlockReportManager.addRDBI将块信息放到pendingIBRs中:

synchronized void addRDBI(ReceivedDeletedBlockInfo rdbi,
      DatanodeStorage storage) {
    // Make sure another entry for the same block is first removed.
    // There may only be one such entry.
    for (PerStorageIBR perStorage : pendingIBRs.values()) {
      if (perStorage.remove(rdbi.getBlock()) != null) {
        break;
      }
    }
    getPerStorageIBR(storage).put(rdbi);
  }

它读取blockreport.incremental.intervalMsec配置,默认为10min,为增量块汇报的间隔。

blockReport发送全量块汇报

DatanodeStorage可以理解为datanode的单个磁盘,扫描每个磁盘下的block文件,读取dfs.blockreport.split.threshold配置,默认为10w。每次发送10w个block信息给namenode:

List<DatanodeCommand> blockReport(long fullBrLeaseId) throws IOException {
    final ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>();

    
    StorageBlockReport reports[] =
        new StorageBlockReport[perVolumeBlockLists.size()];
    //
    for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) {
      BlockListAsLongs blockList = kvPair.getValue();
      reports[i++] = new StorageBlockReport(kvPair.getKey(), blockList);
      totalBlockCount += blockList.getNumberOfBlocks();
    }

      if (totalBlockCount < dnConf.blockReportSplitThreshold) {
        // Below split threshold, send all reports in a single message.
        DatanodeCommand cmd = bpNamenode.blockReport(
            bpRegistration, bpos.getBlockPoolId(), reports,
            new BlockReportContext(1, 0, reportId, fullBrLeaseId));
        blockReportSizes.add(
            calculateBlockReportPBSize(useBlocksBuffer, reports));
        numRPCs = 1;
        numReportsSent = reports.length;
        if (cmd != null) {
          cmds.add(cmd);
        }
      } else {
        // Send one block report per message.
        for (int r = 0; r < reports.length; r++) {
          StorageBlockReport singleReport[] = { reports[r] };
          //每次发送10w个block给namenode
          DatanodeCommand cmd = bpNamenode.blockReport(
              bpRegistration, bpos.getBlockPoolId(), singleReport,
              new BlockReportContext(reports.length, r, reportId,
                  fullBrLeaseId));
          blockReportSizes.add(
              calculateBlockReportPBSize(useBlocksBuffer, singleReport));
          numReportsSent++;
          numRPCs++;
          if (cmd != null) {
            cmds.add(cmd);
          }
        }
      }
      success = true;
    //省略
  }

另外,BPServiceActor还会启动健康检测线程,用于向namenode汇报自己的健康存活信息。它读取dfs.datanode.lifeline.interval.seconds配置,默认是3*dfs.heartbeat.interval=9s,每9s向namenode发送健康状态:

while (shouldRun()) {
        try {
          if (lifelineNamenode == null) {
            lifelineNamenode = dn.connectToLifelineNN(lifelineNnAddr);
          }
          sendLifelineIfDue();
          Thread.sleep(scheduler.getLifelineWaitTime());
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
        } catch (IOException e) {
          LOG.warn("IOException in LifelineSender for " + BPServiceActor.this,
              e);
        }
      }

4. NameNode处理心跳和块汇报流程

NameNode在在处理DataNode的请求时,最终调用DatanodeProtocol的registerDatanode、sendHeartbeat、blockReceivedAndDeleted、blockReport方法,它的实现是NameNodeRpcServer。

4.1 registerDatanode

registerDatanode负责将该datanode节点放入到网络拓扑中,最终执行HeartbeatManager.addDatanode方法:

/** Add a datanode. */
  void addDatanode(final DatanodeDescriptor node) {
    // To keep host2DatanodeMap consistent with datanodeMap,
    // remove  from host2DatanodeMap the datanodeDescriptor removed
    // from datanodeMap before adding node to host2DatanodeMap.
    
    //加到datanodeMap和 host2DatanodeMap中
    synchronized(datanodeMap) {
      host2DatanodeMap.remove(datanodeMap.put(node.getDatanodeUuid(), node));
    }

    //加到网络中
    networktopology.add(node); // may throw InvalidTopologyException
    host2DatanodeMap.add(node);
    checkIfClusterIsNowMultiRack(node);

    if (LOG.isDebugEnabled()) {
      LOG.debug(getClass().getSimpleName() + ".addDatanode: "
          + "node " + node + " is added to datanodeMap.");
    }
  }

4.2 sendHeartbeat

该方法最终执行DatanodeManager.handleHeartbeat()方法。心跳的具体流程如下:

  1. 先获取datanode的信息,判断是否允许连接(比如在exclude中),如果不允许的话,直接抛出异常。
  2. 判断是否注册过,如果没注册过,直接返回注册命令。
  3. 更新datanode的信息,主要就是更新DatanodeDescriptor中的信息,如使用空间,剩余空间等。
  4. 检查是否处于安全模式。
  5. 检查租约情况。
  6. 生成复制的命令。
  7. 生成删除的命令。
  8. 生成缓存相关的命令。
  9. 生成带宽相关的命令。
  10. 返回所有的命令。
/** Handle heartbeat from datanodes. */
  public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
      StorageReport[] reports, final String blockPoolId,
      long cacheCapacity, long cacheUsed, int xceiverCount, 
      int maxTransfers, int failedVolumes,
      VolumeFailureSummary volumeFailureSummary) throws IOException {
    synchronized (heartbeatManager) {
      synchronized (datanodeMap) {
        DatanodeDescriptor nodeinfo = null;
        try {
          //获取datanode的信息
          nodeinfo = getDatanode(nodeReg);
        } catch(UnregisteredNodeException e) {
          return new DatanodeCommand[]{RegisterCommand.REGISTER};
        }
        //是否允许连接
        // Check if this datanode should actually be shutdown instead. 
        if (nodeinfo != null && nodeinfo.isDisallowed()) {
          setDatanodeDead(nodeinfo);
          throw new DisallowedDatanodeException(nodeinfo);
        }

        //检查是否注册过
        if (nodeinfo == null || !nodeinfo.isRegistered()) {
          return new DatanodeCommand[]{RegisterCommand.REGISTER};
        }

        //更新datanode的信息,如使用空间,剩余空间等
        heartbeatManager.updateHeartbeat(nodeinfo, reports,
                                         cacheCapacity, cacheUsed,
                                         xceiverCount, failedVolumes,
                                         volumeFailureSummary);

        //是否处于安全模式
        // If we are in safemode, do not send back any recovery / replication
        // requests. Don't even drain the existing queue of work.
        if(namesystem.isInSafeMode()) {
          return new DatanodeCommand[0];
        }

         //检查租约情况
        //check lease recovery
        BlockInfoContiguousUnderConstruction[] blocks = nodeinfo
            .getLeaseRecoveryCommand(Integer.MAX_VALUE);
        if (blocks != null) {
          BlockRecoveryCommand brCommand = new BlockRecoveryCommand(
              blocks.length);
           .................................
          return new DatanodeCommand[] { brCommand };
        }

       
        final List<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>();
        //生成复制命令
        //check pending replication
        List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
              maxTransfers);
        if (pendingList != null) {
          cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
              pendingList));
        }
        //检查无效的数据块,生成删除命令
        //check block invalidation
        Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
        if (blks != null) {
          cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE,
              blockPoolId, blks));
        }
        //生成缓存相关的命令
        boolean sendingCachingCommands = false;
        long nowMs = monotonicNow();
        if (shouldSendCachingCommands && 
            ((nowMs - nodeinfo.getLastCachingDirectiveSentTimeMs()) >=
                timeBetweenResendingCachingDirectivesMs)) {
          DatanodeCommand pendingCacheCommand =
              getCacheCommand(nodeinfo.getPendingCached(), nodeinfo,
                DatanodeProtocol.DNA_CACHE, blockPoolId);
          if (pendingCacheCommand != null) {
            cmds.add(pendingCacheCommand);
            sendingCachingCommands = true;
          }
          DatanodeCommand pendingUncacheCommand =
              getCacheCommand(nodeinfo.getPendingUncached(), nodeinfo,
                DatanodeProtocol.DNA_UNCACHE, blockPoolId);
          if (pendingUncacheCommand != null) {
            cmds.add(pendingUncacheCommand);
            sendingCachingCommands = true;
          }
          if (sendingCachingCommands) {
            nodeinfo.setLastCachingDirectiveSentTimeMs(nowMs);
          }
        }

        blockManager.addKeyUpdateCommand(cmds, nodeinfo);
         //生成带宽相关的命令
        // check for balancer bandwidth update
        if (nodeinfo.getBalancerBandwidth() > 0) {
          cmds.add(new BalancerBandwidthCommand(nodeinfo.getBalancerBandwidth()));
          // set back to 0 to indicate that datanode has been sent the new value
          nodeinfo.setBalancerBandwidth(0);
        }

        //返回所有的命令
        if (!cmds.isEmpty()) {
          return cmds.toArray(new DatanodeCommand[cmds.size()]);
        }
      }
    }

    return new DatanodeCommand[0];
  }

4.3 blockReceivedAndDeleted增量块汇报

NameNodeRpcServer.blockReceivedAndDeleted方法将块增量处理操作封装成为线程放入BlockManager的queue中:

public void blockReceivedAndDeleted(final DatanodeRegistration nodeReg,
      String poolId, StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks)
          throws IOException {
   
    final BlockManager bm = namesystem.getBlockManager();
    for (final StorageReceivedDeletedBlocks r : receivedAndDeletedBlocks) {
      bm.enqueueBlockOp(new Runnable() {
        @Override
        public void run() {
          try {
            namesystem.processIncrementalBlockReport(nodeReg, r);
          } catch (Exception ex) {
            // usually because the node is unregistered/dead.  next heartbeat
            // will correct the problem
            blockStateChangeLog.error(
                "*BLOCK* NameNode.blockReceivedAndDeleted: "
                    + "failed from " + nodeReg + ": " + ex.getMessage());
          }
        }
      });
    }
  }

BlockManager中有BlockReportProcessingThread线程,该线程会不断从queue中获取线程并异步运行。

FSNamesystem.processIncrementalBlockReport()中,根据块不同的状态,NameNode进行不同的逻辑处理:

for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) {
      switch (rdbi.getStatus()) {
      case DELETED_BLOCK:
        removeStoredBlock(storageInfo, rdbi.getBlock(), node);
        deleted++;
        break;
      case RECEIVED_BLOCK:
        addBlock(storageInfo, rdbi.getBlock(), rdbi.getDelHints());
        received++;
        break;
      case RECEIVING_BLOCK:
        receiving++;
        processAndHandleReportedBlock(storageInfo, rdbi.getBlock(),
                                      ReplicaState.RBW, null);
        break;
      default:
        String msg = 
          "Unknown block status code reported by " + node +
          ": " + rdbi;
        blockLog.warn(msg);
        assert false : msg; // if assertions are enabled, throw.
        break;
      }

以removeStoredBlock为例,它负责在blocksMap中删掉对应块信息:

// 从blocksMap中移除块->元数据,块->datanode映射的信息。
      if (storedBlock == null || !blocksMap.removeNode(storedBlock, node)) {
        blockLog.debug("BLOCK* removeStoredBlock: {} has already been" +
            " removed from node {}", storedBlock, node);
        return;
      }

4.4 processReport全量块汇报

最终执行BlockManager.processReport处理全量块信息。通过reportDiff找到待添加的块,待删除的块,无效块,坏块,后续依次处理。

Collection<Block> processReport(
      final DatanodeStorageInfo storageInfo,
      final BlockListAsLongs report) throws IOException {
    // Normal case:
    // Modify the (block-->datanode) map, according to the difference
    // between the old and new block report.
    //
    Collection<BlockInfoToAdd> toAdd = new ArrayList<>();
    Collection<BlockInfo> toRemove = new HashSet<>();
    Collection<Block> toInvalidate = new ArrayList<>();
    Collection<BlockToMarkCorrupt> toCorrupt = new ArrayList<>();
    Collection<StatefulBlockInfo> toUC = new ArrayList<>();
    //对比,记录待添加的块,待删除的块,无效块,坏块,后续进行相应处理
    reportDiff(storageInfo, report,
                 toAdd, toRemove, toInvalidate, toCorrupt, toUC);

    DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
    // Process the blocks on each queue
    //UnderConstruction块记录
    for (StatefulBlockInfo b : toUC) { 
      addStoredBlockUnderConstruction(b, storageInfo);
    }
    //删除块
    for (BlockInfo b : toRemove) {
      removeStoredBlock(b, node);
    }
    int numBlocksLogged = 0;
    for (BlockInfoToAdd b : toAdd) {
      //增加块
      addStoredBlock(b.stored, b.reported, storageInfo, null,
          numBlocksLogged < maxNumBlocksToLog);
      numBlocksLogged++;
    }
    if (numBlocksLogged > maxNumBlocksToLog) {
      blockLog.info("BLOCK* processReport: logged info for {} of {} " +
          "reported.", maxNumBlocksToLog, numBlocksLogged);
    }
    for (Block b : toInvalidate) {
      //无效块
      addToInvalidates(b, node);
    }
    for (BlockToMarkCorrupt b : toCorrupt) {
      //损坏块
      markBlockAsCorrupt(b, storageInfo, node);
    }

    return toInvalidate;
  }

标签:node,nodeinfo,流程,datanode,DataNode,心跳,new,null,block
From: https://blog.51cto.com/u_15327484/8112749

相关文章

  • 用友助力企事业单位践行电子凭证会计标准应用,实现电子凭证全流程数字化,绿色化,低碳化!
    数字经济时代,电子形式的税务发票、财政票据、铁路客票、航空客票、银行回单和对账单等各类电子凭证应用范围不断扩大,给会计工作,社会大众带来了便利,但由于电子凭证种类众多,数据标准不统一,会计信息系统无法自动识别、解析结构化数据,仍然存在电子凭证接收难、入账难、归账难等现象。为......
  • 基于jeecg-boot的flowable流程加签功能实现
      更多nbcio-boot功能请看演示系统gitee源代码地址在线演示(包括H5):http://122.227.135.243:9888   今天我们实现nbcio-boot的flowable的流程加签功能。一、加签的几个概念1、向前加签任务在A这里,A这个时候需要B核对一下,等B核对之后又回到A这里,这时A才能继续自......
  • 基于jeecg-boot的flowable流程收回功能实现(全网首创功能)
    更多nbcio-boot功能请看演示系统gitee源代码地址在线演示(包括H5):http://122.227.135.243:9888       对于之前的flowable流程,之前有撤回,拒绝,退回等功能,但都不能满足发起人对于流程收回的功能,发起人收回后可以重新进行流程发起,同时能够支持自定义业务的收回功能。 ......
  • CRM系统:全流程精细化运营让销售环节高速运转起来
     效率是每一家企业都十分关注的重点。要想提高工作效率就要鄙弃粗放的管理模式,采用CRM客户管理系统助力企业全流程精细化运营,让销售环节高速运转起来。全流程精细化运营从哪些方面出发?每一家企业的内部流程都不尽相同,客户管理系统可以根据不同的业务特征自定义不同的页面布局......
  • App支付报错"商家订单参数异常,请重新发起付款"排查流程
     今天在对接支付宝APP支付的时候遇到了一个报错,记录下问题的排查过程~  报错过程APP中弹窗提示的报错“商家订单参数异常,请重新发起付款”,检查了下参数感觉没啥问题,不知道是啥问题导致的。 去官网搜了下,折腾排查了一遍,发现是环境问题,没有切到沙箱环境导致的(*/......
  • HDFS读流程分析
    1.背景在https://blog.51cto.com/u_15327484/8023493、https://blog.51cto.com/u_15327484/8089923和https://blog.51cto.com/u_15327484/8095971三篇文章中,介绍了HDFS写文件在client、NameNode、DataNode组件侧的行为逻辑。对于HDFS读文件流程来说相对简单:获取HDFS文件起始的......
  • Java流程控制_01分支结构
    1. if分支  2.switch分支switch可以快速找到某个值,不用一个一个找(底层代码)  ......
  • HDFS写流程分析:Namenode接收client请求
    1.背景在https://blog.51cto.com/u_15327484/8023493文章中,介绍了HDFS创建文件时,客户端执行的操作。对于NameNode而言,在创建文件的过程中,它会接受客户端以下rpc请求:createaddBlockcomplete本文将详细介绍这三个RPC在NameNode端的处理流程,同时扩展介绍Namenode相关架构。2.......
  • 聊城专利申请的流程几个步骤
    聊城专利申请的流程几个步骤恒标知产刘经理发明创造:首先需要确立一个新的发明或创造性的设计。这个发明或设计应该具有实用性和创新性,且在技术上是可行的。搜集资料:在进行专利申请之前,你需要对相关技术领域进行充分的调研和搜集资料,以确保你的发明或设计是新颖的,并且在技术上与已有......
  • zookeeper源码(03)启动流程
    本文将从启动类开始详细分析zookeeper的启动流程:加载配置的过程集群启动过程单机版启动过程启动类org.apache.zookeeper.server.quorum.QuorumPeerMain类。用于启动zookeeper服务,第一个参数用来指定配置文件,配置文件properties格式,例如以下配置参数:dataDir-数据存储目......