首页 > 其他分享 >Apache Ratis在Alluxio中应用

Apache Ratis在Alluxio中应用

时间:2023-11-09 23:00:54浏览次数:34  
标签:return LOG request Alluxio Ratis Raft Apache entry final

1. 背景

在alluxio1.8中,alluxio master只支持单节点部署,一旦挂掉,整个集群将不可用。alluxio 2.x后,提供了高可用方案:Alluxio组件中嵌入Apache Ratis代码,由Ratis负责选举leader,Alluxio的各个master在同步edit log时,由Ratis提供edit log的一致性传输。

Ratis服务基于Raft共识算法,该算法保证分布式集群中只有一个leader master对外提供服务,其他standby master在leader master退出时再竞争成为master。

本文会先后介绍Raft算法框架,不会详解琢磨细节,同时介绍Raft在Apache Ratis中的应用,最后介绍Alluxio中使用Apache Ratis进行选举和更新edit log流程。

2. Raft算法介绍

Raft算法起始于一篇论文:https://raft.github.io/raft.pdf,它有一个比较好的中文解读:

http://arthurchiao.art/blog/raft-paper-zh/#71-snapshot。有另外一种共识算法PAXOS,它比较复杂,难以实现,因此Apache Ratis项目选择基于Raft算法进行实现。

Raft算法是一致性算法,它主要负责两大功能:

  1. 在奇数个master中选举出来一个master作为leader。
  2. leader master接受客户端op操作,将op信息发送给standby master中保存。

2.1 Raft选举流程

在实现Raft算法的集群中,每个节点会有三种状态,出现不同的时间后,状态会进行转换。这种转换过程就是状态机:

Untitled.png

上述状态机表达的意思如下所示:

  1. Follower 只会响应来自其他节点的请求;如果一个 follower 某段时间内收不到 leader 的请求,它会变成一个 candidate 然后发起一轮选举。
  2. 获得大多数选票的 candidate 将成为新的 leader,其他candidate变成follower。
  3. 通常情况下,除非发生故障,否则在任的 leader 会持续担任下去。

在选举过程中,每个master都有一个任期(item),任期大的master会获得投票,超过半数票就转为leader。重新选举时,每个master都会增大iterm,以增加选举leader的成功率。

2.2 Raft日志传输流程

在分布式系统中,每次变更都会由leader发送给follower。每个变更都以log日志的方式持久化存储到各个master机器中。在Raft算法中,为了保证系统的性能,只要半数以上的节点中log一致,同步操作才算结束。

Raft算法会将log应用到内存中,使得master切换时,内存中已经存在大部分数据,减少切换时间。状态机会定义log应用到内存的接口。

Raft日志传输流程如下所示:

  1. 客户端向Leader发送请求,变更分布式系统中存储的值。
  2. Leader的共识模块负责将变更操作持久化到本地磁盘,发送给follower机器持久化到磁盘中。
  3. 当半数以上持久化操作成功后,leader将log应用到状态机中。
  4. 将结果返回给用户。

Untitled 1.png

每个master中都会保存每个log文件,为了避免文件数不断膨胀,follower master会定期从内存中生成snapshot保存到磁盘上,snapshot中有最新的事务ID,删除log文件列表中小于该事务ID的文件。

由于生成snapshot时,状态机无法更新,leader master一般不会进行snapshot,除非手动操作。正常情况下,leader都是从其他follower机器中下载snapshot文件的。

3. Raft算法和HDFS高可用区别

  1. HDFS高可用基于zookeeper的临时节点记录,而Raft算法则是每个master间直接进行选举。
  2. follower既接收log文件,又负责checkpoint,还负责将snapshot发送给leader,它的作用类似于Journal Node+Standby NameNode。Raft中的快照等价于HDFS中的fsimage,Raft中的log等价于HDFS中的edit log。

3. Apache Ratis介绍

3.1 Ratis选举流程

Ratis项目中,定义了RaftServerProtocol,用于定义RaftServer间投票的rpc接口:

public interface RaftServerProtocol {
  enum Op {REQUEST_VOTE, APPEND_ENTRIES, INSTALL_SNAPSHOT}

  RequestVoteReplyProto requestVote(RequestVoteRequestProto request) throws IOException;

  AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto request) throws IOException;

  InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto request) throws IOException;

  StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequestProto request) throws IOException;
}

RaftServerImpl类实现了该接口,当RaftServer接收到startLeaderElection时,开始选举:

class RaftServerImpl implements RaftServer.Division,
    RaftServerProtocol, RaftServerAsynchronousProtocol,
    RaftClientProtocol, RaftClientAsynchronousProtocol{

  public StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequestProto request) throws IOException {
    final RaftRpcRequestProto r = request.getServerRequest();
    final RaftPeerId leaderId = RaftPeerId.valueOf(r.getRequestorId());
    final RaftGroupId leaderGroupId = ProtoUtils.toRaftGroupId(r.getRaftGroupId());
    final TermIndex leaderLastEntry = TermIndex.valueOf(request.getLeaderLastEntry());

    CodeInjectionForTesting.execute(START_LEADER_ELECTION, getId(), leaderId, request);

    //省略
      //开始选举
      changeToCandidate(true);
      return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), true);
    }
  }
}

后续到RoleInfo.startLeaderElection启动LeaderElection线程:

void startLeaderElection(RaftServerImpl server, boolean force) {
    if (pauseLeaderElection.get()) {
      return;
    }
    updateAndGet(leaderElection, new LeaderElection(server, force)).start();
  }

选举时,先后调用askForVotes方法进行预投票和投票:

public void run() {
    //省略

    final Timer.Context electionContext = server.getLeaderElectionMetrics().getLeaderElectionTimer().time();
    try {
      if (skipPreVote || askForVotes(Phase.PRE_VOTE)) {
        if (askForVotes(Phase.ELECTION)) {
          server.changeToLeader();
        }
      }
    } //省略
  }

在LeaderElection.askForVotes中,如果状态为candidate,就无限进行投票:

