首页 > 其他分享 >ES 集群选举的过程

ES 集群选举的过程

时间:2024-05-30 16:58:54浏览次数:22  
标签:选举 response cluster state 集群 节点 final ES

如果需要一个搜索工具,lucene 完全可以胜任,但是网上大火的却是 Elasticsearch,它对 lucene 进行了分布式的赋能,lucene 解决搜索底层的数据存储,Elasticsearch 提供接口和分布式能力。而集群则是分布式的基础,那么 Elasticsearch 是如何组建集群的呢?本文深入细节,详细分解 Elasticsearch 选举过程的每个关键步骤,结合文档和代码片段,来构建一个全面的理解框架。

1. 初始化与配置
启动时设置: 当 ES 节点启动时,会初始化 GatewayMetaState,包括设置当前任期、最后接受的配置等状态信息,节点的 uuid 等,节点选举相关的配置信息保存在 lucene 文件中(详见 LucenePersistedState)。


2. 节点发现与连接
SeedHostsResolver: 解析种子主机地址,帮助发现集群中的其他节点。
PeerFinder: 基于 elasticsearch.yml 中配置的种子节点信息,发现并连接到集群中的其他节点,为选举和状态传播准备通信渠道。


3. 引导主节点选举
ClusterBootstrapService: 解析 cluster.initial_master_nodes 配置,当发现的节点数超过 initial_master_nodes 一半,则节点满足了选举的条件。


4. 预投票(Pre-Voting)
PreVoteCollector: 在正式选举前,潜在的 master 节点通过 PreVoteCollector 向集群其他节点发送预投票请求,以评估是否有足够的支持成为领导者。
预投票请求: 通过 REQUEST_PRE_VOTE_ACTION_NAME 发起。
响应处理: 收集预投票响应,如果预投票结果表明有足够的支持,将触发正式选举。


5. 正式选举
JoinHelper: 发送正式的选举请求(JoinRequest),请求其他节点的投票。
投票逻辑: 其他节点根据当前集群状态、健康状况、任期号等因素决定是否投票给请求节点。
投票计数: 收集投票,依据 ElectionStrategy 判断是否达到多数(超过半数),选举出领导者。


6. 领导者确认与状态传播
CoordinationState: 成为领导者后,节点将通过 CoordinationState 处理客户端的集群状态更新请求,准备集群状态的发布。
PublicationTransportHandler: 负责将新集群状态发布到集群中的其他节点。
PublishRequest: 领导者向所有节点广播包含新状态的PublishRequest。
AckListener: 收集其他节点的确认响应,确保多数节点已经接受新状态。


7. 状态应用与确认
ClusterApplier: 应用新的集群状态,确保集群内所有节点状态一致。
ApplyCommitRequest: 领导者在收到足够数量的确认后,向所有节点发送ApplyCommitRequest,通知它们应用新状态。
CoordinatorPublication: 处理状态的提交和确认,确保状态最终被所有节点应用。


8. 健康监测与故障恢复
LeaderChecker: 监控领导者健康,一旦领导者不可用,触发重新选举。
FollowersChecker: 监控跟随者状态,确保集群稳定。


9. 特殊节点处理
VotingOnlyNodePlugin: 对于只参与投票的节点,有专门的处理逻辑,比如不参与数据发布等,确保资源优化和集群稳定性。

 

重要的配置项:

discovery.seed_hosts 种子节点的地址,集群中的节点通过种子节点,最终发现所有节点
cluster.initial_master_nodes

这个参数非常重要,如果 ES 集群初次启动,必须设置这个属性,如果没有设置,则整个集群无法完成选举,准确的说法是在预投票阶段无法满足选举人过半的要求,从而无法选举出 master。

一旦当集群成功完成选举后,再次启动时,则不再需要设置该属性了,因为此后,节点可以从磁盘配置文件中获取集群中的节点信息。

discovery.type 集群的类型,multi-node/single-node,默认是 multi-node

 

 

 

 

 

 

代码片段:

1. 

/**
 * Loads (and maybe upgrades) cluster metadata at startup, and persistently stores cluster metadata for future restarts.
 *
 * When started, ensures that this version is compatible with the state stored on disk, and performs a state upgrade if necessary. Note that
 * the state being loaded when constructing the instance of this class is not necessarily the state that will be used as {@link
 * ClusterState#metadata()} because it might be stale or incomplete. Master-eligible nodes must perform an election to find a complete and
 * non-stale state, and master-ineligible nodes receive the real cluster state from the elected master after joining the cluster.
 */
org.elasticsearch.gateway.GatewayMetaState

2. 

org.elasticsearch.discovery.PeerFinder#PeerFinder

org.elasticsearch.cluster.coordination.Coordinator.CoordinatorPeerFinder

3. 

