- Feature Name: Implementing Flexible Raft with NWR
- Author: yzw [email protected]
- Date: 2023-06-28
- Issue Link: https://github.com/sofastack/sofa-jraft/issues/1003
Summary
我们希望在原始RAFT算法的基础上,让Leader选举和日志复制除了能需要获得多数派确认模型的支持之外,还可以接入NWRQuorum模型,用于动态调整一致性强度。
Motivation
在原始的RAFT算法中,Leader选举和日志复制都需要获得多数派成员的支持。而NWR模型则可以在动态调整一致性强度的场景中使用,它需要满足W+R>N,以保证强一致性。
JRaft将RAFT和NWR结合起来,使得用户可以根据不同的业务需求来动态调整Quorum的数量。例如,在一个写多读少的场景中,用户可以将多数派的数量从3调整为2,以降低达成共识的条件,从而提高写请求的效率。同时,为了保证RAFT的正确性,写Quorum的调整需要付出代价,即读Quorum的数量也需要相应调整。
JRaft支持成员变更,因此用户可以配置(0,1]范围内的小数来计算W和R的具体值。通过使用JRaft,用户可以根据自己的业务需求来灵活地调整一致性强度,使得分布式系统在不同场景下都可以获得最佳的性能和正确性。
Key design
下图为NWR模型Quroum的设计思路:
- 抽离出抽象父类Quorum作为MajorityQuorum(多数派确认模型,原Ballot类)和NWRQuorum(NWR模型)的模板。
- 用户首先需要在NodeOptions类中决定是否开启NWR模式,默认为多数派模型,用户手动设置读写Factor因子后,则视为开启了NWR模式。
- NodeImpl#init方法进行逻辑处理,通过判断是否开启了NWR模式,构造对应的Quorum选票实例,例如MajorityQuorum与NWRQuorum。
- 在构建好选票实例之后,调用对应的方法可以进行选票的初始化(init)、投票(grant)等操作。
该项目涉及代码变更的地方可以划分为如下四个模块:
- Leader选举模块: 一个节点想要成为leader,会经过以下几个阶段:预投票、正式投票、当选leader。所以对于preVote、electSelf、becomeLeader等等与多数派模型相关的方法都会涉及NWR模型的有关代码变更。
- 日志复制模块: 当leader收到客户端的事务请求或者follower与leader数据存在差距时,会调用 Replicator#sendEntries 去发送日志复制消息(事务消息);而心跳消息和探测消息,则是由 Replicator#sendEmptyEntries 发送的。日志复制中,NodeImpl#executeApplyingTasks 和 NodeImpl#unsafeApplyConfiguration 方法会涉及到多数派确认。在执行这些方法的时候,都会使用 BallotBox#appendPendingTask 方法来构造一个待投票的Ballot(现在叫MajorityQuorum/NWRQuorum)并放置到投票箱中。
- 一致性读模块: 对于一致性读模块,在raft共识算法中,读取R个节点其实体现在R个节点的心跳响应。通过R个节点的心跳,能保证这个节点一定是leader,一定拥有最新的数据,我们并不是真正需要从R个节点里面读取数据。NodeImpl#ReadIndexHeartbeatResponseClosure 这样的方法,可以看到执行了心跳消息的多数派确认模型的逻辑,ReadIndexHeartbeatResponseClosure构造器里面传入了quorum的值,这里我们需要对应修改为NWR模型的逻辑。
- 成员变更模块: 对于JRaft成员变更来讲,核心逻辑是采用单成员变更的方式,即使需要同时变更多个成员时,也是会先整理出新add与新remove的成员,再逐个进行单成员变更。其核心方法 addPeer、removePeer、changePeers、resetPeers 等等都会涉及NWR模型的适配。
Detailed Design
NodeOptions
在NodeOptions类中,我们新增了如下三个参数:readQuorumFactor、writeQuorumFactor与enableNWRMode,分别表示读因子、写因子以及是否开启NWR模型(true),默认不开启,表示多数派确认模型(false)。
/**
* Read Quorum's factor
*/
private double readQuorumFactor;
/**
* Write Quorum's factor
*/
private double writeQuorumFactor;
/**
* Enable NWRMode or Not
*/
private boolean enableNWRMode = false;
对于readQuorumFactor和writeQuorumFactor两个属性,在NodeOptions类里提供了setter和getter方法便于用户自定义配置。对于enableNWRMode属性,提供了isEnableNWRModeI()来判断是否开启NWR模型,而enableNWRMode()方法表示开启NWR模式。
public double getReadQuorumFactor() {
return readQuorumFactor;
}
public void setReadQuorumFactor(double readQuorumFactor) {
this.readQuorumFactor = readQuorumFactor;
enableNWRMode();
}
public double getWriteQuorumFactor() {
return writeQuorumFactor;
}
public void setWriteQuorumFactor(double writeQuorumFactor) {
this.writeQuorumFactor = writeQuorumFactor;
enableNWRMode();
}
public boolean isEnableNWRMode() {
return enableNWRMode;
}
private void enableNWRMode() {
this.enableNWRMode = true;
}
Node Init
在NodeImpl#init时,我们首先会对NodeOptions内部的readFactor和writeFactor进行校验并且进行参数同步,如果用户只设置了readFactor和writeFactor两个参数的其中之一,那么我们需要同步这两个参数的值。
在init方法初始化node时,会首先对NWR模式下的factor进行校验与同步。
if(options.isEnableNWRMode() && !checkAndResetFactor(options.getWriteQuorumFactor(),
options.getReadQuorumFactor())){
return false;
}
校验与同步方法在checkAndResetFactor里:
private boolean checkAndResetFactor(Double writeFactor, Double readFactor){
if (Objects.nonNull(readFactor) && Objects.nonNull(writeFactor)) {
if (readFactor + writeFactor != 1) {
LOG.error("The sum of readFactor and writeFactor should be 1");
return false;
}
return true;
}
if (Objects.nonNull(readFactor)) {
if (readFactor > 0 && readFactor <= 1) {
options.setWriteQuorumFactor(1 - readFactor);
return true;
}
LOG.error("Fail to set quorum_nwr read_factor because {} is not between (0,1]", readFactor);
}
if (Objects.nonNull(writeFactor)) {
if (writeFactor > 0 && writeFactor <= 1) {
options.setReadQuorumFactor(1 - writeFactor);
return true;
}
LOG.error("Fail to set quorum_nwr write_factor because {} is not between (0,1]", writeFactor);
}
return false;
}
在之前node初始化时,生成Ballot对象是通过关键字直接new出来的,如下所示:
private final Ballot voteCtx = new Ballot();
private final Ballot prevVoteCtx = new Ballot();
添加NWR模型后,我们需要判断,到底是生成MajorityQuorum还是NWRQuorum。所以在对节点进行初始化时(NodeImpl#init),会根据NodeOptions判断是否开启NWR模型,进而构造对应实例。
prevVoteCtx = options.isEnableNWRMode() ? new NWRQuorum(opts.getReadQuorumFactor(), opts.getWriteQuorumFactor())
: new MajorityQuorum();
voteCtx = options.isEnableNWRMode() ? new NWRQuorum(opts.getReadQuorumFactor(), opts.getWriteQuorumFactor())
: new MajorityQuorum();
Quorum Detail
Quoroum
Quorum作为NWRQuorum与MajorityQuorum的抽象父类,持有peers、oldPeers、quorum、oldQuorum几个公共属性。
protected final List<Quorum.UnfoundPeerId> peers = new ArrayList<>()
protected int quorum;
protected final List<Quorum.UnfoundPeerId> oldPeers = new ArrayList<>();
protected int oldQuorum;
Quorum提供了grant和init两个抽象方法,子类实现该抽象方法的具体业务逻辑。
public abstract boolean init(final Configuration conf, final Configuration oldConf);
public abstract void grant(final PeerId peerId);
Quorum还定义了findPeer、isGranted、grant这三个包含方法体的父类方法。
public PosHint grant(final PeerId peerId, final PosHint hint){
//此处省略方法体
}
public boolean isGranted() {
//此处省略方法体
}
private UnfoundPeerId findPeer(final PeerId peerId, final List<UnfoundPeerId> peers, final int posHint){
//此处省略方法体
}
NWRQuorum
NWRQuorum作为NWR模型选票实现类,持有readFactor、writeFactor、oldReadFactor、oldWriteFactor、quorumType几个属性,他们代表读写因子与QuoroumType类型(读quorum、写quorum)。
protected Double readFactor; ---读因子
protected Double writeFactor; ---写因子
另外,我们提供了一个NWRQuorum的构造器用于构造NWRQuorum实例,需要传入writeFactor, readFactor, quorumType三个参数。
public NWRQuorum(Double writeFactor, Double readFactor) {
this.writeFactor = writeFactor;
this.readFactor = readFactor;
}
我们也实现了抽象父类的init与grant方法,
public boolean init(Configuration conf, Configuration oldConf) ---初始化选票
public void grant(final PeerId peerId) ---节点投票
对于NWRQuorum的init()方法来讲,他对于quorum的计算与以往有所不同,代码如下:
@Override
public boolean init(Configuration conf, Configuration oldConf) {
peers.clear();
oldPeers.clear();
quorum = oldQuorum = 0;
int index = 0;
if (conf != null) {
for (final PeerId peer : conf) {
peers.add(new UnfoundPeerId(peer, index++, false));
}
}
quorum = new Double(Math.ceil(writeFactor * peers.size())).intValue();
if (oldConf == null) {
return true;
}
index = 0;
for (final PeerId peer : oldConf) {
oldPeers.add(new UnfoundPeerId(peer, index++, false));
}
oldQuorum = new Double(Math.ceil(writeFactor * oldPeers.size())).intValue();
return true;
}
MajorityQuorum
MajorityQuorum实现了Quorum抽象父类的两个方法,init方法初始化选票需要参数,grant方法用于投票。
public boolean init(final Configuration conf, final Configuration oldConf) ---初始化选票
public void grant(final PeerId peerId) ---节点投票
Module Detail
Leader-election Module
一个节点成为leader会经过以下几个阶段:预投票、正式投票、当选leader。
首先我们来看预投票NodeImpl#preVote()方法,大概经历以下几个过程:
- 校验是否可以开启预投票,安装快照或者集群配置不包含本节点都不可以开启预投票。
- 初始化预投票-投票箱。
- 遍历,给除了本节点之外的所有其他节点发起RequestVoteRequest--RPC请求。
- 给自己投票,并判断是否已经达到多数派。
其中预投票有以下几处核心代码涉及投票:
- 在NodeImpl#preVote()中,调用Quorum#init()方法初始化预投票-投票箱
prevVoteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf());
- 在NodeImpl#preVote()中,本节点自己投票,在判断投票箱达到多数派后开启正式选举,调用electSelf()
prevVoteCtx.grant(this.serverId);
if (prevVoteCtx.isGranted()) {
doUnlock = false;
electSelf();
}
- 在NodeImpl#handlePreVoteResponse中,该方法用来处理预投票响应:首先根据响应判断对方节点是否为本节点投票,在判断为true后,Quorum(即prevVoteCtx)调用grant()对该节点进行授权投票。最后通过isGranted()判断是否大多数节点已经确认,如果符合条件,则开启正式选举模式,调用electSelf()方法。
// check granted quorum?
if (response.getGranted()) {
prevVoteCtx.grant(peerId);
if (prevVoteCtx.isGranted()) {
doUnlock = false;
electSelf();
}
}
- 接下来多数派确认后,执行NodeImpl#electSelf()方法,它做了以下几件事:
- 检验当前节点是否存在集群配置里面,不存在不进行选举。
- 关闭预选举定时器。
- 清空leader,增加任期,修改状态为candidate,votedId设置为当前本节点。
- 启动投票定时器voteTimer,因为可能投票失败需要循环发起投票,voteTimer里面会根据当前的CANDIDATE状态调用electSelf进行选举。
- 初始化投票箱。
- 遍历所有节点,向其他集群节点发送RequestVoteRequest--RPC请求,请求被RequestVoteRequestProcessor处理器处理的。
- 如果多数派确认,则调用NodeImpl#becomeLeader晋升为leader。
voteCtx.grant(this.serverId);
if (voteCtx.isGranted()) {
becomeLeader();
}
- 在NodeImpl#handleRequestVoteResponse中,该方法用来处理投票请求的响应。只要收到投票的反馈,就会在投票箱中对多数派进行确认,如果已经达成多数派确认的共识,那么本节点就调用NodeImpl#becomeLeader方法成为leader。投票请求处理器NodeImpl#handleRequestVoteResponse方法对选票处理的核心逻辑如下:
// check granted quorum?
if (response.getGranted()) {
voteCtx.grant(peerId);
if (voteCtx.isGranted()) {
becomeLeader();
}
}
- 在多数派确认后,会调用NodeImpl#becomeLeader方法正式被选举为leader:
- 首先会停止选举定时器。
- 设置当前的状态为leader。
- 设值任期。
- 遍历所有的节点将节点加入到复制集群中。
- 最后将stepDownTimer打开,定时对leader进行校验是不是又半数以上的节点响应当前的leader。
Log-replication Module
当leader收到客户端的事务请求或者follower与leader数据存在差距时,会调用Replicator#sendEntries去复制日志,日志复制消息属于事务消息;而心跳消息和探测消息,则是由Replicator#sendEmptyEntries发送的。
日志复制的流程如下:
- leader将日志项追加到本地日志
- leader将日志广播给follower
- follower追加到本地日志
- follower返回执行结果
- leader收到多数派响应后提交日志
- 返回执行结果给客户端
在JRaft中,我阅读了日志复制模块的源码部分,然后总结出下图来直观的反应整个日志复制从leader到follower再回到leader的整个过程,详细的方法调用链路过程如下所示:
在日志复制中,以下方法会涉及到多数派确认:NodeImpl#executeApplyingTasks 和NodeImpl#unsafeApplyConfiguration,也就是执行应用任务和应用配置变更所使用到的日志复制。在执行这些方法的时候,都会使用BallotBox#appendPendingTask方法来构造一个待投票的Quorum并放置到投票箱中。
场景一:应用任务
我们首先分析一下NodeImpl#executeApplyingTasks方法:
- 检查当前节点是否是 Leader 节点。如果节点不是 Leader 节点,则将所有任务的状态设置为错误并执行相应的回调方法;如果节点正在进行领导权转移,则将所有任务的状态设置为繁忙并执行相应的回调方法。
- 遍历任务列表,对于每个任务执行以下操作:a. 检查任务的 expectedTerm 是否与当前任期相同,如果不同则将任务的状态设置为错误并执行相应的回调方法。b. 将任务添加到 BallotBox 中。c. 将任务的日志条目信息添加到一个列表中,并将任务重置为默认状态。
- 将任务列表中的所有日志条目追加到当前节点的日志中,并将追加操作封装为 LeaderStableClosure 回调方法。
- 检查并更新配置信息,如果需要更新则执行相应的更新操作。
注意:在executeApplyingTasks方法中,根据当前节点配置,生成了一个待投票Quorum,并放置到投票箱BallotBox的pendingMetaQueue中。所以我们需要在这里构造待投票Quorum时,修改quorum为NWR模式,而不是之前的多数派。
private void executeApplyingTasks(final List<LogEntryAndClosure> tasks) {
// 省略部分代码...
if (!this.ballotBox.appendPendingTask(this.conf.getConf(),
this.conf.isStable() ? null : this.conf.getOldConf(), task.done,options.isEnableNWRMode() ?
QuorumFactory.createNWRQuorumConfiguration(options.getWriteQuorumFactor(), options.getReadQuorumFactor()):
QuorumFactory.createMajorityQuorumConfiguration())) {
ThreadPoolsFactory.runClosureInThread(this.groupId, task.done, new Status(RaftError.EINTERNAL, "Fail to append task."));
task.reset();
continue;
}
// 省略部分代码...
this.logManager.appendEntries(entries, new LeaderStableClosure(entries));
// 省略部分代码...
}
场景二:
接下来看看NodeImpl#unsafeApplyConfiguration是如何构建选票Ballot的:
这段代码主要用于将新的配置信息封装成一个日志条目,并追加到当前节点的日志中,从而实现配置变更的操作。其实逻辑和增加普通日志类似,主要需要注意的还是ballotBox.appendPendingTask方法,也就是生成一个待投票Quorum的逻辑。
private void unsafeApplyConfiguration(final Configuration newConf, final Configuration oldConf,
final boolean leaderStart) {
// 省略部分代码...
if (!this.ballotBox.appendPendingTask(newConf, oldConf, configurationChangeDone,options.isEnableNWRMode() ?
QuorumFactory.createNWRQuorumConfiguration(options.getWriteQuorumFactor(), options.getReadQuorumFactor()):
QuorumFactory.createMajorityQuorumConfiguration())) {
ThreadPoolsFactory.runClosureInThread(this.groupId, configurationChangeDone, new Status(
RaftError.EINTERNAL, "Fail to append task."));
return;
}
// 省略部分代码...
this.logManager.appendEntries(entries, new LeaderStableClosure(entries));
checkAndSetConfiguration(false);
}
QuorumFactory
在上面的executeApplyingTasks与unsafeApplyConfiguration方法中使用到了QuorumFactory这个工厂类的方法。为了更方便配置一个Quorum的属性,可以将factor因子和NWR开关整合到QuorumConfiguration类中,以便于快速构建一个QuorumConfiguration。实现代码如下:
public final class QuorumFactory {
public static QuorumConfiguration createNWRQuorumConfiguration(Double writeFactor,Double readFactor) {
boolean isEnableNWR = true;
QuorumConfiguration quorumConfiguration = new QuorumConfiguration();
quorumConfiguration.setReadFactor(readFactor);
quorumConfiguration.setWriteFactor(writeFactor);
quorumConfiguration.setEnableNWR(isEnableNWR);
return quorumConfiguration;
}
public static QuorumConfiguration createMajorityQuorumConfiguration(){
boolean isEnableNWR = false;
QuorumConfiguration quorumConfiguration = new QuorumConfiguration();
quorumConfiguration.setEnableNWR(isEnableNWR);
return quorumConfiguration;
}
}
对于BallotBox#CommitAt来说,在进行确认时,只需要从pendingMetaQueue获取Quorum再进行grant授权投票即可,之后再判断是否已经达到(多数派/NWR)确认。
public boolean commitAt(final long firstLogIndex, final long lastLogIndex, final PeerId peer) {
// 省略部分代码...
Quorum.PosHint hint = new Quorum.PosHint();
for (long logIndex = startAt; logIndex <= lastLogIndex; logIndex++) {
final Quorum quorum = this.pendingMetaQueue.get((int) (logIndex - this.pendingIndex));
hint = quorum.grant(peer, hint);
if (quorum.isGranted()) {
lastCommittedIndex = logIndex;
}
}
// 省略部分代码...
this.waiter.onCommitted(lastCommittedIndex);
return true;
}
Consistent-reading Module
对于ReadIndexHeartbeatResponseClosure类来讲,他的run方法执行了心跳消息多数派确认逻辑。它的构造器里面传入的quorum值需要进行NWR模型适配并且failPeersThreshold属性也需要重新适配计算逻辑。
原有获取ReadQuorum数值的多数派确认逻辑是:
private int getQuorum() {
final Configuration c = this.conf.getConf();
if (c.isEmpty()) {
return 0;
}
return c.getPeers().size() / 2 + 1;
}
如今我们需要修改该方法,额外对NWR模型进行判断:
private int getQuorum(QuorumConfiguration quorumConfiguration) {
final Configuration c = this.conf.getConf();
if (c.isEmpty()) {
return 0;
}
int size = c.getPeers().size();
if(!options.isEnableNWRMode()){
return size / 2 + 1;
}
return size - new Double(Math.ceil(c.getPeers().size() * options.getWriteQuorumFactor())).intValue() + 1;
}
failPeersThreshold原有计算逻辑:
this.failPeersThreshold = peersCount % 2 == 0 ? (quorum - 1) : quorum;
修改后:
this.failPeersThreshold = options.isEnableNWRMode() ? peersCount - quorum + 1 :
(peersCount % 2 == 0 ? (quorum - 1) : quorum);
标签:Quorum,NWR,OSPP,节点,投票,new,Raft,final
From: https://www.cnblogs.com/akai-chi/p/17519285.html