private boolean askForVotes(Phase phase) throws InterruptedException, IOException {
    //shouldRun表示如果当前节点正在运行,并且状态为candidate,并且存活,就不断循环遍历投票
    for(int round = 0; shouldRun(); round++) {
      final long electionTerm;
      final RaftConfigurationImpl conf;
      synchronized (server) {
        if (!shouldRun()) {
          return false;
        }
        final ConfAndTerm confAndTerm = server.getState().initElection(phase);
        electionTerm = confAndTerm.getTerm();
        conf = confAndTerm.getConf();
      }

      LOG.info("{} {} round {}: submit vote requests at term {} for {}", this, phase, round, electionTerm, conf);
      //开始投票
      final ResultAndTerm r = submitRequestAndWaitResult(phase, conf, electionTerm);
      LOG.info("{} {} round {}: result {}", this, phase, round, r);
      //省略
  }

最后,在LeaderElection.submitRequests方法中,执行requestVote rpc请求,向其他节点开始投票:

private int submitRequests(Phase phase, long electionTerm, TermIndex lastEntry,
      Collection<RaftPeer> others, Executor voteExecutor) {
    int submitted = 0;
    for (final RaftPeer peer : others) {
      final RequestVoteRequestProto r = ServerProtoUtils.toRequestVoteRequestProto(
          server.getMemberId(), peer.getId(), electionTerm, lastEntry, phase == Phase.PRE_VOTE);
      //开始投票
      voteExecutor.submit(() -> server.getServerRpc().requestVote(r));
      submitted++;
    }
    return submitted;
  }

3.2 Ratis日志同步流程

RaftClientProtocol接口submitClientRequest方法定义客户端提交请求:

public interface RaftClientProtocol {
  RaftClientReply submitClientRequest(RaftClientRequest request) throws IOException;
}

RaftServerImpl.submitClientRequest实现了该方法,它中间会调用appendTransaction开始执行append操作:

private CompletableFuture<RaftClientReply> appendTransaction(
      RaftClientRequest request, TransactionContext context, CacheEntry cacheEntry) throws IOException {
    assertLifeCycleState(LifeCycle.States.RUNNING);
    CompletableFuture<RaftClientReply> reply;

    final PendingRequest pending;
    synchronized (this) {
      reply = checkLeaderState(request, cacheEntry, true);
      if (reply != null) {
        return reply;
      }

      // append the message to its local log
      final LeaderStateImpl leaderState = role.getLeaderStateNonNull();
      final PendingRequests.Permit permit = leaderState.tryAcquirePendingRequest(request.getMessage());
      //省略
      try {
        //持久化到本地
        state.appendLog(context);
      } //省略

      // put the request into the pending queue
      //构建appendEntry请求
      pending = leaderState.addPendingRequest(permit, request, context);
      if (pending == null) {
        cacheEntry.failWithException(new ResourceUnavailableException(
            getMemberId() + ": Failed to add a pending write request for " + request));
        return cacheEntry.getReplyFuture();
      }
      //发送appendEntry请求给candidate
      leaderState.notifySenders();
    }
    return pending.getFuture();
  }

本地持久化

ServerState.appendLog开始进行本地持久化,最终调用SegmentedRaftLog.appendEntryImpl方法落盘:

protected CompletableFuture<Long> appendEntryImpl(LogEntryProto entry) {
    final Timer.Context context = getRaftLogMetrics().getRaftLogAppendEntryTimer().time();
    checkLogState();
    if (LOG.isTraceEnabled()) {
      LOG.trace("{}: appendEntry {}", getName(), LogProtoUtils.toLogEntryString(entry));
    }
    try(AutoCloseableLock writeLock = writeLock()) {
      validateLogEntry(entry);
      final LogSegment currentOpenSegment = cache.getOpenSegment();
      if (currentOpenSegment == null) {
        //写到cache和log文件中
        cache.addOpenSegment(entry.getIndex());
        fileLogWorker.startLogSegment(entry.getIndex());
      } else if (isSegmentFull(currentOpenSegment, entry)) {
        cache.rollOpenSegment(true);
        fileLogWorker.rollLogSegment(currentOpenSegment);
      } else if (currentOpenSegment.numOfEntries() > 0 &&
          currentOpenSegment.getLastTermIndex().getTerm() != entry.getTerm()) {
        // the term changes
        final long currentTerm = currentOpenSegment.getLastTermIndex().getTerm();
        Preconditions.assertTrue(currentTerm < entry.getTerm(),
            "open segment's term %s is larger than the new entry's term %s",
            currentTerm, entry.getTerm());
        cache.rollOpenSegment(true);
        fileLogWorker.rollLogSegment(currentOpenSegment);
      }

      //TODO(runzhiwang): If there is performance problem, start a daemon thread to checkAndEvictCache
      checkAndEvictCache();

      // If the entry has state machine data, then the entry should be inserted
      // to statemachine first and then to the cache. Not following the order
      // will leave a spurious entry in the cache.
      CompletableFuture<Long> writeFuture =
          fileLogWorker.writeLogEntry(entry).getFuture();
      if (stateMachineCachingEnabled) {
        // The stateMachineData will be cached inside the StateMachine itself.
        cache.appendEntry(LogProtoUtils.removeStateMachineData(entry),
            LogSegment.Op.WRITE_CACHE_WITH_STATE_MACHINE_CACHE);
      } else {
        cache.appendEntry(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
      }
      return writeFuture;
    } catch (Exception e) {
      LOG.error("{}: Failed to append {}", getName(), LogProtoUtils.toLogEntryString(entry), e);
      throw e;
    } finally {
      context.stop();
    }
  }

发送给candidate

本地落盘执行完后,LeaderStateImpl.addPendingRequest开始构建请求,准备发送给candidate。构建完后,执行notifySenders通知LogAppender发送给candidate:

PendingRequest addPendingRequest(PendingRequests.Permit permit, RaftClientRequest request, TransactionContext entry) {
    if (LOG.isDebugEnabled()) {
      LOG.debug("{}: addPendingRequest at {}, entry={}", this, request,
          LogProtoUtils.toLogEntryString(entry.getLogEntry()));
    }
    return pendingRequests.add(permit, request, entry);
  }

void notifySenders() {
    senders.forEach(LogAppender::notifyLogAppender);
  }

LogAppenderDefault.run方法发送entry:

public void run() throws InterruptedException, IOException {
    while (isRunning()) {
      //省略
          //发送Entry
          final AppendEntriesReplyProto r = sendAppendEntriesWithRetries();
          if (r != null) {
            handleReply(r);
          }
        }
      }
      if (isRunning() && !hasAppendEntries()) {
        getEventAwaitForSignal().await(getHeartbeatWaitTimeMs(), TimeUnit.MILLISECONDS);
      }
      getLeaderState().checkHealth(getFollower());
    }
  }

