首页 > 其他分享 >kafka

kafka

时间:2023-06-06 15:58:34浏览次数:22  
标签:消费 消费者 分区 broker kafka 数据 leader

1.消息的模式主要两种,第一种点对点,消费完就删掉。第二种主流的,发布订阅,一对多,消费之后不会删掉。

2.卡夫卡的主要结构:生产者--broker--消费者,broker里有很多partation,实际上一个broker就是一台服务器,partation类似于es的分片,也有主分区从分区之分,只不过读写都是对主分区做的,从分区只在主分区挂了才用,这一点跟es不同,es的从分片,在主分片没有挂掉的时候也会运行查询。哪些broker活着,针对某个分区谁是leader谁是follower存储在zookeeper里。

3.发送流程:
首先发送方会调用send方法,在发送方这边有个分区器,用来确定这条消息发到哪个分区里。然后数据会放到生产者这边的"队列内存(Dqueue)"里,这个队列内存默认是32m,里面有多个队列,每个队列对应着一个broker里的一个分区。队列里有数据块儿,每个数据块儿的默认大小:batch.size是16k,也就是数据攒到16k了才真的发送,不然来一个就发来一个就发效率比较低。但是也不能数据少的时候就不发了,所以还得设置一个最长等待时间,这个时间到了之后即使数据没达到batch.size也要发,这个参数叫linger.ms,默认是0ms,就是有数据就发。batch.size和linger.ms这两个参数是发送端调优的主要参数。
生产者这边实际真正跟broker做交互的是sender线程,他里面也有多个队列,是以broker为key的,可以理解为是Map<brokerName,List<Request>>,List<Request>的size是5,也就是针对一个broker发消息的时候最多能存5个。为什么会存着不是发过去就没了呢?是因为发送的时候需要等待broker应答,返回ack才能去掉一个request.对于这个ack的设置也有3种:
0:发过来的数据不用落盘就返回应答。
1:得主partation落盘了才给应答。
-1:得主partation和副本partation都落盘了才应答。
发送成功之后,sender线程会清理掉这个request,再清理掉内存中被request携带的那个块儿。如果失败了就重试发送,这个重试的次数默认是int最大值
注意这里的Request也是有map的,是以目的broker的nodeId为key,封装了几个batch到list里来减少创建连接次数。

4.发送方式:
首先有同步异步概念,异步就是数据被放到内存队列里了,不管发没发送都返回。同步就是内存队列里的数据必须被sender线程发给broker并成功返回ack了,数据块儿被清掉了才能发下一个块儿。
普通的异步:数据被send方法发给内存里的队列就返回了。
带回调的异步:数据被send方法发给内存里的队列,同时返回一个metedata,告诉你发送的是哪个主题,被放到哪个分区对应的队列里了。
同步发送:同步就是在异步的发送方法后边加个.get(),表示要有返回才行。

5.分区选择策略:如果发送时指定分区了,就用这个。没指定分区但是发送的时候带了key,那用key对分区数取模。如果这俩都没有,那就随机选一个分区,这个分区的batch.size满了或者linger.ms到了才换下一个。

6.发送方提高吞吐量,主要就是提高batch.size和linger.ms,其次就是可以把缓冲区默认的32m(buffer.memory)提高一些,还有就是加个压缩参数让数据变小。
注意虽然batchSize默认16k,但并不是超过16k就不发了,即使一条消息大小超过了16k也是可以发的,只不过开辟内存块儿的时候如果这批消息不到16k就当成他占了16k,取了16k和实际大小的max。这里说的内存块儿指的是sender线程使用的池化内存,申请一块儿内存发给broker发完再把这块儿内存还到内存池里。
sender线程也不是一直在监听,而是内存块儿里根据分区生成的双端队列里有了数据才会把它唤醒。
sender线程一运行起来会先抓取元数据,因为他要知道拿到的batch要发到哪。然后挨个取每个队列头的那个batch,并获取到这个队列的分区在kafka集群里的leader broker在哪,根据leader broker的nodeId为key,list<batch>为value创建连接给他发过去,这样可以减少创建连接次数。

