Kafka问题收集
kafka分区分配策略
生产者发送消息分区分配策略
kafka 里的 partitioner (分区器)来负责客户端生产层面的负载均衡。
- 如果提供 key 值
partitioner 会根据 key 的哈希值(采用Murmur2Hash算法)对 partition 数量取模,根据该值决定把消息发送到哪个 partition 上,(hash(key) % numpartitons)。 - 如果没有提供 key 值
key 为 空(null,无值)时,kafka 2.4 之前有一种策略,轮询算法, 2. 4 之后,又提供了因为一种算法 黏性分区策略。
key 为 null 时,第一次调用时会随机生成一个整数,后面每次在这个整数上自增,然后这个值 对 partition 数量取模,这个就是轮询算法 - roundrobin。
kafka 2.4 之前默认的策略就是这个轮询主题的所有分区,将消息以轮询的方式发送到每一个分区上。
kafka 2.4 之后,社区又引进了 Sticky Partitioning Strategy(黏性分区策略),该策略能显著降低指定分区过程中的延时。具体信息看这里 KIP-480: Sticky Partitioner - 如果提供了 partition:
如果你指定了 partition分区,那么就用指定的这个分区,不用 hash(key) 的分区算法。
消费者消费分区分配策略
Range Strategy
Range策略是针对topic而言的,在进行分区分配时,为了尽可能保证所有consumer均匀的消费分区,会对同一个topic中的partition按照序号排序,并对consumer按照字典顺序排序。
然后为每个consumer划分固定的分区范围,如果不够平均分配,那么排序靠前的消费者会被多分配分区。具体就是将partition的个数除于consumer线程数来决定每个consumer线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多分配分区。
通过下面公式更直观:
假设n = 分区数 / 消费者数量,m = 分区数 % 消费者线程数量,那么前m个消费者每个分配n+1个分区,后面的(消费者线程数量 - m)个消费者每个分配n个分区。
举个例子:
一个消费组CG1中有C0和C1两个consumer,消费Kafka中的主题t1。t1的分区数为10,并且C1的num.streams为1,C2的num.streams为2。
经过排序后,分区为:0, 1, 2, 3, 4, 5, 6, 7, 8, 9;CG1中消费者线程为C0-0、C1-0、C1-1。然后因为 10除3除不尽,那么消费者线程C0-0将会多分配分区,所以分区分配之后结果如下:
C0-0 将消费0、1、2、3分区
C1-0 将消费4、5、6分区
C1-1 将消费7、8、9分区
当存在有2个Kafka topic(t1和t2),它们都有有10个partition,那么最后分区结果为:
C0-0 将消费t1主题的0、1、2、3分区以及t2主题的0、1、2、3分区
C1-0 将消费t1主题的4、5、6分区以及t2主题的4、5、6分区
C2-1 将消费t1主题的7、8、9分区以及t2主题的7、8、9分区
如上场景,随着topic的增多,那么针对每个topic,消费者C0-0都将多消费1个分区,topic越多比如为N个,C0-0消费的分区会比其他消费者明显多消费N个分区。
可以明显的看到这样的分配并不均匀,有可能会出现部分消费者过载的情况,这就是Range分区策略的一个很明显的弊端。
num.streams为消费端处理任务线程数,默认为1
RoundRobin Strategy
如果同一个消费组内所有的消费者的订阅信息都是相同的,那么RoundRobinAssignor策略的分区分配会是均匀的。举例,假设消费组中有2个消费者C0和C1,都订阅了主题t0和t1,并且每个主题都有3个分区,那么所订阅的所有分区可以标识为:t0p0、t0p1、t0p2、t1p0、t1p1、t1p2。最终的分配结果为:
消费者C0:t0p0、t0p2、t1p1
消费者C1:t0p1、t1p0、t1p2
如果同一个消费组内的消费者所订阅的信息是不相同的,那么在执行分区分配的时候就不是完全的轮询分配,有可能会导致分区分配的不均匀。如果某个消费者没有订阅消费组内的某个topic,那么在分配分区的时候此消费者将分配不到这个topic的任何分区。
举例,假设消费组内有3个消费者C0、C1和C2,它们共订阅了3个主题:t0、t1、t2,这3个主题分别有1、2、3个分区,即整个消费组订阅了t0p0、t1p0、t1p1、t2p0、t2p1、t2p2这6个分区。具体而言,消费者C0订阅的是主题t0,消费者C1订阅的是主题t0和t1,消费者C2订阅的是主题t0、t1和t2,那么最终的分配结果为:
消费者C0:t0p0
消费者C1:t1p0
消费者C2:t1p1、t2p0、t2p1、t2p2
可以看到RoundRobinAssignor策略也不是十分完美,这样分配其实并不是最优解,因为完全可以将分区t1p1分配给消费者C1。
使用RoundRobin策略必须满足以下条件:
-
同一个Consumer Group里面的所有consumer的num.streams必须相等
-
每个consumer订阅的topic必须相同
Sticky Strategy
sticky
这个单词可以翻译为“粘性的”,Kafka从0.11.x版本开始引入这种分配策略,它主要有两个目的:
-
分区的分配要尽可能的均匀;
-
分区的分配尽可能的与上次分配的保持相同。
当两者发生冲突时,第一个目标优先于第二个目标。鉴于这两个目标,StickyAssignor策略的具体实现要比RangeAssignor和RoundRobinAssignor这两种分配策略要复杂很多。
Kafka分区分配策略(2)——RoundRobinAssignor和StickyAssignor
broker端的分区分配
这里的分区分配是指创建主题时的分区副本分配方案,即在哪个broker中创建哪些分区的副本。分区分配是否均衡会影响到Kafka整体的负载均衡,具体还会牵涉到优先副本等概念。
在创建主题时,如果使用了replica-assignment参数,那么就按照指定的方案来进行分区副本的创建;如果没有使用replica-assignment参数,那么就需要按照内部的逻辑来计算分配方案了。使用kafka-topics.sh脚本创建主题时的内部分配逻辑按照机架信息划分成两种策略:未指定机架信息和指定机架信息。如果集群中所有的broker节点都没有配置broker.rack参数,或者使用disable-rack-aware参数来创建主题,那么采用的就是未指定机架信息的分配策略,否则采用的就是指定机架信息的分配策略。
Replica副本
Kafa为了提高分区的可靠性,又设计了副本机制。我们创建Topic的时候,通过指定replication-factor副本因子,来确定Topic的副本数。当然,副本因子数必须小于等于节点数,否则会报错。这样就可以保证,绝对不会有一个分区的两个副本分布在同一个节点上,不然副本机制也失去了备份的意义了。
如图所示,创建了一个3个分区3个副本的Topic a3part3rep,被均匀分布到了3个Broker节点上,每个Broker节点互为备份。
这些所有的副本分为两种角色,Leader对外提供读写服务。Follower唯一的任务就是从Leader异步拉取数据,图中红色的副本为Leade,也被均匀分布在各个节点上,可以保证读写均匀,这样的设计也称为单调读一致性。
Segment分段
Kakfa为了防止Log不断追加导致文件过大,导致检索消息效率变低,一个Partition超出一定大小的时候,就被切割为多个Segment来组织数据。在磁盘上,每个Segment由一个log文件和2个index文件组成。
如图所示,这三个文件是成套出现的。其中.index是用来存储Consumer的Offset偏移量的索引文件,.timeindex是用来存储消息时间戳的索引文件,log文件就是用来存储具体的数据文件。
以切割时记录的Offset值作为文件的名字。它的文件结构是这样的,如图所示:
Index索引
前面我们讲到Kafka设计了两种索引,一种是偏移量索引文件,记录的是Offset和消息在Log文件中的位置映射关系。一种是时间戳索引文件,记录的是时间戳和Offset的关系。为了提高检索效率Kafka并不会为每一条消息都会建立索引,而是采用稀疏索引。也就是说隔一批消息才产生一条索引记录。如图所示:
Kafka工作流程
到目前为止,我们讨论了Kafka的核心该概念。现在让我们将目光转向Kafka的工作流程上。Kafka是由分裂为一个或多个partition的topic的集合。 Kafka中的partition可以认为是消息的线性排序序列,其中每个消息由它们的索引(称为offset)来标识。 Kafka集群中的所有数据是每个partition数据分区的并集。 新写入的消息写在分区的末尾,消息由消费者顺序读取。通过将消息复制到不同的Broker来提供持久性。Kafka以快速,可靠,持久,容错和零停机的方式提供基于pub-sub
和队列模型的消息系统。 在这两种情况下,生产者只需将消息发送到topic,消费者可以根据自己的需要选择任何一种类型的消息传递系统。
Pub-Sub 消息模型工作流程
下面是 Pub-Sub 消息模型的工作流程
- 生产者定期向topic发送消息。
- Kafka broker 根据配置将topic的消息存储到指定的partition上。Kafka确保所有的消息均匀分布在topic的所有partition上。如果producer发送了两条消息,并且该topic有两个partition,则每个partition会有一条消息。
- Consumer 订阅指定的topic。
- 一旦消费者订阅了topic,Kafka将向消费者提供topic的当前
offset
,并且还将offset
保存在Zookeeper中。 - 消费者将定期请求Kafka(如100 Ms)新消息。
- 一旦Kafka从生产者接收到消息,它将这些消息转发给消费者。
- 消费者将收到消息并进行处理。
- 一旦消息被处理,消费者将向Kafka broker发送确认。
- 一旦Kafka收到确认,它将
offset
更改为新值,并在Zookeeper中更新它。 由于offset
在Zookeeper中被维护,消费者可以正确地读取下一条消息,即使服务器宕机后重启。 - 以上流程将重复,直到消费者停止请求。
- 消费者可以随时回退/跳转到某个topic的期望
offset
处,并读取所有后续消息。
队列消息模型工作流程 & Consumer Group
在基于队列的消息系统中,取代单个消费者的是订阅了相同topic的一群拥有相同Group ID
的消费者集群。简单来说,订阅具有相同“组ID”的主题的消费者被认为是单个组,并且消息在它们之间共享。 让我们检查这个系统的实际工作流程。
- 生产者定期向topic发送消息。
- Kafka broker 根据配置将topic的消息存储到指定的partition上。
- 单个consumer以名为
Group-1
的Group ID
订阅名为Topic-01
的topic。 - Kafka 会以和Pub-Sub消息模型相同的方式和consumer进行交互知道新的消费者以同样的Group ID加入到消费者分组中。
- 一旦新的消费者加入后,Kafka将操作切换到共享模式,将所有topic的消息在两个消费者间进行均衡消费。这种共享行为直到加入的消费者结点数目达到该topic的分区数。
- 一旦消费者的数目大于topic的分区数,则新的消费者不会收到任何消息直到已经存在的消费者取消订阅。出现这种情况是因为Kafka中的每个消费者将被分配至少一个分区,并且一旦所有分区被分配给现有消费者,新消费者将必须等待。
该功能被称为 “Consumer Group”。以同样的方式,Kafka将以非常简单和高效的方式提供这两种系统功能。
ZooKeeper的角色
Apache Kafka的一个关键依赖是Apache Zookeeper,它是一个分布式配置和同步服务。 Zookeeper是Kafka代理和消费者之间的协调接口。 Kafka服务器通过Zookeeper集群共享信息。 Kafka在Zookeeper中存储基本元数据,例如关于主题,代理,消费者偏移(队列读取器)等的信息。由于所有关键信息存储在Zookeeper中,并且它通常在其整个集群上复制此数据,因此Zookeeper的故障不会影响Kafka集群的状态。一旦Zookeeper重新启动, Kafka将恢复状态。 这为Kafka带来了零停机时间。 Kafka代理之间的领导者选举也通过使用Zookeeper在领导失败的情况下完成。
要了解更多关于Zookeeper,请参考http://www.tutorialspoint.com/zookeeper/
Kafka中的控制器Controller ()
Broker 在启动时,会尝试去 ZooKeeper 中创建 /controller 节点。Kafka 当前选举控制器的规则是:第一个成功创建 /controller 节点的 Broker 会被指定为控制器。
Controller Broker的主要职责有很多,主要是一些管理行为,主要包括以下几个方面:
- 创建、删除主题,增加分区并分配leader分区
- 集群Broker管理(新增 Broker、Broker 主动关闭、Broker 故障)
- preferred leader选举
- 分区重分配
具体职责
具备控制器身份的broker需要比其他普通的broker多一份职责,具体细节如下:
- 监听partition相关的变化。为Zookeeper中的/admin/reassign_partitions节点注册PartitionReassignmentListener,用来处理分区重分配的动作。为Zookeeper中的/isr_change_notification节点注册IsrChangeNotificetionListener,用来处理ISR集合变更的动作。为Zookeeper中的/admin/preferred-replica-election节点添加PreferredReplicaElectionListener,用来处理优先副本的选举动作。
- 监听topic相关的变化。为Zookeeper中的/brokers/topics节点添加TopicChangeListener,用来处理topic增减的变化;为Zookeeper中的/admin/delete_topics节点添加TopicDeletionListener,用来处理删除topic的动作。
- 监听broker相关的变化。为Zookeeper中的/brokers/ids/节点添加BrokerChangeListener,用来处理broker增减的变化。
- 从Zookeeper中读取获取当前所有与topic、partition以及broker有关的信息并进行相应的管理。对于所有topic所对应的Zookeeper中的/brokers/topics/[topic]节点添加PartitionModificationsListener,用来监听topic中的分区分配变化。
- 启动并管理分区状态机和副本状态机。
- 更新集群的元数据信息。
- 如果参数auto.leader.rebalance.enable设置为true,则还会开启一个名为“auto-leader-rebalance-task”的定时任务来负责维护分区的优先副本的均衡。
控制器选举
Kafka中的控制器选举的工作依赖于Zookeeper,成功竞选为控制器的broker会在Zookeeper中创建/controller这个临时(EPHEMERAL)节点,此临时节点的内容参考如下:
{"version":1,"brokerid":0,"timestamp":"1529210278988"}
其中version在目前版本中固定为1,brokerid表示称为控制器的broker的id编号,timestamp表示竞选称为控制器时的时间戳。
在任意时刻,集群中有且仅有一个控制器。每个broker启动的时候会去尝试去读取/controller节点的brokerid的值,如果读取到brokerid的值不为-1,则表示已经有其它broker节点成功竞选为控制器,所以当前broker就会放弃竞选;如果Zookeeper中不存在/controller这个节点,或者这个节点中的数据异常,那么就会尝试去创建/controller这个节点,当前broker去创建节点的时候,也有可能其他broker同时去尝试创建这个节点,只有创建成功的那个broker才会成为控制器,而创建失败的broker则表示竞选失败。每个broker都会在内存中保存当前控制器的brokerid值,这个值可以标识为activeControllerId。
Zookeeper中还有一个与控制器有关的/controller_epoch节点,这个节点是持久(PERSISTENT)节点,节点中存放的是一个整型的controller_epoch值。controller_epoch用于记录控制器发生变更的次数,即记录当前的控制器是第几代控制器,我们也可以称之为“控制器的纪元”。controller_epoch的初始值为1,即集群中第一个控制器的纪元为1,当控制器发生变更时,没选出一个新的控制器就将该字段值加1。每个和控制器交互的请求都会携带上controller_epoch这个字段,如果请求的controller_epoch值小于内存中的controller_epoch值,则认为这个请求是向已经过期的控制器所发送的请求,那么这个请求会被认定为无效的请求。如果请求的controller_epoch值大于内存中的controller_epoch值,那么则说明已经有新的控制器当选了。由此可见,Kafka通过controller_epoch来保证控制器的唯一性,进而保证相关操作的一致性。****
处理集群中下线的Broker
当某个Broker节点由于故障离开Kafka群集时,则存在于该Broker的leader分区将不可用(由于客户端仅对leader分区进行读写操作)。为了最大程度地减少停机时间,需要快速找到替代的leader分区。
Controller Broker可以对失败的Broker做出响应,Controller Broker可以从Zookeeper监听(zookeeper watch)中获取通知信息,ZooKeeper 赋予客户端监控 znode 变更的能力,即所谓的 Watch 通知功能。一旦 znode 节点被创建、删除,子节点数量发生变化,抑或是 znode 所存的数据本身变更,ZooKeeper 会通过节点变更监听器 (ChangeHandler) 的方式显式通知客户端。
每个 Broker 启动后,会在zookeeper的 /Brokers/ids 下创建一个临时 znode。当 Broker 宕机或主动关闭后,该 Broker 与 ZooKeeper 的会话结束,这个 znode 会被自动删除。同理,ZooKeeper 的 Watch 机制将这一变更推送给控制器,这样控制器就能知道有 Broker 关闭或宕机了,从而进行后续的协调操作。
Controller将收到通知并对此采取行动,决定哪些Broker上的分区成为leader分区,然后,它会通知每个相关的Broker,要么将Broker上的主题分区变成leader,要么通过LeaderAndIsr
请求从新的leader分区中复制数据。
处理新加入到集群中的Broker
通过将Leader分区副本均匀地分布在集群的不同Broker上,可以保障集群的负载均衡。在Broker发生故障时,某些Broker上的分区副本会被选举为leader,会造成一个Broker上存在多个leader分区副本的情况,由于客户端只与leader分区副本交互,所以这会给Broker增加额外的负担,并损害集群的性能和运行状况。因此,尽快恢复平衡对集群的健康运行是有益的。
Kafka认为leader分区副本最初的分配(每个节点都处于活跃状态)是均衡的。这些被最初选中的分区副本就是所谓的首选领导者(preferred leaders)。由于Kafka还支持机架感知的leader选举(rack-aware leader election) ,即尝试将leader分区和follower分区放置在不同的机架上,以增加对机架故障的容错能力。因此,leader分区副本的存在位置会对集群的可靠性产生影响。
默认情况下auto.leader.rebalance.enabled为true,表示允许 Kafka 定期地对一些 Topic 分区进行Leader 重选举。大部分情况下,Broker的失败很短暂,这意味着Broker通常会在短时间内恢复。所以当节点离开群集时,与其相关联的元数据并不会被立即删除。
当Controller注意到Broker已加入集群时,它将使用Broker ID来检查该Broker上是否存在分区,如果存在,则Controller通知新加入的Broker和现有的Broker,新的Broker上面的follower分区再次开始复制现有leader分区的消息。为了保证负载均衡,Controller会将新加入的Broker上的follower分区选举为leader分区。
上面提到的选Leader分区,严格意义上是换Leader分区,为了达到负载均衡,可能会造成原来正常的Leader分区被强行变为follower分区。换一次 Leader 代价是很高的,原本向 Leader分区A(原Leader分区) 发送请求的所有客户端都要切换成向 B (新的Leader分区)发送请求,建议你在生产环境中把这个参数设置成 false。
同步副本(in-sync replica ,ISR)列表
ISR中的副本都是与Leader进行同步的副本,所以不在该列表的follower会被认为与Leader是不同步的. 那么,ISR中存在是什么副本呢?首先可以明确的是:Leader副本总是存在于ISR中。 而follower副本是否在ISR中,取决于该follower副本是否与Leader副本保持了“同步”。
始终保证拥有足够数量的同步副本是非常重要的。要将follower提升为Leader,它必须存在于同步副本列表中。每个分区都有一个同步副本列表,该列表由Leader分区和Controller进行更新。
选择一个同步副本列表中的分区作为leader 分区的过程称为clean leader election。注意,这里要与在非同步副本中选一个分区作为leader分区的过程区分开,在非同步副本中选一个分区作为leader的过程称之为unclean leader election。由于ISR是动态调整的,所以会存在ISR列表为空的情况,通常来说,非同步副本落后 Leader 太多,因此,如果选择这些副本作为新 Leader,就可能出现数据的丢失。毕竟,这些副本中保存的消息远远落后于老 Leader 中的消息。在 Kafka 中,选举这种副本的过程可以通过Broker 端参数 **unclean.leader.election.enable **控制是否允许 Unclean 领导者选举。开启 Unclean 领导者选举可能会造成数据丢失,但好处是,它使得分区 Leader 副本一直存在,不至于停止对外提供服务,因此提升了高可用性。反之,禁止 Unclean Leader 选举的好处在于维护了数据的一致性,避免了消息丢失,但牺牲了高可用性。分布式系统的CAP理论说的就是这种情况。
不幸的是,unclean leader election的选举过程仍可能会造成数据的不一致,因为同步副本并不是完全同步的。由于复制是异步完成的,因此无法保证follower可以获取最新消息。比如Leader分区的最后一条消息的offset是100,此时副本的offset可能不是100,这受到两个参数的影响:
- replica.lag.time.max.ms:同步副本滞后与leader副本的时间
- zookeeper.session.timeout.ms:与zookeeper会话超时时间
脑裂
如果controller Broker 挂掉了,Kafka集群必须找到可以替代的controller,集群将不能正常运转。这里面存在一个问题,很难确定Broker是挂掉了,还是仅仅只是短暂性的故障。但是,集群为了正常运转,必须选出新的controller。如果之前被取代的controller又正常了,他并不知道自己已经被取代了,那么此时集群中会出现两台controller。
其实这种情况是很容易发生。比如,某个controller由于GC而被认为已经挂掉,并选择了一个新的controller。在GC的情况下,在最初的controller眼中,并没有改变任何东西,该Broker甚至不知道它已经暂停了。因此,它将继续充当当前controller,这是分布式系统中的常见情况,称为脑裂。
假如,处于活跃状态的controller进入了长时间的GC暂停。它的ZooKeeper会话过期了,之前注册的/controller
节点被删除。集群中其他Broker会收到zookeeper的这一通知。
由于集群中必须存在一个controller Broker,所以现在每个Broker都试图尝试成为新的controller。假设Broker 2速度比较快,成为了最新的controller Broker。此时,每个Broker会收到Broker2成为新的controller的通知,由于Broker3正在进行”stop the world”的GC,可能不会收到Broker2成为最新的controller的通知。
等到Broker3的GC完成之后,仍会认为自己是集群的controller,在Broker3的眼中好像什么都没有发生一样。
现在,集群中出现了两个controller,它们可能一起发出具有冲突的命令,就会出现脑裂的现象。如果对这种情况不加以处理,可能会导致严重的不一致。所以需要一种方法来区分谁是集群当前最新的Controller。
Kafka是通过使用epoch number(纪元编号,也称为隔离令牌)来完成的。epoch number只是单调递增的数字,第一次选出Controller时,epoch number值为1,如果再次选出新的Controller,则epoch number将为2,依次单调递增。
每个新选出的controller通过Zookeeper 的条件递增操作获得一个全新的、数值更大的epoch number 。其他Broker 在知道当前epoch number 后,如果收到由controller发出的包含较旧(较小)epoch number的消息,就会忽略它们,即Broker根据最大的epoch number来区分当前最新的controller。
上图,Broker3向Broker1发出命令:让Broker1上的某个分区副本成为leader,该消息的epoch number值为1。于此同时,Broker2也向Broker1发送了相同的命令,不同的是,该消息的epoch number值为2,此时Broker1只听从Broker2的命令(由于其epoch number较大),会忽略Broker3的命令,从而避免脑裂的发生。
Kafka中的ReplicaManager
分区数是不是越多越好?
客户端/服务器端需要使用的内存就越多
Kafka0.8.2之后,在客户端producer有个参数batch.size,默认是16KB。它会为每个分区缓存消息,一旦满了就打包将消息批量发出。看上去这是个能够提升性能的设计。不过很显然,因为这个参数是分区级别的,如果分区数越多,这部分缓存所需的内存占用也会更多。假设你有10000个分区,按照默认设置,这部分缓存需要占用约157MB的内存。而consumer端呢?我们抛开获取数据所需的内存不说,只说线程的开销。如果还是假设有10000个分区,同时consumer线程数要匹配分区数(大部分情况下是最佳的消费吞吐量配置)的话,那么在consumer client就要创建10000个线程,也需要创建大约10000个Socket去获取分区数据。这里面的线程切换的开销本身已经不容小觑了。
服务器端的开销也不小,如果阅读Kafka源码的话可以发现,服务器端的很多组件都在内存中维护了分区级别的缓存,比如controller,FetcherManager等,因此分区数越多,这种缓存的成本就越大。
文件句柄的开销
每个分区在底层文件系统都有属于自己的一个目录。该目录下通常会有两个文件: base_offset.log和base_offset.index。Kafak的controller和ReplicaManager会为每个broker都保存这两个文件句柄(file handler)。很明显,如果分区数越多,所需要保持打开状态的文件句柄数也就越多,最终可能会突破你的ulimit -n的限制。
降低高可用性
Kafka通过副本(replica)机制来保证高可用。具体做法就是为每个分区保存若干个副本(replica_factor指定副本数)。每个副本保存在不同的broker上。期中的一个副本充当leader 副本,负责处理producer和consumer请求。其他副本充当follower角色,由Kafka controller负责保证与leader的同步。如果leader所在的broker挂掉了,contorller会检测到然后在zookeeper的帮助下重选出新的leader——这中间会有短暂的不可用时间窗口,虽然大部分情况下可能只是几毫秒级别。但如果你有10000个分区,10个broker,也就是说平均每个broker上有1000个分区。此时这个broker挂掉了,那么zookeeper和controller需要立即对这1000个分区进行leader选举。比起很少的分区leader选举而言,这必然要花更长的时间,并且通常不是线性累加的。如果这个broker还同时是controller情况就更糟了。
kafka为什么不支持分区减少
按照Kafka现有的代码逻辑而言,此功能完全可以实现,不过也会使得代码的复杂度急剧增大。实现此功能需要考虑的因素很多,比如删除掉的分区中的消息该作何处理?如果随着分区一起消失则消息的可靠性得不到保障;如果需要保留则又需要考虑如何保留。直接存储到现有分区的尾部,消息的时间戳就不会递增,如此对于Spark、Flink这类需要消息时间戳(事件时间)的组件将会受到影响;如果分散插入到现有的分区中,那么在消息量很大的时候,内部的数据复制会占用很大的资源,而且在复制期间,此主题的可用性又如何得到保障?与此同时,顺序性问题、事务性问题、以及分区和副本的状态机切换问题都是不得不面对的。反观这个功能的收益点却是很低,如果真的需要实现此类的功能,完全可以重新创建一个分区数较小的主题,然后将现有主题中的消息按照既定的逻辑复制过去即可。
虽然分区数不可以减少,但是分区对应的副本数是可以减少的,这个其实很好理解,你关闭一个副本时就相当于副本数减少了。不过正规的做法是使用kafka-reassign-partition.sh脚本来实现。
kakfa消费消息
Kafka 是 Pull 模式的消息队列,即 Consumer 连到消息队列服务上,主动请求新消息,如果要做到实时性,需要采用长轮询,Kafka 在0.8的时候已经支持长轮询模式。
杂项
上周客串了一下面试官,在这里就简单记录一下期间我问到的一些关于 Kafka 的面试题目,这些都是我平时在学习 Kafka 的一些总结要点。
- 谈谈你对 kafka 的整体认识?
问这个问题主要是想知道面试者对 Kafka 的整体认识如何,能够大致了解清楚面试者对 Kafka 的相关概念的熟悉程度,比如消息、topic、partition、replica、offset、重平衡、leader/follower、ISR 等等。
- 谈谈 Kafka 吞吐量为何如此高?
多分区、batch send、kafka Reator 网络模型、pagecache、sendfile 零拷贝、数据压缩。
- 谈谈你对生产者储水池机制的理解
sender 线程工作机制、ByteBuffer 缓冲区的作用等等:
- 如何提高kafka吞吐量?
生产端调整 batch.size、linger.ms 参数,以及主题分区数合理分配等。
- 生产者producer是线程安全的吗?多线程实例还是单线程实例优缺点?
- 消费者 consumer 是线程安全的吗?多线程实例、单线程实例、单 consumer + 多 worker 线程的优缺点?
- 消息拉取时,什么情况下会造成消息重复消费?谈谈你对位移提交的理解?
理解消息交付语义:
最多一次(atmostonce):消息可能丢失也可能被处理,但最多只会被处理一次;
至少一次(atleastonce):消息不会丢失,但可能被处理多次;
精确一次(exactlyonce):消息被处理且只会被处理一次。
假若消费者在消费前提交位移,那么就是“最多一次”,若在消费后提交位移,那么就是“最少一次”,如果能够保证消费和提交位移同在一个事务中执行,就可保证“精确一次”。__consumer_offsets
的一些理解。
- 什么时候会产生消费组重平衡以及重平衡会涉及到哪些相关参数、频繁重平衡会造成哪些后果?
消费组成员变更、主题数量变更、订阅信息变更;session.timeout.ms、max.poll.interval.ms、hearbeat.interval.ms;
相关文章:Kafka重平衡机制
- kafka默认不支持自动分区重分配,那么如果让你来执行分区重分配,有哪几个步骤,以及在重分配过程中kafka会有哪些动作?
RAR、OAR、AR、RAR-OAR、OAR-RAR 相关概念,
相关文章:记一次 Kafka 线上扩容、Kafka 分区重分配源码分析
- 谈谈你对 Preferred leader 选举的理解?
在 broker 挂掉之后,分区 leader 会变更,久而久之就会变得不均衡,Kafka 默认序号最小的副本为 Preferred leader,在 broker 重启回来后,Kafka 会重新调整分区的 Preferred leader 成为 leader,Preferred leader 选举分为手动选举和自动选举,涉及参数 auto.leader.rebalance.enable,还有个默认允许 10% 不均衡策略等等。
- 谈谈你对 ISR 副本同步的理解?ISR副本同步的缺陷有哪些?
相关文章:Kafka ISR 副本同步机制
- 谈谈你对水印备份机制的理解?LEO 更新机制、HW 更新机制?
相关文章:图解:Kafka 水印备份机制
- 水印备份机制的一些缺陷?数据丢失、数据离散?如何解决的(leader epoch)
相关文章:图解:Kafka 水印备份机制
- 谈谈你对 controller 机制的理解?controller 主要有哪些功能?
更新集群元数据信息、创建主题、删除主题、分区重分配、preferred leader 副本选举、主题分区扩展、broker 加入集群、broker 崩溃、受控关闭、controller leader 选举。
- Kafka 的日志存储机制?
每个分区拥有单独的日志(partition log)、顺序写、到一定大小分成日志段文件(log segment file)、每个 log 文件对应一个索引文件(.index .timeindex)等等。
- Kafka 分区数越多性能就越好吗?为什么?
我的理解:
- 每个分区数都对应一个 log 文件,log 文件是顺序写的,但如果有非常多分区同时刷盘,就会变相成乱序写了,我猜想这也是为什么 RocketMQ 一个 broker 只会拥有一个 CommitLog 的原因之一吧;
- 客户端会为每个分区调用一条线程处理,多线程并发地处理分区消息,分区越多,意味着处理的线程数也就越多,到一定程度后,会造成线程切换开销大;
- 其中一个 broker 挂掉后,如果此时分区特别多,Kafka 分区 leader 重新选举的时间大大增加;
- 每个分区对应都有文件句柄,分区越多,系统文件句柄就越多;
- 客户端在会为每个分区分配一定的缓冲区,如果分区过多,分配的内存也越大。
Kafka时间轮
Kafka 延时队列,重试队列,死信队列
延时队列
在发送延时消息的时候并不是先投递到要发送的真实主题(real_topic)中,而是先投递到一些 Kafka 内部的主题(delay_topic)中,这些内部主题对用户不可见,
然后通过一个自定义的服务拉取这些内部主题中的消息,并将满足条件的消息再投递到要发送的真实的主题中,消费者所订阅的还是真实的主题。
如果采用这种方案,那么一般是按照不同的延时等级来划分的,比如设定5s、10s、30s、1min、2min、5min、10min、20min、30min、45min、1hour、2hour这些按延时时间递增的延时等级,延时的消息按照延时时间投递到不同等级的主题中,投递到同一主题中的消息的延时时间会被强转为与此主题延时等级一致的延时时间,这样延时误差控制在两个延时等级的时间差范围之内(比如延时时间为17s的消息投递到30s的延时主题中,之后按照延时时间为30s进行计算,延时误差为13s)。虽然有一定的延时误差,但是误差可控,并且这样只需增加少许的主题就能实现延时队列的功能。
发送到内部主题(delay*topic**)中的消息会被一个独立的 DelayService 进程消费,这个 DelayService 进程和 Kafka broker 进程以一对一的配比进行同机部署(参考下图),以保证服务的可用性。
针对不同延时级别的主题,在 DelayService 的内部都会有单独的线程来进行消息的拉取,以及单独的 DelayQueue(这里用的是 JUC 中 DelayQueue)进行消息的暂存。与此同时,在 DelayService 内部还会有专门的消息发送线程来获取 DelayQueue 的消息并转发到真实的主题中。从消费、暂存再到转发,线程之间都是一一对应的关系。如下图所示,DelayService 的设计应当尽量保持简单,避免锁机制产生的隐患。
为了保障内部 DelayQueue 不会因为未处理的消息过多而导致内存的占用过大,DelayService 会对主题中的每个分区进行计数,当达到一定的阈值之后,就会暂停拉取该分区中的消息。
因为一个主题中一般不止一个分区,分区之间的消息并不会按照投递时间进行排序,DelayQueue的作用是将消息按照再次投递时间进行有序排序,这样下游的消息发送线程就能够按照先后顺序获取最先满足投递条件的消息。
实现delay service 里需要注意:
- 因为consumer是单线程的,所以理论上有多少个延迟主题,就需要创建多少个线程。像上面topic-delay-1s,topic-delay-5s,topic-delay-5m,topic-delay-30m 这4种主题,就需要4个线程来处理。延迟主题少问题不大,但延迟主题如果比较多的话,还是比较难受的。
- 需要手动提交偏移量,因为delay service 可能会因为升级或者故障,导致重启,这个时候消息是不能漏的,所以一定要消息已经转发到业务主题后,再提交偏移量,防止漏消息。
- 重复消息的处理,因为delay service 需要先把消息发送到业务主题,再提交偏移量,就有可能出现 消息发送到业务主题后,还没来的及发送偏移量,delay service 就因电力故障无法正常服务了,下次重启后,就可能继续发送已经发送过的消息。所以业务上建议是要做到幂等,以实现容错。
重试队列 与 死信队列
我们在 延时队列的基础上实现重试队列就比较简单了,当某个消息处理失败时,就把这个消息发送到 延时队列。等时间到了就会重试处理。如果处理成功,则结束。如果处理失败则重试次数加1,再一次进入延时队列。而如果超过了重试次数,则写入死信队列,作为记录。
这里说的重试队列,死信队列都是概念上的东西,kafka本身并不提供。我们是可以在使用层实现这一类概念。下面的时序图,是一个示例。我们假设有一个订单支付成功了,有积分逻辑需要处理,但重试三次后依然失败了。时序过程描述如下:
Tombstone消息
翻译过来就是墓碑消息,Tombstone消息的Value 为 null。Tombstone消息在注册消息和位移消息中都可能出现。如果在注册消息中出现,表示Kafka可以将该消费者组元数据从位移主题中删除;如果在位移消息中出现了,则表示Kafka能够将该消费者组在某主题分区上的位移提交数据删除。这很好的保证了,内部位移主题不会持续增加磁盘占用空间。
Kafka 机架的分配策略
Kafka的消息结构
Kafka的消息堆积(Lag)
- LogStartOffset:表示一个Partition的起始位移,初始为0,虽然消息的增加以及日志清除策略的影响,这个值会阶段性的增大。
- ConsumerOffset:消费位移,表示Partition的某个消费者消费到的位移位置。
- HighWatermark:简称HW,代表消费端所能“观察”到的Partition的最高日志位移,HW大于等于ConsumerOffset的值。
- LogEndOffset:简称LEO, 代表Partition的最高日志位移,其值对消费者不可见。比如在ISR(In-Sync-Replicas)副本数等于3的情况下(如下图所示),消息发送到Leader A之后会更新LEO的值,Follower B和Follower C也会实时拉取Leader A中的消息来更新自己,HW就表示A、B、C三者同时达到的日志位移,也就是A、B、C三者中LEO最小的那个值。由于B、C拉取A消息之间延时问题,所以HW必然不会一直与Leader的LEO相等,即LEO>=HW。
Lag=HW - ConsumerOffset。对于这里大家有可能有个误区,就是认为Lag应该是LEO与ConsumerOffset之间的差值。但是LEO是对消费者不可见的,既然不可见何来消费滞后。
ConsumerOffset位移获取
ConsumerOffset,Kafka中有两处可以存储,一个是Zookeeper,而另一个是"__consumer_offsets这个内部topic中,前者是0.8.x版本中的使用方式,但是随着版本的迭代更新,现在越来越趋向于后者。就拿1.0.0版本来说,虽然默认是存储在"__consumer_offsets"中,但是保不齐用于就将其存储在了Zookeeper中了。这个问题倒也不难解决,针对两种方式都去拉取,然后哪个有值的取哪个。不过这里还有一个问题,对于消费位移来说,其一般不会实时的更新,而更多的是定时更新,这样可以提高整体的性能。那么这个定时的时间间隔就是ConsumerOffset的误差区间之一。
Kafka日志清除策略
Kafka将消息存储在磁盘中,为了控制磁盘占用空间的不断增加就需要对消息做一定的清理操作。Kafka中每一个分区partition都对应一个日志文件,而日志文件又可以分为多个日志分段文件,这样也便于日志的清理操作。Kafka提供了两种日志清理策略:
日志删除(Log Deletion):按照一定的保留策略来直接删除不符合条件的日志分段。
日志压缩(Log Compaction):针对每个消息的key进行整合,对于有相同key的的不同value值,只保留最后一个版本。
我们可以通过broker端参数log.cleanup.policy来设置日志清理策略,此参数默认值为“delete”,即采用日志删除的清理策略。如果要采用日志压缩的清理策略的话,就需要将log.cleanup.policy设置为“compact”,并且还需要将log.cleaner.enable(默认值为true)设定为true。通过将log.cleanup.policy参数设置为“delete,compact”还可以同时支持日志删除和日志压缩两种策略。日志清理的粒度可以控制到topic级别,比如与log.cleanup.policy对应的主题级别的参数为cleanup.policy,为了简化说明,本文只采用broker端参数做陈述,如若需要topic级别的参数可以查看官方文档。
Log Deletion
日志删除
Kafka日志管理器中会有一个专门的日志删除任务来周期性检测和删除不符合保留条件的日志分段文件,这个周期可以通过broker端参数log.retention.check.interval.ms来配置,默认值为300,000,即5分钟。当前日志分段的保留策略有3种:基于时间的保留策略、基于日志大小的保留策略以及基于日志起始偏移量的保留策略。
基于时间
日志删除任务会检查当前日志文件中是否有保留时间超过设定的阈值retentionMs来寻找可删除的的日志分段文件集合deletableSegments,参考下图所示。retentionMs可以通过broker端参数log.retention.hours、log.retention.minutes以及log.retention.ms来配置,其中log.retention.ms的优先级最高,log.retention.minutes次之,log.retention.hours最低。默认情况下只配置了log.retention.hours参数,其值为168,故默认情况下日志分段文件的保留时间为7天。
查找过期的日志分段文件,并不是简单地根据日志分段的最近修改时间lastModifiedTime来计算,而是根据日志分段中最大的时间戳largestTimeStamp来计算。因为日志分段的lastModifiedTime可以被有意或者无意的修改,比如执行了touch操作,或者分区副本进行了重新分配,lastModifiedTime并不能真实地反映出日志分段在磁盘的保留时间。要获取日志分段中的最大时间戳largestTimeStamp的值,首先要查询该日志分段所对应的时间戳索引文件,查找时间戳索引文件中最后一条索引项,若最后一条索引项的时间戳字段值大于0,则取其值,否则才设置为最近修改时间lastModifiedTime。
若待删除的日志分段的总数等于该日志文件中所有的日志分段的数量,那么说明所有的日志分段都已过期,但是该日志文件中还要有一个日志分段来用于接收消息的写入,即必须要保证有一个活跃的日志分段activeSegment,在此种情况下,会先切分出一个新的日志分段作为activeSegment,然后再执行删除操作。
删除日志分段时,首先会从日志文件对象中所维护日志分段的跳跃表中移除待删除的日志分段,以保证没有线程对这些日志分段进行读取操作。然后将日志分段文件添加上“.deleted”的后缀,当然也包括日志分段对应的索引文件。最后交由一个以“delete-file”命名的延迟任务来删除这些“.deleted”为后缀的文件,这个任务的延迟执行时间可以通过file.delete.delay.ms参数来设置,默认值为60000,即1分钟。
基于日志大小
日志删除任务会检查当前日志的大小是否超过设定的阈值retentionSize来寻找可删除的日志分段的文件集合deletableSegments,参考下图所示。retentionSize可以通过broker端参数log.retention.bytes来配置,默认值为-1,表示无穷大。注意log.retention.bytes配置的是日志文件的总大小,而不是单个的日志分段的大小,一个日志文件包含多个日志分段。
基于日志大小的保留策略与基于时间的保留策略类似,其首先计算日志文件的总大小size和retentionSize的差值diff,即计算需要删除的日志总大小,然后从日志文件中的第一个日志分段开始进行查找可删除的日志分段的文件集合deletableSegments。查找出deletableSegments之后就执行删除操作,这个删除操作和基于时间的保留策略的删除操作相同,这里不再赘述。
基于日志起始偏移量
一般情况下日志文件的起始偏移量logStartOffset等于第一个日志分段的baseOffset,但是这并不是绝对的,logStartOffset的值可以通过DeleteRecordsRequest请求、日志的清理和截断等操作修改。
基于日志起始偏移量的删除策略的判断依据是某日志分段的下一个日志分段的起始偏移量baseOffset是否小于等于logStartOffset,若是则可以删除此日志分段。参考上图,假设logStartOffset等于25,日志分段1的起始偏移量为0,日志分段2的起始偏移量为11,日志分段3的起始偏移为23,那么我们通过如下动作收集可删除的日志分段的文件集合deletableSegments:
- 从头开始遍历每个日志分段,日志分段1的下一个日志分段的起始偏移量为11,小于logStartOffset的大小,将日志分段1加入到deletableSegments中;
- 日志分段2的下一个日志偏移量的起始偏移量为23,也小于logStartOffset的大小,将日志分段2页加入到deletableSegments中;
- 日志分段3的下一个日志偏移量在logStartOffset的右侧,故从日志分段3开始的所有日志分段都不会被加入到deletableSegments中。
收集完可删除的日志分段的文件集合之后的删除操作同基于日志大小的保留策略和基于时间的保留策略相同,这里不再赘述。
Log Compaction
Kafka中的Log Compaction是指在默认的日志删除(Log Deletion)规则之外提供的一种清理过时数据的方式。Log Compaction对于有相同key的的不同value值,只保留最后一个版本。如果应用只关心key对应的最新value值,可以开启Kafka的日志清理功能,Kafka会定期将相同key的消息进行合并,只保留最新的value值。
Log Compaction执行前后,日志分段中的每条消息的偏移量和写入时的保持一致。Log Compaction会生成新的日志分段文件,日志分段中每条消息的物理位置会重新按照新文件来组织。Log Compaction执行过后的偏移量不再是连续的,不过这并不影响日志的查询。
Kafka的Log Compaction可以类比于Redis的SNAPSHOTTING的持久化模式。试想一下,如果一个系统使用Kafka来保存状态,每次有状态变更都会将其写入Kafka中。在某一时刻此系统异常崩溃,进而在恢复时通过读取Kafka中的消息来恢复其应有的状态,那么此系统关心的是它原本的最新状态而不是历史时刻中的每一个状态。如果Kafka的日志保存策略是日志删除(Log Deletion),那么系统势必要一股脑的读取Kafka中的所有数据来恢复,而如果日志保存策略是Log Compaction,那么可以减少数据的加载量进而加快系统的恢复速度。Log Compaction在某些应用场景下可以简化技术栈,提高系统整体的质量。
我们知道可以通过配置log.dir或者log.dirs参数来设置Kafka日志的存放目录,而对于每一个日志目录下都有一个名为“cleaner-offset-checkpoint”的文件,这个文件就是清理检查点文件,用来记录每个主题的每个分区中已清理的偏移量。通过清理检查点文件可以将日志文件(Log)分成两个部分,参考下图,通过检查点cleaner checkpoint来划分出一个已经清理过的clean部分和一个还未清理过的dirty部分。在日志清理的同时,客户端也会读取日志。dirty部分的消息偏移量是逐一递增的,而clean部分的消息偏移量是断续的,如果客户端总能赶上dirty部分,它就能读取到日志的所有消息,反之,就不可能读到全部的消息。
上图中firstDirtyOffset(与cleaner checkpoint相等)表示dirty部分的起始偏移量,而firstUncleanableOffset为dirty部分的截止偏移量,整个dirty部分的偏移量范围为[firstDirtyOffset, firstUncleanableOffset),注意这里是左闭右开区间。为了避免当前活跃的日志分段activeSegment成为热点文件,activeSegment不会参与Log Compaction的操作。同时Kafka支持通过参数log.cleaner.min.compaction.lag.ms(默认值为0)来配置消息在被清理前的最小保留时间,默认情况下firstUncleanableOffset等于activeSegment的baseOffset。
注意Log Compaction是针对key的,所以在使用时应注意每个消息的key值不为null。每个broker会启动log.cleaner.thread(默认值为1)个日志清理线程负责执行清理任务,这些线程会选择“污浊率”最高的日志文件进行清理。用cleanBytes表示clean部分的日志占用大小,dirtyBytes表示dirty部分的日志占用大小,那么这个日志的污浊率(dirtyRatio)为:
dirtyRatio = dirtyBytes / (cleanBytes + dirtyBytes)
为了防止日志不必要的频繁清理操作,Kafka还使用了参数log.cleaner.min.cleanable.ratio(默认值为0.5)来限定可进行清理操作的最小污浊率。
Kafka中用于保存消费者消费位移的主题“__consumer_offsets”使用的就是Log Compaction策略。
这里我们已经知道了怎样选择合适的日志文件做清理操作,然而我们怎么对日志文件中消息的key进行筛选操作呢?Kafka中的每个日志清理线程会使用一个名为“SkimpyOffsetMap”的对象来构建key与offset的映射关系的哈希表。
日志清理需要遍历两次日志文件,第一次遍历把每个key的哈希值和最后出现的offset都保存在SkimpyOffsetMap中,映射模型如下图所示。
第二次遍历检查每个消息是否符合保留条件,如果符合就保留下来,否则就会被清理掉。假设一条消息的offset为O1,这条消息的key在SkimpyOffsetMap中所对应的offset为O2,如果O1>=O2即为满足保留条件。
默认情况下SkimpyOffsetMap使用MD5来计算key的哈希值,占用空间大小为16B,根据这个哈希值来从SkimpyOffsetMap中找到对应的槽位,如果发生冲突则用线性探测法处理。为了防止哈希冲突过于频繁,我们也可以通过broker端参数log.cleaner.io.buffer.load.factor(默认值为0.9)来调整负载因子。偏移量占用空间大小为8B,故一个映射项占用大小为24B。每个日志清理线程的SkimpyOffsetMap的内存占用大小为log.cleaner.dedupe.buffer.size / log.cleaner.thread,默认值为 = 128MB/1 = 128MB。所以默认情况下SkimpyOffsetMap可以保存128MB * 0.9 /24B ≈ 5033164个key的记录。假设每条消息的大小为1KB,那么这个SkimpyOffsetMap可以用来映射4.8GB的日志文件,而如果有重复的key,那么这个数值还会增大,整体上来说SkimpyOffsetMap极大的节省了内存空间且非常高效。
“SkimpyOffsetMap”这个取名也很有意思,“Skimpy”可以直译为“不足的”,可以看出它最初的设计者也认为这种实现不够严谨。如果遇到两个不同的key但哈希值相同的情况,那么其中一个key所对应的消息就会丢失。虽然说MD5这类摘要算法的冲突概率非常小,但根据墨菲定律,任何一个事件,只要具有大于0的几率,就不能假设它不会发生,所以在使用Log Compaction策略时要注意这一点。
Log Compaction会为我们保留key相应的最新value值,那么当我们需要删除一个key怎么办?Kafka中提供了一个墓碑消息(tombstone)的概念,如果一条消息的key不为null,但是其value为null,那么此消息就是墓碑消息。日志清理线程发现墓碑消息时会先进行常规的清理,并保留墓碑消息一段时间。墓碑消息的保留条件是当前墓碑消息所在的日志分段的最近修改时间lastModifiedTime大于deleteHorizonMs,参考图2,这个deleteHorizonMs的计算方式为clean部分中最后一个日志分段的最近修改时间减去保留阈值deleteRetionMs(通过broker端参数log.cleaner.delete.retention.ms配置,默认值为86400000,即24小时)的大小,即:
deleteHorizonMs = clean部分中最后一个LogSegment的lastModifiedTime - deleteRetionMs
所以墓碑消息的保留条件为:
所在LogSegment的lastModifiedTime > deleteHorizonMs
=> 所在LogSegment的lastModifiedTime > clean部分中最后一个LogSegment的lastModifiedTime - deleteRetionMs
=> 所在LogSegment的lastModifiedTime + deleteRetionMs > clean部分中最后一个LogSegment的lastModifiedTime
(可以对照图2中的deleteRetionMs所标记的位置去理解)
Log Compaction执行过后的日志分段的大小会比原先的日志分段的要小,为了防止出现太多的小文件,Kafka在实际清理过程中并不对单个的日志分段进行单独清理,而是会将日志文件中offset从0至firstUncleanableOffset的所有日志分段进行分组,每个日志分段只属于一组,分组策略为:按照日志分段的顺序遍历,每组中日志分段的占用空间大小之和不超过segmentSize(可以通过broker端参数log.segments.bytes设置,默认值为1GB),且对应的索引文件占用大小之和不超过maxIndexSize(可以通过broker端参数log.index.interval.bytes设置,默认值为10MB)。同一个组的多个日志分段清理过后,只会生成一个新的日志分段。
参考上图,假设所有的参数配置都为默认值,在Log Compaction之前checkpoint的初始值为0。执行第一次Log Compaction之后,每个非活跃的日志分段的大小都有所缩减,checkpoint的值也有所变化。执行第二次Log Compaction时会将组队成[0.4GB, 0.4GB]、[0.3GB, 0.7GB]、[0.3GB]、[1GB]这4个分组,并且从第二次Log Compaction开始还会涉及墓碑消息的清除。同理,第三次Log Compaction过后的情形可参考上图尾部。Log Compaction过程中会将对每个日志分组中需要保留的消息拷贝到一个以“.clean”为后缀的临时文件中,此临时文件以当前日志分组中第一个日志分段的文件名命名,例如:00000000000000000000.log.clean。Log Compaction过后将“.clean”的文件修改为以“.swap”后缀的文件,例如:00000000000000000000.log.swap,然后删除掉原本的日志文件,最后才把文件的“.swap”后缀去掉,整个过程中的索引文件的变换也是如此,至此一个完整Log Compaction操作才算完成。
Kafka各个参数配置
生产者配置:
acks
:指定了必须有多少个分区副本收到消息,生产者才会认为消息写入是成功的。默认为acks=1
acks=0
如果设置为 0,则 Producer 不会等待服务器的反馈。该消息会被立刻添加到 socket buffer 中并认为已经发送完成。在这种情况下,服务器是否收到请求是没法保证的,并且参数retries
也不会生效(因为客户端无法获得失败信息)。每个记录返回的 offset 总是被设置为-1。acks=1
如果设置为 1,leader 节点会将记录写入本地日志,并且在所有 follower 节点反馈之前就先确认成功。在这种情况下,如果 leader 节点在接收记录之后,并且在 follower 节点复制数据完成之前产生错误,则这条记录会丢失。acks=all
如果设置为 all,这就意味着 leader 节点会等待所有同步中的副本(ISR)确认之后再确认这条记录是否发送完成。只要至少有一个同步副本存在,记录就不会丢失。这种方式是对请求传递的最有效保证。acks=-1 与 acks=all 是等效的。
buffer.memory
:用来设置 Producer 缓冲区大小。compression.type
:Producer 生成数据时可使用的压缩类型。默认值是 none(即不压缩)。可配置的压缩类型包括:none
、gzip
、snappy
、lz4
或zstd
。压缩是针对批处理的所有数据,所以批处理的效果也会影响压缩比(更多的批处理意味着更好的压缩)。retries
:用来设置发送失败的重试次数。batch.size
:用来设置一个批次可占用的内存大小。linger.ms
:用来设置 Producer 在发送批次前的等待时间。client.id
:Kafka 服务器用它来识别消息源,可以是任意字符串。max.in.flight.requests.per.connection
:用来设置 Producer 在收到服务器响应前可以发送多少个消息。timeout.ms
:用来设置 Broker 等待同步副本返回消息确认的时间,与acks
的配置相匹配。request.timeout.ms
:Producer 在发送数据时等待服务器返回响应的时间。metadata.fetch.timeout.ms
:Producer 在获取元数据时(如:分区的 Leader 是谁)等待服务器返回响应的时间。max.block.ms
:该配置控制KafkaProducer.send()
和KafkaProducer.partitionsFor()
允许被阻塞的时长。这些方法可能因为缓冲区满了或者元数据不可用而被阻塞。用户提供的序列化程序或分区程序的阻塞将不会被计算到这个超时。max.request.size
:请求的最大字节数。receieve.buffer.bytes
:TCP 接收缓冲区的大小。send.buffer.bytes
:TCP 发送缓冲区的大小。
消费者配置
bootstrap.servers
- Broker 集群地址,格式:ip1:port,ip2:port...,不需要设定全部的集群地址,设置两个或者两个以上即可。group.id
- 消费者隶属的消费者组名称,如果为空会报异常,一般而言,这个参数要有一定的业务意义。fetch.min.bytes
- 消费者获取记录的最小字节数。Kafka 会等到有足够的数据时才返回消息给消费者,以降低负载。fetch.max.wait.ms
- Kafka 需要等待足够的数据才返回给消费者,如果一直没有足够的数据,消费者就会迟迟收不到消息。所以需要指定 Broker 的等待延迟,一旦超时,直接返回数据给消费者。max.partition.fetch.bytes
- 指定了服务器从每个分区返回给消费者的最大字节数。默认为 1 MB。session.timeout.ms
- 指定了消费者的心跳超时时间。如果消费者没有在有效时间内发送心跳给群组协调器,协调器会视消费者已经消亡,从而触发分区再均衡。默认为 3 秒。auto.offset.reset
- 指定了消费者在读取一个没有偏移量的分区或偏移量无效的情况下,该如何处理。latest
- 表示在偏移量无效时,消费者将从最新的记录开始读取分区记录。earliest
- 表示在偏移量无效时,消费者将从起始位置读取分区记录。
enable.auto.commit
- 指定了是否自动提交消息偏移量,默认开启。partition.assignment.strategy
- 消费者的分区分配策略。Range
- 表示会将主题的若干个连续的分区分配给消费者。RoundRobin
- 表示会将主题的所有分区按照轮询方式分配给消费者。
client.id
- 客户端标识。max.poll.records
- 用于控制单次能获取到的记录数量。receive.buffer.bytes
- 用于设置 Socket 接收消息缓冲区(SO_RECBUF)的大小,默认值为 64KB。如果设置为-1,则使用操作系统的默认值。send.buffer.bytes
- 用于设置 Socket 发送消息缓冲区(SO_SNDBUF)的大小,默认值为 128KB。与 receive.buffer.bytes 参数一样,如果设置为-1,则使用操作系统的默认值。
Broker配置参数
replication.factor
的作用是设置每个分区的副本数。replication.factor
是主题级别配置; default.replication.factor
是 broker 级别配置。
副本数越多,数据可靠性越高;但由于副本数增多,也会增加同步副本的开销,可能会降低集群的可用性。一般,建议设为 3,这也是 Kafka 的默认值。
不完全的选主
unclean.leader.election.enable
用于控制是否支持不同步的副本参与选举 Leader。unclean.leader.election.enable
是 broker 级别(实际上是集群范围内)配置,默认值为 true 。
- 如果设为 true,代表着允许不同步的副本成为主副本(即不完全的选举),那么将面临丢失消息,消息不一致的风险;
- 如果设为 false,就要等待原先的主副本重新上线,从而降低了可用性。
从Kafka 0.11.0.0版本开始
unclean.leader.election.enable
参数的默认值由原来的true
改为false
最少同步副本
min.insync.replicas
控制的是消息至少要被写入到多少个副本才算是“已提交”。min.insync.replicas
是主题级别和 broker 级别配置。
尽管可以为一个主题配置 3 个副本,但还是可能会出现只有一个同步副本的情况。如果这个同步副本变为不可用,则必须在可用性和数据一致性之间做出选择。Kafka 中,消息只有被写入到所有的同步副本之后才被认为是已提交的。但如果只有一个同步副本,那么在这个副本不可用时,则数据就会丢失。
如果要确保已经提交的数据被已写入不止一个副本,就需要把最小同步副本的设置为大一点的值。
注意:要确保
replication.factor
>min.insync.replicas
。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成replication.factor = min.insync.replicas + 1
。
auto.create.topics.enable
默认值为true,生产者向一个尚未创建的topic发送消息时,会自动创建一个num.partitions(默认值为1)个分区和default.replication.factor(默认值为1)个副本的对应topic。不过我们一般不建议将auto.create.topics.enable
参数设置为true,因为这个参数会影响topic的管理与维护
log.retention.check.interval.ms
周期性检测,删除不符合保留条件的日志分段文件的间隔时间,默认值为300,000,即5分钟。
log.retention.bytes
默认值是-1,表示无穷大,参数配置的是日志文件的总大小,而不是单个的日志分段的大小,一个日志文件可以包含多个日志分段。