sendAppendEntriesWithRetries开始发送请求:

private AppendEntriesReplyProto sendAppendEntriesWithRetries()
      throws InterruptedException, InterruptedIOException, RaftLogIOException {
    int retry = 0;
    AppendEntriesRequestProto request = null;
    while (isRunning()) { // keep retrying for IOException
      //省略
        final AppendEntriesReplyProto r = getServerRpc().appendEntries(request);
        //省略
  }

它最终执行的是RaftServerProtocol接口中的appendEntries方法,用于向其他candidate发送appendEntry请求:

public interface RaftServerProtocol {
  enum Op {REQUEST_VOTE, APPEND_ENTRIES, INSTALL_SNAPSHOT}

  RequestVoteReplyProto requestVote(RequestVoteRequestProto request) throws IOException;

  AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto request) throws IOException;

  InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto request) throws IOException;

  StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequestProto request) throws IOException;
}

更新statemachine

candidate更新成功后,leader执行onFollowerSuccessAppendEntries准备更新状态机:

public void onFollowerSuccessAppendEntries(FollowerInfo follower) {
    if (isAttendingVote(follower)) {
      submitUpdateCommitEvent();
    } else {
      eventQueue.submit(checkStagingEvent);
    }
  }

半数以上更新成功,开始updateCommitIndex:

private void updateCommit(long majority, long min) {
    final long oldLastCommitted = raftLog.getLastCommittedIndex();
    if (majority > oldLastCommitted) {
      // Get the headers before updating commit index since the log can be purged after a snapshot
      final LogEntryHeader[] entriesToCommit = raftLog.getEntries(oldLastCommitted + 1, majority + 1);

      if (server.getState().updateCommitIndex(majority, currentTerm, true)) {
        updateCommit(entriesToCommit);
      }
    }
    watchRequests.update(ReplicationLevel.ALL, min);
  }

updateCommitIndex通知stateMachineUpdater处理log:

boolean updateCommitIndex(long majorityIndex, long curTerm, boolean isLeader) {
    if (log.updateCommitIndex(majorityIndex, curTerm, isLeader)) {
      stateMachineUpdater.notifyUpdater();
      return true;
    }
    return false;
  }

StateMachineUpdater线程死循环,等待signal信息,获取log中的entry并应用到statemachine中:

public void run() {
    for(; state != State.STOP; ) {
      try {
        waitForCommit();

        if (state == State.RELOAD) {
          reload();
        }
        //将log应用到状态机中
        final MemoizedSupplier<List<CompletableFuture<Message>>> futures = applyLog();
        checkAndTakeSnapshot(futures);

        if (shouldStop()) {
          checkAndTakeSnapshot(futures);
          stop();
        }
      } catch (Throwable t) {
        if (t instanceof InterruptedException && state == State.STOP) {
          LOG.info("{} was interrupted.  Exiting ...", this);
        } else {
          state = State.EXCEPTION;
          LOG.error(this + " caught a Throwable.", t);
          server.close();
        }
      }
    }
  }

takeSnapshot方法中,follower可以直接创建snapshot,但是master只能通过命令手动建snapshot,否则就从follower下载snapshot:

