Kafka
一、Kafka介绍及基本原理
kafka是一个分布式的、支持分区的、多副本、基于zookeeper的分布式消息系统/中间件。
kafka一般不会删除消息,不管这些消息有没有被消费。只会根据配置的日志保留时间(log.retention.hours)确认消息多久被删除,默认保留最近一周的日志消息(日志消息就是消息)。
kafka消息消费是以组方式进行的。Kafka同一条消息只能被同一个消费组下的某一个消费者消费。
每个consumer维护各自的消费offset。我们可以通过指定offset来重复消费某些消息,或者跳过某些消息。
Kafka的使用场景
1.日志收集(即可以整合到ELK)
一个公司可以用Kafka收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
2.消息系统
解耦和生产者和消费者、缓存消息等。
3.用户活动跟踪
Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
4.运营指标
Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
Kafka集群架构图
Kafka基本概念
1.Broker
一个Kafka节点就是一个broker,一个或者多个Broker可以组成一个Kafka集群。
2.Topic
Kafka根据topic对消息进行归类,发布到Kafka集群的每条消息都需要指定一个topic。
3.Producer
消息生产者,向Broker发送消息的客户端
4.Consumer
消息消费者,从Broker读取消息的客户端
5.ConsumerGroup
每个Consumer属于一个特定的Consumer Group,一条消息可以被多个不同的Consumer Group消费,但是一个Consumer Group中只能有一个Consumer能够消费该消息。
6.Partition
物理上的概念,一个topic可以分为多个partition,每个partition内部消息是有序的。
消费消息的时候可以消费多个主题。
Kafka 消费者可以同时订阅并消费多个主题中的消息。
consumer.subscribe(Arrays.asList(“topic1”, “topic2”));
消费模式可以实现为单播消费、多播消费
(即通过新建一个消费者摆放到一个消费组还是新建多个消费者摆放到多个消费组来决定消费是被一个消费者消费还是多个消费者消费,而不是说一条消息能被一个消费组下的多个消费者消费)
1.单播消费:
一条消息只能被某一个消费者消费的模式,类似queue模式,只需让所有消费者在同一个消费组里即可。
2.多播消费:
一条消息能被多个消费者消费的模式,类似publish-subscribe模式,针对Kafka同一条消息只能被同一个消费组下的某一个消费者消费的特性,要实现多播只要保证这些消费者属于不同的消费组即可。
一个kafka节点表示一个broker
每个分区Partition都会选一个kafka节点(broker)作为其leader节点。leader处理所有的针对该partition分区的读写请求。
kafka集群中每个broker都有topic各分区的数据?不一定,一般是
可以这么来理解Topic,Partition和Broker
一个topic,代表逻辑上的一个业务数据集,比如按数据库里不同表的数据操作消息进行区分来放入不同topic,订单相关操作消息放入订单topic,用户相关操作消息放入用户topic,对于大型网站来说,后端数据都是海量的,订单消息很可能是非常巨量的,比如有几百个G甚至达到TB级别,如果把这么多数据都放在一台机器上可定会有容量限制问题,那么就可以在topic内部划分多个partition来分片存储数据,不同的partition可以位于不同的机器上,每台机器上都运行一个Kafka的进程Broker。
一个topic下的某个分区,其Leader副本和每个Follower副本的数据是一样的。不同分区比如Partition0,Partition1,他们的数据是不一样的。
如果每个broker都有topic的所有分区,其他broker都有分区的部分。那么相当于有多少个broker就有几倍的分区总数据,比如所以分区是10个g的大小,那么如果有10个broker,相当于有100g数据了。当然可以设置备份数来调整其大小。
比如下面Patition0分区,其有一个Leader副本(位于broker0)两个Follower副本(一个follower位于broker1,一个位于broker2)。
为什么要对Topic下数据进行分区存储?
1、commit log文件会受到所在机器的文件系统大小的限制,分区之后可以将不同的分区放在不同的机器上,相当于对
数据做了分布式存储,理论上一个topic可以处理任意数量的数据。
2、为了提高并行度。
kafka将很多集群关键信息记录在zookeeper里,包括kafka集群节点元数据信息(比如节点配置信息Configs:isr、replicas、leader等,但是不包括kafka消息),保证自己的无状态,从而在水平扩容时非常方便。其它几个消息中间件也是不保存节点信息,不把节点信息存放在本地(无状态)。
kafka节点元数据信息除了会在zookeeper中保存有,在其它的broker节点也会缓存有。
kafka分区才有leader、follower概念,Broker(节点)没有这个概念。不像zookeeper、nacos这些集群节点有leader、follower概念。但是Broker有Controller的概念,会在所有Broker中选一个Broker作为核心总控制器。
集群消费
针对每个partition,都有一个broker起到“leader”的作用,0个或多个其他的broker作为“follwer”的作用。
leader处理所有的针对这个partition的读写请求,而follower不提供读写(主要是为了保证多副本数据与消费的一致性)。如果这个leader失效了,其中的一个follower将会自动的变成新的leader。
kafka对节点数没有要求,不一定要像zookeeper一样对节点数有要求。
消费顺序
1.一个partition同一个时刻在一个consumer group中只能有一个consumer instance在消费,从而保证消费顺序。
consumer group中的consumer instance的数量不能比一个Topic中的partition的数量多,否则,多出来的consumer消费不到消息。(因为根据kafka的特性一条消息只能被同一个消费组下的某一个消费者消费)
即一个Topic有多个分区,那么一个Topic可以被多个消费者消费。
在 Kafka 中,一个分区的所有消息只能被同一个消费者组内的一个消费者实例消费。换句话说,一个分区的消息不能同时被同一个消费者组内的多个消费者实例消费。
2.Kafka只在partition的范围内保证消息消费的局部顺序性,不能在同一个topic中的多个partition中保证总的消费顺序性。因为同一个消费者,对于有多个分区partition发送过来的消息,kafka不知道谁先谁后。只能保证单个分区发送过来的不同消息的顺序性。
3.kafka的顺序消费很少用:如果有在总体上保证消费顺序的需求,那么我们可以通过将topic的partition数量设置为1,将consumer group中的consumer instance数量也设置为1,但是这样会影响性能,所以kafka的顺序消费很少用。
因为kafka不适合顺序消费,所以其使用场景一般为不对消息有顺序要求的场景。如上面说的Kafka使用场景。
发送kafka消息时如果未指定发送分区,具体发送的分区计算公式:hash(key)%partitionNum。
如果指定发送分区,则发往指定分区。
–
kafka 单播消费、多播消费、集群消费的区别?
Kafka 中的单播消费、多播消费和集群消费是针对消息消费的不同方式,它们有以下区别:
1.单播消费(Single Consumer):单播消费是指一个消费者实例订阅一个主题的所有分区,并且每个分区只会被一个消费者实例消费。这意味着每个消息只会被一个消费者实例处理,适用于一些场景下要求消息处理的顺序性。
2.多播消费(Multiple Consumers):多播消费是指多个消费者实例同时订阅同一个主题的所有分区,在这种情况下,每个分区的消息会被所有的消费者实例中的一个消费(不是一个分区的消息被一个组内多个消费组消费)。这种方式适用于需要提高消息处理的并发能力和容错性的场景。
3.集群消费(Consumer Groups,和消费组有关):集群消费是指多个消费者实例以消费者组的形式共同消费一个主题的所有分区。每个分区只能被消费者组中的一个消费者实例消费,而且一个消费者组可以包含多个消费者实例。这种方式适用于横向扩展和负载均衡,通过增加消费者实例来提高整体的消费能力。多播消费是强调的是一个消费组消费多个分区,而集群消费强调的是多个消费组消费多个分区。
总的来说,单播消费适合一些特定的业务场景,多播消费适合提高消息处理的并发能力和容错性,而集群消费则适用于横向扩展和负载均衡的需求。开发人员可以根据具体的业务需求选择合适的消费方式来处理 Kafka 中的消息。
二、kafka发送消费核心参数与设计原理
kafka主题默认是自动创建的,也可以设置不默认创建。
在 Kafka 中,主题(topic)是在使用时动态创建的。当生产者向一个尚不存在的主题发送消息时,Kafka 会自动创建该主题(如果配置允许的话)。这种自动创建主题的行为是 Kafka 的默认行为。然而,你也可以通过 Kafka 的配置来禁用自动创建主题。
架构图组件Broker、Topic、Partition、Segement(分区文件夹中包含了多个分段文件)关系。
Broker:代表整个 Kafka 集群中的一个节点。
Topic:代表一个消息的逻辑类别,可以包含多个 Partition。
Partition:代表一个 Topic 的物理分区,包含多个 Segment。可以存储在不同的broker。
Segment:代表一个 Partition 的物理存储单元,是实际的日志文件。
Kafka Controller:负责管理集群所有分区和副本的状态。
Kafka设计原理
1.Kafka核心总控制器Controller作用
在Kafka集群中会有一个或者多个broker,其中有一个broker会被选举为控制器(Kafka Controller),它负责管理整个集群中所有分区和副本的状态。
a.当某个分区的leader副本出现故障时,由控制器负责为该分区选举新的leader副本。
b.当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有broker更新其元数据信息。
c.当使用kafka-topics.sh脚本为某个topic增加分区数量时,同样还是由控制器负责让新分区被其他节点感知
到。
2.Controller选举机制(即判断选哪台broker作为controller)
在kafka集群启动的时候,会自动选举一台broker作为controller来管理整个集群,选举的过程是集群中每个broker都会
尝试在zookeeper上创建一个 /controller 临时节点,zookeeper会保证有且仅有一个broker能创建成功,这个broker
就会成为集群的总控器controller。
当这个controller角色的broker宕机了,此时zookeeper临时节点会消失,集群里其他broker会一直监听这个临时节
点,发现临时节点消失了,就竞争再次创建临时节点,就是我们上面说的选举机制,zookeeper又会保证有一个broker
成为新的controller。
具备控制器身份的broker需要比其他普通的broker多一份职责,具体细节如下:
a. 监听broker相关的变化。为Zookeeper中的/brokers/ids/节点添加BrokerChangeListener,用来处理broker增减的变化。
b. 监听topic相关的变化。为Zookeeper中的/brokers/topics节点添加TopicChangeListener,用来处理topic增减的变化;为Zookeeper中的/admin/delete_topics节点添加TopicDeletionListener,用来处理删除topic的动作。
c. 从Zookeeper中读取获取当前所有与topic、partition以及broker有关的信息并进行相应的管理。对于所有topic所对应的Zookeeper中的/brokers/topics/[topic]节点添加PartitionModificationsListener,用来监听topic中的分区分配变化。
d. 更新集群的元数据信息,同步到其他普通的broker节点中。
3.Partition副本选举Leader机制
controller感知到分区leader所在的broker挂了(controller监听了很多zk节点可以感知到broker存活),controller会从
ISR列表(参数unclean.leader.election.enable=false的前提下)里挑第一个broker作为leader(第一个broker最先放进ISR
列表,可能是同步数据最多的副本),如果参数unclean.leader.election.enable为true,代表在ISR列表里所有副本都挂
了的时候可以在ISR列表以外的副本中选leader(ISR列表以外的副本是指不在ISR列表,但是在replicas副本列表),这种设置,可以提高可用性,但是选出的新leader有可能数据少很多。
isr (in-sync replica)是replicas的一个子集,它只列出当前还存活着的,并且已同步备份了该partition的节点。
副本进入ISR列表有两个条件:
a. 副本节点必须能与zookeeper保持会话以及跟leader副本网络连通
b. 副本能复制leader上的所有写操作,并且不能落后太多。(与leader副本同步滞后的副本,是由
replica.lag.time.max.ms 配置决定的,超过这个时间都没有跟leader同步过的一次的副本会被移出ISR列表)
第一行是所有分区的概要信息
Topic:test1 表示这个主题名称是test1
PartitionCount:2 表示分区数量有2个
ReplicationFactor:1 表示每个分区只有一个副本。即这个副本既是 leader 副本也是唯一的副本。没有额外的 follower 副本用来复制数据或提供冗余。
第二行表示每个partition的信息
Topic:test1 表示这个主题名称是test1
Partition:0 表示分区的编号
Leader:0 表示当前分区的Leader节点broker的编号/id,即编号为0的broker
Replicas:0 即存在副本的节点broker的编号,即副本只存在编号为0的broker上,即只有一个副本,即leader副本。
只要有备份信息,不管这个节点是不是leader,甚至是否宕机,也会列出在这里。
Isr:0 表示存活的副本,且已经同步备份了被partition的节点broker的编号。它是replicas的一个子集。
leader:0、replicas(备份)=0的意思?
leader:0的0表示broker实例的id(即kafka节点的id)。
每个分区都会选一个kafka节点作为其leader节点。leader处理所有的针对该partition分区的读写请求。
replicas 表示某个partition在哪几个broker上存在备份
isr (in-sync replica)是replicas的一个子集,它只列出当前还存活着的,并且已同步备份了该partition的节点。
ISR即存活着&已同步备份了该partition
4.消费者消费消息的offset记录机制
每个consumer会定期将自己消费分区的offset提交给kafka内部topic:__consumer_offsets
提交过去的时候,key是consumerGroupId+topic+分区号,value就是当前offset的值,kafka会定期清理topic里的消息,最后就保留最新的那条数据。
因为__consumer_offsets可能会接收高并发的请求,kafka默认给其分配50个分区(可以通过offsets.topic.num.partitions设置),这样可以通过加机器的方式抗大并发。
通过如下公式确定需要把offset信息提交到__consumer_offsets哪个分区:
公式:hash(consumerGroupId)%__consumer_offsets主题的分区数。
5.消费者Rebalance机制(目的是明确消费者和消费分区的关系,明确消费者消费哪个分区。rebalance只针对subscribe这种不指定分区消费的情况)
rebalance过程中,消费者无法从kafka消费消息。rebalance只针对subscribe这种不指定分区消费的情况。如果通过assign这种消费方式指定了分区,kafka不会进行rebanlance。
如下情况可能会触发消费者rebalance(consumer/分区/topic变了)
a. 消费组里的consumer增加或减少了
b. 动态给topic增加了分区
c. 消费组订阅了更多的topic
rebalance过程中,消费者无法从kafka消费消息,这对kafka的TPS会有影响,如果kafka集群内节点较多,比如数百个,那重平衡可能会耗时极多,所以应尽量避免在系统高峰期的重平衡发生(即减少在高峰期增加/减少consumer或动态增加分区/topic等)。
6.producer发布消息机制剖析
leader和followers的同步机制需要借助log。和其它中间件一样。
a、写入方式
producer 采用 push 模式将消息发布到 broker,每条消息都被 append 到 patition 中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障 kafka 吞吐率)。
b、消息路由
producer 发送消息到 broker 时,会根据分区算法选择将其存储到哪一个 partition。其路由机制为:
- 指定了 patition,则直接使用;
- 未指定 patition 但指定 key,通过对 key 的 value 进行hash 选出一个 patition(itps use this)
- patition 和 key 都未指定,使用轮询选出一个 patition。
c、写入流程
follower为副本。
- producer 先从 zookeeper 的 “/brokers/…/state” 节点找到该 partition 的 leader
- producer 将消息发送给该 leader
- leader 将消息写入本地 log
- followers 从 leader pull 消息,写入本地 log 后 向leader 发送 ACK
- leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向producer 发送ACK。同时leader 副本会将新的高水位信息广播给所有的 follower 副本,每个 follower 副本在收到来自 leader 副本的高水位更新信息后,会更新自己的高水位。
HW与LEO详解
HW俗称高水位,HighWatermark的缩写,取一个partition对应的ISR中最小的LEO(log-end-offset)作为HW,consumer最多只能消费到HW所在的位置。另外每个replica都有HW,leader和follower各自负责更新自己的HW的状态。对于leader新写入的消息,consumer不能立刻消费,leader会等待该消息被所有ISR中的replicas同步后更新HW,
此时消息才能被consumer消费。这样就保证了如果leader所在的broker失效,该消息仍然可以从新选举的leader中获取。对于来自内部broker的读取请求,没有HW的限制。
所以Kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上,同步复制要求所有能工作的follower都复制完,这条消息才会被commit,这种复制方式极大的影响了吞吐率。而异步复制方式下,follower异步的从leader复制数据,数据只要被leader写入log就被认为已经commit,这种情况下如果follower都还没有复制完,落后于leader时,突然leader宕机,则会丢失数据。
而Kafka的这种使用ISR的方式则很好的均衡了确保数据不丢失以及吞吐率(即 leader只需要收到所有 ISR 中的 replica 的 ACK,而无需所有follower都得复制完)。
7.日志分段存储(把分区的日志分成多个段,每个段最多存储1g数据)
Kafka 一个分区的消息数据对应存储在一个文件夹下,以topic名称+分区号命名,消息在分区内是分段(segment)存储,每个段的消息都存储在不一样的log文件里,这种特性方便old segment file快速被删除,kafka规定了一个段位的 log 文件最大为 1G,做这个限制目的是为了方便把 log 文件加载到内存去操作。
kafka数据包含的文件类型,如:
1.0000000000.index
.index文件为部分消息的offset索引文件。
kafka每次往分区发4K(可配置)消息就会记录一条当前消息的offset到index文件。查找某一条日志时先从索引中找,比如查找offset为5的,则从索引为4开始找,而不是直接从log顺序遍历。
2.0000000000.log
.log文件为消息存储文件,主要存offset和消息体。
3.0000000000.timeindex
.timeindex文件为消息发送时间的索引文件,记录了日志/消息的时间和索引。。
kafka每次往分区发4K(可配置)消息就会记录一条当前消息的发送时间戳与对应的offset到timeindex文件。
如果需要按照时间来定位消息的offset,会先在这个文件里查找
00000000000009936472.index
这个 9936472 之类的数字,就是代表了这个日志段文件的第一条记录在整个分区的偏移量Offset,也就说明这个分区里至少都写入了接近1000 万条数据了。
Kafka Broker 有一个参数,log.segment.bytes,限定了每个日志段文件的大小,最大就是 1GB。
一个日志段文件满了,就自动开一个新的日志段文件来写入,避免单个文件过大,影响文件的读写性能,这个过程叫做log rolling,正在被写入的那个日志段文件,叫做 active log segment。
三、Kafka线上问题总结
kafka-manager为Kafka可视化管理工具
1、消息丢失情况:
(解决方案:发送端设置合适的acks,消费端设置为手动提交及保证适时提交offset。
如果手动批量提交部分提交失败时如何保证不重复消费?看第2条,需要做幂等方案。
幂等方案看实战项目。)
消息发送端:
(1)acks=0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消息。大数据统计报表场景,对性能要求很高,对数据丢失不敏感的情况可以用这种。
(2)acks=1: 至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入。就可以继续发送下一条消息。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。
(3)acks=-1或all: 这意味着leader需要等待所有备份(min.insync.replicas配置的备份个数,isr)都成功写入日志才算提交成功。这是最强的数据保证。一般除非是金融级别,或跟钱打交道的场景才会使用这种配置。当然如果min.insync.replicas配置的是1则也可能丢消息,跟acks=1情况类似。
min.insync.replicas 是 Kafka 配置参数,用于指定至少需要多少个副本在 ISR 中才能将消息视为已提交(acknowledged),默认值为 1。
如果isr数量<min.insync.replicas 那么是不是无法成功提交消息了?
是的,如果 ISR(In-Sync Replicas)的数量少于 min.insync.replicas 的配置值,那么生产者在尝试提交消息时将无法成功。这是因为 Kafka 生产者需要等待至少 min.insync.replicas 数量的副本在 ISR 中确认接收并复制消息后,才认为消息已成功提交。
消息消费端:
如果消费这边配置的是自动提交,万一消费到数据还没处理完,就自动提交offset了,但是此时你consumer直接宕机了,未处理完的数据丢失了,下次也消费不到了。
2、消息重复消费(一般消费端都是要做消费幂等)
(通过在消息发送前给消息生成分布式唯一id来实现消费幂等)
消息发送端:
发送消息如果配置了重试机制,比如网络抖动时间过长导致发送端发送超时,实际broker可能已经接收到消息,但发送方会重新发送消息
消息消费端:
如果消费这边配置的是自动提交,刚拉取了一批数据处理了一部分,但还没来得及提交,服务挂了,下次重启又会拉取相同的一批数据重复处理。
一般消费端都是要做消费幂等处理的。
什么是幂等性?幂等操作的特点是,任意多次执行所产生的影响,均与一次执行所产生的影响相同。
一般如何实现消息幂等?
可以在消息发送前给消息赋值一个全局分布式唯一ID(参照rocketmq)。
3、消息乱序
(实现同一分区有序可以在消费端使用内存队列,实现不同分区有序同理也可以在消费端处理。)
如果发送端配置了重试机制,kafka不会等之前那条消息完全发送成功才去发送下一条消息,这样可能会出现,发送了1,2,3条消息,第一条超时了,后面两条发送成功,再重试发送第1条消息,这时消息在broker端的顺序就是2,3,1了
所以,是否一定要配置重试要根据业务情况而定。也可以用同步发送的模式去发消息,当然acks不能设置为0,这样也能保证消息从发送端到消费端全链路有序。
kafka保证全链路消息顺序消费,需要从发送端开始,将所有有序消息发送到同一个分区,然后用一个消费者去消费(即一个broker只包含一个分区,一个消费组只包含一个消费者)。
但是这种性能比较低,可以在消费者端接收到消息后将需要保证顺序消费的几条消费发到内存队列(可以搞多个),一个内存队列开启一个线程顺序处理消息。
实现同一分区有序解决方案:
a.不使用重试机制(根据业务情况而定)
b.使用同步发送模式且ack不为0(性能不高)
c.自动提交ack=0,然后在消费端使用内存队列,一个内存队列开启一个线程顺序处理消息(推荐)(此处不考虑消息丢失)。
实现不同分区有序解决方案:
a.可以在消费者先缓存消息不处理,收到消息排好序再处理。
b.可以先将先不同分区发送到同一个分区,再走实现同一分区有序解决方案(a less b me)。
4、消息积压
(a.可以修改程序,将消息转发到其他topic(topic包含多个分区),然后增加消费者
b.对于一直消费不成功的消息可以先将其转发到其他队列,先不处理后面再处理。)
1)线上有时因为发送方发送消息速度过快,或者消费方处理消息过慢,可能会导致broker积压大量未消费消息。
此种情况如果积压了上百万未消费消息需要紧急处理,可以修改消费端程序,让其将收到的消息快速转发到其他topic(可以设置很多分区),然后再启动多个消费者同时消费新主题的不同分区。不能只增加消费者,增加消费者也要增加分区才行。
2)由于消息数据格式变动或消费者程序有bug,导致消费者一直消费不成功,也可能导致broker积压大量未消费消息。
此种情况可以将这些消费不成功的消息先转发到其它队列里去,后面再慢慢分析队列里的消息处理问题,即先不处理,后面再处理。
5、延时队列(需要自己实现,kafka没有延时队列)
延时队列一般用rocketmq。
延时队列存储的对象是延时消息。所谓的“延时消息”是指消息被发送以后,并不想让消费者立刻获取,而是等待特定的时间后,消费者才能获取这个消息进行消费,延时队列的使用场景有很多, 比如 :
1)在订单系统中, 一个用户下单之后通常有 30 分钟的时间进行支付,如果 30 分钟之内没有支付成功,那么这个订单将进行异常处理,
这时就可以使用延时队列来处理这些订单了。
2)订单完成1小时后通知用户进行评价。
实现思路(队列+轮询 具体啥意思还要看下):发送延时消息时先把消息按照不同的延迟时间段发送到指定的队列中(topic_1s,topic_5s,topic_10s,…topic_2h,这个一般不能支持任意时间段的延时),然后通过定时器进行轮训消费这些topic,查看消息是否到期,如果到期就把这个消息发送到具体业务处理的topic中,队列中消息越靠前的到期时间越早,具体来说就是定时器在一次消费过程中,对消息的发送时间做判断,看下是否延迟到对应时间了,如果到了就转发,如果还没到这一次定时任务就可以提前结束了。
6、消息回溯(可以用consumer的offsetsForTimes、seek等方法)
如果某段时间对已消费消息计算的结果觉得有问题,可能是由于程序bug导致的计算错误,当程序bug修复后,这时可能需要对之前已消费的消息重新消费,可以指定从多久之前的消息回溯消费,这种可以用consumer的offsetsForTimes、seek等方法指定从某个offset偏移的消息开始消费。
7、分区数越多吞吐量越高吗(不一定)
不一定(分区越多,线程上下文切换的就越多),一般情况分区数跟集群机器数量相当就差不多了。
可以用kafka压测工具自己测试分区数不同,各种情况下的吞吐量(use method p3)。
注意:如果分区数设置过大,比如设置10000,可能会设置不成功,后台会报错"java.io.IOException : Too many open files"。
8、消息传递保障
producer 的 ack 策略分为三种:
ack=0 producer不等待broker同步完成的确认,继续发送下一条(批)信息
ack=1 producer要等待leader成功收到数据并得到确认,才发送下一条message。
ack=-1 producer得到isr 确认,才发送下一条数据
即ack=1的时候,消费者可能收到0次也可能收到多次。
ack用于确保生产者发送消息到broker的可靠性。
at most once(消费者最多收到一次消息,0-1次):acks = 0 可以实现。
at least once(消费者至少收到一次消息,1-多次):ack = 1 或 ack = all 可以实现。
exactly once(消费者刚好收到一次消息):默认kafka是无法实现的,其解决方案如下:
a.可以用at least once 加上消费者幂等性可以实现(一般用这种方法)。
9、kafka高性能的原因(kafka使用分区是高吞吐,而不是高性能)
a.磁盘顺序读写:kafka消息不能修改以及不会从文件中间删除,保证了磁盘顺序读,kafka的消息写入文件都是追加在文件末尾,
不会写入文件中的某个位置(随机写),保证了磁盘顺序写。
b.数据传输的零拷贝(使用DMA技术)
在 Kafka 中,消息和日志段以文件的形式存储在磁盘上。在进行网络传输时,Kafka利用操作系统提供的 sendfile 系统调用来直接将文件内容发送给网络套接字,而无需在用户空间和内核空间之间进行数据拷贝。
减少了两次内核与用户空间的数据拷贝。
减少了内核与用户空间上下文切换。
c.读写数据的批量batch处理以及压缩传输