kafka面试题 1
-
简介
- kafka是一个分布式发布-订阅消息系统和一个强大的队列,可以处理大量的数据,并使你能够将消息从一个端点传递到另一个端点,kafka适合离线和在线消息消费,kafka消息保留在磁盘上,并在集群内复制以防止数据丢失,kafka构建在zookeeper同步服务上,他与Apache Storm和Spark非常好地集成,用于实时流数据分析
- kafka依赖于日志顺序写,因此支持消息回溯和支撑高性能读写
- 依赖于zookeeper
-
broker
- 包含多个Topic,Partition和Replica,负责协调Producer和Consumer
- 主从结构:主节点为Controller,kafka启动会往zookeeper中注册当前broker信息,谁先注册谁就是Controller,读取注册上来的节点的数据,(通过监听机制),生成集群的元数据信息,之后把这些信息都分发给其他服务器,让其他服务器能感知到集群中其他成员的存在
-
topic
- 标准MQ中的Queue,Kafka中一个topic的消息会保存在不同的Partition(不同的broker)来保证高可用
-
Partition分区
- 可以理解为将标准MQ的Queue的消息进行拆分,来实现高可用
- Producer发送的Message,根据key和partition数进行hash,然后继续宁投递
- 一个分区只能被同一个Consumer group中的一个Consumer消息,分区内消费有序
-
replica备份
- 每一个Partition的备份,Replica的小于等于broker的数量
- Leader:Replica领导节点,每一个Partition都有对应的Leader节点,Producer写数据时,只会往Leader中写,Consumer读数据也是从Leader中读
- Follower:是Replica跟随节点,用于复制领导节点的数据,复制Leader消息采用pull拉取模式
- broker设置副本数量,默认为3, default.replication.factor
- topic设置副本数量:replication-factor
-
ISR(In-Sync Replica)
- Leader维护一个与基本保持同步的Replica列表,每个partition都会有一个ISR,而且是由leader动态维护,如果一个follower比一个leader落后太多,或者超过一定时间未发起数据复制请求,则leader将其从ISR中移除,当ISR中所有的Replica都向Leader发送ACK时,leader才commit
- Leader宕机之后,会从ISR选择数据最新的Follower来当作leader,如果ISR全部宕机,则选择第一个回复的Replica当作Leader节点,(消息可能会丢失或者重复消费)
- replica.lag.time.max.ms=10000
- 如果leader发现follower超过10s没有向他发起fetch请求,那么leader考虑这个follower是不是程序出了问题,或者资源紧张调度不过来,他太慢了,不希望他拖慢后面的进度,就把他从ISR中移除
- replica.lag.max.messages=4000
- 相差4000条就移除
- follower慢的时候,保证高可用性,同时满足这两个条件后又加入ISR中
- 在可用性和一致性做了动态平衡(一个亮点)
- min.insync.replicas=1
- 需要保证ISR中至少有多少个replica
-
LEO和HW
- LEO(Last end offset):日志末端位移,记录了该副本对象底层日志文件中下一条消息的位移值,副本写入消息的时候,会自动更新LEO值,leader会保存两个LEO值,一个是自己的LEO值,另一个是remote的LEO值,Follower每次fetch请求都会携带当前LEO,leader会选择最小的LEO来更新HW
- HW(high watermaker):HW一定不会大于LEO值,小于HW值的消息被认为是已提交或已备份的消息,并对消费者可见
-
Message
- 标准MQ的Queue中的Message,即一条消息
-
Producer
- 标准MQ中的发送方,发送给broker使用push(推)模式
-
数据一致性保证(消息不丢失)
- request.required.acks=0
- 0:相当于异步的,不需要leader给予回复,producer立即返回,发送就是成功,那么发送消息网络超时或者broker crash(1:Partition的leader还没有commit消息,2:leader与follower数据不同步),即有可能会丢失数据也可能会重发
- 1:当leader接收到消息之后发送ACK,丢失会重发,丢失的概率很小
- -1:当所有的follower都同步消息成功后发送ack,不会丢失消息
- request.required.acks=0
-
consumer
- 标准MQ中的消费方,接受broker使用pull模式,默认100ms拉一次,consumer消费的是partition的数据
- 消息丢失:手动确认ack而不是自动提交
- 消息重复:消费端幂等处理
-
consumer group
- 在kafka中,一个topic是可以被一个消费者组消费,一个Topic分发给Consumer group中的Consumer进行消费,保证同一条message不会被不同的Consumer消费
- 当consumer group的consumer数量大于partition的数量时,超过partition的数量将会拿不到消息
-
分片规则
- kafka分配Replica的算法有两种:RangeAssignor和RoundRobinAssignor
- 默认为RangeAssignor
- 将所有broker和待分配的partition排序
- 将第i个partition分配到第(i mod n)个broker上
- 将第i个partition的第j个replica分配到第((i+j) mod n)个broker上
-
rebalance重平衡
- rebalance本质上是一种协议,规定了一个consumer group下的所有consumer如何达成一致,来分配订阅topic的每个分区
- rebalance发生时,所有的consumer group都停止工作,直到rebalance完成
-
coordinator
- group coordinator 是一个服务,每个broker在启动的时候都会启动一个该服务,
- group coordinator的作用是用来存储group的相关Meta信息,并将对应partition的offset信息记录到kafka内置topic(_consumer_offsets)中,
- kafka在0.9之前是基于zookeeper来存储partition的offset信息(consumer/{group}/offset/{topic}/{partition}),因为zookeeper并不适用于频繁的读写,所以0.9之后通过内置topic的方式来记录对应partition的offset
-
rebalance触发条件
- 组成员个数发生变化
- 新的消费者加入到消费者组
- 消费者主动退出消费组
- 消费者被动下线,比如消费者长时间的GC,网络延迟导致消费者长时间未向group coordinator发送心跳请求,均会被认为该消费者已经下线并踢出
- 订阅的topic的consumer group个数发生变化
- topic的分区数发生变化
- 组成员个数发生变化
-
rebalance流程
- join:加入组,这一步中,所有的成员都向coordinator发送JoinGroup请求,请求加入消费者组,一旦所有成员都发送了JoinGroup请求,coordinate会从中选择一个Consumer担任leader的角色,并把组成员信息以及订阅信息发送给consumer leader,consumer leader负责消费分配方案的确定
- sync:consumer leader开始分配消费方案,即哪个consumer负责消费哪些topic的哪些partition,一旦完成分配,leader会将这个方案封装进syncGroup请求中发送给coordinate·,非leader也会发送syncGroup请求,只是内容为空,coordinate接收到分配方案之后,会把方案塞进syncGroup的response中发送给各个consumer,这样组内的所有成员就都知道自己应该消费那些分区了。
-
如何避免rebalance
- 心跳相关
- session.timeout.ms=6s
- hearbeat.interval.ms=2s
- 消费时间
- max.poll.interval.ms
- 心跳相关
-
日志索引
- kafka能支撑TB级别数据,在日志级别有两个原因:顺序写和日志索引
- kafka在一个日志文件达到一定数量(1G)后,会生成新的日志文件,大数据情况下会有很多个日志文件,通过偏移量来确定到某行记录时,如果遍历所有的日志文件,那么效率自然是很差的,
- kafak在日志级别上抽出来一层日志索引,来方便根据offset快速定位到是某个日志文件
- 每一个partition对应多个log文件(最大1G),每个log文件又对应一个index文件
- 通过offset查找message流程
- 先根据offset(例如:368773);二分定位到最大,小于等于该offset的index文件(368769.index)
- 通过二分(368773-368769=4)定位到(368769.index),该offset的log文件偏移量(3497)
- 通过定位该文件的消息行(3497),然后再往后一行一行匹配(368773 830)
-
分区的原因
- 如果我们假设像标准 MQ 的 Queue, 为了保证一个消息只会被一个消费者消费, 那么我们第一想到的就是加锁. 对于发送者, 在多线程并且非顺序写环境下, 保证数据一致性, 我们同样也要加锁. 一旦考虑到加锁, 就会极大的影响性能. 我们再来看Kafka 的 Partition, Kafka 的消费模式和发送模式都是以 Partition 为分界. 也就是说对于一个 Topic 的并发量限制在于有多少个 Partition, 就能支撑多少的并发. 可以参考 Java 1.7 的 ConcurrentHashMap 的桶设计, 原理一样, 有多少桶, 支持多少的并发
-
顺序写
- 磁盘的顺序写的性能要比内存随机写的还要强. 磁盘顺序写和随机写的差距也是天壤之别
-
批量发送
- 批处理是一种常用的用于提高I/O性能的方式. 对Kafka而言, 批处理既减少了网络传输的Overhead, 又提高了写磁盘的效率. Kafka 0.82 之后是将多个消息合并之后再发送, 而并不是send一条就立马发送(之前支持)
- 批量发送的基本单位, 默认是16384Bytes, 即16kB
- batch.size
- 延迟时间
- linger.ms
- 两者满足其一便发送
-
数据压缩
- 数据压缩的一个基本原理是, 重复数据越多压缩效果越好. 因此将整个Batch的数据一起压缩能更大幅度减小数据量, 从而更大程度提高网络传输效率
- Broker接收消息后,并不直接解压缩,而是直接将消息以压缩后的形式持久化到磁盘, Consumer接受到压缩后的数据再解压缩
- Producer 到 Broker, 副本复制, Broker 到 Consumer 的数据都是压缩后的数据, 保证高效率的传输