Raft 相关模块
必须要注意的一点
当新Leader当选之后,需要追加一条空日志,使其了解当前集群的共识状态(也就是CommitIndex)
模块概览
特异功能
LeaderTransfer
- 终止之前 正在 Transfer 的流程,并重新设置 Transfer 为新的
- 停止追加日志
- 如果新Leader 有足够新的日志,发送 MsgTimeOutNow 让其立即进行选举
- 如果没有则同步没有的日志
Config Change
这里是单个成员的配置变更,所以不会有两个不相交的合法集群。
const (
ConfChangeType_AddNode ConfChangeType = 0
ConfChangeType_RemoveNode ConfChangeType = 1
)
type ConfChange struct {
ChangeType ConfChangeType `protobuf:"varint,1,opt,name=change_type,json=changeType,proto3,enum=eraftpb.ConfChangeType" json:"change_type,omitempty"`
// node will be add/remove
NodeId uint64 `protobuf:"varint,2,opt,name=node_id,json=nodeId,proto3" json:"node_id,omitempty"`
StoreId uint64 `protobuf:"varint,4,opt,name=store_id,json=storeId,proto3" json:"store_id,omitempty"`
}
// 并在 接受 配置变更之前 进行 如下判断
func (r *Raft) CanChangeConf() bool {
if r.RaftLog.applied >= r.PendingConfIndex {
log.Infof("%s apply conf change", r.Info())
} else {
log.Warnf("%s previous change is not applied { applied: %d } { pendingConfIndex: %d }", r.Info(), r.RaftLog.applied, r.PendingConfIndex)
return false
}
return true
}
待 Node 同步后增加相应的 Peers ,然后持久化到 Storage 中。
Region Split
SnapShot
当 Follower 差的日志我们已经 打了快照,我们需要给他们发送 当前的快照。
// 截断日志
// CutDown cut down the log entries to (index,LastLogIndex]
func (l *RaftLog) CutDown(index, term uint64) {
var cp []pb.Entry
log.Infof("cut down: %d, %d, %d, %d", index, l.LastIndex(), len(l.entries), len(cp))
if l.LastIndex() < index {
cp = make([]pb.Entry, 1)
} else {
cp = make([]pb.Entry, l.LastIndex()-index+1)
}
cp[0].Index, cp[0].Term = index, term
if index+1 < l.LastIndex() {
copy(cp[1:], l.entries[index+1-l.start:]) // 用 copy 的话,是因为想让(go gc) 释放前面日志
}
l.entries = cp
l.start = index
l.committed = max(l.committed, index)
l.applied = max(l.applied, index)
l.stabled = max(l.stabled, index)
}
分布式事务 Percolator
解决 Commit 阶段的冲突
核心: 检查主锁的状态
如果锁冲突会返回锁的相关信息
type LockInfo struct {
PrimaryLock []byte `protobuf:"bytes,1,opt,name=primary_lock,json=primaryLock,proto3" json:"primary_lock,omitempty"`
LockVersion uint64 `protobuf:"varint,2,opt,name=lock_version,json=lockVersion,proto3" json:"lock_version,omitempty"`
Key []byte `protobuf:"bytes,3,opt,name=key,proto3" json:"key,omitempty"`
LockTtl uint64 `protobuf:"varint,4,opt,name=lock_ttl,json=lockTtl,proto3" json:"lock_ttl,omitempty"`
}
type CheckTxnStatusResponse struct {
RegionError *errorpb.Error `protobuf:"bytes,1,opt,name=region_error,json=regionError,proto3" json:"region_error,omitempty"`
// Three kinds of txn status:
// locked: lock_ttl > 0
// committed: commit_version > 0
// rolled back: lock_ttl == 0 && commit_version == 0
LockTtl uint64 `protobuf:"varint,2,opt,name=lock_ttl,json=lockTtl,proto3" json:"lock_ttl,omitempty"`
CommitVersion uint64 `protobuf:"varint,3,opt,name=commit_version,json=commitVersion,proto3" json:"commit_version,omitempty"`
// The action performed by TinyKV in response to the CheckTxnStatus request.
Action Action `protobuf:"varint,4,opt,name=action,proto3,enum=kvrpcpb.Action" json:"action,omitempty"`
}
调用 KvCheckTxnStatus 获取锁的主锁信息。
- 锁已经提交
- 如果有提交,返回 committs(这里也可能是 已经回滚 )
- 如果没有提交
没有锁,返回锁不存在, 插入回滚
有锁,判断是否超时, 超时 删除preWrite 插入的记录, 插入回滚
之后调用 KvResolveLock 并携带 startTs (事务标识), commitTs( 表示是否提交 为 0 表示没有 )
Node接受后如果 commitTs != 0 ,将这个事务 剩余未提交的 提交, 否则 回滚。
peer 的获取
store 有 peers [region -> RaftNode ]
leader 收到一条 get 请求之后, 我们会通过 raftPeers 进行将这条消息 同步到集群, 如何应用之后通过 callback 告知 当前请求已经完成,并会有相应的response。
那么 peers 怎么来的
- 初始化时 从 当前 store 底层的存储获取的
- 当新加入节点是放入存储引擎中的
修改的时候,也就是 配置更新 Add / Remove 都会更新这个 key 的数据
RegionMetaMinKey = []byte{LocalPrefix, RegionMetaPrefix}
RegionMetaMaxKey = []byte{LocalPrefix, RegionMetaPrefix + 1}
func makeRegionKey(regionID uint64, suffix byte, subID uint64) []byte {
key := make([]byte, 19)
key[0] = LocalPrefix
key[1] = RegionRaftPrefix
binary.BigEndian.PutUint64(key[2:], regionID)
key[10] = suffix
binary.BigEndian.PutUint64(key[11:], subID)
return key
}
当初始化的时候,遍历这个区间的所有key即可。
当 Add Node 时, 我们Raft 其实也需要做一个事情,那就是修改里面的 peers。
Get / Scan
也就是一个读事务,获取一个 db 的一个快照。
write
也是大致的流程