// org.elasticsearch.cluster.coordination.ClusterBootstrapService#onFoundPeersUpdated
@Override
public void onFoundPeersUpdated() {
    final Set<DiscoveryNode> nodes = getDiscoveredNodes();
    if (bootstrappingPermitted.get()
        && transportService.getLocalNode().isMasterNode()
        && bootstrapRequirements.isEmpty() == false
        && isBootstrappedSupplier.getAsBoolean() == false) {

        final Tuple<Set<DiscoveryNode>, List<String>> requirementMatchingResult;
        try {
            requirementMatchingResult = checkRequirements(nodes);
        } catch (IllegalStateException e) {
            logger.warn("bootstrapping cancelled", e);
            bootstrappingPermitted.set(false);
            return;
        }

        final Set<DiscoveryNode> nodesMatchingRequirements = requirementMatchingResult.v1();
        final List<String> unsatisfiedRequirements = requirementMatchingResult.v2();
        logger.trace(
            "nodesMatchingRequirements={}, unsatisfiedRequirements={}, bootstrapRequirements={}",
            nodesMatchingRequirements,
            unsatisfiedRequirements,
            bootstrapRequirements
        );

        if (nodesMatchingRequirements.contains(transportService.getLocalNode()) == false) {
            logger.info(
                "skipping cluster bootstrapping as local node does not match bootstrap requirements: {}",
                bootstrapRequirements
            );
            bootstrappingPermitted.set(false);
            return;
        }

        if (nodesMatchingRequirements.size() * 2 > bootstrapRequirements.size()) {
            startBootstrap(nodesMatchingRequirements, unsatisfiedRequirements);
        }
    }
}

4.

// org.elasticsearch.cluster.coordination.PreVoteCollector#handlePreVoteRequest
private PreVoteResponse handlePreVoteRequest(final PreVoteRequest request) {
    updateMaxTermSeen.accept(request.getCurrentTerm());

    Tuple<DiscoveryNode, PreVoteResponse> state = this.state;
    assert state != null : "received pre-vote request before fully initialised";

    final DiscoveryNode leader = state.v1();
    final PreVoteResponse response = state.v2();

    final StatusInfo statusInfo = nodeHealthService.getHealth();
    if (statusInfo.getStatus() == UNHEALTHY) {
        String message = "rejecting " + request + " on unhealthy node: [" + statusInfo.getInfo() + "]";
        logger.debug(message);
        throw new NodeHealthCheckFailureException(message);
    }

    if (leader == null) {
        return response;
    }

    if (leader.equals(request.getSourceNode())) {
        // This is a _rare_ case where our leader has detected a failure and stepped down, but we are still a follower. It's possible
        // that the leader lost its quorum, but while we're still a follower we will not offer joins to any other node so there is no
        // major drawback in offering a join to our old leader. The advantage of this is that it makes it slightly more likely that the
        // leader won't change, and also that its re-election will happen more quickly than if it had to wait for a quorum of followers
        // to also detect its failure.
        return response;
    }

    throw new CoordinationStateRejectedException("rejecting " + request + " as there is already a leader");
}


// org.elasticsearch.cluster.coordination.PreVoteCollector.PreVotingRound#handlePreVoteResponse
private void handlePreVoteResponse(final PreVoteResponse response, final DiscoveryNode sender) {
    if (isClosed.get()) {
        logger.debug("{} is closed, ignoring {} from {}", this, response, sender);
        return;
    }

    updateMaxTermSeen.accept(response.getCurrentTerm());

    if (response.getLastAcceptedTerm() > clusterState.term()
        || (response.getLastAcceptedTerm() == clusterState.term() && response.getLastAcceptedVersion() > clusterState.version())) {
        logger.debug("{} ignoring {} from {} as it is fresher", this, response, sender);
        return;
    }

    preVotesReceived.put(sender, response);

    // create a fake VoteCollection based on the pre-votes and check if there is an election quorum
    final VoteCollection voteCollection = new VoteCollection();
    final DiscoveryNode localNode = clusterState.nodes().getLocalNode();
    final PreVoteResponse localPreVoteResponse = getPreVoteResponse();

    preVotesReceived.forEach(
        (node, preVoteResponse) -> voteCollection.addJoinVote(
            new Join(
                node,
                localNode,
                preVoteResponse.getCurrentTerm(),
                preVoteResponse.getLastAcceptedTerm(),
                preVoteResponse.getLastAcceptedVersion()
            )
        )
    );

    if (electionStrategy.isElectionQuorum(
        clusterState.nodes().getLocalNode(),
        localPreVoteResponse.getCurrentTerm(),
        localPreVoteResponse.getLastAcceptedTerm(),
        localPreVoteResponse.getLastAcceptedVersion(),
        clusterState.getLastCommittedConfiguration(),
        clusterState.getLastAcceptedConfiguration(),
        voteCollection
    ) == false) {
        logger.debug("{} added {} from {}, no quorum yet", this, response, sender);
        return;
    }

    if (electionStarted.compareAndSet(false, true) == false) {
        logger.debug("{} added {} from {} but election has already started", this, response, sender);
        return;
    }

    logger.debug("{} added {} from {}, starting election", this, response, sender);
    startElection.run();
}

 