public long takeSnapshot() {
    long index;
    StateLockManager stateLockManager = mStateLockManagerRef.get();
    if (!mIsLeader) {
      //follower直接创建snapshot
      index = takeLocalSnapshot(false);
    } else if (stateLockManager != null) {
      // the leader has been allowed to take a local snapshot by being given a non-null
      // StateLockManager through the #allowLeaderSnapshots method
      try (LockResource stateLock = stateLockManager.lockExclusive(StateLockOptions.defaults())) {
        //通过命令强制构建snapshot
        index = takeLocalSnapshot(true);
      } catch (Exception e) {
        return RaftLog.INVALID_LOG_INDEX;
      }
    } else {
      RaftGroup group;
      try (LockResource ignored = new LockResource(mGroupLock)) {
        if (mServerClosing) {
          return RaftLog.INVALID_LOG_INDEX;
        }
        // These calls are protected by mGroupLock and mServerClosing
        // as they will access the lock in RaftServerProxy.java
        // which is also accessed during raft server shutdown which
        // can cause a deadlock as the shutdown takes the lock while
        // waiting for this thread to finish
        Preconditions.checkState(mServer.getGroups().iterator().hasNext());
        group = mServer.getGroups().iterator().next();
      } catch (IOException e) {
        SAMPLING_LOG.warn("Failed to get raft group info: {}", e.getMessage());
        return RaftLog.INVALID_LOG_INDEX;
      }
      if (group.getPeers().size() < 2) {
        SAMPLING_LOG.warn("No follower to perform delegated snapshot. Please add more masters to "
            + "the quorum or manually take snapshot using 'alluxio fsadmin journal checkpoint'");
        return RaftLog.INVALID_LOG_INDEX;
      } else {
        //从其他Follower中获取snapshot
        index = mSnapshotManager.maybeCopySnapshotFromFollower();
      }
    }
    // update metrics if took a snapshot
    if (index != RaftLog.INVALID_LOG_INDEX) {
      mSnapshotLastIndex = index;
      mLastCheckPointTime = System.currentTimeMillis();
    }
    return index;
  }

Ratis会判断是否需要创建snapshot,每隔40w条entry就会创建一次snapshot:

private boolean shouldTakeSnapshot() {
    if (autoSnapshotThreshold == null) {
      return false;
    } else if (shouldStop()) {
      return getLastAppliedIndex() - snapshotIndex.get() > 0;
    }
    return state == State.RUNNING && getLastAppliedIndex() - snapshotIndex.get() >= autoSnapshotThreshold;
  }

4. Apache Ratis在Alluxio中的应用

4.1 Raft选举流程

Alluxio Master启动时,启动JournalSystem线程,实际上是RaftJournalSystem类:

public class AlluxioMasterProcess extends MasterProcess {
  protected final JournalSystem mJournalSystem;
  public void start() throws Exception {
    mJournalSystem.start();
  }
}

后续执行RaftJournalSystem.startInternal方法,joinQuorum开始选举:

private RaftServer mServer;

