Kafka
前几天聊完绩效的时候问了下今年还有没有涨薪,组长的原话是"很难。。。我尽量帮大家争取。。。",我刚听完脑海的第一念头:"此处涨薪难,自有不难处!"。
冷静分析一波,今年整体大环境不行,还是苟着拿波年终吧,先不准备跳了,跟大家浅浅分享一下之前准备的kafka相关知识点
,等看机会的时候可以拿来复习复习。kafka也算是面试常考的组件,一些基本概念就不再写了,就写写面试里常考常问的一些点。
kafka的基本组件
Broker
:通俗理解成一台部署了kafka的服务器就是一个Broker,一个kafka集群由多个Broker组成,每个Broker包含多个TopicController
:broker的领导者,主写主读,它负责管理整个集群中所有分区和副本的状态Producer
:消息生产者,自己决定向哪个partaion发送数据,hash或轮询Consumer
:消息消费者,通过zookeeper维护offsetConsumer Group
:消费者组,同一个组内不同消费者负责消费不同的partation,也就是一个分区只能由一个组内消费者消费;消费者组之间互不影响。每条消息只能被Consumer Group中的一个Consumer消费;但是可以被多个Consumer Group组消费。Topic
:消息主题,一类消息的总称/消息队里,逻辑概念,真实数据存放在partation中,一个 topic 由多个 partions 组成Partation
:分区,真实存储数据的地方,负载均衡与扩展性考虑,一个Topic可以分为多个Partition,物理存储在Kafka集群中的多个Broker上。可靠性上考虑,每个Partition都会有备份Replica。partation保持分区顺序Replica副本
:Partition的副本,为了保证集群中的某个节点发生故障时,该节点上的Partition数据不会丢失,且Kafka仍能继续工作,所以Kafka提供了副本机制,一个Topic的每个Partition都有若干个副本,一个Leader和若干个Follower。Leader
:Replica的主角色,Producer与Consumer只跟Leader交互。Follwer
:Replica的从角色,实时从Leader中同步数据,保持和Leader数据的同步。Leader发生故障时,经过一系列选举算法,某个Follower会变成新的Leader。Offset
:每个分区日志都有一个offset来唯一的标记一条消息,offset的值为8个字节的数字,表示此消息在此partition中所处的起始位置
kafka整体架构
文字描述:
- 整体架构分为producer、broker、consumer三部分,3.0版本之前依赖zookeeper做集群管理,3.0版本之后通过KRaft进行集群管理。
- consumer有消费者组概念,同一个组内不同消费者负责消费不同的partation,一个分区只能由一个组内消费者消费;消费者组之间互不影响
- 集群中的broker会选举出一个leader作为Controller负责管理整个集群中所有分区和副本的状态
- 每个topic由多个partation组成,
partation为真实存储数据的地方,每个partation以文件夹的形式存储在文件系统中。每个对应的partation数据目录下存储*.index,*log ,*timeindex三个文件
- 每个partation都有对应的副本,分散在不同的broker中来实现分布式存储。
- 整体使用主写主读架构,通过partation分布不同的broker上,尽量保证每个broker既有replicas分区拉数据也有leader分区生产数据,实现负载
kafka replicas是如何管理的
- kafka为了保证数据安全性,在producer写入数据时会通过副本机制对当前数据进行复制备份,其他分区副本通过拉取的方式进行数据同步,依赖多副本机制进行故障转移。
- HW: 高水位,标识consumer可见的offset,取所有ISR中最小的那个,只有所有的副本都同步完成HW才会增加,消费者只能消费到HW之后的数据
- LEO: 每个partation的log最后一条message位置
- AR: 所有的分区副本集合
- ISR: 同步的分区集合队列,属于AR的一个子集,ISR中如果同步慢了或挂起会被t出ISR队列。
- OSR:从同步队列中被提出的分区集合、
- 当partation leader挂掉后由Controller在ISR集合中顺序查找出第一个选举新leader
kafka如何保证数据不丢失
- Producer保证发送数据不丢,生产者发送消息有三种模式,
发完即忘
、同步
和异步
,可以通过设置同步或异步的方式获取响应结果,失败做重试来保证消息在发送阶段不丢(broker接受produer数据做了幂等性保证) - Broker保证接收数据保证不丢失,当生产者向leader发数据时通过request.required.acks参数设置数据可靠性的级别。
- 1(默认): producer在ISR中的leader已成功收到的数据并得到确认后发送下一条message。如果leader宕机了,则会丢失数据。
- 0:producer无需等待来自broker的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性确是最低的。
- -1或者all:producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。通过设置ack=1,broker内部做副本同步保证broker内部数据不丢失。
- Consumer保证消费数据不丢失,默认情况下,当消费者消费到消息后,会自动提交offse。但是如果消费者消费出错,没有进入真正的业务处理,那么就可能会导致这条消息消费失败,从而丢失。可以通过开启手动提交位移,等待业务正常处理完成后,再提交offset。
kafka为什么那么快,吞吐量高
- kafka生产消息时通过异步发送机制,首先通过main线程将数据缓存起来,sender线程批量搬运数据,broker定时去poll数据。
- 数据批量读写、批量压缩,消息发送到broker之前会压缩消息,达到一定数据量压缩一次性发送。
- 顺序写磁盘:新的消息顺序添加到日志文件末尾,而且磁盘上的 数据不会一直存着,后台会维护一个线程 来定期检测是否有数据该删除。
- PageCache页缓存:充分利用Linux操作系统对磁盘的访问优化,Cache层在内存种缓存了磁盘上的部分数据。(类似mysql的bufferpool)Broker收到数据后先将生产者的数据写入page cache,再定期刷到磁盘中
- 零拷贝技术:通过 NIO 的 transferTo/transferFrom 调用操作系统的 sendfile 实现零拷贝(高频考点)。
- 数据分区分段 + 稀疏索引:Kafka 的 message 消息实际上是分布式存储在一个一个小的segment中的,每次文件操作也是直接操作的 segment。为了进一步的查询优化,Kafka 又默认为分段后的数据文件建立了索引文件,就是文件系统上的 .index文件。这种分区分段+索引的设计,不仅提升了数据读取的效率,同时也提高了数据操作的并行度。
kafka数据存储原理
- partation为真实存储数据的地方,每个partation以文件夹的形式存储在文件系统中。每个对应的每个partation文件夹下的日志被分割成很多segment段。
- 日志分段名通过偏移量确定,比如segment1的段号是509,segment2的段号是1397,那么segment1就存储了偏移量509-1397的消息。
- 定位到段后通过稀疏索引的方式,也就是利用*.index文件。之所以成为稀疏索引是因为并没有维护所有数据的索引,定位数据的时候要通过二分查找的方式定位索引的位置,再通过索引对应的真实数据的位置回表查询。
- *.timeindex 和kafka清理数据有着密切的关系,kafka默认保留7天内的数据,对于超过7天的数据,会被清理掉,这里的清理逻辑主要根据timeindex时间索引文件里最大的时间来判断的,如果最大时间与当前时间差值超过7天,那么对应的数据段就会被清理掉
kafka rebalance
- consumer group多个消费者组成起来的一个组,它们共同消费 topic 的所有消息,并且一个 topic 的一个 partition 只能被一个 consumer 消费。reblance就是为了kafka对提升消费效率做的优化,规定了一个ConsumerGroup下的所有consumer均匀分配订阅 Topic 的每个分区。
- 触发时机:①新consumer加入consumer group ②组内consumer离开或崩溃
- 触发原因:生产环境一般出现rebalance现象大部分原因是
消费者心跳超时
、消费者消费数据超时
- 主要参数:
- session.timeout.ms 表示 consumer 向 broker 发送心跳的超时时间。例如 session.timeout.ms = 180000 表示在最长 180 秒内 broker 没收到 consumer 的心跳,那么 broker 就认为该 consumer 死亡了,会启动 rebalance。
- heartbeat.interval.msheart 表示 consumer 每次向 broker 发送心跳的时间间隔。heartbeat.interval.ms = 60000 表示 consumer 每 60 秒向 broker 发送一次心跳。一般来说,session.timeout.ms 的值是 heartbeat.interval.ms 值的 3 倍以上。
- max.poll.interval.ms
- max.poll.records 表示每次消费的时候,获取多少条消息。获取的消息条数越多,需要处理的时间越长。所以每次拉取的消息数不能太多,需要保证在 max.poll.interval.ms 设置的时间内能消费完,否则会发生 rebalance。
- 解决方案:
- 心跳超时就调整session.timeout.ms和heartbeat.interval.ms.
- 消费处理超时一般是增加消费者处理的时间(max.poll.interval.ms),减少每次处理的消息数(max.poll.records)
如何增加消费能力
- 可以考虑增加 topic 的分区数,并且同时提升消费组的消费者数量,消费者数=分区数。
- 如果是消费者消费不及时,可以采用多线程的方式进行消费,并且优化业务方法流程,同样的分区数,查看为什么并发那么高。
kafka数据倾斜怎么办
- 刚才提到kafka broker内部结构会出现随着topic数量不断增多,每个topic的分区数量又不一致,最终就会出现topic分区在Kafka集群内分配不均的情况。
- 比如:topic1是10个分区、topic2是15个分区、topic3是3个分区,集群有6台机器。那6台broker上总会有4台broker有两个topic1的分区,有3台broke上有3个topic3分区等等。这样就会导致分区多的broker上的出入流量可能要比其他broker上要高,最终导致资源问题。
- 出现这种情况如果仅仅知识新增broker扩展并不会起作用,要手动编辑内置副本迁移脚本
vi topic-reassignment.json
手动调整各broker与partation的关系。当然网上也有很多自动迁移工具。 - 最近很火的pulsar天然支持动态伸缩能力,就不用这么费劲
kafka支持读写分离吗
- kafka作为主写主读架构不支持读写分离
- 读写分离本质上通过另一个节点分担主节点负载压力,而kafka有独特的副本机制去实现负载功能
分区数越多越好吗
- 在一定条件下,分区数的数量是和吞吐量成正比的,分区数和性能也是成正比。
- 超过了一定限度,客户端和服务端需要使用的内存会激增
- 服务端在很多组件中都维护了分区级别的缓存,分区数越大,缓存成本也就越大。
- 消费端的消费线程数是和分区数挂钩的,分区数越大消费线程数也就越多,线程的开销成本也就越大
- 生产者发送消息有缓存的概念,会为每个分区缓存消息,当积累到一定程度或者时间时会将消息发送到分区,分区越多,这部分的缓存也就越大
- 文件句柄的开销,partation底层存储对应一个log文件,文件句柄数量增加
- 增加数据同步负担,降低高可用