摘要
主要的是的针对于的kafka的面试的问题进行分析和总结
Partition Rebalance分区再均衡
1)消费者组中新添加消费者读取到原本是其他消费者读取的消息,(2)消费者关闭或崩溃之后离开群组,原本由他读取的partition将由群组里其他消费者读取,(3)当向一个Topic添加新的partition,会发生partition在消费者中的重新分配
以上三种现象会使partition的所有权在消费者之间转移,这样的行为叫作再均衡。
再均衡的优点:给消费者组带来了高可用性和伸缩性
再均衡的缺点:(1)再均衡期间消费者无法读取消息,整个群组有一小段时间不可用。(2)partition被重新分配给一个消费者时,消费者当前的读取状态会丢失,有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢应用程序。因此需要进行安全的再均衡和避免不必要的再均衡。
那么消费者组是怎么知道一个消费者可不可用呢?
消费者通过向被指派为群组协调器的Broker发送信息来维持它们和群组的从属关系以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息。消费者会在轮询消息或提交偏移量时发送心跳。如果消费者停止发送心跳的时间足够长,会话就会过期,群组协调器认为它已经死亡,就会触发一次再均衡。还有一点需要注意的是,当发生再均衡时,需要做一些清理工作,具体的操作方法可以通过在调用subscribe()方法时传入一个ConsumerRebalanceListener实例即可。如何创建消费者创建Kafka的消费者对象的过程与创建生产者的过程是类似的,需要传入必要的属性。在创建消费者的时候以下以下三个选项是必选的:
bootstrap.servers :指定 broker 的地址清单,清单里不需要包含所有的 broker 地址,生产者会从给定的 broker 里查找 broker 的信息。不过建议至少要提供两个 broker 的信息作为容错;
key.deserializer :指定键的反序列化器;
value.deserializer :指定值的反序列化器。
Kafka 与传统消息系统之间有三个关键区别?
(1).Kafka 持久化日志,这些日志可以被重复读取和无限期保留
(2).Kafka 是一个分布式系统:它以集群的方式运行,可以灵活伸缩,在内部通过复制数据,提升容错能力和高可用性
(3).Kafka 支持实时的流式处理
Kafka的用途有哪些?使用场景如何?
总结下来就几个字:异步处理、日常系统解耦、削峰、提速、广播。如果再说具体一点例如:消息,网站活动追踪,监测指标,日志聚合,流处理,事件采集,提交日志等
Kafka中的ISR、AR又代表什么?ISR的伸缩又指什么?
ISR:In-Sync Replicas 副本同步队列
AR:Assigned Replicas 所有副本
ISR是由leader维护,follower从leader同步数据有一些延迟(包括延迟时间replica.lag.time.max.ms和延迟条数replica.lag.max.messages两个维度, 当前最新的版本0.10.x中只支持replica.lag.time.max.ms这个维度),任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。AR=ISR+OSR。
Kafka中的HW、LEO、LSO、LW等分别代表什么?
HW:High Watermark 高水位,取一个partition对应的ISR中最小的LEO作为HW,consumer最多只能消费到HW所在的位置上一条信息。
LEO:LogEndOffset 当前日志文件中下一条待写信息的offset
HW/LEO这两个都是指最后一条的下一条的位置而不是指最后一条的位置。
LSO:Last Stable Offset 对未完成的事务而言,LSO 的值等于事务中第一条消息的位置(firstUnstableOffset),对已完成的事务而言,它的值同 HW 相同
LW:Low Watermark 低水位, 代表 AR 集合中最小的 logStartOffset 值
Kafka中是怎么体现消息顺序性的?
kafka每个partition中的消息在写入时都是有序的,消费时,每个partition只能被每一个group中的一个消费者消费,保证了消费时也是有序的。整个topic不保证有序。如果为了保证topic整个有序,那么将partition调整为1.
Kafka中的分区器、序列化器、拦截器是否了解?它们之间的处理顺序是什么?
拦截器->序列化器->分区器
Kafka生产者客户端中使用了几个线程来处理?分别是什么?
2个,主线程和Sender线程。主线程负责创建消息,然后通过分区器、序列化器、拦截器作用之后缓存到累加器RecordAccumulator中。Sender线程负责将RecordAccumulator中消息发送到kafka中.
“消费组中的消费者个数如果超过topic的分区,那么就会有消费者消费不到数据”这句话是否正确?如果不正确,那么有没有什么hack的手段??
不正确,通过自定义分区分配策略,可以将一个consumer指定消费所有partition。
消费者提交消费位移时提交的是:当前消费到的最新消息的offset还是offset+1?
offset+1
有哪些情形会造成重复消费?
消费者消费后没有commit offset(程序崩溃/强行kill/消费耗时/自动提交偏移情况下unscrible)
哪些情景下会造成消息漏消费?
消费者没有处理完消息 提交offset(自动提交偏移 未处理情况下程序异常结束)
KafkaConsumer是非线程安全的,那么怎么样实现多线程消费?
1.在每个线程中新建一个KafkaConsumer
2.单线程创建KafkaConsumer,多个处理线程处理消息(难点在于是否要考虑消息顺序性,offset的提交方式)
简述消费者与消费组之间的关系
消费者从属与消费组,消费偏移以消费组为单位。每个消费组可以独立消费主题的所有数据,同一消费组内消费者共同消费主题数据,每个分区只能被同一消费组内一个消费者消费。
当你使用kafka-topics.sh创建(删除)了一个topic之后,Kafka背后会执行什么逻辑?
创建:在zk上/brokers/topics/下节点 kafkabroker会监听节点变化创建主题
删除:调用脚本删除topic会在zk上将topic设置待删除标志,kafka后台有定时的线程会扫描所有需要删除的topic进行删除
为什么Kafka中的分区数只能增加不能减少?
当一个主题被创建之后,依然允许我们对其做一定的修改,比如修改分区个数、修改配置等,这个修改的功能就是由kafka-topics.sh脚本中的alter指令所提供。我们首先来看如何增加主题的分区数。以前面的主题topic-config为例,当前分区数为1,修改为3,示例如下:
[root@node1 kafka_2.11-2.0.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic topic-config --partitions 3
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
[root@node1 kafka_2.11-2.0.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic topic-config
Topic:topic-config PartitionCount:3 ReplicationFactor:1 Configs:
Topic: topic-config Partition: 0 Leader: 2 Replicas: 2 Isr: 2
Topic: topic-config Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: topic-config Partition: 2 Leader: 1 Replicas: 1 Isr: 1
注意上面提示的告警信息:当主题中的消息包含有key时(即key不为null),根据key来计算分区的行为就会有所影响。当topic-config的分区数为1时,不管消息的key为何值,消息都会发往这一个分区中;当分区数增加到3时,那么就会根据消息的key来计算分区号,原本发往分区0的消息现在有可能会发往分区1或者分区2中。如此还会影响既定消息的顺序,所以在增加分区数时一定要三思而后行。对于基于key计算的主题而言,建议在一开始就设置好分区数量,避免以后对其进行调整。目前Kafka只支持增加分区数而不支持减少分区数。比如我们再将主题topic-config的分区数修改为1,就会报出InvalidPartitionException的异常,示例如下:
[root@node1 kafka_2.11-2.0.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic topic-config --partitions 1
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Error while executing topic command : The number of partitions for a topic can only be increased. Topic topic-config currently has 3 partitions, 1 would not be an increase.
[2018-09-10 19:28:40,031] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: The number of partitions for a topic can only be increased. Topic topic-config currently has 3 partitions, 1 would not be an increase.
(kafka.admin.TopicCommand$)
为什么不支持减少分区?
按照Kafka现有的代码逻辑而言,此功能完全可以实现,不过也会使得代码的复杂度急剧增大。实现此功能需要考虑的因素很多,比如删除掉的分区中的消息该作何处理?如果随着分区一起消失则消息的可靠性得不到保障;如果需要保留则又需要考虑如何保留。直接存储到现有分区的尾部,消息的时间戳就不会递增,如此对于Spark、Flink这类需要消息时间戳(事件时间)的组件将会受到影响;如果分散插入到现有的分区中,那么在消息量很大的时候,内部的数据复制会占用很大的资源,而且在复制期间,此主题的可用性又如何得到保障?与此同时,顺序性问题、事务性问题、以及分区和副本的状态机切换问题都是不得不面对的。反观这个功能的收益点却是很低,如果真的需要实现此类的功能,完全可以重新创建一个分区数较小的主题,然后将现有主题中的消息按照既定的逻辑复制过去即可。
创建topic时如何选择合适的分区数?
根据集群的机器数量和需要的吞吐量来决定适合的分区数
Kafka目前有哪些内部topic,它们都有什么特征?各自的作用又是什么?
__consumer_offsets 以下划线开头,保存消费组的偏移
优先副本是什么?它有什么特殊的作用?
优先副本选举主要是为了让所有的分区尽可能分布在不同的broker上的一种机制,那么什么是优先副本?优先副本是AR种的第一个副本,这个副本通常为leader,假如这个副本不是leader,那么这个副本就是非优先副本。
假如我们通过以下方式创建一个topic_a,分区数为3,副本因子也为3.
/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 3 --replication-factor 1 --topic topic_a
最终创建的的结果如下
此时0号broker上的leader数量为2个,这个节点负载此时最高,1号节点的负载最低,之前的负载均衡变成了现在的负载失衡,理想状态下优先副本应该就是该分区的leader副本,而优先副本的选举就是尽可能的将每个分区的leader副本作为优先副本,从而促进集群的负载均衡,但是真正的负载均衡还得参考每个leader的数据传输负载
在kafka中默认有一个参数控制自动优先副本的选举auto.leader.rebalance.enable=true,这个选举机制的间隔时间通过leader.imbalance.check.interval.seconds参数进行控制,默认是5分钟(300s)一次,这个选举机制的衡量标准是通过“broker不平衡率”,broker不平衡率=非优先副本的leader个数/总分区数,假如该值超过leader.imbalance.per.broker.percentage,默认为10%,那么就开始进行优先副本的选举。上面案例的不平衡率=1/3=33%。记住,在实际上产中是不会开启优先副本选举的,因为可能引起负面的性能问题,比如客户端请求阻塞,所以一般还是通过监控+手动控制优先副本的方式来控制集群负载,手动控制通过bin/kafka-preferred-replica-election.sh --zookeeper localhost:2181/kafka --path-to-json-file election.json命令进行。
#election.json文件内容示例如下。选举操作应该避开业务高峰期进行,这个--path-to-json-file选项是可选的,但是强烈建议加上这个配置。
{
"partitions":[
{
"partition":0,
"topic":"topic-partitions"
},
{
"partition":1,
"topic":"topic-partitions"
},
{
"partition":2,
"topic":"topic-partitions"}
]
}
分区重分配?
分区重新分配,主要是在需要进行横向扩展Broker的时候或者有计划下线Broker的时候使用,假如在Kafka集群中新加入了一个Broker,那么只有新增的topic对应的分区才会分配到该节点上,之前的topic数据是不会分配给该节点的,这就导致了新增节点与原先节点数据的严重不均衡,所以kafka提供了kafka-reassign-partitions.sh脚本,在节点扩展、Broker下线的时候通过数据复制的方式重新分配分区,该脚本的使用分3个步骤:
- 创建一个包含主题清单的json文件
- 根据主题清单与broker清单生成一份重新分配方案
- 根据方案执行重分配动作
假设现在有3个节点,0,1,2,我们想要下线Broker1,然后再重新分配。
第一步,创建一个主题清单的json文件:reassign.json
{
"topics":[
{
"topic": "topic-a"
}
],
"version": 1
}
第二步,根据主题清单创建一个重新分配的方案,这个方案可以自己通过json的schema创建,这样第一步、第二步操作就省去了。
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/kafka --generate --topics-to-move-json-file reassign.json --broker-list 0,2
生成的清单格式为,可以将Current partition replica assignment保存下来用来做操作的回滚。
#Current partition replica assignment
{"version":1,"partitions":[]}
#Proposed partition reassignment configuration
{"version":1,"partitions":[]}
第三步,执行重新分配方案
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/kafka --execute --reassign-json-file project.json
#校验改重新方案的执行情况
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/kafka --varify --reassign-json-file project.json
如果需要将某个Broker下线,那么在执行重新分配方案的时候,最好先关闭或者重启Broker,这样Broker就不再是任何分区的leader节点了,这样它的分区就可以被分配给集群中的其它Broker,这样可以减少集群间的流量复制,提升重新分配的性能,以减少对整个集群的影响。
数据复制限流(配合分区重新分配使用)
主题分区的重新分配是通过数据复制来实现的,首先创建新副本、同步数据、删除旧副本;
数据复制会占用额外的资源,如果复制的量太大,势必会影响整体的性能,尤其是在业务高峰期的时候,减小重分配的粒度、以小批次的方式可行,如果集群中某个主题或某个分区 的流量在某段时间内特别大,那么只靠减小粒度是不足以应对的,这时就需要有个限流的机制,可以对副本之间的复制流量加以限制,保证整体不受太大的影响。
副本间的复制限流有2种方式:
kafka-config.sh
该脚本主要用来实现动态限流的目的,相关的参数有2个
- follower.replication.throttled.rate(限制follower副本复制的速度)
- leader.replication.throttled.rate(限制副本传输的速度,B/s)
#添加配置
bin/kafka-configs.sh
--zookeeper localhost:2181/kafka
--entity-type brokers
--entity-name 1
--alter
--add-config
follower.replication.throttled.rate=l024 , leader.replication.throttled.rate=l024
#查看配置
bin/kafka-configs.sh
--zookeeper localhost:2181/kafka
--entity-type brokers
--entity-name 1
--describe
#删除配置
bin/kafka-configs.sh
--zookeeper localhost:2181/kafka
--entity-type brokers
--entity-name 1
--alter
--delete-config
follower.replication.throttled.rate=l024 , leader.replication.throttled.rate=l024
在topic级别也有2个参数来限制复制的速度
- follower.replication.throttled.replicas
- leader.replication.throttled.replicas
#1、首先创建一个主题topic-throttled,partitions=3,replication-factor=2
bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 3 --replication-factor 2 --topic topic-throttled
#2、查看创建的topic的详情
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic topic-throttled
#3、添加限制复制流量的配置
bin/kafka-config.sh
--zookeeper localhost:2181/kafka
--entity-type topics
--entity-name topic-throttled
--alter
--add-config
leader.replication.throttled.replicas=[0:0,1:1,2:2],follower.replication.throttled.replicas=[0:1,1:2,2:0] #其中0:0等代表分区号与代理的映射关系
kafka-reassign-partitions.sh
该脚本也提供了限流的功能,只需要传入一个throttled参数就行,具体用法如下:
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/kafka --execute --reassign-json-file project.json --throttled 10 # 这里的10表示10B/S
Kafka有哪几处地方有分区分配的概念?简述大致的过程及原理
创建主题时
如果不手动指定分配方式 有两种分配方式
消费组内分配
简述Kafka的日志目录结构
每个partition一个文件夹,包含四类文件.index/ .log /.timeindex /leader-epoch-checkpoint。.index .log .timeindex 三个文件成对出现 前缀为上一个segment的最后一个消息的偏移
- log文件中保存了所有的消息
- index文件中保存了稀疏的相对偏移的索引
- timeindex保存的则是时间索引,
- leader-epoch-checkpoint中保存了每一任leader开始写入消息时的offset 会定时更新。follower被选为leader时会根据这个确定哪些消息可用。
如果我指定了一个offset,Kafka怎么查找到对应的消息?
1.通过文件名前缀数字x找到该绝对offset 对应消息所在文件
2.offset-x为在文件中的相对偏移
3.通过index文件中记录的索引找到最近的消息的位置
4.从最近位置开始逐条寻找
如果我指定了一个timestamp,Kafka怎么查找到对应的消息?
1.通过文件名前缀数字x找到该绝对offset 对应消息所在文件
2.offset-x为在文件中的相对偏移
3.通过index文件中记录的索引找到最近的消息的位置
4.从最近位置开始逐条寻找
但是时间的因为消息体中不带有时间戳 所以不精确
聊一聊你对Kafka的Log Retention的理解
kafka留存策略包括 删除和压缩两种
- 删除: 根据时间和大小两个方式进行删除 大小是整个partition日志文件的大小,超过的会从老到新依次删除 时间指日志文件中的最大时间戳而非文件的最后修改时间。
- 压缩: 相同key的value只保存一个 压缩过的是clean 未压缩的dirty 压缩之后的偏移量不连续 未压缩时连续
Kafka有哪些指标需要着重关注?
生产者关注MessagesInPerSec、BytesOutPerSec、BytesInPerSec 消费者关注消费延迟Lag
怎么计算Lag?(注意read_uncommitted和read_committed状态下的不同)
参考 如何监控kafka消费Lag情况
Kafka的那些设计让它有如此高的性能?
零拷贝,页缓存,顺序写(高效性),分区(扩展性)
Kafka中的幂等是怎么实现的?
Producer在生产发送消息时,难免会重复发送消息。Producer进行retry时会产生重试机制,发生消息重复发送。而引入幂等性后,重复发送只会生成一条有效的消息。Kafka作为分布式消息系统,它的使用场景常见与分布式系统中,比如消息推送系统、业务平台系统(如物流平台、银行结算平台等)。以银行结算平台来说,业务方作为上游把数据上报到银行结算平台,如果一份数据被计算、处理多次,那么产生的影响会很严重。
影响Kafka幂等性的因素有哪些?
在使用Kafka时,需要确保Exactly-Once语义。分布式系统中,一些不可控因素有很多,比如网络、OOM、FullGC等。在Kafka Broker确认Ack时,出现网络异常、FullGC、OOM等问题时导致Ack超时,Producer会进行重复发送。可能出现的情况如下:
Kafka的幂等性是如何实现的?
Kafka为了实现幂等性,它在底层设计架构中引入了ProducerID和SequenceNumber。那这两个概念的用途是什么呢?
- ProducerID:在每个新的Producer初始化时,会被分配一个唯一的ProducerID,这个ProducerID对客户端使用者是不可见的。
- SequenceNumber:对于每个ProducerID,Producer发送数据的每个Topic和Partition都对应一个从0开始单调递增的SequenceNumber值。
当Producer发送消息(x2,y2)给Broker时,Broker接收到消息并将其追加到消息流中。此时,Broker返回Ack信号给Producer时,发生异常导致Producer接收Ack信号失败。对于Producer来说,会触发重试机制,将消息(x2,y2)再次发送,但是,由于引入了幂等性,在每条消息中附带了PID(ProducerID)和SequenceNumber。相同的PID和SequenceNumber发送给Broker,而之前Broker缓存过之前发送的相同的消息,那么在消息流中的消息就只有一条(x2,y2),不会出现重复发送的情况。
Kafka中的事务是怎么实现的?
与幂等性有关的另外一个特性就是事务。Kafka中的事务与数据库的事务类似,Kafka中的事务属性是指一系列的Producer生产消息和消费消息提交Offsets的操作在一个事务中,即原子性操作。对应的结果是同时成功或者同时失败。这里需要与数据库中事务进行区别,操作数据库中的事务指一系列的增删查改,对Kafka来说,操作事务是指一系列的生产和消费等原子性操作。
Kafka引入事务的用途?
在事务属性引入之前,先引入Producer的幂等性,它的作用为:
- Producer多次发送消息可以封装成一个原子性操作,即同时成功,或者同时失败;
- 消费者&生产者模式下,因为Consumer在Commit Offsets出现问题时,导致重复消费消息时,Producer重复生产消息。需要将这个模式下Consumer的Commit Offsets操作和Producer一系列生产消息的操作封装成一个原子性操作。
产生的场景有:
比如,在Consumer中Commit Offsets时,当Consumer在消费完成时Commit的Offsets为100(假设最近一次Commit的Offsets为50),那么执行触发Balance时,其他Consumer就会重复消费消息(消费的Offsets介于50~100之间的消息)。
Kafka事务提供了哪些可使用的API?
Producer提供了五种事务方法,它们分别是:initTransactions()、beginTransaction()、sendOffsetsToTransaction()、commitTransaction()、abortTransaction(),代码定义在org.apache.kafka.clients.producer.Producer<K,V>接口中,具体定义接口如下:
// 初始化事务,需要注意确保transation.id属性被分配
void initTransactions();
// 开启事务
void beginTransaction() throws ProducerFencedException;
// 为Consumer提供的在事务内Commit Offsets的操作
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws ProducerFencedException;
// 提交事务
void commitTransaction() throws ProducerFencedException;
// 放弃事务,类似于回滚事务的操作
void abortTransaction() throws ProducerFencedException;
事务的实际应用场景有哪些?
在Kafka事务中,一个原子性操作,根据操作类型可以分为3种情况。情况如下:
- 只有Producer生产消息,这种场景需要事务的介入;
- 消费消息和生产消息并存,比如Consumer&Producer模式,这种场景是一般Kafka项目中比较常见的模式,需要事务介入;
- 只有Consumer消费消息,这种操作在实际项目中意义不大,和手动Commit Offsets的结果一样,而且这种场景不是事务的引入目的。
聊一聊你对Kafka的Log Compaction的理解?
为什么Kafka不支持读写分离?
在 Kafka 中,生产者写入消息、消费者读取消息的操作都是与 leader 副本进行交互的,从而实现的是一种主写主读的生产消费模型。数据库、Redis等都具备主写主读的功能,与此同时还支持主写从读的功能。主写从读也就是读写分离,为了与主写主读对应,这里就以主写从读来称呼。从代码层面上来说,虽然增加了代码复杂度,但在 Kafka 中这种功能完全可以支持。
1、数据一致性问题
数据从主节点转到从节点,必然会有一个延时的时间窗口,这个时间窗口会导致主从节点之间的数据不一致。某一时刻,在主节点和从节点中A数据的值都为X, 之后将主节点中 A 的值修改为 Y。那么在这个变更通知到从节点之前,应用读取从节点中的 A 数据的值并不为最新的Y,由此便产生了数据不一致的问题。
2、延时问题
类似Redis这种组件,数据从写入主节点到同步至从节点中的过程,需要经历网络→主节点内存→网络→从节点内存这几个阶段,整个过程会耗费一定的时间。而在 Kafka 中,主从同步会比 Redis 更加耗时,它需要经历网络→主节点内存→主节点磁盘→网络→从节点内存→从节点磁盘这几个阶段。对延时敏感的应用而言,主写从读的功能并不太适用。主写从读可以均摊一定的负载,却不能做到完全的负载均衡。比如对于数据写压力很大而读压力很小的情况,从节点只能分摊很少的负载压力,而绝大多数压力还是在主节点上。而在 Kafka 中却可以达到很大程度上的负载均衡,而且这种均衡是在主写主读的架构上实现的。
有以下几种情况(包含但不仅限于),会造成一定程度上的负载不均衡:
- broker 端的分区分配不均。当创建主题的时候可能会出现某些 broker 分配到的分区数 多而其他 broker 分配到的分区数少,那么自然而然地分配到的 leader 副本也就不均。
- 生产者写入消息不均。生产者可能只对某些 broker 中的 leader 副本进行大量的写入操 作,而对其他 broker 中的 leader 副本不闻不问。
- 消费者消费消息不均。消费者可能只对某些 broker 中的 leader 副本进行大量的拉取操 作,而对其他 broker 中的 leader 副本不闻不问。
- leader 副本的切换不均。在实际应用中可能会由于 broker 宕机而造成主从副本的切换, 或者分区副本的重分配等,这些动作都有可能造成各个 broker 中 leader 副本的分配不均。
针对第一种情况,在主题创建的时候尽可能使分区分配得均衡,好在 Kafka 中相应的分配算法也是在极力地追求这一目标,如果是开发人员自定义的分配,则需要注意这方面的内容。
对于第二和第三种情况,主写从读也无法解决。
对于第四种情况,Kafka 提供了优先副本的选举来达到 leader 副本的均衡。与此同时,也可以配合相应的监控、告警和运维平台来实现均衡的优化。
在实际应用中,配合监控、告警、运维相结合的生态平台,在绝大多数情况下 Kafka 都能做到很大程度上的负载均衡。
kafka follower如何与leader数据同步?
kafka使用 ISR 的方式很好的均衡了确保数据不丢失以及吞吐率。Follower可以批量的从Leader复制数据,而且Leader充分利用磁盘顺序读以及 send file(zero copy) 机制,这样极大的提高复制性能,内部批量写磁盘,大幅减少了Follower与Leader的消息量差。(kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制。完全同步复制要求 All Alive Follower 都复制完,这条消息才会被认为commit,这种复制方式极大的影响了吞吐率。一步复制方式下,Follower异步的从Leader复制数据,数据只要被Leader写入log就被认为已经commit,这种情况下,如果leader挂掉,会丢失数据;)
聊一聊你对Kafka底层存储的理解(页缓存、内核层、块层、设备层)?
聊一聊Kafka的延时操作的原理?
聊一聊Kafka控制器的作用?
消费再均衡的原理是什么?(提示:消费者协调器和消费组协调器)
Kafka中有哪些地方需要选举?这些地方的选举策略又有哪些?
失效副本是指什么?有哪些应对措施?
多副本下,各个副本中的HW和LEO的演变过程
Kafka在可靠性方面做了哪些改进?(HW, LeaderEpoch)
Kafka中怎么实现死信队列和重试队列?
Kafka中的延迟队列怎么实现(这题被问的比事务那题还要多!!!听说你会Kafka,那你说说延迟队列怎么实现?)
Kafka中怎么做消息审计?
Kafka中怎么做消息轨迹?
Kafka中有哪些配置参数比较有意思?聊一聊你的看法
Kafka中有哪些命名比较有意思?聊一聊你的看法
kafka 可以脱离 zookeeper 单独使用吗?为什么?
kafka 不能脱离 zookeeper 单独使用,因为 kafka 使用 zookeeper 管理和协调 kafka 的节点服务器。
kafka 有几种数据保留的策略?
kafka 有两种数据保存策略:按照过期时间保留和按照存储的消息大小保留
什么情况会导致 kafka 运行变慢?
- cpu 性能瓶颈
- 磁盘读写瓶颈
- 网络瓶颈
使用 kafka 集群需要注意什么?
- 集群的数量不是越多越好,最好不要超过 7 个,因为节点越多,消息复制需要的时间就越长,整个群组的吞吐量就越低。
- 集群数量最好是单数,因为超过一半故障集群就不能用了,设置为单数容错率更高。
Kafka 判断一个节点是否还活着有那两个条件?
- (1)节点必须可以维护和 ZooKeeper 的连接,Zookeeper 通过心跳机制检查每个节点的连接
- (2)如果节点是个 follower,他必须能及时的同步 leader 的写操作,延时不能太久
Kafka 消费者是否可以消费指定分区消息?
Kafa consumer消费消息时,向broker发出fetch请求去消费特定分区的消息,consumer指定消息在日志中的偏移量(offset),就可以消费从这个位置开始的消息,customer拥有了offset的控制权,可以向后回滚去重新消费之前的消息,这是很有意义的。
消费者负载均衡策略?
一个消费者组中的一个分片对应一个消费者成员,他能保证每个消费者成员都能访问,如果组中成员太多会有空闲的成员。
Kafka 的消费者如何消费数据?
- 1、指定多主题消费:consumer.subscribe(Arrays.asList(“t4”,“t5”));
- 2、指定分区消费: consumer.assign(list);
- 3、手动修改偏移量: consumer.commitSync();//提交当前消费偏移量 consumer.commitSync(Map<TopicPartition, OffsetAndMetadata>) //提交指定偏移量 consumer.assign(Arrays.asList(tp));
- 4、seek,修改偏移量搜索指针,顺序读取数据: consumer.assign(Arrays.asList(tp)); consumer.seek(tp,0);
partition 的数据如何保存到硬盘
- topic中的多个partition以文件夹的形式保存到broker,每个分区序号从0递增,且消息有序
- Partition文件下有多个segment(xxx.index,xxx.log)
- segment 文件里的 大小和配置文件大小一致可以根据要求修改 默认为1g。如果大小大于1g时,会滚动一个新的segment并且以上一个segment最后一条消息的偏移量命名
kafka如何保证数据数据有序?
一个消费者组里它的内部是有序的,消费者组与消费者组之间是无序的。
kafka数据一致性保证?
假设分区的副本为3,其中副本0是 Leader,副本1和副本2是 follower,并且在 ISR 列表里面。虽然副本0已经写入了 Message4,但是 Consumer 只能读取到 Message2。因为所有的 ISR 都同步了 Message2,只有 High Water Mark 以上的消息才支持 Consumer 读取,而 High Water Mark 取决于 ISR 列表里面偏移量最小的分区,对应于上图的副本2,这个很类似于木桶原理。
这样做的原因是还没有被足够多副本复制的消息被认为是“不安全”的,如果 Leader 发生崩溃,另一个副本成为新 Leader,那么这些消息很可能丢失了。如果我们允许消费者读取这些消息,可能就会破坏一致性。试想,一个消费者从当前 Leader(副本0) 读取并处理了 Message4,这个时候 Leader 挂掉了,选举了副本1为新的 Leader,这时候另一个消费者再去从新的 Leader 读取消息,发现这个消息其实并不存在,这就导致了数据不一致性问题。
当然,引入了 High Water Mark 机制,会导致 Broker 间的消息复制因为某些原因变慢,那么消息到达消费者的时间也会随之变长(因为我们会先等待消息复制完毕)。延迟时间可以通过参数 replica.lag.time.max.ms 参数配置,它指定了副本在复制消息时可被允许的最大延迟时间。
Kafka存储在硬盘上的消息格式是什么?
消息由一个固定长度的头部和可变长度的字节数组组成。头部包含了一个版本号和CRC32校验码。
- 消息长度: 4 bytes (value: 1+4+n)
- 版本号: 1 byte
- CRC校验码: 4 bytes
- 具体的消息: n bytes
Kafka高效文件存储设计特点:
- (1).Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。
- (2).通过索引信息可以快速定位message和确定response的最大大小。
- (3).通过index元数据全部映射到memory,可以避免segment file的IO磁盘操作。
- (4).通过索引文件稀疏存储,可以大幅降低index文件元数据占用空间大小。