1. 背景
在alluxio1.8中,alluxio master只支持单节点部署,一旦挂掉,整个集群将不可用。alluxio 2.x后,提供了高可用方案:Alluxio组件中嵌入Apache Ratis代码,由Ratis负责选举leader,Alluxio的各个master在同步edit log时,由Ratis提供edit log的一致性传输。
Ratis服务基于Raft共识算法,该算法保证分布式集群中只有一个leader master对外提供服务,其他standby master在leader master退出时再竞争成为master。
本文会先后介绍Raft算法框架,不会详解琢磨细节,同时介绍Raft在Apache Ratis中的应用,最后介绍Alluxio中使用Apache Ratis进行选举和更新edit log流程。
2. Raft算法介绍
Raft算法起始于一篇论文:https://raft.github.io/raft.pdf,它有一个比较好的中文解读:
http://arthurchiao.art/blog/raft-paper-zh/#71-snapshot。有另外一种共识算法PAXOS,它比较复杂,难以实现,因此Apache Ratis项目选择基于Raft算法进行实现。
Raft算法是一致性算法,它主要负责两大功能:
- 在奇数个master中选举出来一个master作为leader。
- leader master接受客户端op操作,将op信息发送给standby master中保存。
2.1 Raft选举流程
在实现Raft算法的集群中,每个节点会有三种状态,出现不同的时间后,状态会进行转换。这种转换过程就是状态机:
上述状态机表达的意思如下所示:
- Follower 只会响应来自其他节点的请求;如果一个 follower 某段时间内收不到 leader 的请求,它会变成一个 candidate 然后发起一轮选举。
- 获得大多数选票的 candidate 将成为新的 leader,其他candidate变成follower。
- 通常情况下,除非发生故障,否则在任的 leader 会持续担任下去。
在选举过程中,每个master都有一个任期(item),任期大的master会获得投票,超过半数票就转为leader。重新选举时,每个master都会增大iterm,以增加选举leader的成功率。
2.2 Raft日志传输流程
在分布式系统中,每次变更都会由leader发送给follower。每个变更都以log日志的方式持久化存储到各个master机器中。在Raft算法中,为了保证系统的性能,只要半数以上的节点中log一致,同步操作才算结束。
Raft算法会将log应用到内存中,使得master切换时,内存中已经存在大部分数据,减少切换时间。状态机会定义log应用到内存的接口。
Raft日志传输流程如下所示:
- 客户端向Leader发送请求,变更分布式系统中存储的值。
- Leader的共识模块负责将变更操作持久化到本地磁盘,发送给follower机器持久化到磁盘中。
- 当半数以上持久化操作成功后,leader将log应用到状态机中。
- 将结果返回给用户。
每个master中都会保存每个log文件,为了避免文件数不断膨胀,follower master会定期从内存中生成snapshot保存到磁盘上,snapshot中有最新的事务ID,删除log文件列表中小于该事务ID的文件。
由于生成snapshot时,状态机无法更新,leader master一般不会进行snapshot,除非手动操作。正常情况下,leader都是从其他follower机器中下载snapshot文件的。
3. Raft算法和HDFS高可用区别
- HDFS高可用基于zookeeper的临时节点记录,而Raft算法则是每个master间直接进行选举。
- follower既接收log文件,又负责checkpoint,还负责将snapshot发送给leader,它的作用类似于Journal Node+Standby NameNode。Raft中的快照等价于HDFS中的fsimage,Raft中的log等价于HDFS中的edit log。
3. Apache Ratis介绍
3.1 Ratis选举流程
Ratis项目中,定义了RaftServerProtocol,用于定义RaftServer间投票的rpc接口:
public interface RaftServerProtocol {
enum Op {REQUEST_VOTE, APPEND_ENTRIES, INSTALL_SNAPSHOT}
RequestVoteReplyProto requestVote(RequestVoteRequestProto request) throws IOException;
AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto request) throws IOException;
InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto request) throws IOException;
StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequestProto request) throws IOException;
}
RaftServerImpl类实现了该接口,当RaftServer接收到startLeaderElection时,开始选举:
class RaftServerImpl implements RaftServer.Division,
RaftServerProtocol, RaftServerAsynchronousProtocol,
RaftClientProtocol, RaftClientAsynchronousProtocol{
public StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequestProto request) throws IOException {
final RaftRpcRequestProto r = request.getServerRequest();
final RaftPeerId leaderId = RaftPeerId.valueOf(r.getRequestorId());
final RaftGroupId leaderGroupId = ProtoUtils.toRaftGroupId(r.getRaftGroupId());
final TermIndex leaderLastEntry = TermIndex.valueOf(request.getLeaderLastEntry());
CodeInjectionForTesting.execute(START_LEADER_ELECTION, getId(), leaderId, request);
//省略
//开始选举
changeToCandidate(true);
return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), true);
}
}
}
后续到RoleInfo.startLeaderElection启动LeaderElection线程:
void startLeaderElection(RaftServerImpl server, boolean force) {
if (pauseLeaderElection.get()) {
return;
}
updateAndGet(leaderElection, new LeaderElection(server, force)).start();
}
选举时,先后调用askForVotes方法进行预投票和投票:
public void run() {
//省略
final Timer.Context electionContext = server.getLeaderElectionMetrics().getLeaderElectionTimer().time();
try {
if (skipPreVote || askForVotes(Phase.PRE_VOTE)) {
if (askForVotes(Phase.ELECTION)) {
server.changeToLeader();
}
}
} //省略
}
在LeaderElection.askForVotes中,如果状态为candidate,就无限进行投票:
private boolean askForVotes(Phase phase) throws InterruptedException, IOException {
//shouldRun表示如果当前节点正在运行,并且状态为candidate,并且存活,就不断循环遍历投票
for(int round = 0; shouldRun(); round++) {
final long electionTerm;
final RaftConfigurationImpl conf;
synchronized (server) {
if (!shouldRun()) {
return false;
}
final ConfAndTerm confAndTerm = server.getState().initElection(phase);
electionTerm = confAndTerm.getTerm();
conf = confAndTerm.getConf();
}
LOG.info("{} {} round {}: submit vote requests at term {} for {}", this, phase, round, electionTerm, conf);
//开始投票
final ResultAndTerm r = submitRequestAndWaitResult(phase, conf, electionTerm);
LOG.info("{} {} round {}: result {}", this, phase, round, r);
//省略
}
最后,在LeaderElection.submitRequests方法中,执行requestVote rpc请求,向其他节点开始投票:
private int submitRequests(Phase phase, long electionTerm, TermIndex lastEntry,
Collection<RaftPeer> others, Executor voteExecutor) {
int submitted = 0;
for (final RaftPeer peer : others) {
final RequestVoteRequestProto r = ServerProtoUtils.toRequestVoteRequestProto(
server.getMemberId(), peer.getId(), electionTerm, lastEntry, phase == Phase.PRE_VOTE);
//开始投票
voteExecutor.submit(() -> server.getServerRpc().requestVote(r));
submitted++;
}
return submitted;
}
3.2 Ratis日志同步流程
RaftClientProtocol接口submitClientRequest方法定义客户端提交请求:
public interface RaftClientProtocol {
RaftClientReply submitClientRequest(RaftClientRequest request) throws IOException;
}
RaftServerImpl.submitClientRequest实现了该方法,它中间会调用appendTransaction开始执行append操作:
private CompletableFuture<RaftClientReply> appendTransaction(
RaftClientRequest request, TransactionContext context, CacheEntry cacheEntry) throws IOException {
assertLifeCycleState(LifeCycle.States.RUNNING);
CompletableFuture<RaftClientReply> reply;
final PendingRequest pending;
synchronized (this) {
reply = checkLeaderState(request, cacheEntry, true);
if (reply != null) {
return reply;
}
// append the message to its local log
final LeaderStateImpl leaderState = role.getLeaderStateNonNull();
final PendingRequests.Permit permit = leaderState.tryAcquirePendingRequest(request.getMessage());
//省略
try {
//持久化到本地
state.appendLog(context);
} //省略
// put the request into the pending queue
//构建appendEntry请求
pending = leaderState.addPendingRequest(permit, request, context);
if (pending == null) {
cacheEntry.failWithException(new ResourceUnavailableException(
getMemberId() + ": Failed to add a pending write request for " + request));
return cacheEntry.getReplyFuture();
}
//发送appendEntry请求给candidate
leaderState.notifySenders();
}
return pending.getFuture();
}
本地持久化
ServerState.appendLog开始进行本地持久化,最终调用SegmentedRaftLog.appendEntryImpl方法落盘:
protected CompletableFuture<Long> appendEntryImpl(LogEntryProto entry) {
final Timer.Context context = getRaftLogMetrics().getRaftLogAppendEntryTimer().time();
checkLogState();
if (LOG.isTraceEnabled()) {
LOG.trace("{}: appendEntry {}", getName(), LogProtoUtils.toLogEntryString(entry));
}
try(AutoCloseableLock writeLock = writeLock()) {
validateLogEntry(entry);
final LogSegment currentOpenSegment = cache.getOpenSegment();
if (currentOpenSegment == null) {
//写到cache和log文件中
cache.addOpenSegment(entry.getIndex());
fileLogWorker.startLogSegment(entry.getIndex());
} else if (isSegmentFull(currentOpenSegment, entry)) {
cache.rollOpenSegment(true);
fileLogWorker.rollLogSegment(currentOpenSegment);
} else if (currentOpenSegment.numOfEntries() > 0 &&
currentOpenSegment.getLastTermIndex().getTerm() != entry.getTerm()) {
// the term changes
final long currentTerm = currentOpenSegment.getLastTermIndex().getTerm();
Preconditions.assertTrue(currentTerm < entry.getTerm(),
"open segment's term %s is larger than the new entry's term %s",
currentTerm, entry.getTerm());
cache.rollOpenSegment(true);
fileLogWorker.rollLogSegment(currentOpenSegment);
}
//TODO(runzhiwang): If there is performance problem, start a daemon thread to checkAndEvictCache
checkAndEvictCache();
// If the entry has state machine data, then the entry should be inserted
// to statemachine first and then to the cache. Not following the order
// will leave a spurious entry in the cache.
CompletableFuture<Long> writeFuture =
fileLogWorker.writeLogEntry(entry).getFuture();
if (stateMachineCachingEnabled) {
// The stateMachineData will be cached inside the StateMachine itself.
cache.appendEntry(LogProtoUtils.removeStateMachineData(entry),
LogSegment.Op.WRITE_CACHE_WITH_STATE_MACHINE_CACHE);
} else {
cache.appendEntry(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
}
return writeFuture;
} catch (Exception e) {
LOG.error("{}: Failed to append {}", getName(), LogProtoUtils.toLogEntryString(entry), e);
throw e;
} finally {
context.stop();
}
}
发送给candidate
本地落盘执行完后,LeaderStateImpl.addPendingRequest开始构建请求,准备发送给candidate。构建完后,执行notifySenders通知LogAppender发送给candidate:
PendingRequest addPendingRequest(PendingRequests.Permit permit, RaftClientRequest request, TransactionContext entry) {
if (LOG.isDebugEnabled()) {
LOG.debug("{}: addPendingRequest at {}, entry={}", this, request,
LogProtoUtils.toLogEntryString(entry.getLogEntry()));
}
return pendingRequests.add(permit, request, entry);
}
void notifySenders() {
senders.forEach(LogAppender::notifyLogAppender);
}
LogAppenderDefault.run方法发送entry:
public void run() throws InterruptedException, IOException {
while (isRunning()) {
//省略
//发送Entry
final AppendEntriesReplyProto r = sendAppendEntriesWithRetries();
if (r != null) {
handleReply(r);
}
}
}
if (isRunning() && !hasAppendEntries()) {
getEventAwaitForSignal().await(getHeartbeatWaitTimeMs(), TimeUnit.MILLISECONDS);
}
getLeaderState().checkHealth(getFollower());
}
}
sendAppendEntriesWithRetries开始发送请求:
private AppendEntriesReplyProto sendAppendEntriesWithRetries()
throws InterruptedException, InterruptedIOException, RaftLogIOException {
int retry = 0;
AppendEntriesRequestProto request = null;
while (isRunning()) { // keep retrying for IOException
//省略
final AppendEntriesReplyProto r = getServerRpc().appendEntries(request);
//省略
}
它最终执行的是RaftServerProtocol接口中的appendEntries方法,用于向其他candidate发送appendEntry请求:
public interface RaftServerProtocol {
enum Op {REQUEST_VOTE, APPEND_ENTRIES, INSTALL_SNAPSHOT}
RequestVoteReplyProto requestVote(RequestVoteRequestProto request) throws IOException;
AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto request) throws IOException;
InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto request) throws IOException;
StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequestProto request) throws IOException;
}
更新statemachine
candidate更新成功后,leader执行onFollowerSuccessAppendEntries准备更新状态机:
public void onFollowerSuccessAppendEntries(FollowerInfo follower) {
if (isAttendingVote(follower)) {
submitUpdateCommitEvent();
} else {
eventQueue.submit(checkStagingEvent);
}
}
半数以上更新成功,开始updateCommitIndex:
private void updateCommit(long majority, long min) {
final long oldLastCommitted = raftLog.getLastCommittedIndex();
if (majority > oldLastCommitted) {
// Get the headers before updating commit index since the log can be purged after a snapshot
final LogEntryHeader[] entriesToCommit = raftLog.getEntries(oldLastCommitted + 1, majority + 1);
if (server.getState().updateCommitIndex(majority, currentTerm, true)) {
updateCommit(entriesToCommit);
}
}
watchRequests.update(ReplicationLevel.ALL, min);
}
updateCommitIndex通知stateMachineUpdater处理log:
boolean updateCommitIndex(long majorityIndex, long curTerm, boolean isLeader) {
if (log.updateCommitIndex(majorityIndex, curTerm, isLeader)) {
stateMachineUpdater.notifyUpdater();
return true;
}
return false;
}
StateMachineUpdater线程死循环,等待signal信息,获取log中的entry并应用到statemachine中:
public void run() {
for(; state != State.STOP; ) {
try {
waitForCommit();
if (state == State.RELOAD) {
reload();
}
//将log应用到状态机中
final MemoizedSupplier<List<CompletableFuture<Message>>> futures = applyLog();
checkAndTakeSnapshot(futures);
if (shouldStop()) {
checkAndTakeSnapshot(futures);
stop();
}
} catch (Throwable t) {
if (t instanceof InterruptedException && state == State.STOP) {
LOG.info("{} was interrupted. Exiting ...", this);
} else {
state = State.EXCEPTION;
LOG.error(this + " caught a Throwable.", t);
server.close();
}
}
}
}
takeSnapshot方法中,follower可以直接创建snapshot,但是master只能通过命令手动建snapshot,否则就从follower下载snapshot:
public long takeSnapshot() {
long index;
StateLockManager stateLockManager = mStateLockManagerRef.get();
if (!mIsLeader) {
//follower直接创建snapshot
index = takeLocalSnapshot(false);
} else if (stateLockManager != null) {
// the leader has been allowed to take a local snapshot by being given a non-null
// StateLockManager through the #allowLeaderSnapshots method
try (LockResource stateLock = stateLockManager.lockExclusive(StateLockOptions.defaults())) {
//通过命令强制构建snapshot
index = takeLocalSnapshot(true);
} catch (Exception e) {
return RaftLog.INVALID_LOG_INDEX;
}
} else {
RaftGroup group;
try (LockResource ignored = new LockResource(mGroupLock)) {
if (mServerClosing) {
return RaftLog.INVALID_LOG_INDEX;
}
// These calls are protected by mGroupLock and mServerClosing
// as they will access the lock in RaftServerProxy.java
// which is also accessed during raft server shutdown which
// can cause a deadlock as the shutdown takes the lock while
// waiting for this thread to finish
Preconditions.checkState(mServer.getGroups().iterator().hasNext());
group = mServer.getGroups().iterator().next();
} catch (IOException e) {
SAMPLING_LOG.warn("Failed to get raft group info: {}", e.getMessage());
return RaftLog.INVALID_LOG_INDEX;
}
if (group.getPeers().size() < 2) {
SAMPLING_LOG.warn("No follower to perform delegated snapshot. Please add more masters to "
+ "the quorum or manually take snapshot using 'alluxio fsadmin journal checkpoint'");
return RaftLog.INVALID_LOG_INDEX;
} else {
//从其他Follower中获取snapshot
index = mSnapshotManager.maybeCopySnapshotFromFollower();
}
}
// update metrics if took a snapshot
if (index != RaftLog.INVALID_LOG_INDEX) {
mSnapshotLastIndex = index;
mLastCheckPointTime = System.currentTimeMillis();
}
return index;
}
Ratis会判断是否需要创建snapshot,每隔40w条entry就会创建一次snapshot:
private boolean shouldTakeSnapshot() {
if (autoSnapshotThreshold == null) {
return false;
} else if (shouldStop()) {
return getLastAppliedIndex() - snapshotIndex.get() > 0;
}
return state == State.RUNNING && getLastAppliedIndex() - snapshotIndex.get() >= autoSnapshotThreshold;
}
4. Apache Ratis在Alluxio中的应用
4.1 Raft选举流程
Alluxio Master启动时,启动JournalSystem线程,实际上是RaftJournalSystem类:
public class AlluxioMasterProcess extends MasterProcess {
protected final JournalSystem mJournalSystem;
public void start() throws Exception {
mJournalSystem.start();
}
}
后续执行RaftJournalSystem.startInternal方法,joinQuorum开始选举:
private RaftServer mServer;
public synchronized void startInternal() {
LOG.info("Initializing Raft Journal System");
mPeerId = RaftJournalUtils.getPeerId(mLocalAddress);
Set<RaftPeer> peers = mClusterAddresses.stream()
.map(addr -> RaftPeer.newBuilder()
.setId(RaftJournalUtils.getPeerId(addr))
.setAddress(addr)
.build()
)
.collect(Collectors.toSet());
mRaftGroup = RaftGroup.valueOf(RAFT_GROUP_ID, peers);
LOG.info("Starting Raft journal system. Cluster addresses: {}. Local address: {}",
mClusterAddresses, mLocalAddress);
try {
initServer();
long startTime = System.currentTimeMillis();
mServer.start();
LOG.info("Started Raft Journal System in {}ms", System.currentTimeMillis() - startTime);
//省略
//开始选举
joinQuorum();
}
joinQuorum方法构建选举请求并发送:
private void joinQuorum() {
// Send a request to join the quorum.
// If the server is already part of the quorum, this operation is a noop.
AddQuorumServerRequest request = AddQuorumServerRequest.newBuilder()
.setServerAddress(NetAddress.newBuilder()
.setHost(mLocalAddress.getHostString())
.setRpcPort(mLocalAddress.getPort()))
.build();
RaftClient client = createClient();
client.async().sendReadOnly(Message.valueOf(
UnsafeByteOperations.unsafeWrap(
JournalQueryRequest
.newBuilder()
.setAddQuorumServerRequest(request)
.build().toByteArray()
))).whenComplete((reply, t) -> {
if (t != null) {
LogUtils.warnWithException(LOG, "Exception occurred while joining quorum", t);
}
if (reply != null && reply.getException() != null) {
LogUtils.warnWithException(LOG,
"Received an error while joining quorum", reply.getException());
}
try {
client.close();
} catch (IOException e) {
LogUtils.warnWithException(LOG, "Exception occurred closing raft client", e);
}
});
}
4.2 日志更新流程
以Alluxio执行逻辑操作为例,DefaultBlockMaster.removeBlocks会讲op操作封装成entry记录到context中:
public void removeBlocks(Collection<Long> blockIds, boolean delete) throws UnavailableException {
try (JournalContext journalContext = createJournalContext()) {
for (long blockId : blockIds) {
Set<Long> workerIds;
try (LockResource r = lockBlock(blockId)) {
Optional<BlockMeta> block = mBlockMetaStore.getBlock(blockId);
if (!block.isPresent()) {
continue;
}
List<BlockLocation> locations = mBlockMetaStore.getLocations(blockId);
workerIds = new HashSet<>(locations.size());
for (BlockLocation loc : locations) {
workerIds.add(loc.getWorkerId());
}
if (delete) {
// Make sure blockId is removed from mLostBlocks when the block metadata is deleted.
// Otherwise blockId in mLostBlock can be dangling index if the metadata is gone.
mLostBlocks.remove(blockId);
mBlockMetaStore.removeBlock(blockId);
JournalEntry entry = JournalEntry.newBuilder()
.setDeleteBlock(DeleteBlockEntry.newBuilder().setBlockId(blockId)).build();
//将entry加入到context中
journalContext.append(entry);
}
}
//省略
}
}
MasterJournalContext 会讲entry放到AsyncJournalWriter的mQueue中:
public long appendEntry(JournalEntry entry) {
// TODO(gpang): handle bounding the queue if it becomes too large.
mCounter.incrementAndGet();
mQueue.offer(entry);
return mCounter.get();
}
AsyncJournalWriter线程执行doFlush方法,调用Ratis接口发送entry:
private void doFlush() {
// Runs the loop until ::stop() is called.
while (!mStopFlushing) {
while (mQueue.isEmpty() && !mStopFlushing) {
//省略
try {
long startTime = System.nanoTime();
// Write pending entries to journal.
while (!mQueue.isEmpty()) {
// Get, but do not remove, the head entry.
JournalEntry entry = mQueue.peek();
if (entry == null) {
// No more entries in the queue. Break write session.
break;
}
mJournalWriter.write(entry);
JournalUtils.sinkAppend(mJournalSinks, entry);
// Remove the head entry, after the entry was successfully written.
mQueue.poll();
mWriteCounter++;
if (((System.nanoTime() - startTime) >= mFlushBatchTimeNs) && !mStopFlushing) {
// This thread has been writing to the journal for enough time. Break out of the
// infinite while-loop.
break;
}
}
// Either written new entries or previous flush had been failed.
if (mFlushCounter.get() < mWriteCounter) {
try (Timer.Context ctx = MetricsSystem
.timer(MetricKey.MASTER_JOURNAL_FLUSH_TIMER.getName()).time()) {
//调用Ratis接发送entry接口
mJournalWriter.flush();
}
JournalUtils.sinkFlush(mJournalSinks);
mFlushCounter.set(mWriteCounter);
}
// Notify tickets that have been served to wake up.
Iterator<FlushTicket> ticketIterator = mTicketSet.iterator();
while (ticketIterator.hasNext()) {
FlushTicket ticket = ticketIterator.next();
if (ticket.getTargetCounter() <= mFlushCounter.get()) {
ticket.setCompleted();
ticketIterator.remove();
}
}
} //省略
}
}
4.3 Alluxio状态机
RaftJournalSystem中RaftServer就是服务端Ratis rpc处理类,将Alluxio自定义的状态机JournalStateMachine放入RaftServer中,后续Ratis修改的状态机就是JournalStateMachine:
private JournalStateMachine mStateMachine;
RaftServer mServer = RaftServer.newBuilder()
.setServerId(mPeerId)
.setGroup(mRaftGroup)
.setStateMachine(mStateMachine)
.setProperties(properties)
.setParameters(parameters)
.build();
如下所示:JournalStateMachine实现了状态机接口:
public class JournalStateMachine extends BaseStateMachine {}
标签:return,LOG,request,Alluxio,Ratis,Raft,Apache,entry,final
From: https://blog.51cto.com/u_15327484/8286269