7。数据可靠性:
最可靠的肯定是ack=-1,但是ack=-1时,follower是主动拉取leader数据来落盘然后应答才返回的,如果某个follower挂了,那不就没法用了吗?这里有个isr的概念,就是活着的主、副节点(broker)集合,可以设置剔除时间,比如超过30秒这个followe没有给我发请求就在isr里把他去掉。再设置isr最小size为2,这样至少有一个主分区和一个副本分区活着。还可以直接设置最小应答分区数,就是加入有4个副分区,我设置成2,那就只有一个副本分区落完盘就可以返回了,这样兼顾性能和数据可靠性。
ack=0,基本不用。ack=1,通常用来传输日志,丢一点无所谓。跟钱相关的一点错都不允许出,那就得用-1。

8.数据重复:
上面说的ack设置成了-1,有一种极端情况,那就是主分区、副本分区都落盘完了,即将给sender线程返回ack,结果主分区挂了,挂了之后zookeeper选了个新分区做主分区,sender线程因为没收到应答所以又发了一遍,那这个数据就出现了两次造成重复。ack=-1已经保证了数据不丢,那么怎么保证数据不重复呢?这个要靠幂等性来支持。幂等性:即使生产者发送了一条消息多次,broker也只会落盘一次。这个是根据<PID,Partition,SeqNo>作为联合主键来确定的。PID是每次卡夫卡重启都会分配的一个固定值,所以在一次会话内,往一个分区发的每条消息都有它的一个SeqNo,即使上面的问题出现,新的主分区发现这条消息的SeqNo已经有了,就不会再落盘了。不过要注意这个也有一些不足,那就是重启之后换了PID检测不出来重复。为什么不能去掉PID呢?因为SeqNo是每次启动后单调递增的。

为了进一步解决幂等性的局限,还要在幂等性的基础上,开启kafka事务。每台broker上都有自己的事务处理器,那接到一条消息时候使用哪个处理器呢?这里需要指定以一个全局唯一的事务id。有一个特殊的存事务相关信息的主题,它默认有50个分区,分散在broker上,根据这个事务id对分区数取模找到分区,然后这个分区(注意这个分区不是存业务数据的分区)所在的broker上的处理器就是这次请求的处理器。大概是这么个流程:生产者向broker发送完数据之后,会再给事务协调器发一个commit请求,携带着事务id,表示向确定这条写入成功没有。事务协调器先把这个事务id commit(第一次提交)到那个事务消息主题里,然后直接返回给生产者告诉它成功了(这里存疑,为什么不检查完是否真的持久化成功再返回呢?)。然后再访问那个存储业务数据的broker看他到底持久化成功了没,如果成功了就第二次提交那个事务id的消息,这一次是真正的落盘,然后如果下次重启,即使PID变了,但是提交带有相同的事务id的请求,事务处理器发现这个事务id已经存过了就不会再继续持久化。

9.数据乱序
分区间数据没法保证有序,有序指的都是单分区内,单分区内数据是怎么乱序的呢?前面提到了sender线程发数据的时候最多是缓存5个,有可能1发完之后,2发送失败了要重试,结果2还没重试呢,3先发过去了,就乱了。怎么解决呢,最简单粗暴的方法,把max.in.flight.request.per.connection设置成1,每次只发一个,不缓存。这个肯定不好。有更好的方法,就是开启幂等性,因为幂等性会携带SeqNo,单调递增,kafka集群这边根据这个来确定是不是该落盘就好了。卡夫卡这边消费之前也是有个队列的,里面缓存sender线程发的数据,它这个缓存队列长度是5,所以max.in.flight.request.per.connection最大只能设置5.比如1发过来了落盘了,2发失败了,3456发过去了,那kafka集群就会把这4个缓存起来,等2来了再落盘。但是如果max.in.flight.request.per.connection设置成了6,那34567一起发过去把缓存队列占满了,等不到2,就没法落盘了。

