首页 > 其他分享 >TinyKv


时间:2023-07-23 17:45:09浏览次数:32  
标签:opt index name TinyKv json key omitempty

Raft 相关模块






  1. 终止之前 正在 Transfer 的流程,并重新设置 Transfer 为新的
  2. 停止追加日志
    1. 如果新Leader 有足够新的日志,发送 MsgTimeOutNow 让其立即进行选举
    2. 如果没有则同步没有的日志

Config Change


const (
	ConfChangeType_AddNode    ConfChangeType = 0
	ConfChangeType_RemoveNode ConfChangeType = 1
type ConfChange struct {
	ChangeType ConfChangeType `protobuf:"varint,1,opt,name=change_type,json=changeType,proto3,enum=eraftpb.ConfChangeType" json:"change_type,omitempty"`
	// node will be add/remove
	NodeId               uint64   `protobuf:"varint,2,opt,name=node_id,json=nodeId,proto3" json:"node_id,omitempty"`
	StoreId              uint64   `protobuf:"varint,4,opt,name=store_id,json=storeId,proto3" json:"store_id,omitempty"`

// 并在 接受 配置变更之前 进行 如下判断
func (r *Raft) CanChangeConf() bool {
	if r.RaftLog.applied >= r.PendingConfIndex {
		log.Infof("%s apply conf change", r.Info())
	} else {
		log.Warnf("%s previous change is not applied { applied: %d } { pendingConfIndex: %d }", r.Info(), r.RaftLog.applied, r.PendingConfIndex)
		return false
	return true

待 Node 同步后增加相应的 Peers ,然后持久化到 Storage 中。

Region Split


当 Follower 差的日志我们已经 打了快照,我们需要给他们发送 当前的快照。

// 截断日志
// CutDown cut down the log entries to (index,LastLogIndex]
func (l *RaftLog) CutDown(index, term uint64) {
	var cp []pb.Entry
	log.Infof("cut down: %d, %d, %d, %d", index, l.LastIndex(), len(l.entries), len(cp))
	if l.LastIndex() < index {
		cp = make([]pb.Entry, 1)
	} else {
		cp = make([]pb.Entry, l.LastIndex()-index+1)
	cp[0].Index, cp[0].Term = index, term
	if index+1 < l.LastIndex() {
		copy(cp[1:], l.entries[index+1-l.start:]) // 用 copy 的话,是因为想让(go gc) 释放前面日志

	l.entries = cp
	l.start = index

	l.committed = max(l.committed, index)
	l.applied = max(l.applied, index)
	l.stabled = max(l.stabled, index)

分布式事务 Percolator

解决 Commit 阶段的冲突

核心: 检查主锁的状态

type LockInfo struct {
	PrimaryLock          []byte   `protobuf:"bytes,1,opt,name=primary_lock,json=primaryLock,proto3" json:"primary_lock,omitempty"`
	LockVersion          uint64   `protobuf:"varint,2,opt,name=lock_version,json=lockVersion,proto3" json:"lock_version,omitempty"`
	Key                  []byte   `protobuf:"bytes,3,opt,name=key,proto3" json:"key,omitempty"`
	LockTtl              uint64   `protobuf:"varint,4,opt,name=lock_ttl,json=lockTtl,proto3" json:"lock_ttl,omitempty"`
type CheckTxnStatusResponse struct {
	RegionError *errorpb.Error `protobuf:"bytes,1,opt,name=region_error,json=regionError,proto3" json:"region_error,omitempty"`
	// Three kinds of txn status:
	// locked: lock_ttl > 0
	// committed: commit_version > 0
	// rolled back: lock_ttl == 0 && commit_version == 0
	LockTtl       uint64 `protobuf:"varint,2,opt,name=lock_ttl,json=lockTtl,proto3" json:"lock_ttl,omitempty"`
	CommitVersion uint64 `protobuf:"varint,3,opt,name=commit_version,json=commitVersion,proto3" json:"commit_version,omitempty"`
	// The action performed by TinyKV in response to the CheckTxnStatus request.
	Action               Action   `protobuf:"varint,4,opt,name=action,proto3,enum=kvrpcpb.Action" json:"action,omitempty"`

调用 KvCheckTxnStatus 获取锁的主锁信息。

  1. 锁已经提交
    1. 如果有提交,返回 committs(这里也可能是 已经回滚 )
  2. 如果没有提交

没有锁,返回锁不存在, 插入回滚
有锁,判断是否超时, 超时 删除preWrite 插入的记录, 插入回滚

之后调用 KvResolveLock 并携带 startTs (事务标识), commitTs( 表示是否提交 为 0 表示没有 )
Node接受后如果 commitTs != 0 ,将这个事务 剩余未提交的 提交, 否则 回滚。

peer 的获取

store 有 peers [region -> RaftNode ]
leader 收到一条 get 请求之后, 我们会通过 raftPeers 进行将这条消息 同步到集群, 如何应用之后通过 callback 告知 当前请求已经完成,并会有相应的response。

那么 peers 怎么来的

  1. 初始化时 从 当前 store 底层的存储获取的
  2. 当新加入节点是放入存储引擎中的

修改的时候,也就是 配置更新 Add / Remove 都会更新这个 key 的数据

RegionMetaMinKey = []byte{LocalPrefix, RegionMetaPrefix}
RegionMetaMaxKey = []byte{LocalPrefix, RegionMetaPrefix + 1}

func makeRegionKey(regionID uint64, suffix byte, subID uint64) []byte {
	key := make([]byte, 19)
	key[0] = LocalPrefix
	key[1] = RegionRaftPrefix
	binary.BigEndian.PutUint64(key[2:], regionID)
	key[10] = suffix
	binary.BigEndian.PutUint64(key[11:], subID)
	return key

当 Add Node 时, 我们Raft 其实也需要做一个事情,那就是修改里面的 peers。

Get / Scan

也就是一个读事务,获取一个 db 的一个快照。



From: https://www.cnblogs.com/jgjg/p/17575191.html
