首页 > 其他分享 >反问面试官:如何实现集群内选主

反问面试官:如何实现集群内选主

时间:2024-09-27 18:13:27浏览次数:1  
标签:面试官 选主 内选主 ElectionMessage private 集群 投票 new 节点

面试官经常喜欢问什么zookeeper选主原理、什么CAP理论、什么数据一致性。经常都被问烦了,我就想问问面试官,你自己还会实现一个简单的集群内选主呢?估计大部分面试官自己也写不出来。

本篇使用 JavaNetty 实现简单的集群选主过程的示例。

这个示例展示了多个节点通过投票选举一个新的主节点的过程。Netty 用于节点间的通信,而每个节点则负责发起和响应选举消息。

集群选主流程

选主流程

咱们且不说zookeeper如何选主,单说人类选主,也是采用少数服从多数的原则。人类选主时,中间会经历如下过程:

(1)如果我没有熟悉的或者没找到能力比我强的,首先投给自己一票。

(2)随着时间推移,可能后面的人介绍了各自的特点和实力,那我可能会改投给别人。

(3)所有人将投票信息放入到统计箱中。

(4)最终票数最多的人是领导者。

同样的,zookeeper在选主时,也是这样的流程。假设有5个服务器

  1. 服务器1先给自身投票
  2. 后续起来的服务器2也会投自身一票,然后服务器1观察到服务器2的id比较大,则会改投服务器2
  3. 后续起来的服务器3也会投自身一票,然后服务1和服务器2发现服务器3的id比较大,则都会改投服务器3。服务器3被确定为领导者。
  4. 服务器4起来后也会投自身一票,然后发现服务器3已经有3票了,立马改投服务器3。
  5. 服务器5与服务器4的操作一样。

选主协议

在选主过程中采用的是超过半数的协议。在选主过程中,会需要如下几类消息:

  • 投票请求:节点发出自己的投票请求。
  • 接受投票:其余节点作出判断,如果觉得id较大,则接受投票。
  • 选举胜出:当选主节点后,广播胜出消息。

代码实现

下面模拟3个节点的选主过程,核心步骤如下:

1、定义消息类型、消息对象、节点信息

public enum MessageType {
        VOTE_REQUEST, // 投票请求
        VOTE,         // 投票
        ELECTED       // 选举完成后的胜出消息
}
    
public class ElectionMessage implements Serializable {
    private MessageType type;
    private int nodeId;   // 节点ID
    private long zxId;    // ZXID:类似于ZooKeeper中的逻辑时钟,用于比较
    private int voteFor;  // 投票给的节点ID
}

public class ElectionNode {
    private int nodeId; // 当前节点ID
    private long zxId;  // 当前节点的ZXID
    private volatile int leaderId; // 当前选举的Leader ID
    private String host;
    private int port;
    private ConcurrentHashMap<Integer, Integer> voteMap = new ConcurrentHashMap<>(); // 此节点对每个节点的投票情况
    private int totalNodes; // 集群总节点数
}

2、每个节点利用Netty启动Server

public void start() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(
                                    new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
                                    new ObjectEncoder(),
                                    new ElectionHandler(ElectionNode.this));
                        }
                    });

            ChannelFuture future = serverBootstrap.bind(port).sync();
            System.out.println("Node " + nodeId + " started on port " + port);

            // 启动后开始选举过程
            startElection();
//            future.channel().closeFuture().sync();


        } catch (Exception e) {

        } finally {
//            bossGroup.shutdownGracefully();
//            workerGroup.shutdownGracefully();
        }
    }

3、启动后利用Netty发送投票请求

public void sendVoteRequest(String targetHost, int targetPort) {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(
                                    new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
                                    new ObjectEncoder(),
                                    new ElectionHandler(ElectionNode.this));
                        }
                    });

            ChannelFuture future = bootstrap.connect(targetHost, targetPort).sync();
            ElectionMessage voteRequest = new ElectionMessage(ElectionMessage.MessageType.VOTE_REQUEST, nodeId, zxId, nodeId);
            future.channel().writeAndFlush(voteRequest);
//            future.channel().closeFuture().sync();
        } catch (Exception e) {

        } finally {
//            group.shutdownGracefully();
        }
    }

4、节点接受到投票请求后,做相关处理

节点在收到消息后,做相关逻辑处理:处理投票请求、处理确认投票、处理选主结果