public synchronized void startInternal() {
    LOG.info("Initializing Raft Journal System");
    mPeerId = RaftJournalUtils.getPeerId(mLocalAddress);
    Set<RaftPeer> peers = mClusterAddresses.stream()
        .map(addr -> RaftPeer.newBuilder()
                .setId(RaftJournalUtils.getPeerId(addr))
                .setAddress(addr)
                .build()
        )
        .collect(Collectors.toSet());
    mRaftGroup = RaftGroup.valueOf(RAFT_GROUP_ID, peers);
    LOG.info("Starting Raft journal system. Cluster addresses: {}. Local address: {}",
        mClusterAddresses, mLocalAddress);
    try {
      initServer();
      long startTime = System.currentTimeMillis();
      mServer.start();
      LOG.info("Started Raft Journal System in {}ms", System.currentTimeMillis() - startTime);
    //省略
    //开始选举
    joinQuorum();
  }

joinQuorum方法构建选举请求并发送:

private void joinQuorum() {
    // Send a request to join the quorum.
    // If the server is already part of the quorum, this operation is a noop.
    AddQuorumServerRequest request = AddQuorumServerRequest.newBuilder()
        .setServerAddress(NetAddress.newBuilder()
            .setHost(mLocalAddress.getHostString())
            .setRpcPort(mLocalAddress.getPort()))
        .build();
    RaftClient client = createClient();
    client.async().sendReadOnly(Message.valueOf(
        UnsafeByteOperations.unsafeWrap(
            JournalQueryRequest
                .newBuilder()
                .setAddQuorumServerRequest(request)
                .build().toByteArray()
        ))).whenComplete((reply, t) -> {
          if (t != null) {
            LogUtils.warnWithException(LOG, "Exception occurred while joining quorum", t);
          }
          if (reply != null && reply.getException() != null) {
            LogUtils.warnWithException(LOG,
                "Received an error while joining quorum", reply.getException());
          }
          try {
            client.close();
          } catch (IOException e) {
            LogUtils.warnWithException(LOG, "Exception occurred closing raft client", e);
          }
        });
  }

4.2 日志更新流程

以Alluxio执行逻辑操作为例,DefaultBlockMaster.removeBlocks会讲op操作封装成entry记录到context中:

public void removeBlocks(Collection<Long> blockIds, boolean delete) throws UnavailableException {
    try (JournalContext journalContext = createJournalContext()) {
      for (long blockId : blockIds) {
        Set<Long> workerIds;
        try (LockResource r = lockBlock(blockId)) {
          Optional<BlockMeta> block = mBlockMetaStore.getBlock(blockId);
          if (!block.isPresent()) {
            continue;
          }
          List<BlockLocation> locations = mBlockMetaStore.getLocations(blockId);
          workerIds = new HashSet<>(locations.size());
          for (BlockLocation loc : locations) {
            workerIds.add(loc.getWorkerId());
          }
          if (delete) {
            // Make sure blockId is removed from mLostBlocks when the block metadata is deleted.
            // Otherwise blockId in mLostBlock can be dangling index if the metadata is gone.
            mLostBlocks.remove(blockId);
            mBlockMetaStore.removeBlock(blockId);
            JournalEntry entry = JournalEntry.newBuilder()
                .setDeleteBlock(DeleteBlockEntry.newBuilder().setBlockId(blockId)).build();
            //将entry加入到context中
            journalContext.append(entry);
          }
        }
   //省略
    }
  }

MasterJournalContext 会讲entry放到AsyncJournalWriter的mQueue中:

public long appendEntry(JournalEntry entry) {
    // TODO(gpang): handle bounding the queue if it becomes too large.
    mCounter.incrementAndGet();
    mQueue.offer(entry);
    return mCounter.get();
  }

AsyncJournalWriter线程执行doFlush方法,调用Ratis接口发送entry:

private void doFlush() {
    // Runs the loop until ::stop() is called.
    while (!mStopFlushing) {

      while (mQueue.isEmpty() && !mStopFlushing) {
        //省略

      try {
        long startTime = System.nanoTime();

        // Write pending entries to journal.
        while (!mQueue.isEmpty()) {
          // Get, but do not remove, the head entry.
          JournalEntry entry = mQueue.peek();
          if (entry == null) {
            // No more entries in the queue. Break write session.
            break;
          }
          mJournalWriter.write(entry);
          JournalUtils.sinkAppend(mJournalSinks, entry);
          // Remove the head entry, after the entry was successfully written.
          mQueue.poll();
          mWriteCounter++;

          if (((System.nanoTime() - startTime) >= mFlushBatchTimeNs) && !mStopFlushing) {
            // This thread has been writing to the journal for enough time. Break out of the
            // infinite while-loop.
            break;
          }
        }

        // Either written new entries or previous flush had been failed.
        if (mFlushCounter.get() < mWriteCounter) {
          try (Timer.Context ctx = MetricsSystem
              .timer(MetricKey.MASTER_JOURNAL_FLUSH_TIMER.getName()).time()) {
             //调用Ratis接发送entry接口
             mJournalWriter.flush();
          }
          JournalUtils.sinkFlush(mJournalSinks);
          mFlushCounter.set(mWriteCounter);
        }

        // Notify tickets that have been served to wake up.
        Iterator<FlushTicket> ticketIterator = mTicketSet.iterator();
        while (ticketIterator.hasNext()) {
          FlushTicket ticket = ticketIterator.next();
          if (ticket.getTargetCounter() <= mFlushCounter.get()) {
            ticket.setCompleted();
            ticketIterator.remove();
          }
        }
      } //省略
    }
  }

4.3 Alluxio状态机

RaftJournalSystem中RaftServer就是服务端Ratis rpc处理类,将Alluxio自定义的状态机JournalStateMachine放入RaftServer中,后续Ratis修改的状态机就是JournalStateMachine:

private JournalStateMachine mStateMachine;
RaftServer mServer = RaftServer.newBuilder()
        .setServerId(mPeerId)
        .setGroup(mRaftGroup)
        .setStateMachine(mStateMachine)
        .setProperties(properties)
        .setParameters(parameters)
        .build();

如下所示:JournalStateMachine实现了状态机接口:

public class JournalStateMachine extends BaseStateMachine {}

标签:return,LOG,request,Alluxio,Ratis,Raft,Apache,entry,final
From: https://blog.51cto.com/u_15327484/8286269

相关文章

  • Apache JMeter压力测试工具使用
    JMeter是Apache组织开发的基于Java的压力测试工具,用于对软件做压力测试。软件下载下载地址:https://jmeter.apache.org/download_jmeter.cgi最新版本5.6.2用浏览器下载发现慢得很,用迅雷下载非常快哟。测试使用在使用前需要先安装jdk下载完后将文件解压缩,找到bin目录下的jmeter.b......
  • 与创新者同行,Apache Doris in 2023
    在刚刚过去的DorisSummitAsia2023峰会上,ApacheDorisPMC成员、飞轮科技技术副总裁衣国垒带来了“与创新者同行”的主题演讲,回顾了ApacheDoris在过去一年所取得的技术突破与社区发展,重新思考了在面对海量数据实时分析上的挑战与机遇,全面介绍了ApacheDoris在未来的迭代......
  • Apache php配置
    window下配置apache+php+mysql运行环境一准备 1下载apachehttp://httpd.apache.org/download.cgi#apache24httpd-2.2.22-win32-x86-openssl-0.9.8t.msi openssl表示带有openssl模块,利用openssl可给Apache配置SSL安全链接 2下载phphttp://windows.php.net/downloads/relea......
  • org.apache.log4j.Logger 详解
    org.apache.log4j.Logger 详解1. 概述1.1. 背景  在应用程序中添加日志记录总的来说基于三个目的 :监视代码中变量的变化情况,周期性的记录到文件中供其他应用进行统计分析工作;跟踪代码运行时轨迹,作为日后审计的依据;担当集成开发环境中的调试器的作用,向文件或控制台打......
  • Linux安装配置apache
    1.获取软件: http://httpd.apache.org/ httpd-2.2.21.tar.gz2.安装步骤:解压源文件:1tarzvxfhttpd-2.2.21.tar.gz2cdhttpd-2.2.213./configure--prefix=/usr/local/apache2--enable-so--enable-rewrite4make5makeinstall运行./configure命令进行编译源代......
  • Apache DolphinScheduler PMC代立冬荣获中关村U30青年创业者荣誉
    北京,[2023年11月3日]—在中关村举行的U30年度优胜者见面交流会上,白鲸开源科技的联合创始人代立冬先生荣幸被选为年度优胜者之一。这是对代先生及白鲸开源科技在云原生DataOps平台领域创新成就的高度认可。中关村U30是由中国科协科学技术传播中心、共青团北京市委员会、北京市科......
  • mac os13上安装apache\php\mysql
    macos13上安装1,下载并安装brew,brew是macos上的软件安装工具;2,安装apache2brewinstallhttpd 安装成功后提示:工程文件根目录DocumentRootis/usr/local/var/www配置文件Thedefaultportshavebeensetin/usr/local/etc/httpd/httpd.confto8080andin/usr/local/e......
  • HttpClient报错 org.apache.http.NoHttpResponseException : 10.1.1.0:13001 failed t
    一、问题描述使用HttpClient并发调用http接口,并发量稍微大一点就会报错org.apache.http.NoHttpResponseException:10.1.1.0:13001failedtorespond 二、排查过程最开始怀疑是服务端连接过多,拒绝请求了,监控发现服务端并没有多少连接找运维搭建了一个新环(只有我们请求服务端),......
  • Apache, service httpd stop, Address already in use:
    servicehttpdstopStoppinghttpd:                                           [FAILED][root@testtestapache_logs]#servicehttpdstartStartinghttpd:(98)Addressalreadyinuse:make_sock:couldnotbindtoaddress[::......
  • Apache Paimon 实时数据湖 Streaming Lakehouse 的存储底座
    摘要:本文整理自阿里云开源大数据表存储团队负责人,阿里巴巴高级技术专家李劲松(之信),在StreamingLakehouseMeetup的分享。内容主要分为四个部分:流计算邂逅数据湖PaimonCDC实时入湖Paimon不止CDC入湖总结与生态一、流计算邂逅数据湖流计算1.0实时预处理流计算1.0架构截止......