10.zoo keeper里有啥
(1)哪些broker正常活着
(2)所有的主题,以及这个主题有哪些分区,还有某个分区的主分区在哪个broker里,副本分区在哪些broker里(isr,主副都在里头,主排第一)
(3)辅助选举leader的节点是谁,这个是活着的broker抢着注册的,谁先注册这里就写谁,如果他也挂了再换别人。leader挂了之后,再用这个信息找到辅助选举人,根据这个辅助选举人broker里controller的信息来确定谁是新的broker.

leader选举规则:在broker上线的时候,会往AR里注册自己,isr里是活着的节点。谁抢先注册为controller了,谁就负责选举下一次的leader.这个节点会跟zookeeper通信,如果发现isr里leader挂了,就选活着(在isr里)并且在AR里排位靠前那个做leader。

11.leader和follower的消费进度是不同的,follower挂了没事儿,但如果leader挂了,会从follower中选举新的leader,新的这个leader消费的offset通常会比原来的leader小,而且她会要求其他比他offset长的削掉长的部分来跟他保持一致,所以会丢数据。

12.副本在broker上的分配原则,假如我某个主题有4个分区,每个分区一主二从共三个副本,为了尽量均匀的分配,会这样:
分区一:broker1主,23从
分区二:broker2主,34从
分区三:broker3主,41从
分区四:broker4主,12从
总之跟es差不多,两个原则,第一:4主8从共12个副本,应该均匀的分配到4个broker上,第二每个分区的主和他的从不能放一个broker里。
者其中leader的分配尤为重要,因为只要leader不挂,生产者消费者读写都是对leader做的。

13.文件存储方式
实际分区的存储,也不是直接一整个全寸的,而是把一个分区的数据按1G为单元分开的,每个段叫一个segment,这些段被放在一个文件夹里,文件夹的命名格式为:主题-分区号,比如topic-0。一个段分成3种文件,.log是真实的数据,.index是稀疏索引,存储相对位置,.timeindex存储时间戳,默认每个文件存1星期。这三个文件的名字就是这一个段的数据的开头那个offset,比如可能在topic-0这个文件夹里出现这么6个文件。00000000.log,00000000.index,00000000.timeindex,00001024.log,00001024.index,00001024.timeindex。.index稀疏的规则,就是log大小每增加4kb它生成一条数据,里边有两个列,第一个列是消息数据的offset,第二个列是这条数据在.log里的位置。找某一条消息的话,就先根据这条消息的offset找到它在那个段文件里,然后再去.index里根据稀疏索引找到应该从.log的哪个位置开始查,顺序往下找就行了。

14.文件清理
文件有两种清理策略,一个是删除,一个是压缩。‘
删除可以基于时间,比如默认超过7天就删除,这里开始计时的时间是一个segment里最后一条数据的时间。另一个是基于空间,比如占用空间大于某个阈值,就删除最早的哪个segment.这俩策略跟日志清理策略很相似。
压缩的方式是对于相同的key只保存value最大的,这个应用比较少。

15.总结一下kafka之所以高效读写的原因
首先最外层就使用了集群、分区的概念,使得生产者、消费者可以并行对不同的服务器(broker)操作。其次再每个分区里,又把数据分成了segment,然后在segment里又用了稀疏索引.index文件进行索引。而且对于数据是只追加不修改的方式,对磁盘进行顺序读写,顺序读写比随机读写快几十倍。
在操作系统层面还有个页缓存,数据先从页缓存里查,查不到才查磁盘。还有一个零拷贝,简单说就是数据查到之后直接返回给消费者而不经过应用层,因为broker不提供过滤供能。

16.消息拉取
消息拉取主要两种方式,一种是推,就是broker推给消费端。另一种是拉,就是消费端主动去broker里拉数据。因为不同消费端处理速度不同,所以kafka用的是主动拉的方式。

17.消费者组
消费者组是根据groupId确定的,消费者组内可以有多个消费者,但是逻辑上一个消费者组就是一个消费者,所以对于一个分区来说,一个消费者组内的多个消费者,不能同时消费同一个分区,因为这样就相当于一个消费者对一个分区消费了多次。类比于我们实际的开发,一个集群就相当于一个组,这个集群内有多个分组,每个分组下又有好几台ip,但是一条消息只能被一个ip消费掉。
核心思想:组内竞争、组间共享。

