如果需要一个搜索工具,lucene 完全可以胜任,但是网上大火的却是 Elasticsearch,它对 lucene 进行了分布式的赋能,lucene 解决搜索底层的数据存储,Elasticsearch 提供接口和分布式能力。而集群则是分布式的基础,那么 Elasticsearch 是如何组建集群的呢?本文深入细节,详细分解 Elasticsearch 选举过程的每个关键步骤,结合文档和代码片段,来构建一个全面的理解框架。
1. 初始化与配置
启动时设置: 当 ES 节点启动时,会初始化 GatewayMetaState,包括设置当前任期、最后接受的配置等状态信息,节点的 uuid 等,节点选举相关的配置信息保存在 lucene 文件中(详见 LucenePersistedState)。
2. 节点发现与连接
SeedHostsResolver: 解析种子主机地址,帮助发现集群中的其他节点。
PeerFinder: 基于 elasticsearch.yml 中配置的种子节点信息,发现并连接到集群中的其他节点,为选举和状态传播准备通信渠道。
3. 引导主节点选举
ClusterBootstrapService: 解析 cluster.initial_master_nodes 配置,当发现的节点数超过 initial_master_nodes 一半,则节点满足了选举的条件。
4. 预投票(Pre-Voting)
PreVoteCollector: 在正式选举前,潜在的 master 节点通过 PreVoteCollector 向集群其他节点发送预投票请求,以评估是否有足够的支持成为领导者。
预投票请求: 通过 REQUEST_PRE_VOTE_ACTION_NAME 发起。
响应处理: 收集预投票响应,如果预投票结果表明有足够的支持,将触发正式选举。
5. 正式选举
JoinHelper: 发送正式的选举请求(JoinRequest),请求其他节点的投票。
投票逻辑: 其他节点根据当前集群状态、健康状况、任期号等因素决定是否投票给请求节点。
投票计数: 收集投票,依据 ElectionStrategy 判断是否达到多数(超过半数),选举出领导者。
6. 领导者确认与状态传播
CoordinationState: 成为领导者后,节点将通过 CoordinationState 处理客户端的集群状态更新请求,准备集群状态的发布。
PublicationTransportHandler: 负责将新集群状态发布到集群中的其他节点。
PublishRequest: 领导者向所有节点广播包含新状态的PublishRequest。
AckListener: 收集其他节点的确认响应,确保多数节点已经接受新状态。
7. 状态应用与确认
ClusterApplier: 应用新的集群状态,确保集群内所有节点状态一致。
ApplyCommitRequest: 领导者在收到足够数量的确认后,向所有节点发送ApplyCommitRequest,通知它们应用新状态。
CoordinatorPublication: 处理状态的提交和确认,确保状态最终被所有节点应用。
8. 健康监测与故障恢复
LeaderChecker: 监控领导者健康,一旦领导者不可用,触发重新选举。
FollowersChecker: 监控跟随者状态,确保集群稳定。
9. 特殊节点处理
VotingOnlyNodePlugin: 对于只参与投票的节点,有专门的处理逻辑,比如不参与数据发布等,确保资源优化和集群稳定性。
重要的配置项:
discovery.seed_hosts | 种子节点的地址,集群中的节点通过种子节点,最终发现所有节点 |
cluster.initial_master_nodes |
这个参数非常重要,如果 ES 集群初次启动,必须设置这个属性,如果没有设置,则整个集群无法完成选举,准确的说法是在预投票阶段无法满足选举人过半的要求,从而无法选举出 master。 一旦当集群成功完成选举后,再次启动时,则不再需要设置该属性了,因为此后,节点可以从磁盘配置文件中获取集群中的节点信息。 |
discovery.type | 集群的类型,multi-node/single-node,默认是 multi-node |
代码片段:
1.
/** * Loads (and maybe upgrades) cluster metadata at startup, and persistently stores cluster metadata for future restarts. * * When started, ensures that this version is compatible with the state stored on disk, and performs a state upgrade if necessary. Note that * the state being loaded when constructing the instance of this class is not necessarily the state that will be used as {@link * ClusterState#metadata()} because it might be stale or incomplete. Master-eligible nodes must perform an election to find a complete and * non-stale state, and master-ineligible nodes receive the real cluster state from the elected master after joining the cluster. */ org.elasticsearch.gateway.GatewayMetaState
2.
org.elasticsearch.discovery.PeerFinder#PeerFinder org.elasticsearch.cluster.coordination.Coordinator.CoordinatorPeerFinder
3.
// org.elasticsearch.cluster.coordination.ClusterBootstrapService#onFoundPeersUpdated @Override public void onFoundPeersUpdated() { final Set<DiscoveryNode> nodes = getDiscoveredNodes(); if (bootstrappingPermitted.get() && transportService.getLocalNode().isMasterNode() && bootstrapRequirements.isEmpty() == false && isBootstrappedSupplier.getAsBoolean() == false) { final Tuple<Set<DiscoveryNode>, List<String>> requirementMatchingResult; try { requirementMatchingResult = checkRequirements(nodes); } catch (IllegalStateException e) { logger.warn("bootstrapping cancelled", e); bootstrappingPermitted.set(false); return; } final Set<DiscoveryNode> nodesMatchingRequirements = requirementMatchingResult.v1(); final List<String> unsatisfiedRequirements = requirementMatchingResult.v2(); logger.trace( "nodesMatchingRequirements={}, unsatisfiedRequirements={}, bootstrapRequirements={}", nodesMatchingRequirements, unsatisfiedRequirements, bootstrapRequirements ); if (nodesMatchingRequirements.contains(transportService.getLocalNode()) == false) { logger.info( "skipping cluster bootstrapping as local node does not match bootstrap requirements: {}", bootstrapRequirements ); bootstrappingPermitted.set(false); return; } if (nodesMatchingRequirements.size() * 2 > bootstrapRequirements.size()) { startBootstrap(nodesMatchingRequirements, unsatisfiedRequirements); } } }
4.
// org.elasticsearch.cluster.coordination.PreVoteCollector#handlePreVoteRequest private PreVoteResponse handlePreVoteRequest(final PreVoteRequest request) { updateMaxTermSeen.accept(request.getCurrentTerm()); Tuple<DiscoveryNode, PreVoteResponse> state = this.state; assert state != null : "received pre-vote request before fully initialised"; final DiscoveryNode leader = state.v1(); final PreVoteResponse response = state.v2(); final StatusInfo statusInfo = nodeHealthService.getHealth(); if (statusInfo.getStatus() == UNHEALTHY) { String message = "rejecting " + request + " on unhealthy node: [" + statusInfo.getInfo() + "]"; logger.debug(message); throw new NodeHealthCheckFailureException(message); } if (leader == null) { return response; } if (leader.equals(request.getSourceNode())) { // This is a _rare_ case where our leader has detected a failure and stepped down, but we are still a follower. It's possible // that the leader lost its quorum, but while we're still a follower we will not offer joins to any other node so there is no // major drawback in offering a join to our old leader. The advantage of this is that it makes it slightly more likely that the // leader won't change, and also that its re-election will happen more quickly than if it had to wait for a quorum of followers // to also detect its failure. return response; } throw new CoordinationStateRejectedException("rejecting " + request + " as there is already a leader"); } // org.elasticsearch.cluster.coordination.PreVoteCollector.PreVotingRound#handlePreVoteResponse private void handlePreVoteResponse(final PreVoteResponse response, final DiscoveryNode sender) { if (isClosed.get()) { logger.debug("{} is closed, ignoring {} from {}", this, response, sender); return; } updateMaxTermSeen.accept(response.getCurrentTerm()); if (response.getLastAcceptedTerm() > clusterState.term() || (response.getLastAcceptedTerm() == clusterState.term() && response.getLastAcceptedVersion() > clusterState.version())) { logger.debug("{} ignoring {} from {} as it is fresher", this, response, sender); return; } preVotesReceived.put(sender, response); // create a fake VoteCollection based on the pre-votes and check if there is an election quorum final VoteCollection voteCollection = new VoteCollection(); final DiscoveryNode localNode = clusterState.nodes().getLocalNode(); final PreVoteResponse localPreVoteResponse = getPreVoteResponse(); preVotesReceived.forEach( (node, preVoteResponse) -> voteCollection.addJoinVote( new Join( node, localNode, preVoteResponse.getCurrentTerm(), preVoteResponse.getLastAcceptedTerm(), preVoteResponse.getLastAcceptedVersion() ) ) ); if (electionStrategy.isElectionQuorum( clusterState.nodes().getLocalNode(), localPreVoteResponse.getCurrentTerm(), localPreVoteResponse.getLastAcceptedTerm(), localPreVoteResponse.getLastAcceptedVersion(), clusterState.getLastCommittedConfiguration(), clusterState.getLastAcceptedConfiguration(), voteCollection ) == false) { logger.debug("{} added {} from {}, no quorum yet", this, response, sender); return; } if (electionStarted.compareAndSet(false, true) == false) { logger.debug("{} added {} from {} but election has already started", this, response, sender); return; } logger.debug("{} added {} from {}, starting election", this, response, sender); startElection.run(); }
标签:选举,response,cluster,state,集群,节点,final,ES From: https://www.cnblogs.com/allenwas3/p/18222681