首页 > 其他分享 >Kafka原理-分区leader选举

Kafka原理-分区leader选举

时间:2023-02-13 21:56:46浏览次数:39  
标签:副本 val 选举 分区 partition Kafka topicAndPartition leader

0.说明

kafka源码版本为1.0

 

1.分区状态

kafka源码定义了4种状态

NewPartition: 表示正在创建新的分区, 是一个中间状态,只是在Controller的内存中存了状态信息

OnlinePartition: 表示在线状态, 只有在线的分区才能提供服务.

OfflinePartition: 表示下线状态, 分区可能因为Broker宕机或者删除Topic等原因流转到这个状态, 下线后不能提供服务

NonExistentPartition: 表示分区不存在

 

2.选举源码分析

源码入口PartitionStateMachine#electLeaderForPartition

注释说明leader选举发生在OfflinePartition,OnlinePartition->OnlinePartition状态变更的时候

 /**
   * Invoked on the OfflinePartition,OnlinePartition->OnlinePartition state change.
   * It invokes the leader election API to elect a leader for the input offline partition
   * @param topic               The topic of the offline partition
   * @param partition           The offline partition
   * @param leaderSelector      Specific leader selector (e.g., offline/reassigned/etc.)
   */
  def electLeaderForPartition(topic: String, partition: Int, leaderSelector: PartitionLeaderSelector) {
    val topicAndPartition = TopicAndPartition(topic, partition)
    val stateChangeLog = stateChangeLogger.withControllerEpoch(controller.epoch)
    // handle leader election for the partitions whose leader is no longer alive
    stateChangeLog.trace(s"Started leader election for partition $topicAndPartition")
    try {
      var zookeeperPathUpdateSucceeded: Boolean = false
      var newLeaderAndIsr: LeaderAndIsr = null
      var replicasForThisPartition: Seq[Int] = Seq.empty[Int]
      while(!zookeeperPathUpdateSucceeded) {
// 01:从zk种获取分区元数据 val currentLeaderIsrAndEpoch = getLeaderIsrAndEpochOrThrowException(topic, partition) val currentLeaderAndIsr = currentLeaderIsrAndEpoch.leaderAndIsr val controllerEpoch = currentLeaderIsrAndEpoch.controllerEpoch
// 02:这里表示其他controller成为新的首领,旧的请求抛异常 if (controllerEpoch > controller.epoch) { val failMsg = s"Aborted leader election for partition $topicAndPartition since the LeaderAndIsr path was " + s"already written by another controller. This probably means that the current controller $controllerId went " + s"through a soft failure and another controller was elected with epoch $controllerEpoch." stateChangeLog.error(failMsg) throw new StateChangeFailedException(stateChangeLog.messageWithPrefix(failMsg)) } // 03:选举的实现 val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topicAndPartition, currentLeaderAndIsr) val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partition, leaderAndIsr, controller.epoch, currentLeaderAndIsr.zkVersion) newLeaderAndIsr = leaderAndIsr.withZkVersion(newVersion) zookeeperPathUpdateSucceeded = updateSucceeded replicasForThisPartition = replicas } val newLeaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(newLeaderAndIsr, controller.epoch) // 04:更新ControllerContext的leader信息(内存中的缓存) controllerContext.partitionLeadershipInfo.put(TopicAndPartition(topic, partition), newLeaderIsrAndControllerEpoch) stateChangeLog.trace(s"Elected leader ${newLeaderAndIsr.leader} for Offline partition $topicAndPartition") val replicas = controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition)) // 05:向broker添加LeaderAndIsr请求 brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition, newLeaderIsrAndControllerEpoch, replicas) } catch { //省略一些异常处理 } debug(s"After leader election, leader cache for $topicAndPartition is updated to ${controllerContext.partitionLeadershipInfo(topicAndPartition)}") }

实际选举leaderSelector.selectLeader(topicAndPartition, currentLeaderAndIsr)策略来自PartitionLeaderSelector,具体有以下四种

(1)OfflinePartitionLeaderSelector

触发场景:

  • Controller 重新加载
  • 脚本执行脏选举
  • Controller 监听到有Broker启动了
  • Controller 监听 LeaderAndIsrResponseReceived请求:
  • Controller 监听 UncleanLeaderElectionEnable请求:unclean.leader.election.enable设置为 true时

选举规则

找AR中第一个在线且在isr中的副本,若找不到,且unclean.leader.election.enable为true,找AR中第一个在线的副本。

 

(2)ReassignedPartitionLeaderSelector

触发场景:

  • 分区副本重分配:只有当之前的Leader副本在经过重分配之后不存在了,或者故障下线了才会触发

选举规则

找AR中第一个在线且在isr中的副本

 

(3)PreferredReplicaPartitionLeaderSelector

触发场景:

  • 自动定时执行优先副本选举任务:auto.leader.rebalance.enable=true (源码KafkaController#checkAndTriggerAutoLeaderRebalance)
  • Controller 重新加载的时候:先执行OfflinePartitionLeaderSelector再执行PreferredReplicaPartitionLeaderSelector (源码KafkaController#onControllerFailover)
  • 手动执行优先副本选举脚本:kafka-leader-election.sh 并且选择的模式是 PREFERRED (先写zk节点,后controller触发PreferredReplicaElectionListener)

选举规则

只有满足以下条件才会选举成功:是第一个副本 && 副本在线 && 副本在ISR列表中。

代码如下

class PreferredReplicaPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {

  logIdent = "[PreferredReplicaPartitionLeaderSelector]: "

  def selectLeader(topicAndPartition: TopicAndPartition,
                   currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
// 从内存中(AR)拿到第一个副本,就是首选副本 val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) val preferredReplica = assignedReplicas.head // 判断实际的leader是不是首选副本,若是就不需要选举了 val currentLeader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader if (currentLeader == preferredReplica) { throw new LeaderElectionNotNeededException("Preferred replica %d is already the current leader for partition %s" .format(preferredReplica, topicAndPartition)) } else { info("Current leader %d for partition %s is not the preferred replica.".format(currentLeader, topicAndPartition) + " Triggering preferred replica leader election") // 否则,首选副本在线并且在isr中,就选举其为新leader if (controllerContext.isReplicaOnline(preferredReplica, topicAndPartition) && currentLeaderAndIsr.isr.contains(preferredReplica)) { val newLeaderAndIsr = currentLeaderAndIsr.newLeader(preferredReplica) (newLeaderAndIsr, assignedReplicas) } else { throw new StateChangeFailedException(s"Preferred replica $preferredReplica for partition $topicAndPartition " + s"is either not alive or not in the isr. Current leader and ISR: [$currentLeaderAndIsr]") } } } }

 

 

(4)ControlledShutdownLeaderSelector

触发场景:

  • Broker关机的时候:当Broker关机的过程中,会向Controller发起一个请求, 让它重新发起一次选举, 把在所有正在关机(也就是发起请求的那个Broker,或其它同时正在关机的Broker) 的Broker里面的副本给剔除掉。

选举规则

在AR中找到第一个满足条件的副本:副本在线 && 副本在ISR中 && 副本所在的Broker不在正在关闭的Broker集合中 。

 

标签:副本,val,选举,分区,partition,Kafka,topicAndPartition,leader
From: https://www.cnblogs.com/ouym/p/17117963.html

相关文章

  • kafka 常见命令以及增加topic的分区数
    基础命令1.创建topickafka-topics.sh--bootstrap-server${kafkaAddress}--create--topic${topicName}--partitions${partipartions}--replication-factor${rep......
  • kafka删除topic清空数据
    一般情况下,是不会删除数据的。到达一定时间后,kafka会自动删除。如果一定要删除可以删除topic在重建topic了No.1:如果需要被删除topic此时正在被程序produce和consum......
  • kafka笔记
    1、概念:kafka是一个用scala语言编写的,分布式、支持分区(partition)、多副本(replica),基于zookeeper协调分布式消息系统,它最大的特性就是可以实时处理大量数据以满足各种需......
  • go连接kafka
    Part1前言本文主要介绍如何通过go语言连接kafka。这里采用的是sarama库。​​https://github.com/Shopify/sarama​​Part2库的安装goget-ugithub.com/Shopify/saramago......
  • kafka如何开启kerberos认证
    参考:     https://www.cnblogs.com/wuyongyin/p/15624452.html kerberos基本原理       https://www.cnblogs.com/wuyongyin/p/15634397.html kerb......
  • 05-KafkaConsumer
    1.工作流程1.1消费者组概述ConsumerGroup(CG):由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupId相同。消费者与消费组这种模型可以让整体的消费......
  • 04-KafkaBroker
    1.工作流程1.1Zk存储的信息1.2总体工作流程step1~6:step7~11:模拟broker上下线,Zk中的数据变化:(1)查看/kafka/brokers/ids路径上的节点[zk:localhost:2181(......
  • 02-KafkaProducer
    1.发送消息流程1.1整体架构整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和Sender线程(发送线程)。在主线程中由KafkaProducer创建消息,然后通过可能的......
  • 搭建kafka
    1、安装jdkjava-version查看jdk版本yum-ylistjava*查看已安装和可安装的软件包yum-yinstalljava安装新版本 2、安装zookeeper(单机模式)a) 下载地址http......
  • rocketMq和kafka对比
    为什么在RocketMQ和kafka中选型在单机同步发送的场景下,Kafka>RocketMQ,Kafka的吞吐量高达17.3w/s,RocketMQ吞吐量在11.6w/s。kafka高性能原因生产者Kafka会把收到的消息......