标签:选举,response,cluster,state,集群,节点,final,ES
From: https://www.cnblogs.com/allenwas3/p/18222681

相关文章

  • SEC504.1 Incident Response and Cyber Investigations 事件响应和网络调查 (LAB 6)
    注意,不建议“勤奋”练习并自我感动,比“怎么做”更重要的是“怎么查”。Bootcamp:LinuxOlympics训练营:Linux奥运会ï使用您的SlingshotLinuxVM完成Bootcamp模块LinuxOlympics–交互式、自定进度的活动,带有集成提示,可帮助您在Linux终端中完成常见任务ï培养......
  • 在kubernetes里使用seccomp限制容器的系统调用
    目录一.系统环境二.前言三.系统调用简介四.使用seccomp限制docker容器系统调用五.在kubernetes里使用seccomp限制容器的系统调用5.1配置seccomp允许pod进行所有系统调用5.2配置seccomp禁止pod进行所有系统调用5.3配置seccomp允许pod进行50个系统调用六.总结一.系统环境本文主......
  • 论文阅读笔记(十)——CRISPR-GPT: An LLM Agent for Automated Design of Gene-Editin
    论文阅读笔记(十)——CRISPR-GPT:AnLLMAgentforAutomatedDesignofGene-EditingExperiments目录论文阅读笔记(十)——CRISPR-GPT:AnLLMAgentforAutomatedDesignofGene-EditingExperimentsAbstract简介名词解释问题CRISPR-GPT概述MethodToolProvider......
  • 【稳定检索】2024年核能科学与材料、物理应用国际会议(NESMPA 2024)
    2024年核能科学与材料、物理应用国际会议2024InternationalConferenceonNuclearEnergyScienceandMaterials,PhysicalApplications【1】会议简介        2024年核能科学与材料、物理应用国际会议即将拉开帷幕,这是一场汇聚全球核能科学、材料研究及物理......
  • nuxt3中使用nprogress
    下载依赖npminprogressnpmi--save-dev@types/nprogress//引入ts类型声明,未使用ts可不用创建plugins文件夹plugins目录下创建nprogress.client.ts文件(新版本nuxt3中目录顶层文件会自动注册.client代表只在客户端执行)importNProgressfrom'nprogress';import......
  • Mistral 发布 Codestral,它的第一个代码生成人工智能模型,精通 80 多种编程语言
    Mistral是一家由微软支持、估值60亿美元的法国人工智能初创公司,它发布了第一个用于编码的生成式人工智能模型,名为Codestral。与其他代码生成模型一样,Codestral旨在帮助开发人员编写代码并与代码交互。Mistral在博客文章中解释说,它接受了80多种编程语言的培训,包括Py......
  • Gym-100520A Andrew Stankevich Contest 45 A 题解
    AnalogousSetsGym-100520ASol1.集合生成函数将可重集合\(M\)映射为生成函数:\[F(M)=\sum_{m\inM}(\#m)\cdotx^m\]如果\(M\)的元素在\(\mathbbN\)上取值,那么,\(F(M)\)是多项式。2.\(\theta\)算子\[\theta(F)=x\cdotF'\]其中\(F'=\frac{dF}{dx}\)......
  • 一个前后端都有的后台管理系统,使用nest.js和vue3
    今天介绍一个新的Vue后台管理框架,相比其他后台功能丰富管理系统,这个后台管理系统可以用干净简洁来形容——Nova-adminNova-adminNova-admin是一个基于Vue3、Vite5等最新技术的后台管理平台。用简单的方式实现完整功能,并尽可能的考虑代码规范,易读易理解无过度封装,方便二次开发......
  • PostgreSQL 小课推广-20240529
    目前PostgreSQL小课在持续更新中,PostgreSQL小课专栏新人优惠券到2024年6月1日到期,有需要的伙伴还请关注下。优惠券马上到期,领取券后,也就只需要30元(也就一杯咖啡的钱)就可以解锁专栏,先到先得。目前专栏的50元/年,后续到期不需要续费,等到专栏完成,会有一个......
  • 自己实现dubbo参数校验(类似RestFul 参数校验)
    1.场景:因为工作中经常需要做参数校验,在springboot项目中使用@Valid+@NotNull、@NotBlank…注解开发API接口非常丝滑,相反在开发RPC接口时却还是需要编写大量的参数判断,严重影响主业务流程的开发(公司目前用的是Dubbo2.7.2)且代码整洁度、风格都受到了挑战。基于以上原因萌生了写一......