Node接口
Node是raft应用模块在节点上的抽象,也是应用模块和算法模块交互的入口
应用模块持有Node作为算法模块的引用,通过调用Node接口的API与算法模块通信,通信方式是通过若干个Channel异步完成的。
// Node represents a node in a raft cluster.
type Node interface {
// 告知算法模块时间
Tick()
// 告知需要参与竞选
Campaign(ctx context.Context) error
// 发送写请求
Propose(ctx context.Context, data []byte) error
// 发送配置变更请求
ProposeConfChange(ctx context.Context, cc pb.ConfChange) error
// ...
Ready() <-chan Ready
// Ready任务之后,通知算法模块
Advance()
// 应用模块感知到应用变更被集群认可
ApplyConfChange(cc pb.ConfChange) *pb.ConfState
// ...
// 读请求
ReadIndex(ctx context.Context, rctx []byte) error
// ...
}
ReadyOnly
读请求全部由leader处理
type readIndexStatus struct {
//封装读请求的消息体
req pb.Message
//leader已提交日志索引
index uint64
//Leader收到的节点响应,实际上是个Set类型
//为了防止网络同步时失误而存在多个leader的结果
//leader需要向其他follower告知自己身份,以期得到多数派响应,证明leader身份是合法的
//acks就是用来收集其他follower响应
acks map[uint64]struct{}
}
type readOnly struct {
option ReadOnlyOption
//使用entry数据当作key,保存读请求队列中的读请求的状态
pendingReadIndex map[string]*readIndexStatus
//将多个读请求放入队列,依次处理
readIndexQueue []string
}
startRaft方法
func (rc *raftNode) startNode() {
//...
// 获取集群其他raft信息
rpeers := make([]raft.Peer, len(rc.peers))
for i := range rpeers {
rpeers[i] = raft.Peer{ID: uint64(i + 1)}
}
// 构造raft配置的实例给算法层
c := &raft.Config{
ID: uint64, // 当前节点ID
ElectionTick: 10, // 选举时间 10个tick加上随机扰动值后发起选举
HeartbeatTick: 1, // 广播心跳时间
Storage: rc.raftStore // 应用层给算法层的存储接口 算法层可通过此查询存储
}
// 集群信息
startPeers := rpeers
//启动算法层的node
rc.node = raft.StartNode(c, startPeers)
// ...
rc.transport = &rafthttp.Transport{
ID: types.ID(rc.id),
ClusterID: 0x1000,
Raft: rc,
ServerStats: ss,
LeaderStats: stats.NewLeaderStats(strconv.Itoa(rc.id)),
ErrorC: make(chan error),
}
// raft节点之间的通信
rc.transport.Start()
for i := range rc.peers {
if i+1 != rc.id {
rc.transport.AddPeer(types.ID(i+1), []string{rc.peers[i]})
}
}
// 启动通行
go rc.serveRaft()
// 异步开启raftNoide的主循环 用于与算法层的goroutine建立持续通信关系
go rc.serveChannel
}
serveChannel方法
serveChannel方法有两处阻塞监听
func (rc *raftNode) serveChannels() {
// ...
// 这里定义一个ticker 为 100ms
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
// 接受客户端请求 传送到算法层
go func() {
var confChangeCount uint64 = 0
for rc.proposeC != nil && rc.confChangeC != nil {
select {
// 客户端写请求
case prop, ok := <-rc.proposeC:
if !ok {
rc.proposeC = nil
} else {
rc.node.Propose(context.TODO(), []byte(prop))
}
// 客户端配置变更请求
case cc, ok := <-rc.confChangeC:
if !ok {
rc.confChangeC = nil
} else {
confChangeCount += 1
cc.ID = confChangeCount
rc.node.ProposeConfChange(context.TODO(), cc)
}
}
}
// client closed channel; shutdown raft if not already
close(rc.stopc)
}()
// 接受算法层关于日志持久化的信息
for {
select {
// 监听ticker 并发送给算法模块 让算法模块感知时间
case <-ticker.C:
rc.node.Tick()
// 接受算法层要求
case rd := <-rc.node.Ready():
// 预写日志持久化
rc.raftStorage.Append(rd.Entries)
// f调用模块 例如发送邮件信息
rc.transport.Send(rd.Messages)
// 将预写日志应用到状态机
if okI := rc.publishEntries(rc.entriesToApply(rd.CommittedEnttries)); ok {
rc.stop()
return
}
// 告知算法模块 已完成预写日志持久化
rc.node.Advance()
//...
}
}
}
标签:Node,etcd,算法,模块,rc,raft,ID
From: https://www.cnblogs.com/ling-2945/p/18171121