1 Topic
Kafka消息分类的标签,是一个逻辑概念。2 Partion
主题作为消息的归类,可以细分为一个或多个分区,分区可以看做是对消息的二次归类。分区可以有一个至多个副本,每个副本对应一个日志文件,每个日志文件对应一个至多个日志分段(LogSegment),每个日志分段还可以细分为索引文件、日志存储文件、和快照文件等。
Kafka将Topic会分为多个Partion进行存储,这样做有两个方面的好处:
消息存储扩容:一个文件存储消息大小是有限的。在集群中通过多个文件存储可以大大增加一个topic能够保存的消息数量。
并行读写:通过多个partion文件存储消息,可以实现producer与consumer可以并行的读写一个topic。
对数据进行分区的主要原因是为了实现系统的高伸缩特性。不同的分区能够放置到不同节点的机器上,而数据的读写是针对分区进行的,这样每个机器都能独立地执行各自分区的读写请求,从而达到了负载均衡的效果。并且,可以通过增加新的节点机器来增加整个系统的吞吐量(读写并发性增强,消息存储能力增强)。最后利用分区可以实现一些业务中对消息要求保序的特性。
3 segment
Kafka日志组成(为了解释segment,所以没有指明分区):
1)第0号segment文件,表示其前没有消息。
00000000000000000000.index
00000000000000000000.log
2)第368769号segment文件,表示其前有368769条消息。
00000000000000368769.index
00000000000000368769.log
消息的顺序存放特性:kafka消息的顺序存放指的是一个segment中的消息存放的是有序的。
为了能够存放大量的消息,Kafka将消息直接存进了磁盘,由于磁盘是一种低俗IO设备,为了提高速度,就想将Broker中同一个topic的消息顺序存放在磁盘。但是一个topic的消息量很大而且在不断增长,为了连续磁盘空间的分配,就将一个topic下的消息写入到一个文件中,这个文件的最大大小通过配置可以固定。这个文件成为Segment.随着消息量的不断增长,segment文件越来越多,为了便于管理,将同一个topic的segment文件都存放到一个或多个目录中,这些目录就是Partion.
4 Replicas of partion
kafka副本定义:同一个分区下的所有副本保存有相同的消息序列,这些副本分散保存在不同的 Broker 上,从而能够对抗部分 Broker 宕机带来的数据不可用。kafka副本相当于起到了一个备份的作用。
副本角色是采用基于领导者(Leader-based)的副本机制,确保副本中所有的数据都是一致的
基于领导者的副本机制的工作原理如下图所示:
1、副本分成两类:领导者副本(Leader Replica)和追随者副本(Follower Replica)。每个分区在创建时都要选举1个副本,称为领导者副本,其余的副本自动称为追随者副本。
2、追随者副本是不对外提供服务的。这就是说,任何1个追随者副本都不能响应消费者和生产者的读写请求。所有的请求都必须由领导者副本来处理,或者说,所有的读写请求都必须发往领导者副本所在的 Broker,由该 Broker 负责处理。追随者副本不处理客户端请求,它唯一的任务就是从领导者副本异步拉取消息,并写入到自身的提交日志中,从而实现与领导者副本的同步。
3、当领导者副本挂掉了,或者说领导者副本所在的 Broker 宕机时,Kafka 依托于 ZooKeeper 提供的监控功能能够实时感知到,并立即开启新一轮的领导者选举,从追随者副本中选1个作为新的领导者。当 Leader 副本重启回来后,只能作为追随者副本加入到集群中。
如果领导者宕机了,有不一部分消息还没来得及同步到追随者副本,那么这些消息将丢失,要保证这些消息不丢失,可以在生产者端配置 acks=all 来确保
为什么追随者副本不对外提供服务呢?
方便实现Read-your-writes consistency(读你所写一致性):节点 A 更新一个数据后,它自身总是能访问到自身更新过的最新值,而不会看到旧值。
方便实现Monotonic read consistency(单调读一致性):单调读一致性是指如果一个节点从系统中读取出一个数据项的某个值后,那么系统对于该节点后续的任何数据访问都不应该返回更旧的值。摘取自:分布式事务基础理论
5 Partion Leader与 Partion follower,以及ISR
Partion Leader:他是broker Controller节点从ISR集合中的follower选取的。Partion Leader宕机后,ISR集合中的某个follower上位。Partion Leader与Partion follower之间是主备关系,而不是主从关系。
AR=ISR+OSR
ISR 不只是追随者副本集合,它必然包括 Leader 副本,它是一个动态调整的集合,而非静态不变的。在某些情况下,ISR甚至只有 Leader 这1个副本.
Kafka 判断 Follower 是否与 Leader 同步的标准,不是看相差的消息数,而是根据一个参数决定。这个标准就是 Broker 端参数 replica.lag.time.max.ms 参数值。这个参数的含义是 Follower 副本能够落后 Leader 副本的最大时间间隔,当前默认值是 10 秒。这就是说,只要某个 Follower 副本落后 Leader 副本的时间不连续超过 10 秒,那么 Kafka 就认为该Follower 副本与 Leader 是同步的,即使此时 Follower 副本中保存的消息明显少于 Leader 副本中的消息。
思考:光是依靠多副本机制能保证Kafka的高可靠性,但是能保证数据不丢失吗?NO
因为如果leader宕机,但是leader的数据还没同步到follower上去,此时即使选举了follower作为新的leader,当时刚才的数据已经丢失了。ISR是:in-sync replica,就是跟leader partition保持同步的follower partition的数量,只有处于ISR列表中的follower才可以在leader宕机之后被选举为新的leader,因为在这个ISR列表中代表他的数据跟leader是同步的。如果要保证写入kafka的数据不丢失,首先需要保证ISR中至少有1个follower,其次就是在1条数据写入了leader partition之后,要求必须复制给ISR中所有的follower partition,才能说代表这条数据已提交,绝对不会丢失,这是Kafka给出的承诺。(ack = all)
6 offest偏移量
Consumer消费消息时,通过指定的offset来定位下1条要读取的消息。值得注意的是,offset的维护是由Consumer全权控制的。当broker中配置的时间到达时,不论消息是否被消费,Kafka都会清理磁盘空间。因此需要保证消息要被及时的消费。
kafka0.8 版本之前offset保存在zookeeper上,0.8 版本之后offset保存在kafka集群上。kafka集群内部有1个默认的topic,名称叫 __consumer_offsets,它默认有50个分区,将 Consumer 的位移数据作为1条条普通的 Kafka 消息,提交到 __consumer_offsets 中进行保存。
为什么最初考虑将消息位移保存到zookeeper中,后面又保存到kafka集群中?
Apache ZooKeeper 是1个分布式的协调服务框架,Kafka 重度依赖它实现各种各样的协调管理。将位移保存在 ZooKeeper 外部系统的做法,最显而易见的好处就是减少了 Kafka Broker 端的状态保存开销。现在比较流行的提法是将服务器节点做成无状态的,这样可以自由地扩缩容,实现超强的伸缩性。Kafka 最开始也是基于这样的考虑,才将 Consumer Group 位移保存在独立于 Kafka 集群之外的框架中。
不过,慢慢地大家发现了1个问题,即 ZooKeeper 这类元框架其实并不适合进行频繁的写更新,而Consumer Group 的位移更新却是1个非常频繁的操作。这种大吞吐量的写操作会极大地拖慢 ZooKeeper 集群的性能,因此 Kafka 社区渐渐有了这样的共识:将 Consumer 位移保存在ZooKeeper 中是不合适的做法。在新版本的 Consumer Group 中,Kafka 社区重新设计了 Consumer Group 的位移管理方式,采用了将位移保存在 Kafka 内部主题的方法。即:__consumer_offsets。
7 reblance
Rebalance 本质上是1种协议,规定了1个 Consumer Group 下的所有 Consumer 如何达达成一致,来分配订阅 Topic 的每个分区。
触发条件:
1 消费者数量发生变更
2 订阅主题数发生变更
3 订阅主题的分区数发生变更
reblance导致的问题:
1 消费暂停
2 所有Consumer实例共同参与,重新分配所有的分区。
其实更高效的做法是尽量减少分配方案的变动。例如实例 A 之前负责消费分区 1、2、3,那么 Rebalance 之后,如果可能的话,最好还是让实例 A 继续消费分区 1、2、3,而不是被重新分配其他的分区。这样的话,实例 A 连接这些分区所在 Broker 的 TCP 连接就可以继续,不用重新创建连接其他 Broker 的 Socket 资源。(分区分配粘性算法实现,而不是RoundRobinAssign)
3 Rebalance 速度慢。
曾经,有个国外用户的 Group 内有几百个 Consumer 实例,成功 Rebalance 1次要几个小时!这完全是不能忍受的。最悲剧的是,目前社区对此无能为力,至少现在还没有特别好的解决方案。
8 broker controller
如何选举:broker controller由zk负责选举,Broker 在启动时,会尝试去 ZooKeeper 中创建 /controller 节点。第1个成功创建 /controller 节点的 Broker 会被指定为控制器。broker controller负责管理分区与副本controller的作用:
1 选举Leader和ISR:从分区副本列表中选出1个作为该分区的leader,并将该分区对应所有副本置于ISR列表。
2 同步元数据信息包括broker和分区的元数据信息:控制器架子ZK的/brokers/ids以及上1个步骤得到的topic下各分区leader和ISR将这些元数据信息同步到集群每个broker。当有broker或者分区发生变更时及时更新到集群保证集群每1台broker缓存的是最新元数据。
3 broker增删监听与处理:控制器启动时就起1个监视器监视ZK/brokers/ids/子节点。当存在broker启动加入集群后都会在ZK/brokers/ids/增加1个子节点brokerId,或者当1个broker崩溃时,该broker与ZK的会话失效导致ZK会删除该子节点,控制器的监视器发现这种变化后,控制器开始执行broker加入或者删除的相关流程并更新元数据信息到集群。
4 topic变化监听与处理:控制器启动时就起1个监视器监视ZK/brokers/topics/子节点。当通过脚本或者请求创建1个topic后,该topic对应的所有分区及其副本都会写入该目录下的1个子节点。控制器的监视器发现这种变化后,控制器开始执行topic创建的相关流程包括leader选举和ISR并同步元数据信息到集群;且新增1个监视器监视ZK/brokers/topics/<新增topic?节点内容>防止该topic内容变化。
控制器启动时就起1个监视器监视ZK/admin/delete_topics/子节点。当通过脚本或者请求删除1个topic后,该topic会写入该目录下的1个子节点。控制器的监视器发现这种变化后,控制器开始执行topic删除的相关流程包括通知该topic所有分区的所有副本停止运行;通知所有分区所有副本删除数据;删除ZK/admin/delete_topics/<待删除topic子节点>
5 分区变化监听与变化处理:分区重分配通过KAFKA管理员脚本执行完成1个topic下分区的副本重新分配broker。
分区扩展监听与处理:当创建1个topic后,控制器会增加1个监视器监视ZK/brokers/topics/<新增topic子节点内容>防止该topic内容变化。当通过脚本执行分扩展后会在该目录增加新的分区目录。控制器的监视器发现这种变化后,控制器开始执行分区扩展相应流程如选举leader和ISR并同步。
6 broker优雅退出:相比较broker机器直接宕机或强制kill,通过脚本关闭1个broker我们称为broker优雅退出。即将关闭的broker向控制器发送退出请求后一直阻塞。控制器接收到请求后,执行leader重选举和ISR后响应broker。broker接收后退出。这个比较特殊,不依赖ZK,直接通过broker和控制器RPC通信即可完成。
7 数据服务:控制器的最后一大类工作,就是向其他 Broker 提供数据服务。控制器上保存了最全的集群元数据信息,其他所有 Broker 会定期接收控制器发来的元数据更新请求,从而更新其内存中的缓存数据。
控制器保存的数据可以通过如下图所示进行概括:
补充知识:控制器故障转移
故障转移指的是,当运行中的控制器突然宕机或意外终止时,Kafka 能够快速地感知到,并立即启动备用控制器(选举新产生的)来代替之前失败的控制器。这个过程就被称为 Failover,该过程是自动完成的,无需人为手动干预。
9 zookeeper
Kafka 通过 zookeeper 来存储集群的meta元数据信息,Zookeeper负责维护和协调broker,负责Broker Controller的选举。
10 Group Coordinator
主要用于Consumer Group中的各个成员的offset位移管理和Rebalance。Group Coordinator同时管理着当前broker的所有消费者组。Coordinator作为每个consumer group都会选择1个broker作为它的coordinator,他是负责监控这个消费组内的各个消费者的心跳,以及判断是否宕机,然后开启rebalance。 根据内部的选择机制,会挑选1个对应的Broker,Kafka总会把各个消费组均匀分配给各个Broker作为coordinator来进行管理的。也就是说每个消费组对应的coordinator尽可能地均匀分配到各个Broker。 consumer group中的每个consumer刚刚启动就会跟选举出来的这个consumer group对应的coordinator所在的broker进行通信,然后由coordinator分配分区给你的这个consumer来进行消费。coordinator会尽可能均匀的分配分区给各个consumer来消费。 如何选择哪台是coordinator? 先对消费组的groupId进行hash,接着对consumer_offsets的分区数量取模,找到你的这个consumer group的offset要提交到consumer_offsets的哪个分区。比如说:groupId,"membership-consumer-group" -> hash值(数字)-> 对50取模 -> 就知道这个consumer group下的所有的消费者提交offset的时候是往哪个分区去提交offset,找到consumer_offsets的这个分区,consumer_offset的分区的副本数量默认来说1,只有1个leader,然后对这个分区就可以找到对应的leader所在的broker,这个broker就是这个consumer group的coordinator了,consumer接着就会维护1个Socket连接跟这个Broker进行通信。11 消息路由策略(生产者相关)
1) 若指定了partition,则直接写到指定的partition; 2) 若未指定partition但指定了key,则通过对key的hash值与partition数量取模,该取模结果就是要选出的partition索引; 3) 若partition和key都未指定,则使用轮询策略选出1个partition。12消费分区分配算法(消费者相关)
Kafka中提供了多重分区分配算法(PartitionAssignor)的实现:RangeAssignor、RoundRobinAssignor、StickyAssignor PartitionAssignor接口用于用户自定义实现分区分配算法,以实现Consumer之间的分区分配。Kafka默认采用RangeAssignor的分配算法。 RangeAssignor:RangeAssignor将对于每1个Topic,将消费组内所有订阅这个Topic的消费者按照名称的字典序排序,然后为每个消费者划分固定的分区范围。当消费者数量小于分区,那么字典序靠前的消费者会被多分配1个分区,而且这个问题是随着消费者订阅的Topic的数量的增加,不均衡的问题会越来越严重,【新增加的一个分区总是被分配到排名第一的】。此外当消费者数量大于分区,排名靠前的消费者先分到,靠后的就分不到。RangeAssignor的分配示例图如下: RoundRobinAssignor:此分配策略是将消费组内订阅的所有Topic的分区及所有消费者进行排序后尽量均衡的分配(RangeAssignor是针对单个Topic的分区进行排序分配的)。如果消费组内,消费者订阅的Topic列表是相同的(每个消费者都订阅了相同的Topic),那么分配结果是尽量均衡的(消费者之间分配到的分区数的差值不会超过1)【有点像大锅饭】。 StickyAssignor:Sticky是“粘性的”,可以理解为分配结果是带“粘性的”——每1次分配变更相对上1次分配做最少的变动(上1次的结果是有粘性的)其目标有两点:
1. 分区的分配尽量的均衡
2. 每1次重分配的结果尽量与上1次分配结果保持1致
StickyAssignor的分配结果如下图所示(增加RoundRobinAssignor分配作为对?):
C1消费者消失之后,RoundRobinAssignor再分配:
C1消费者消失之后,StickyAssignor再分配:
注意上面那个是错的,下面这一个才是对的:(从图中可以看出StickyAssignor在分配时,原来的C0与C2绑定的分区是不变的。原C1的分区现在分别分配给了C0与C2。并且在分配的时候,还确保了C0与C2分区数量的均衡)
13 消息写入算法:
生产者将消息发送给broker,并形成最终的可供消费者消费的log,是1个比较复杂的过程。
1) producer向broker集群提交连接请求,其所连接上的任意broker都会向其发送broker controller的
通信URL,即broker controller主机配置文件中的listeners地址
2) 当producer指定了要生产消息的topic后,其会向broker controller发送请求,请求当前topic中所有
partition的leader列表地址
3) broker controller在接收到请求后,会从zk中查找到指定topic的所有partition的leader,并返回
给producer
4) producer在接收到leader列表地址后,根据消息路由策略找到当前要发送消息所要发送的partition
leader,然后将消息发送给该leader
5) leader将消息写入本地log,并通知ISR中的followers
6) ISR中的followers从leader中同步消息后向leader发送ACK
7) leader收到所有ISR中的followers的ACK后,增加HW,表示消费者已经可以消费到该位置了
8) 若leader在等待的followers的ACK超时了,发现还有follower没有发送ACK,则会将该follower从ISR
中清除,然后增加HW。
大致过程可以用如下图表示:
14 HW机制
高水位的作用:HW,HighWatermark,高水位,表示Consumer可以消费到的最大partition偏移量。
1)定义消息可见性,即用来标识分区下的哪些消息是可以被消费者消费的。【位移值等于高水位的消息也属于未提交消息。也就是说,高水位上及后面的消息是不能被消费者消费的】
2) 帮助 Kafka 完成副本同步, 保证leader和follower之间的数据一致性,这样即使某个broker宕机,那么其它副本被选举为leader 副本后,也能保证消息的不丢失以及重复消费。LEO,日志末端位移,即 Log End Offset。它表示副本写入下一条消息的位移值。同一个副本对象,其高水位值不会大于 LEO 值。Kafka 所有副本都有对应的高水位和 LEO 值。分区的高水位值是learder 副本的高水位值。
远程副本(Remote副本)的作用:帮助 Leader 副本确定其高水位,也就是分区高水位
Leader 副本的HW更新原则:取当前leader副本的LEO和所有remote副本的LEO的最小值
Follower副本的HW更新原则:取leader副本发送的HW和自身的LEO中的最小值。可能这里有点疑惑,为什么不直接把当前leader副本的HW作为follower副本的HW呢,这里涉及到HW截断。
比如:
随后Leader A宕机了,那么这个时候B被选为了Leader
如果ack = all,那么此时B的6和A之前的6是一样的,如果为0或者1,那么就是不一样的。因此需要保证不丢消息原来的消息6,就需要将ack设置为all是最可靠的。
此后A又恢复了,于是就出现了如下尴尬的场景(ack=0或1, A与B的数据不一致了) :
B与C的HW都为5,但是A的HW为4,LEO为6,此时就要进行截断了,将LEO回退到宕机时的HW,并取leader副本B发送的HW 5和A自身的LEO 4中的最小值。随后A从B拉取消息5 和 6.【A截断了HW到LEO之间的数据】
注意:HW截断机制只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。
15 消费过程解析:
生产者将消息发送到topic中,消费者即可对其进行消费,其消费过程如下:
1) consumer向broker集群提交连接请求,其所连接上的任意broker都会向其发送broker controller的通信URL,即broker controller主机配置文件中的listeners地址。
2) 当consumer指定了要消费的topic后,其会向broker controller发送poll请求
3) broker controller会为consumer分配一个或几个partition leader,并将该partitioin的当前offset发
送给consumer
4) consumer会按照broker controller分配的partition对其中的消息进行消费
5) 当消费者消费完该条消息后,消费者会向broker发送1个该消息已被消费的反馈,即该消息的
offset
6) 当broker接到消费者的offset后,会更新到相应的__consumer_offset中
7) 以上过程一直重复,直到消费者停止请求消息
8) 消费者可以重置offset,从而可以灵活消费存储在broker上的消息
16 消息压缩:
压缩就是用时间去换空间的经典 trade-off 思想,具体来说就是利用 CPU 时间去换磁盘空间或网络I/O 传输量。什么时候压缩?
产者端和Broker端,生产者程序中配置compression.type参数即表示启用指定类型的压缩算法。在生产者端启用压缩是很自然的想法,那为什么说在Broker端也可能进行压缩呢?
两种例外情况就可能让Broker重新压缩消息
(1)Broker端指定了和Producer端不同的压缩算法
(2)Broker端发生了消息格式转换
Consumer 端解压缩。
各种压缩算法对比?
在吞吐量方面:LZ4 > Snappy > zstd / GZIP
在压缩方面:zstd > LZ4 > GZIP > Snappy
17 日志清理策略
kafka log的清理策略有两种:delete,compact,默认是delete,这个对应了kafka中每个topic对于record的管理模式
delete:一般是使用按照时间保留的策略,当不活跃的segment的时间戳是大于设置的时间的时候,当前segment就会被删除 。cleanup.policy: delete
delete采用两种配置方式删除一些旧的segment:
retention.bytes:总的segment的大小限制,达到这个限制后会删除旧的segment, 默认值为-1,就是不会删除
retention.ms:当前时间 - segment的最后写入record的时间 > retention.ms 的segment会被删除,默认是168h, 7天
delete相关一些辅助的配置:
og.retention.check.interval.ms: 每隔多久检查一次是否有可以删除的log,默认是300s,5分钟这个是broker级别的设置
file.delete.delay.ms: 在彻底删除文件前保留的时间,默认为1分钟 这个是broker级别的设置
compact: 日志不会被删除,会被去重清理,这种模式要求每个record都必须有key,然后kafka会按照一定的时机清理segment中的key,对于同1个key只保留最新的那个key.同样的,compact也只针对不活跃的segment.。 cleanup.policy: compact
compact清理使用场景:
日志清理的compact策略,对于那种需要留存一份全量数据的需求比较有用,什么意思呢,比如,用flink计算了所有用户的粉丝数,且每5分钟更新1次,结果都存储到kafka当中。这个时候kafka相当于是1个数据总线,任何需要用户粉丝数的业务部门都可以从kafka中拿到这个数据。这个时候如果数据的保存使用delete策略,为了保存所有用户的粉丝数,只能设置不删除,也就是这样的话,数据会无限膨胀,并且,很多数据是无意义的,因为业务在从kafka中消费数据的时候,实际上只是想知道用户的当前粉丝数是多少,不关注一个月前这个用户有多少粉丝数,但是这些数据都在kafka中存储,会造成无意义的消费。
compact相关的参数配置:
min.cleanable.dirty.ratio: 可以进行compact的脏数据的比例,dirtyRatio = dirtyBytes /(cleanBytes + dirtyBytes) 其中dirtyBytes表示可清理部分的日志大小,cleanBytes表示已清理部分的日志大小。这个配置也是为了提升清理的性价比设置的,因为清理数据需要对磁盘进行读写,开销并不低,如果你的数据只有很小的重复比例,实际上是没有清理的必要的。这个值默认是0.5 也就是脏了的数据达到了总数据的50%才会清理,一般情况下我如果开启了compact策略,都会将这个值设置为0.1,感觉这样对于想要消费当前topic的业务更加友好。
min.compaction.lag.ms: 这个设置了在一条消息在被produer发送到kafka当中之后,多久时间以内不会被compact,为了满足有些想要获取一定时间内的历史快照的业务,默认是0,就是不会根据消息投递的时间来决定消息是否应该被compacted。
标签:术语,副本,分区,基础,broker,kafka,topic,消息,leader From: https://www.cnblogs.com/mtjb1dd/p/16591173.html