18.怎么确定消费者组内的某个消费者跟集群里的哪个broker进行绑定呢?这个跟前面事务那个就有些类似了。kafka集群里还是每个broker上都有个控制器,这个消费者组根据自己的groupId对50取模,找到这个控制器所在broker位置,然后组内所有消费者注册到这个控制器上面来,控制器从这些消费者里选出一个leader跟他通信,这个leader确定下来一个消费方案,比如1号消费者绑定到分区1,2号消费者绑定到分区2,然后把这个方案发给控制器,控制器再分发给组内其他消费者。消费者们每隔3秒要跟控制器进行心跳上报,如果超过45秒都没上报,就会触发再平衡,重新定制消费方案。同时控制器会记录每个消费者消费到了分区内的哪个offset,通过50个通道记在磁盘里。

19.消费者组消费消息的具体流程
一个组内的某个消费者确定了要对哪个分区进行消费之后,具体流程是这样的,有点像发送时候的反推,也是不是直接从broke拿过来,而是调用fetch方法,创建一个连接,从分区里拉数据,这个拉数据取决于分区里产生了多少数据,默认的是1字节(fetch.min.bytes)。但是如果就不到1字节就一直不拉了吗?也不是,还有一个最长等待时间(fetch.max.wait.ms),即达到了这个最长等待时间即使数据不到一个字节也要拉。这俩参数就跟前面send的时候那个batch.size和linger.ms很相似了。不过还多了一个单批次最大拉取大小参数(fetch.max.bytes)默认50m,这个是一次拉所有分区的上限,对于单个分区的上限是1m,取决于单条消息的最大size配置,所以消费者这边的这个双端队列里面存了多个分区的数据.拉过来之后也不是直接给消费者,而是作为一个块儿也放到一个队列里,然后消费者再从这个队列里拿,默认的是一次最多拿500条。(这里有疑问,拿的时候有没有最小数据和等待时间限制)
消费者这边消费主题,既可以消费某个主题,也可以指定消费某个主题的某个分区。

20.具体消费策略的制定。上面说了消费者中的leader要制定消费方案告诉控制器,那怎么制定的呢?有四种策略,默认的是前两个一起用
(1)range:分区数除以消费者数,除不开前面的多消费点。比如8分区组内3个消费者,那就是消费者1消费123,2消费456,3消费78.这种存在弊端,就是主题如果很多,前面的消费者总是比别人多消费一个分区压力很大。
(2)randrobin:randrobin于range相比,就是把所有的topic的所有分区都拿出来排个序然后再分配,比如3主题各3个分区,那么排完序就是1-9,4个消费者,那么最终分配的就是一号消费者消费159,2消费26,3消费37,4消费48.
(3)sticky跟randrobin相比,就是不按套路了,仍然是所有主题所有分区排序,但是分配的时候是随机分,不按顺序来,所以有可能是3个消费者消费5个分区,221 212 122都有可能,不过因为是一轮一轮来的,也会保证两个消费者差距最大为1。

21.offset的存储位置
offset是在每个broker上都有,存在一个系统主题里的,名字叫_consumer_offset,key是groupId+topic+分区号,value值就是offset,会定期压缩,每个key都保存最新的值。offset是消费者这边每隔5秒自动告知broker的,默认自动提交,也设置成可以手动提交。

22。消费者启动时,可以指定从哪个位置开始消费,默认是latest,就是不消费历史数据,只消费最新数据。还有一个earliest就是从最早开始消费,消费历史数据。也可以手动指定offset位置从这个位置开始消费,或者指定一个时间,从这个时间点往后消费。

23.自动提交offset有可能在5秒间隔内还未提交下一批时消费者挂掉,这种情况就会重复消费。手动提交有可能异步提交完保存了offset但实际数据还没落盘消费者挂掉,这种情况会漏消费,简单理解消费者这边重复消费和漏消费都是消费者异常引起的,要避免这种情况发生就要手动在下游加事务来支持,虽然麻烦但只能这样。