**处理投票请求:**判断是否是否接受投票信息。只有在主节点没确定并且zxId较大时,才发送投票消息。如果接受了投票请求的话,则更新本地的投票逻辑,然后给投票节点发送接受投票的消息

处理确认投票:如果投票消息被接受了,则更新本地的投票逻辑。

处理选主结果:如果收到了选主结果的消息,则更新本地的主节点。

public class ElectionHandler extends ChannelInboundHandlerAdapter {
    private final ElectionNode node;

    public ElectionHandler(ElectionNode node) {
        this.node = node;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ElectionMessage electionMessage = (ElectionMessage) msg;
        System.out.println("Node " + node.getNodeId() + " received: " + electionMessage);

        if (electionMessage.getType() == ElectionMessage.MessageType.VOTE_REQUEST) {
            // 判断是否是否接受投票信息。只有在主节点没确定并且zxId较大时,才发送投票消息
            // 如果接受了投票请求的话,则更新本地的投票逻辑,然后给投票节点发送接受投票的消息
            if (electionMessage.getZxId() >= node.getZxId() && node.getLeaderId() == 0) {
                node.receiveVote(electionMessage.getNodeId());
                ElectionMessage voteMessage = new ElectionMessage(ElectionMessage.MessageType.VOTE, electionMessage.getNodeId(), electionMessage.getZxId(), electionMessage.getNodeId());
                ctx.writeAndFlush(voteMessage);
            } else {
                // 如果已经确定主节点了,直接发送ELECTED消息
                sendLeaderInfo(ctx);
            }
        } else if (electionMessage.getType() == ElectionMessage.MessageType.VOTE) {
            // 如果投票消息被接受了,则更新本地的投票逻辑。
            if (electionMessage.getZxId() >= node.getZxId() && node.getLeaderId() == 0) {
                node.receiveVote(electionMessage.getNodeId());
            } else {
                // 如果已经确定主节点了,直接发送ELECTED消息
                sendLeaderInfo(ctx);
            }
        } else if (electionMessage.getType() == ElectionMessage.MessageType.ELECTED) {
            if (node.getLeaderId() == 0) {
                node.setLeaderId(electionMessage.getVoteFor());
            }
        }
    }

5、接受别的节点的投票

这里是比较关键的一步,当确定接受某个节点时,则更新本地的投票数,然后判断投票数是否超过半数,超过半数则确定主节点。同时,再将主节点广播出去。

此时,其余节点接收到选主确认的消息后,都会更新自己的本地的主节点信息。

public void receiveVote(int nodeId) {
    voteMap.merge(nodeId, 1, Integer::sum);
    // 比较出votes里值,取出最大的那个对应的key
    int currentVotes = voteMap.values().stream().max(Integer::compareTo).get();

    if (currentVotes > totalNodes / 2 && leaderId == 0) {
        setLeaderId(nodeId);
        broadcastElected();
    }
}

6、广播选主结果

/**
 * 广播选举结果
 */
private void broadcastElected() {
    for (int i = 1; i <= totalNodes; i++) {
        if (i != nodeId) {
            sendElectedMessage(host, 9000 + i);
        }
    }
}

/**
 * 发送选举结果
 *
 * @param targetHost
 * @param targetPort
 */
public void sendElectedMessage(String targetHost, int targetPort) {
    EventLoopGroup group = new NioEventLoopGroup();
    try {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ch.pipeline().addLast(
                                new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
                                new ObjectEncoder(),
                                new ElectionHandler(ElectionNode.this));
                    }
                });

        ChannelFuture future = bootstrap.connect(targetHost, targetPort).sync();
        ElectionMessage electedMessage = new ElectionMessage(ElectionMessage.MessageType.ELECTED, leaderId, zxId, leaderId);
        future.channel().writeAndFlush(electedMessage);
//            future.channel().closeFuture().sync();
    } catch (Exception e) {

    } finally {
//            group.shutdownGracefully();
    }
}

7、完整代码

完整代码:https://gitee.com/yclxiao/specialty/blob/master/javacore/src/main/java/com/ycl/election/ElectionHandler.java

总结

本文主要演示了一个简易的多Server的选主过程,以上代码是一个简单的基于Netty实现的集群选举过程的示例。在实际场景中,选举逻辑远比这个复杂,需要处理更多的网络异常、重复消息、并发问题等。

希望对你有帮助,如遇问题可加V交流。

本篇完结!欢迎 关注、加V(yclxiao)交流、全网可搜(程序员半支烟)

