Kafka 常见问题
一年将尽夜,万里未归人。
1、Kafka 简介
Apache Kafka是一个分布式发布 - 订阅消息系统和一个强大的队列, 可以处理大量的数据, 并使您能够将消息从一个端点传递到另一个端点。 Kafka适合离线和在线消息消费,Kafka消息保留在磁盘上, 并在群集内复制以防止数据丢失。 Kafka构建在ZooKeeper同步服务之上,依赖 Zookeeper,它与Apache Storm和Spark非常好地集成, 用于实时流式数据分析。 Kafka 依赖于日志顺序写, 因此支持消息回溯和支撑高性能读写。2、Kafka 的 Broker 基本概念
Kafka的 Server包含多个 Topic 、Partition 和 Replica,负责协调 Producer 和 Consumer。主从结构为: 主节点为 Controller, 从节点为从节点 Kafka 启动是会往 Zookeeper 中注册当前Broker 信息,谁先注册谁就是 Controller,读取注册上来的从节点的数据(通过监听机制), 生成集群的元数据信息, 之后把这些信息都分发给其他的服务器, 让其他服务器能感知到集群中其它成员的存在3、Kafka 的 Topic 基本概念
标准 MQ 中的 Queue,Kafka 中一个 Topic 的消息会保存在不同的 Partition (不同的 Broker)来保证高可用。4、Kafka 的 Partition (分区) 基本概念
- 可以理解为将标准 MQ 的 Queue 的消息进行拆分, 来实现高可用。
- Producer 发送的 Message, 根据 key 和 partition 数进行 hash, 然后进行投递。
- 一个分区只能被同一个 Consumer Group 中的一个 Consumer 消费,分区内消费有序。
5、Replica (备份)
每一个 Partition 的备份。 Replica 的小于等于 Broker 的数量。 Leader: Replica领导节点, 每一个 Partition 都有对应的 Leader 节点(Broker),Producer 写数据时, 只会往 Leader 中写,Consumer 读数据也是从 Leader 中读。 Follower: Replica跟随节点, 用于复制领导节点的数据,复制 Leader 消息采用 pull (拉)模式。、# Broker 设置副本数量 默认为 3
default.replication.factor
# Topic 设置副本数量
replication-factor
6、ISR (In-Sync Replica)
Leader维护一个与其基本保持同步的Replica列表, 每个Partition都会有一个ISR, 而且是由leader动态维护。如果一个flower比一个leader落后太多, 或者超过一定时间未发起数据复制请求, 则leader将其重ISR中移除。当ISR中所有Replica都向Leader发送ACK时, leader才commit。 Leader 宕机之后, 会从 ISR 选择数据最新的 Follower 来当做 Leader 如果 ISR 全部宕机, 则选择第一个回复的 Replica 当做 Leader 节点 (消息可能会丢失或者重复消费)。7、水印备份机制
水印备份机制即 LEO (last end offffset),日志末端位移, 记录了该副本对象底层日志文件中下一条消息的位移值, 副本写入消息的时候, 会自动更新 LEO 值 Leader 会保存两个 LEO 值, 一个是自己的 LEO 值, 另外一个是 remote 的 LEO 值。Follower 每次 fetch 请求都会携带当前 LEO, Leader 会选择最小的 LEO来更新 HW HW (high watermark): 从名字可以知道, 该值叫高水印值, HW 一定不会大于 LEO 值, 小于 HW 值的消息被认为是"已提交"或"已备份"的消息, 并对消费者可见。8、Message
标准 MQ 的 Queue 中的 Message,即一条消息。9、Producer
标准 MQ 中的发送方,发送给 Broker 使用push (推)模式。10、数据一致性保证 (消息不丢失)
request.required.asks=0
- 0: 相当于异步的, 不需要leader给予回复, producer立即返回, 发送就是成功,那么发送消息网络超时或broker crash(1.Partition的Leader还没有commit消息 2.Leader与Follower数据不同步), 既有可能丢失也可能会重发。
- 1:当leader接收到消息之后发送ack, 丢会重发, 丢的概率很小。
- -1:当所有的follower都同步消息成功后发送ack. 不会丢失消息。
11、Consumer
标准 MQ 中的消费方,接受 Broker 使用 pull (拉)模式, 默认 100ms 拉一次,Consumer 消费的是Partition 的数据。 消息丢失: 手动确认 ack 而不是自动提交。 消息重复: 消费端幂等处理。12、Consumer Group
在 Kafka 中, 一个 Topic 是可以被一个消费组消费, 一个Topic 分发给 Consumer Group 中的Consumer 进行消费, 保证同一条 Message 不会被不同的 Consumer 消费。 注意: 当Consumer Group的 Consumer 数量大于 Partition 的数量时, 超过 Partition 的数量将会拿不到消息。13、分片规则
Kafka分配Replica的算法有两种: RangeAssignor 和 RoundRobinAssignor 默认为RangeAssignor: 1. 将所有Broker(假设共n个Broker)和待分配的Partition排序 2. 将第i个Partition分配到第(i mod n)个Broker上 3. 将第i个Partition的第j个Replica分配到第((i + j) mod n)个Broker上14、Rebalance (重平衡)
Rebalance 本质上是一种协议, 规定了一个 Consumer Group 下的所有 consumer 如何达成一致,来分配订阅 Topic 的每个分区。 Rebalance 发生时, 所有的 Consumer Group 都停止工作, 直到 Rebalance 完成。15、Coordinator
Group Coordinator 是一个服务, 每个 Broker 在启动的时候都会启动一个该服务 Group Coordinator 的作用是用来存储 Group 的相关 Meta 信息, 并将对应 Partition 的 Offset 信息记录到 Kafka 内置 Topic(__consumer_offsets)中 Kafka 在0.9之前是基于 Zookeeper 来存储Partition的 offset 信息(consumers/{group}/offsets/{topic}/{partition}), 因为 Zookeeper 并不适用于频繁的写操作, 所以在0.9之后通过内置 Topic 的方式来记录对应 Partition 的 offset。16、Rebalace 流程
Rebalance 过程分为两步:Join 和 Sync 1. Join: 顾名思义就是加入组。这一步中, 所有成员都向 Coordinator 发送 JoinGroup 请求, 请求加入消费组,一旦所有成员都发送了 JoinGroup 请求, Coordinator 会从中选择一个Consumer 担任 Leader 的角色, 并把组成员信息以及订阅信息发给 Consumer Leader ,注意Consumer Leader 和 Coordinator不是一个概念。Consumer Leader负责消费分配方案的制定。 2. Sync: Consumer Leader 开始分配消费方案, 即哪个 Consumer 负责消费哪些 Topic 的哪些Partition。一旦完成分配, Leader 会将这个方案封装进 SyncGroup 请求中发给 Coordinator,非 Leader 也会发 SyncGroup 请求, 只是内容为空。Coordinator 接收到分配方案之后会把方案塞进SyncGroup的Response中发给各个Consumer,这样组内的所有成员就都知道自己应该消费哪些分区了。17、日志索引
Kafka 能支撑 TB 级别数据, 在日志级别有两个原因: 顺序写和日志索引。 Kafka 在一个日志文件达到一定数据量 (1G) 之后, 会生成新的日志文件, 大数据情况下会有多个日志文件, 通过偏移量来确定到某行纪录时, 如果遍历所有的日志文件, 那效率自然是很差的。Kafka在日志级别上抽出来一层日志索引, 来方便根据 offset 快速定位到是某个日志文件。 每一个 partition 对应多个个 log 文件(最大 1G), 每一个 log 文件又对应一个 index 文件。18、Kafka 高性能、高吞吐 的原因?
分区、顺序写、批发送和数据压缩等。19、分区的原因
如果我们假设像标准 MQ 的 Queue, 为了保证一个消息只会被一个消费者消费, 那么我们第一想到的就是加锁。对于发送者, 在多线程并且非顺序写环境下, 保证数据一致性, 我们同样也要加锁。一旦考虑到加锁, 就会极大的影响性能。我们再来看Kafka 的 Partition, Kafka 的消费模式和发送模式都是以 Partition 为分界,也就是说对于一个 Topic 的并发量限制在于有多少个 Partition, 就能支撑多少的并发,可以参考 Java 1.7 的 ConcurrentHashMap 的桶设计, 原理一样, 有多少桶, 支持多少的并发。20、顺序写
磁盘的顺序写的性能要比内存随机写的还要强。21、批发送
批处理是一种常用的用于提高I/O性能的方式。对Kafka而言, 批处理既减少了网络传输的Overhead, 又提高了写磁盘的效率。Kafka 0.82 之后是将多个消息合并之后再发送, 而并不是send一条就立马发送(之前支持)。# 批量发送的基本单位, 默认是16384Bytes, 即16kB batch.size # 延迟时间 linger.ms # 两者满足其一便发送