24.消费者调优:
消费者消费的慢了,怎么提高速度呢?首先可以增加分区数和消费者数,这是最直接的。其次可以提高单批次fetch最大拉取数据大小和从队列里拉取时的最大拉取数据条数,提高单批次吞吐量。

25. 2.8.0版本之后可以不用zookeeper,而使用kraft模式,可以给broker配置角色,可以配置为broker,或者controller,或者既是broker又是controller,谁是controller谁负责。

标签:消费,消费者,分区,broker,kafka,数据,leader
From: https://www.cnblogs.com/hit-cw/p/17460770.html

相关文章

  • KafKa消费开发
     KafKa消费开发配置以下代码需要写完整,不完整会出现中断,假死现象,长时间不处理问题。(实际项目代码)///<summary>///-offsets是自动提交的。///-consumer.Poll/OnMessage是用于消息消费的。///-......
  • Spark消费Kafka
    0.前言之前先写了处理数据的spark,用文件读写测了一批数据,能跑出结果;今天调通了Kafka,拼在一起,没有半点输出,查了半天,发现是之前的处理部分出了问题,把一个不等号打成了等号,把数据全filter没了。很恐怖,我保证这段时间我没动过这段代码,但上次真的跑出东西了啊(尖叫1.配置流程主节点......
  • 面试官问:kafka为什么如此之快?
    前言天下武功,唯快不破。同样的,kafka在消息队列领域,也是非常快的,这里的块指的是kafka在单位时间搬运的数据量大小,也就是吞吐量,下图是搬运网上的一个性能测试结果,在同步发送场景下,单机Kafka的吞吐量高达17.3w/s,不愧是高吞吐量消息中间件的行业老大。那究竟是什么原因让kafka如此......
  • 单节点kafka部署笔记
    1背景因为工作中需要对接kafka,准备在测试环境中自己部署一套,考虑方便决定部署一台单点。2部署2.1scala2.1.1java环境openjdk即可,我使用的是openjdk1.82.1.2下载软件下载scala-2.12.17.tgz并解压,例如解压到/home/scala/scala-2.12.172.1.3环境变量exportSCALA_HOME......
  • apache kafka系列之迁移与扩容工具用法
    kafka迁移与扩容工具使用参考官网site:https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-6.ReassignPartitionsTool说明:当我们对kafka集群扩容时,需要满足2点要求: 将指定topic迁移到集群内新增的node上。将topic的指定partition迁移到新增......
  • kafka跨集群发送消息
    1.场景集群B有一个应用要向集群A的kafka集群发送消息,但是集群A和集群B不是直接互通的,需要经过一层转发。 ......
  • Using Spring for Apache Kafka
    UsingSpringforApacheKafkaSendingMessagesKafkaTemplateThe KafkaTemplate wrapsaproducerandprovidesconveniencemethodstosenddatatokafkatopics.Bothasynchronousandsynchronousmethodsareprovided,withtheasyncmethodsreturninga ......
  • 【kafka】浅谈kafka常考特性
    Kafka前几天聊完绩效的时候问了下今年还有没有涨薪,组长的原话是"很难。。。我尽量帮大家争取。。。",我刚听完脑海的第一念头:"此处涨薪难,自有不难处!"。冷静分析一波,今年整体大环境不行,还是苟着拿波年终吧,先不准备跳了,跟大家浅浅分享一下之前准备的kafka相关知识点,等看机会的时候可......
  • 当Elasticsearch遇见Kafka
    Elasticsearch作为当前主流的全文检索引擎,除了强大的全文检索能力和高扩展性之外,对多种数据源的兼容能力也是其成功的秘诀之一。而Elasticsearch强大的数据源兼容能力,主要来源于其核心组件之一的Logstash,Logstash通过插件的形式实现了对多种数据源的输入和输出。Kafka是一种高吞......
  • kafka动态生产者
    packagecom.sunclouder.das.data.kafka.forward;importcn.hutool.core.util.StrUtil;importcn.hutool.json.JSONObject;importcn.hutool.json.JSONUtil;importcom.sunclouder.das.data.kafka.entity.ConfigEntity;importcom.sunclouder.das.data.kafka.entity.DasConfig......