原文链接:https://mp.weixin.qq.com/s/Lxt1ujFicJm-8KYBlVptZQ

标签:面试官,选主,内选主,ElectionMessage,private,集群,投票,new,节点
From: https://www.cnblogs.com/mangod/p/18436320

相关文章

  • 构建读写分离的数据库集群
    三、构建读写分离的数据库集群1、案例目标(1)了解Mycat提供的读写分离功能。(2)了解MySQL数据库的主从架构。(3)构建以Mycat为中间件的读写分离数据库集群。2、案例分析读写分离的基本原理:让主数据库处理事务性增、改、删除操作,而从库处理查询操作2.1准备三台虚拟机设置如下:......
  • redis集群增加减少节点
    redis集群故障切换方案步骤:1、增加临时节点并加入集群2、将故障节点槽位移动到新节点3、剔除故障节点4、备份故障节点配置下架更换5、更换后按照1-3步骤将临时节点剔除,将原故障节点重新加入集群#测试环境为3主3从,有5个keyredis-cli-c-p8102-h172.17.0.89-a123456d......
  • semi-sync原主库加入集群阻塞问题分析
    问题现象客户在一个一主两从的半同步复制环境下做了手工切换,然后尝试把原主库加入集群中,结果发现新集群中的数据一直无法同步到slave(原主库)中来,查看slave(原主库)同步状态,IO线程和SQL线程都是YES状态,但是Seconds_Behind_Master大于0.查看showprocesslist状态,发现SQL线程一......
  • 【Golang】双节点集群etcd未组成集群vip切换时序
    目录1、背景2、时序图1、背景在vip(虚拟ip)可以从一个节点切换到另一个节点,但etcd未组成集群的环境中,与etcd的连接会断开重连,但同一个key在不同节点的版本可能会不一样导致etcd服务器不向客户端推送数据,也就是客户端watch失效了,具体解决方法参考之前的重连方法。下面......
  • 面试官:谈谈你对IO多路复用的理解?
    “IO多路复用”是编程中常见的技术词汇,使用这种技术的框架有很多,如,Redis、Kafka、Netty、Nginx中都用到了此技术。那问题来了,什么是IO多路复用?它的具体实现技术有哪些?这些技术之间有什么区别?今天我们就来简单的探讨一下。1.什么是IO多路复用?IO多路复用技术是一种允许单个......
  • Java单体服务和集群分布式SpringCloud微服务的理解
    单体应用存在的问题1.随着业务的发展开发变得越来越复杂。2.修改或者新增,需要对整个系统进行测试、重新部署。3.一个模块出现问题,很可能导致整个系统崩溃。4.多个开发团队同时对数据进行管理,容易产生安全漏洞。5.各个模块使用同一种技术进行开发,各个模块很难根据实际情况......
  • 南大通用GBase 8s高可用集群架构介绍
    对一些行业关键核心业务应用来说,必须满足7×24小时的不间断服务。这就要求数据库必须具备高可用性以避免数据库节点的单点故障。数据库的高可用架构选择直接影响到业务应用服务可用性的等级。一般行业的关键核心业务要求具备极高可用性,即99.999%的可用性级别。Oracle在高可用架构上......
  • Flink(二)集群安装
    集群安装Standalone模式安装解压缩[user@hadoop102software]$tar-zxvfflink-1.10.1-bin-scala_2.12.tgz-C/opt/module/修改flink/conf/flink-conf.yaml文件jobmanager.rpc.address:hadoop102修改/conf/slaves文件hadoop103hadoop104分发给其他两台虚拟......
  • HDFS数据(跨集群)迁移
    一、数据迁移使用场景1.冷热集群数据同步、分类存储2.整体数据整体搬迁3.数据准实时同步(备份)二、考量因素1.网络传输带宽及时间,是否会影响现有业务2.性能,单机?多线程?分布式?3.是否正常增量同步4.数据迁移的同步性(同步单位时间数据超过单位时间)三、DistCp工具使用3.1简介dis......
  • Hadoop高可用集群搭建
    一、HDFSHA简介1.1 QJM简介1.QuorumJournalManager(仲裁日志管理器),是Hadoop官方推荐的HDFSHA解决方案之一2.使用zookeeper中ZKFC来实现主备切换;3.使用JournalNode(JN)集群实现editslog的共享以达到数据同步的目的1.2主备切换问题解决方案--ZKFailoverController(zkfc)ZKFailo......