首页 > 其他分享 >zookeeper 选主核心代码

zookeeper 选主核心代码

时间:2024-03-02 17:11:55浏览次数:35  
标签:zxid 选主 代码 zookeeper 选票 epoch sid logicalclock leader

选主的核心代码是在org.apache.zookeeper.server.quorum.FastLeaderElection#lookForLeader方法下。

选主逻辑的核心代码如下:

public Vote lookForLeader() throws InterruptedException {
        //无关代码部分忽略
    
        self.start_fle = Time.currentElapsedTime();
        try {
            
            //存储本轮选举收到的有效选票,用于判断是否有多数派的选票支持同一成员为Leader
            Map<Long, Vote> recvset = new HashMap<>();
            //用户加快Leader收敛,当成员加入集群时推测哪个成员为Leader,并且在广播选票之前对Logicalclock自增1
            Map<Long, Vote> outofelection = new HashMap<>();
            int notTimeout = minNotificationInterval;
            synchronized (this) {
                //自增生成logicalclock
                logicalclock.incrementAndGet();
                //更新最新选票内容
                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
            }

            //向其他节点发送提议请求
            sendNotifications();

            SyncedLearnerTracker voteSet = null;

            //当前节点处于查找状态时,循环读取接收队列里的消息
            while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
                //从队列中取出消息
                Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
                if (n == null) {
                    //无消息接收逻辑代码省略
                } else if (validVoter(n.sid) && validVoter(n.leader)) {
                    //选票消息的节点状态
                    switch (n.state) {
                    case LOOKING:
                        //省略zxid校验代码
                        if (n.electionEpoch > logicalclock.get()) {
                            //选票的所处的轮次大于自己的logicalclock则说明自己所处的选举轮次是落后的,应更新自己的logicalclock,清空选票池,并重新广播自己的选票
                            //更新当前节点epoch
                            logicalclock.set(n.electionEpoch);
                            //清空选票池
                            recvset.clear();
                            //检测本次notification的leader是否赢得选举,包含epoch、sid、zxid比较
                            if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                                updateProposal(n.leader, n.zxid, n.peerEpoch);
                            } else {
                                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
                            }
                            //重新广播自己的选票
                            sendNotifications();
                        } else if (n.electionEpoch < logicalclock.get()) {
                            // 选票轮次小于自己的logicalclock,则忽略
                                LOG.debug(
                                    "Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x{}, logicalclock=0x{}",
                                    Long.toHexString(n.electionEpoch),
                                    Long.toHexString(logicalclock.get()));
                            break;
                        } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                            //选票所处轮次等于自己的logicalclock,然后进行检测是否赢得选票,如果选票获胜,则更新自己选票并广播
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                            //重新广播选票
                            sendNotifications();
                        }

                        //记录选票
                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                        //获取选票集合,用于判断自己的选票是否获得多数派,以此结束本轮选举
                        voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));
                        //如果已经获得多数派选票
                        if (voteSet.hasAllQuorums()) {
                            //如果还存在一些未处理的选票请求,则遍历判断,如果有选票在比较中胜出,则重新入队,并结束此次选举判断(选举获得多数派也不作数,即不会更新节点状态)
                            //如果没有选票在比较中胜出,则修改状态
                            // Verify if there is any change in the proposed leader
                            while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
                                if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                                    recvqueue.put(n);
                                    break;
                                }
                            }
                            //如果在指定时间内还没有收到新的请求,那么则可以对节点状态进行更新
                            if (n == null) {
                                //节点状态变更,如果proposedLeader是当前节点,则将当前节点状态标记为LEADING
                                setPeerState(proposedLeader, voteSet);
                                Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }
                        break;
                    case OBSERVING:
                        LOG.debug("Notification from observer: {}", n.sid);
                        break;
                    case FOLLOWING:
                        /*
                        * To avoid duplicate codes
                        * */
                        Vote resultFN = receivedFollowingNotification(recvset, outofelection, voteSet, n);
                        if (resultFN == null) {
                            break;
                        } else {
                            return resultFN;
                        }
                    case LEADING:
                        Vote resultLN = receivedLeadingNotification(recvset, outofelection, voteSet, n);
                        if (resultLN == null) {
                            break;
                        } else {
                            return resultLN;
                        }
                    default:
                        LOG.warn("Notification state unrecognized: {} (n.state), {}(n.sid)", n.state, n.sid);
                        break;
                    }
                } else {
                    if (!validVoter(n.leader)) {
                        LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
                    }
                    if (!validVoter(n.sid)) {
                        LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
                    }
                }
            }
            return null;
        } finally {
            //省略部分代码
        }
    }

其中totalOrderPredicate方法的源码如下:

protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
        if (self.getQuorumVerifier().getWeight(newId) == 0) {
            return false;
        }

        /*
         * We return true if one of the following three cases hold:
         * 1- New epoch is higher
         * 2- New epoch is the same as current epoch, but new zxid is higher
         * 3- New epoch is the same as current epoch, new zxid is the same
         *  as current zxid, but server id is higher.
         */

        return ((newEpoch > curEpoch)
                || ((newEpoch == curEpoch)
                    && ((newZxid > curZxid)
                        || ((newZxid == curZxid)
                            && (newId > curId)))));
    }

核心逻辑就是,先比较epoch大小,然后是比较zxid大小最后是比较serverId大小。主要判断当前接收的投票是否是有效的,如果不满足代码里的逻辑判断则认为是无效的。

标签:zxid,选主,代码,zookeeper,选票,epoch,sid,logicalclock,leader
From: https://www.cnblogs.com/bibibao/p/18048905

相关文章

  • 代码随想录算法训练营第三十四天| ● 860.柠檬水找零 ● 406.根据身高重建队列 ●
    柠檬水找零 题目链接:860.柠檬水找零-力扣(LeetCode)思路:注意对于20元的情况,有两种找零方式,            头一次见到这种情况,随便加一个标准输出才能通过的样例。classSolution{public:boollemonadeChange(vector<int>&bills){in......
  • C++填坑系列——新手写代码易错点
    c++新手写代码的几个易错点学习自https://www.youtube.com/watch?v=i_wDa2AS_8w1.限制usingnamespacestd的作用范围如果你自己定义了一个和std空间内同名的函数,当你把std放到globalnamespace(也就是直接usingnamespacestd)中,就会出现函数冲突;usingnamespacestd的使......
  • 代码随想录算法训练营day11 | leetcode 20. 有效的括号、1047. 删除字符串中的所有相
    目录题目链接:20.有效的括号-简单题目链接:1047.删除字符串中的所有相邻重复项-简单题目链接:150.逆波兰表达式求值-中等题目链接:20.有效的括号-简单题目描述:给定一个只包括'(',')','{','}','[',']'的字符串s,判断字符串是否有效。有效字符串需满足:左括号必须用相同类型的右......
  • 代码随想录 第十天 | ● 理论基础 ● 232.用栈实现队列 ● 225. 用队列实现栈
    队列的方法:添加元素:add(Ee):将指定的元素添加到队列的尾部,如果队列已满则抛出异常。offer(Ee):将指定的元素添加到队列的尾部,如果队列已满则返回false。移除元素:remove():移除并返回队列的头部元素,如果队列为空则抛出异常。poll():移除并返回队列的头部元素,如果......
  • 《手动学习深度学习》3.2和3.3的代码对比
    3.2线性回归的从零开始这是我的第一个代码,也算是属于自己的helloworld了,特此纪念,希望继续努力。代码中引入了3.1中的计时模块,用来对比训练时间。importrandomimporttorchfromd2limporttorchasd2limportsyssys.path.append("..")fromtimerimport......
  • 掌握C语言指针,轻松解锁代码高效性与灵活性(中)
    ✨✨欢迎大家来到贝蒂大讲堂✨✨......
  • 直播系统源代码,iOS端截屏时隐藏内容
    直播系统源代码,iOS禁止截屏,手机截屏时隐藏内容,或自定义截屏后的内容。核心是利用UITextField的secureTextEntry属性隐藏内容,注意该功能仅iOS13.2及以上支持。UITextField在开启密码模式后,在截屏录屏时隐藏一个子视图。注:模拟器左上角的截图不支持UITextField的密码隐藏。如果要......
  • 代码随想录 第九天 | 烤馍片(kmp)算法 ●28. 实现 strStr() ●459.重复的子字符串
    烤馍片算法(kmp):为了不让遍历的指针回退,每次不相等的时候找不相等之前的字符串的最长相等前后缀。i表示目标字符串,j表示需要在目标找到的字符串的指针。最长相等前后缀的长度就是之前有多少个与needle字符串相同,直接将j跳到上一元素位置记录的最长相等前后缀长度(next数组),这样i就可以......
  • Python贝叶斯回归分析住房负担能力数据集|附代码数据
    原文链接:http://tecdat.cn/?p=11664最近我们被客户要求撰写关于贝叶斯回归的研究报告,包括一些图形和统计输出。我想研究如何使用pymc3在贝叶斯框架内进行线性回归。根据从数据中学到的知识进行推断 贝叶斯规则是什么? 本质上,我们必须将已经知道的知识与世界上的事实相结合。......
  • 用SPSS估计HLM多层(层次)线性模型模型|附代码数据
    原文链接:http://tecdat.cn/?p=3230作为第一步,从一个不包含协变量的空模型开始 ( 点击文末“阅读原文”获取完整代码数据******** )。每所学校的截距,β0J,然后设置为平均,γ00,和随机误差ü0J。将(2)代入(1)产生要在SPSS中进行估算,请转至分析→混合模型→线性...相关视频**......