简介
Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。
特点
- 可靠性:具有副本及容错机制。
- 可扩展性:kafka无需停机即可扩展节点及节点上线。
- 持久性:数据存储到磁盘上,持久性保存。
- 性能:kafka具有高吞吐量。达到TB级的数据,也有非常稳定的性能。
- 速度快:顺序写入和零拷贝技术使得kafka延迟控制在毫秒级。
- 异步通信:消息队列允许用户把消息放入队列但不立即处理它。
使用场景
- 消息队列,服务解耦
- 日志收集
- 流处理
设计及原理
架构
img
「名词解释」
序号 | 名词 | 含义 |
01 | 「Producer」 | 生产者,发送消息的一方。生产者负责创建消息,然后将其发送到 Kafka。 |
02 | 「Consumer」 | 消费者,接受消息的一方。消费者连接到 Kafka 上并接收消息,进而进行相应的业务逻辑处理。 |
03 | 「Consumer Group」 | 一个消费者组可以包含一个或多个消费者。使用多分区 + 多消费者方式可以极大提高数据下游的处理速度,同一消费组中的消费者不会重复消费消息,同样的,不同消费组中的消费者消息消息时互不影响。Kafka 就是通过消费组的方式来实现消息 P2P 模式和广播模式。 |
04 | 「Broker」 | 服务代理节点。Broker 是 Kafka 的服务节点,即 Kafka 的服务器。 |
05 | 「Topic」 | Kafka 中的消息以 Topic 为单位进行划分,生产者将消息发送到特定的 Topic,而消费者负责订阅 Topic 的消息并进行消费。 |
06 | 「Partition」 | Topic 是一个逻辑的概念,它可以细分为多个分区,每个分区只属于单个主题。同一个主题下不同分区包含的消息是不同的,分区在存储层面可以看作一个可追加的日志(Log)文件,消息在被追加到分区日志文件的时候都会分配一个特定的偏移量(offset)。 |
07 | 「segment」 | 一个partition当中存在多个segment文件段,每个segment分为两部分,.log文件和 .index 文件,其中 .index 文件是索引文件,主要用于快速查询, .log 文件当中数据的偏移量位置; |
08 | 「Offset」 | offset 是消息在分区中的唯一标识,Kafka 通过它来保证消息在分区内的顺序性,不过 offset 并不跨越分区,也就是说,Kafka 保证的是分区有序性而不是主题有序性。 |
09 | 「Replication」 | 副本,是 Kafka 保证数据高可用的方式,Kafka 同一 Partition 的数据可以在多 Broker 上存在多个副本,通常只有主副本对外提供读写服务,当主副本所在 broker 崩溃或发生网络异常,Kafka 会在 Controller 的管理下会重新选择新的 Leader 副本对外提供读写服务。 |
10 | 「Record」 | 实际写入 Kafka 中并可以被读取的消息记录。每个 record 包含了 key、value 和 timestamp。 |
11 | 「ZooKeeper」 | Kafka 集群能够正常工作,需要依赖于 ZooKeeper,ZooKeeper 帮助 Kafka 存储和管理集群信息。 |
ZooKeeper作用
Zookeeper 是一个成熟的分布式协调服务,它可以为分布式服务提供分布式配置服、同步服务和命名注册等能力.。对于任何分布式系统,都需要一种协调任务的方法。Kafka 是使用 ZooKeeper 而构建的分布式系统。
zookeeper
- Broker 注册:Broker 是分布式部署并且之间相互独立,Zookeeper 用来管理注册到集群的所有 Broker 节点。
- Topic 注册:在 Kafka 中,同一个 Topic 的消息会被分成多个分区并将其分布在多个 Broker 上,这些分区信息及与 Broker 的对应关系也都是由 Zookeeper 在维护
- 生产者负载均衡:由于同一个 Topic 消息会被分区并将其分布在多个 Broker 上,因此,生产者需要将消息合理地发送到这些分布式的 Broker 上。
- 消费者负载均衡:与生产者类似,Kafka 中的消费者同样需要进行负载均衡来实现多个消费者合理地从对应的 Broker 服务器上接收消息,每个消费者分组包含若干消费者,每条消息都只会发送给分组中的一个消费者,不同的消费者分组消费自己特定的 Topic 下面的消息,互不干扰。
Controller 是从 Broker 中选举出来的,负责分区 Leader 和 Follower 的管理。当某个分区的 leader 副本发生故障时,由 Controller 负责为该分区选举新的 leader 副本。当检测到某个分区的 ISR(In-Sync Replica)集合发生变化时,由控制器负责通知所有 broker 更新其元数据信息。当使用kafka-topics.sh脚本为某个 topic 增加分区数量时,同样还是由控制器负责分区的重新分配。
Kafka 中 Contorller 的选举的工作依赖于 Zookeeper,成功竞选为控制器的 broker 会在 Zookeeper 中创建/controller这个临时(EPHEMERAL)节点。
生产模型
分区策略
分区原因:
- 方便在集群中扩展,每个 Partition 可以通过调整以适应它所在的机器,而一个 Topic 又可以有多个 Partition 组成,因此可以以 Partition 为单位读写了。
- 可以提高并发,因此可以以 Partition 为单位读写了。
分区原则:我们需要将 Producer 发送的数据封装成一个 ProducerRecord 对象。
该对象需要指定一些参数:
- topic:string 类型,NotNull。
- partition:int 类型,可选。
- timestamp:long 类型,可选。
- key:string 类型,可选。
- value:string 类型,可选。
- headers:array 类型,Nullable。
- 指明 Partition 的情况下,直接将给定的 Value 作为 Partition 的值。
- 没有指明 Partition 但有 Key 的情况下,将 Key 的 Hash 值与分区数取余得到 Partition 值。
- 既没有 Partition 有没有 Key 的情况下,第一次调用时随机生成一个整数(后面每次调用都在这个整数上自增),将这个值与可用的分区数取余,得到 Partition 值,也就是常说的 Round-Robin 轮询算法。
数据可靠性保证
为保证 Producer 发送的数据,能可靠地发送到指定的 Topic,Topic 的每个 Partition 收到 Producer 发送的数据后,都需要向 Producer 发送 ACK(ACKnowledge 确认收到)。
如果 Producer 收到 ACK,就会进行下一轮的发送,否则重新发送数据。
6.png
「副本数据同步策略」
何时发送 ACK?确保有 Follower 与 Leader 同步完成,Leader 再发送 ACK,这样才能保证 Leader 挂掉之后,能在 Follower 中选举出新的 Leader 而不丢数据。
多少个 Follower 同步完成后发送 ACK?全部 Follower 同步完成,再发送 ACK。
7.png
「ISR」
采用第二种方案,所有 Follower 完成同步,Producer 才能继续发送数据,设想有一个 Follower 因为某种原因出现故障,那 Leader 就要一直等到它完成同步。
这个问题怎么解决?Leader维护了一个动态的 in-sync replica set(ISR):和 Leader 保持同步的 Follower 集合。
当 ISR 集合中的 Follower 完成数据的同步之后,Leader 就会给 Follower 发送 ACK。
如果 Follower 长时间未向 Leader 同步数据,则该 Follower 将被踢出 ISR 集合,该时间阈值由 replica.lag.time.max.ms 参数设定。Leader 发生故障后,就会从 ISR 中选举出新的 Leader。
「ACK 应答机制」
对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等 ISR 中的 Follower 全部接受成功。
所以 Kafka 为用户提供了三种可靠性级别,用户根据可靠性和延迟的要求进行权衡,选择以下的配置。
8.png
ACK 参数配置:
- 0:Producer 不等待 Broker 的 ACK,这提供了最低延迟,Broker 一收到数据还没有写入磁盘就已经返回,当 Broker 故障时有可能丢失数据。
- 1:Producer 等待 Broker 的 ACK,Partition 的 Leader 落盘成功后返回 ACK,如果在 Follower 同步成功之前 Leader 故障,那么将会丢失数据。
- -1(all):Producer 等待 Broker 的 ACK,Partition 的 Leader 和 Follower 全部落盘成功后才返回 ACK。但是在 Broker 发送 ACK 时,Leader 发生故障,则会造成数据重复。
「故障处理细节」
9.png
LEO:每个副本最大的 Offset。HW:消费者能见到的最大的 Offset,ISR 队列中最小的 LEO。
Follower 故障:Follower 发生故障后会被临时踢出 ISR 集合,待该 Follower 恢复后,Follower 会 读取本地磁盘记录的上次的 HW,并将 log 文件高于 HW 的部分截取掉,从 HW 开始向 Leader 进行同步数据操作。
等该 Follower 的 LEO 大于等于该 Partition 的 HW,即 Follower 追上 Leader 后,就可以重新加入 ISR 了。
Leader 故障:Leader 发生故障后,会从 ISR 中选出一个新的 Leader,之后,为保证多个副本之间的数据一致性,其余的 Follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 Leader 同步数据。
注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。
Exactly Once 语义
将服务器的 ACK 级别设置为 -1,可以保证 Producer 到 Server 之间不会丢失数据,即 At Least Once 语义。
相对的,将服务器 ACK 级别设置为 0,可以保证生产者每条消息只会被发送一次,即 At Most Once 语义。
At Least Once 可以保证数据不丢失,但是不能保证数据不重复;相对的,At Most Once 可以保证数据不重复,但是不能保证数据不丢失。
但是,对于一些非常重要的信息,比如交易数据,下游数据消费者要求数据既不重复也不丢失,即 Exactly Once 语义。
0.11 版本的 Kafka,引入了幂等性:Producer 不论向 Server 发送多少重复数据,Server 端都只会持久化一条。
即:
At Least Once + 幂等性 = Exactly Once
要启用幂等性,只需要将 Producer 的参数中 enable.idompotence 设置为 true 即可。
开启幂等性的 Producer 在初始化时会被分配一个 PID,发往同一 Partition 的消息会附带 Sequence Number。
而 Borker 端会对 <PID,Partition,SeqNumber> 做缓存,当具有相同主键的消息提交时,Broker 只会持久化一条。
但是 PID 重启后就会变化,同时不同的 Partition 也具有不同主键,所以幂等性无法保证跨分区会话的 Exactly Once。
消费模型
Kafka 有消费组的概念,每个消费者只能消费所分配到的分区的消息,每一个分区只能被一个消费组中的一个消费者所消费,所以同一个消费组中消费者的数量如果超过了分区的数量,将会出现有些消费者分配不到消费的分区。消费组与消费者关系如下图所示:
consumer group
Kafka Consumer Client 消费消息通常包含以下步骤:
- 配置客户端,创建消费者
- 订阅主题
- 拉取消息并消费
- 提交消费位移
- 关闭消费者实例
过程
因为 Kafka 的 Consumer 客户端是线程不安全的,为了保证线程安全,并提升消费性能,可以在 Consumer 端采用类似 Reactor 的线程模型来消费数据。
消费模型
Kafka consumer 参数
- bootstrap.servers:连接 broker 地址,
host:port
格式。 - group.id:消费者隶属的消费组。
- key.deserializer:与生产者的
key.serializer
对应,key 的反序列化方式。 - value.deserializer:与生产者的
value.serializer
对应,value 的反序列化方式。 - session.timeout.ms:coordinator 检测失败的时间。默认 10s 该参数是 Consumer Group 主动检测 (组内成员 comsummer) 崩溃的时间间隔,类似于心跳过期时间。
- auto.offset.reset:该属性指定了消费者在读取一个没有偏移量后者偏移量无效(消费者长时间失效当前的偏移量已经过时并且被删除了)的分区的情况下,应该作何处理,默认值是 latest,也就是从最新记录读取数据(消费者启动之后生成的记录),另一个值是 earliest,意思是在偏移量无效的情况下,消费者从起始位置开始读取数据。
- enable.auto.commit:否自动提交位移,如果为
false
,则需要在程序中手动提交位移。对于精确到一次的语义,最好手动提交位移 - fetch.max.bytes:单次拉取数据的最大字节数量
- max.poll.records:单次 poll 调用返回的最大消息数,如果处理逻辑很轻量,可以适当提高该值。但是
max.poll.records
条数据需要在在 session.timeout.ms 这个时间内处理完 。默认值为 500 - request.timeout.ms:一次请求响应的最长等待时间。如果在超时时间内未得到响应,kafka 要么重发这条消息,要么超过重试次数的情况下直接置为失败。
Kafka Rebalance
rebalance 本质上是一种协议,规定了一个 consumer group 下的所有 consumer 如何达成一致来分配订阅 topic 的每个分区。比如某个 group 下有 20 个 consumer,它订阅了一个具有 100 个分区的 topic。正常情况下,Kafka 平均会为每个 consumer 分配 5 个分区。这个分配的过程就叫 rebalance。
「什么时候 rebalance?」
这也是经常被提及的一个问题。rebalance 的触发条件有三种:
- 组成员发生变更(新 consumer 加入组、已有 consumer 主动离开组或已有 consumer 崩溃了——这两者的区别后面会谈到)
- 订阅主题数发生变更
- 订阅主题的分区数发生变更
「如何进行组内分区分配?」
Kafka 默认提供了两种分配策略:Range 和 Round-Robin。当然 Kafka 采用了可插拔式的分配策略,你可以创建自己的分配器以实现不同的分配策略。
常见问题
为什么Kafka不支持读写分离?
在 Kafka 中,生产者写入消息、消费者读取消息的操作都是与 leader 副本进行交互的,从 而实现的是一种主写主读的生产消费模型。
Kafka 并不支持主写从读,因为主写从读有 2 个很明 显的缺点:
(1)数据一致性问题。数据从主节点转到从节点必然会有一个延时的时间窗口,这个时间 窗口会导致主从节点之间的数据不一致。某一时刻,在主节点和从节点中 A 数据的值都为 X, 之后将主节点中 A 的值修改为 Y,那么在这个变更通知到从节点之前,应用读取从节点中的 A 数据的值并不为最新的 Y,由此便产生了数据不一致的问题。
(2)延时问题。类似 Redis 这种组件,数据从写入主节点到同步至从节点中的过程需要经 历网络→主节点内存→网络→从节点内存这几个阶段,整个过程会耗费一定的时间。而在 Kafka 中,主从同步会比 Redis 更加耗时,它需要经历网络→主节点内存→主节点磁盘→网络→从节 点内存→从节点磁盘这几个阶段。对延时敏感的应用而言,主写从读的功能并不太适用。
Kafka中是怎么体现消息顺序性的?
kafka每个partition中的消息在写入时都是有序的,消费时,每个partition只能被每一个group中的一个消费者消费,保证了消费时也是有序的。
整个topic不保证有序。如果为了保证topic整个有序,那么将partition调整为1.
Kafka中的消息是否会丢失和重复消费?
要确定Kafka的消息是否丢失或重复,从两个方面分析入手:消息发送和消息消费。
1、消息发送
Kafka消息发送有两种方式:同步(sync)和异步(async),默认是同步方式,可通过producer.type属性进行配置。Kafka通过配置request.required.acks属性来确认消息的生产:
0---表示不进行消息接收是否成功的确认;
1---表示当Leader接收成功时确认;
-1---表示Leader和Follower都接收成功时确认;
综上所述,有6种消息生产的情况,下面分情况来分析消息丢失的场景:
(1)acks=0,不和Kafka集群进行消息接收确认,则当网络异常、缓冲区满了等情况时,消息可能丢失;
(2)acks=1、同步模式下,只有Leader确认接收成功后但挂掉了,副本没有同步,数据可能丢失;
2、消息消费
Kafka消息消费有两个consumer接口,Low-level API和High-level API:
Low-level API:消费者自己维护offset等值,可以实现对Kafka的完全控制;
High-level API:封装了对parition和offset的管理,使用简单;
如果使用高级接口High-level API,可能存在一个问题就是当消息消费者从集群中把消息取出来、并提交了新的消息offset值后,还没来得及消费就挂掉了,那么下次再消费时之前没消费成功的消息就“诡异”的消失了;
解决办法:
针对消息丢失:同步模式下,确认机制设置为-1,即让消息写入Leader和Follower之后再确认消息发送成功;异步模式下,为防止缓冲区满,可以在配置文件设置不限制阻塞超时时间,当缓冲区满时让生产者一直处于阻塞状态;
针对消息重复:将消息的唯一标识保存到外部介质中,每次消费时判断是否处理过即可。
Kafka 如何保证高可用?
kafka 0.8以前,是没有HA机制的,就是任何一个broker宕机了,那个broker上的partition就废了,没法写也没法读,没有什么高可用性可言。
kafka 0.8以后,提供了HA机制,replica副本机制。每个partition的数据都会同步到其他机器上,形成自己的多个replica副本。然后所有replica会选举一个leader出来,那么生产和消费都跟这个leader打交道,然后其他replica就是follower。写的时候,leader会负责把数据同步到所有follower上去,读的时候就直接读leader上数据即可。只能读写leader?很简单,要是你可以随意读写每个follower,那么就要care数据一致性的问题,系统复杂度太高,很容易出问题。kafka会均匀的将一个partition的所有replica分布在不同的机器上,这样才可以提高容错性。
kafka的这种机制,就有所谓的高可用性了,因为如果某个broker宕机了,也没事儿,因为那个broker上面的partition在其他机器上都有副本的,那么此时会重新选举一个新的leader出来,大家继续读写那个新的leader即可。
1)写过程
写数据的时候,生产者就写leader,然后leader将数据落地写本地磁盘,接着其他follower自己主动从leader来pull数据。一旦所有follower同步好数据了,就会发送ack给leader,leader收到所有follower的ack之后,就会返回写成功的消息给生产者。(当然,这只是其中一种模式,还可以适当调整这个行为)
2)读过程
消费的时候,只会从leader去读,但是只有当一个消息已经被所有follower都同步成功并返回ack的时候,这个消息才能够被消费者读到。
Kafka 为什么快?
顺序写mmap
因为硬盘是机械结构,每次读写都会「寻址」,「写入」,其中寻址是一个“机械动作”,它是最耗时的。
所以随机I/O会让硬盘重复机械动作比较耗时,顺序I/O的寻址速度就比较快了。
为了提高读写硬盘的速度,Kafka就是使用「顺序I/O」。每条消息都被append到该Partition中,属于「顺序写磁盘」,因此效率非常高。
img
即便是顺序写入硬盘,硬盘的访问速度还是不可能追上内存。所以「Kafka的数据并不是实时的写入硬盘,它充分利用了现代操作系统分页存储来利用内存提高I/O效率。」
零拷贝
Kafka服务器在响应客户端读取数据的时候,底层使用的是「ZeroCopy」技术,也就是数据只在内核空间传输,数据不会到达用户空间。
常规的I/O操作一般是这样的:
img
- 文件在磁盘中的数据被拷贝到内核缓冲区
- 从内核缓冲区拷贝到用户缓冲区
- 用户缓冲区拷贝到内核与Socket相关的缓冲区
- 数据从Socket缓冲区拷贝到相关协议引擎发送出去
这样的操作与用户空间有关,效率不高,Kafka底层使用的零拷贝是这样的:
- 文件在磁盘中的数据被拷贝到内核缓冲区
- 从内核缓冲区拷贝到与Socket相关的缓冲区
- 数据从Socket缓冲区拷贝到相关协议引擎发送出去
整个处理过程没有用到用户空间,效率提升了,这种就是零拷贝。