首页 > 其他分享 >Kafka-核心设计和实现原理,生产者和消费者详述

Kafka-核心设计和实现原理,生产者和消费者详述

时间:2023-07-02 20:57:24浏览次数:47  
标签:详述 副本 生产者 分区 默认 kafka 消息 Kafka leader

1.体系架构

 

  • Producer:生产者
  • Consumber:消费者
  • Broker:服务代理节点(kafka实例)

 

2.消息存储

  • 主题(Topic):kafka消息以topic为单位进行归类,逻辑概念
  • 分区(Partition):
    • Topic-Partition为一对多
    • 分区在存储层面可看做是一个可追加的日志文件
    • 消息在追加到分区时会分配一个特定的偏移量(offset)作为在此分区的唯一标示
    • kafka通过offset保证消息在分区内的顺序性,但只保证分区有序而不保证主题有序
    • 每条消息发送到broker前,会根据分区规则分配到具体的哪个分区

 

3.容灾设计

  • 多副本机制(Replica):
    • 一个分区会在多个副本中保存相同的消息
    • 副本之间是一主多从关系
    • leader副本负责读写操作,follower副本只负责同步消息(主动拉取)
    • leader副本故障时,从follower副本重新选举新leader

 

同步状态

  • 分区中所有副本统称为 AR(Assigned Replicas)
  • 所有与leader副本保持一定程度同步的副本(包括leader)组成 ISR(In-Sync Replicas)
  • 同步之后过多的副本组成 OSR(Out-of-Sync Replicas)

 

特殊偏移量

 

  • LEO(Log End Offset):标识当前分区下一条代写入消息的offset
  • HW(High Watermark):高水位,标识了一个特定的offset,消费者只能拉渠道这个offset之前的消息(不含HW)
  • 所有副本都同步了的消息才能被消费,HW的位置取决于所有follower中同步最慢的分区的offset

 

二、生产者

 

1.客户端开发

  • 消息发送步骤
    • 配置生产者客户端参数及创建相应的生产者实例
    • Properties
    • KafkaProducer
    • 构建待发送的消息:ProducerRecord
    • 发送消息:send( ),flush( )
    • 关闭生产者实例:close( )
  • 必要参数配置:
    • bootstrap.servers:设置kafka集群地址,并非需要所有broker地址,因为生产者会从给定的broker中获取其他broker信息
    • key.serializer、value.serializer:转换字节数组到所需对象的序列化器,填写全限类名
  • 发送模式
    • 发后即忘(fire-and-forget):只管往kafka发送而不关心消息是否正确到达,不对发送结果进行判断处理;
    • 同步(sync):KafkaProducer.send()返回的是一个Future对象,使用Future.get()来阻塞获取任务发送的结果,来对发送结果进行相应的处理;
    • 异步(async):向send()返回的Future对象注册一个Callback回调函数,来实现异步的发送确认逻辑。
  • 拦截器
    • 实现ProducerInterceptor接口,在消息发送的不同阶段调用
    • configure():完成生产者配置时
    • onSend():调用send()后,消息序列化和计算分区之前
    • onAcknowledgement():消息被应答之前或消息发送失败时
    • close():生产者关闭时
    • 通过 interceptor.classes 配置指定
  • 序列化
    • 自定义序列化器:实现Serializer接口
  • 分区器
    • 在消息发送到kafka前,需要先计算出分区号,默认使用DefaultPartitioner(采用MurmurHash2算法)
    • 自定义分区器:实现Partitioner接口
    • 通过partitioner.class配置指定

2.原理分析

整体架构

 

  • 主线程KafkaProducer创建消息,通过可能的拦截器、序列化器和分区器之后缓存到消息累加器(RecordAccumulatro)
  • 消息在RecordAccumulator被包装成ProducerBatch,以便Sender线程可以批量发送,缓存的消息发送过慢时,send()方法会被阻塞或抛异常
  • 缓存的大小通过buffer.memory配置,阻塞时间通过max.block.ms配置
  • Kafka生产者客户端中,通过ByteBuffer实现消息内存的创建和释放,而RecordAccumulator内部有一个BufferPool用来实现ByteBuffer的复用
  • Sender从RecordAccumulator中获取缓存的消息后,将ProducerBatch按Node分组,Node代表broker节点。也就是说sender只向具体broker节点发送消息,而不关注属于哪个分区,这里是应用逻辑层面到网络层面的转换
  • Sender发往Kafka前,还会保存到InFlightRequests中,其主要作用是缓存已经发出去但还没收到相应的请求,也是以Node分组。
  • 每个连接最大缓存未响应的请求数通过max.in.flight.requests.per.connection配置(默认5)

元数据的更新

  • InFlightRequests可以获得leastLoadedNode,即所有Node中负载最小的。leastLoadedNode一般用于元数据请求、消费者组播协议等交互。
  • 当客户端中没有需要使用的元数据信息或唱过metadata.max.age.ms没有更新元数据时,就会引起元数据更新操作。

 

3.重要的生产者参数

  • acks:用来指定分区中有多少个副本收到这条消息,生产者才认为写入成功(默认”1")
  • “1":leader写入即成功、“0”:不需要等待服务端相应、”-1”/“all":ISR所有副本都写入才收到响应
  • max.request.size:限制生产者客户端能发送的消息的最大值(默认1048576,即1m)
  • retries、retry.backoff.ms:生产者重试次数(默认0)和两次重试之间的间隔(默认100)
  • compression.type:消息压缩方式,可配置为”gzip”、”snappy”、”lz4”(默认”none”)
  • connections.max.idle.ms:多久后关闭闲置的连接(默认540000,9分钟)
  • linger.ms:生产者发送ProducerBatch等待更多消息加入的时间(默认为0)
  • receive.buffer.bytes:Socket接收消息缓冲区的大小(默认32768,32k)
  • send.buffer.bytes:Socket发送消息缓冲区的大小(默认131072,128k)
  • request.timeout.ms:Producer等待请求响应的最长时间(默认30000ms),这个值需要比broker参数replica.lag.time.max.ms大

三、消费者

 

1.消费者与消费组

  • 每个分区只能被一个消费组的一个消费者消费
  • 消费者数大于分区数时,会有消费者分配不到分区而无法消费任何消息
  • 消费者并非逻辑上的概念,它是实际的应用实例,它可以是一个钱程,也可以是一个进程。

 

2.客户端开发

  • 消费步骤
    • 配置消费者客户端参数及创建KafkaConsumer实例
    • 订阅主题
    • 拉取消息并消费
    • 提交消费位移
    • 关闭实例
  • 必要的参数配置
    • bootstrap.servers:集群broker地址清单
    • group.id:消费组名称
    • key.deserializer、value.deserializer`:反序列化器
  • 订阅主题和分区
    • subscribe():订阅主题
    • assign():订阅指定主题分区
    • 通过partitionFor()方法先获取分区列表
    • unsubscribe():取消订阅
  • 消息消费
    • poll():返回的是所订阅的主题(分区)上的一组消息,可设定timeout参数来控制阻塞时间
  • 位移提交
    • 提交的offset为 lastConsumedOffset + 1
    • lastConsumedOffset:上一次poll拉取到的最后一条消息的offset
  • 控制或关闭消费
    • pause()、resume():暂停和恢复某分区的消费
  • 指定位移消费
    • seek():指定offset消费
    • beginingOffsets(),endOffsetes(),offstesForTimes():获取开头、末尾或指定时间的offset
    • seekToBeginning、seekToEnd():从开头、末尾开始消费
  • 再均衡
    • 在subcribe()时,可以注册一个实现ConsumerRebalanceListener接口的监听器
    • onPartionsRevoked():消费者停止读取消息之后,再均衡开始之前
    • onPartitionsAssigned():重新分配分区后,开始读取消费前
  • 拦截器
    • 实现ConsumerInterceptor接口
    • poll()返回之前,会调用onConsume()方法,提交完offset后会调用onCommit()方法
  • 多线程实现
    • KafkaProducer是线程安全的,但KafkaConsumer是非线程安全的,acquire()方法可检测当前是否只有一个线程在操作,否则抛出异常
    • 推荐使用单线程消费,而消息处理用多线程

 

3.重要的消费者参数

  • fetch.min.bytes:一次请求能拉取的最小数据量(默认1b)
  • fetch.max.bytes:一次请求能拉取的最大数据量(默认52428800b,50m)
  • fetch.max.wait.ms:与min.bytes有关,指定kafka拉取时的等待时间(默认500ms)
  • max.partition.fetch.bytes:从每个分区里返回Consumer的最大数据量(默认1048576b,1m)
  • max.poll.records:一次请求拉取的最大消息数(默认500)
  • connections.max.idle.ms:多久后关闭闲置连接,默认(540000,9分钟)
  • receive.buffer.bytes:Socket接收消息缓冲区的大小(默认65536,64k)
  • send.buffer.bytes:Socket发送消息缓冲区的大小(默认131072,128k)
  • request.timeout.ms:Consumer等待请求响应的最长时间(默认30000ms)
  • metadata.max.age.ms:元数据过期时间(默认30000,5分钟)
  • reconnect.backoff.ms:尝试重新连接指定主机前的等待时间(默认50ms)
  • retry.backoff.ms:尝试重新发送失败请求到指定主题分区的等待时间(默认100ms)
  • isolation.level:消费者的事务隔离级别(具体查看进阶篇:事务)

四、主题与分区

 

1.主题的管理

  • 创建
    • broker设置auto.create.topics.enable=true时,生产者发送消息时会自动创建分区数为num.partitions(默认1),副本因子为default.replication.factor(默认1)的主题
    • 通过kafka-topics.sh创建:create指令
      • kafka-topics.sh --zookeeper <zkpath> --create --topic <topic> --partitions <N> --replication-factor <N>
      • 手动分配副本:--replica-assignment
        • --replica-assignment 2:0:1,1:2:0,0:1:2
        • partion1 AR:2,0,1
        • partion2 AR:1:2:0
        • partion3 AR:0:1:2
    • 设定参数:--config <key=value>
  • 分区副本的分配
    • 使用kafka-topics.sh创建主题内部分配逻辑按机架信息划分两种策略:
      • 未指定机架信息分配策略:assignReplicasToBrokersRackUnaware()方法
      • 指定机架分配策略:assignReplicasToBrokersRackAware()方法
    • 当创建一个主题时,不管用什么方式,实质上是在zk的/broker/topics节点下创建与该主题对应的子节点并写入分区副本分配方案,并且在/config/topics节点下创建与该主题相关的子节点并写入主题配置信息
  • 查看:kafka-topics.sh脚本的 list、describe指令
  • 修改:kafka-topics.sh脚本的 alter指令
  • 配置管理:kafka-configs.sh脚本
  • 删除:kafka-topics.sh脚本的 delete指令

 

2.初识KafkaAdminClient

  • KafkaAdminClient可实现以调用API的方式对Kafka进行管理
  • 主题合法性
    • 通过KafkaAdminClient创建主题可能不符合规范,可以在broker端设置create.topic.policy.class.name来指定一个类验证主题创建时的合法性,这个类需要实现CreateTopicPolicy接口,放入Kafka源码,并重新编译

3.分区的管理

  • 优先副本(preferred replica/preferred leader)
    • 优先副本即 AR 集合中的第一个副本,当分区leader出现故障时,会直接使用优先副本作为新的leader
    • kafka-perferred-replica-election.sh可进行优先副本选举操作
  • 分区重分配
    • 解决问题:
    • 将某节点上的分区副本迁移至其他节点:宕机迁移失效副本、有计划下线节点迁移副本
    • 注意,下线前最好先关闭或重启此broker,保证不是leader节点,减少了节点间流量复制
    • 向新增节点分配原有主题分区副本
    • 集群中新增节点时,只有新创建的主题分区才有可能分配到新节点上,需要把老主体的分区分配到新节点上
    • 可使用kafka-reassign-partitions.sh脚本
  • 复制限流
    • 数据复制会占用额外的资源,如果重分配的量太大必然会严重影响整体的性能。可以通过对副本间的复制流量加以限制来保证重分配期间整体服务不会受太大的影响,可分别限制follower副本复制速度和leader副本传输速度
    • 通过kafka-config.sh或 kafka-reassign-partitions.sh配置
    • broker级别:follower/leader.replication.throttled.rate=N
    • topic级别:follower/leader.replication.throttled.replicas=N
    • 分区重分配过程中的临时限流策略
    • 原AR会应用leader限流配置
    • 分区移动的目的地会应用follower限流配置
    • 重分配所需的数据复制完成后,临时限流策略会被移除
  • 修改副本因子
    • 通过kafka-reassign-partitions.sh配置
    • 如何选择合适的分区数
  • 性能测试工具
    • 生产者性能测试:kafka-producer-perf-test.sh脚本
    • 消费者性能测试:kafka-consumer-perf-test.sh脚本
  • 分区数和吞吐量的关系
    • 在一定限度内,吞吐量随分区数增加而上升,但由于磁盘、文件系统、I/O调度策略等影响,到一定程度时吞吐量会存在瓶颈或有所下降
  • 考量因素
    • 如果分区数过多,当集群中某个broker宕机,就会有大量分区需要进行leader角色切换,这个过程会耗费一定的时间,并且在此期间这些分区不可用。分区数越多,kafka的正常启动和关闭耗时也会越长,同时也会增加日志清理的耗时
    • 建议将分区数设定为broker的倍数

标签:详述,副本,生产者,分区,默认,kafka,消息,Kafka,leader
From: https://www.cnblogs.com/fcjedorfjoeij/p/17521366.html

相关文章

  • kafka入门必备知识
    1.Kafka是一个分布式流处理平台:可以让你发布和订阅流式的记录。这一方面与消息队列或者企业消息系统类似。可以储存流式的记录,并且有较好的容错性。可以在流式记录产生时就进行处理。2.消息系统:定义将数据从一个应用程序传递到另一个应用程序,通过提供消息传递和消......
  • (一)kafka从入门到精通之初识kafka
    一、发布订阅系统在学习kafka之前,我们先来看看什么是发布订阅系统。概念数据的发送者不会直接把消息发送给接收者,这是发布与订阅消息系统的一个特点。发布者以某种方式对消息进行分类,接受者订阅它们,以便接受特定类型的消息。发布与订阅系统一般会有一个broker,也就是发布消息的......
  • (二)kafka从入门到精通之kafka的优势
    学习传送门(一)kafka从入门到精通之初识kafka一、常用消息队列比较基于发布与订阅的消息系统那么多,为什么Kafka会是一个更好的选择呢?咱们先来简单的看看mq的一个对比图吧。特性ActiveMQRabbitMQRocketMQKafka生产者消费者模式支持支持支持支持发布订阅......
  • Kafka中的消费者Offset
    消费者位移每个consumer实例都会为它消费的分区维护属于自己的位置信息来记录当前消费了多少条消息。这在Kafka中有一个特有的术语:位移(offset)。相比较将offset保存在服务器端(broker),这样虽然简单,但是有如下的问题:broker变成了有状态的,增加了同步成本,影响伸缩性。需......
  • SpringBoot整合Kafka
    1、安装kafka这里我是用的是docker-compose方式安装(1)安装docker和docker-composesudoyuminstall-yyum-utilssudoyum-config-manager\--add-repo\https://download.docker.com/linux/centos/docker-ce.reposudoyuminstalldocker-cedocker-ce-clico......
  • 18、【SparkStreaming】object not serializable (class: org.apache.kafka.clients.c
    背景:当SparkStream连接kafka,消费数据时,报错:objectnotserializable(class:org.apache.kafka.clients.consumer.ConsumerRecord,value:ConsumerRecord分析:消费者的消费记录序列化出现了问题,需要正确的进行序列化。措施:在设置sparkconf的时候,指定序列化方式就可以解......
  • 白话Kafka
    一、Kafka基础消息系统的作用应该大部份小伙伴都清楚,用机油装箱举个例子所以消息系统就是如上图我们所说的仓库,能在中间过程作为缓存,并且实现解耦合的作用。引入一个场景,我们知道中国移动,中国联通,中国电信的日志处理,是交给外包去做大数据分析的,假设现在它们的日志都交给了你做的系......
  • 白话Kafka
     一、Kafka基础消息系统的作用应该大部份小伙伴都清楚,用机油装箱举个例子 所以消息系统就是如上图我们所说的仓库,能在中间过程作为缓存,并且实现解耦合的作用。引入一个场景,我们知道中国移动,中国联通,中国电信的日志处理,是交给外包去做大数据分析的,假设现在它们的日志都交......
  • docker swarm 集群部署Kafka3.5,彻底告别zookeeper
    介绍本次部署kafka3.5版本,彻底告别zookeeper时代,部署更加轻量,运维更加简单同时使用比较好用的kafka控制台redpandadatadockerswam集群搭建详见我的另一篇博客DockerSwarm集群搭建,不再这里赘述。docker-compose文件准备docker-compose-kafka3-cluster.ymlversi......
  • 都 2023 年了,你还在用 Kafka?快试试这个全新平台吧
    最近这个ApachePulsar消息中间件非常的火,号称云原生时代的下一代消息中件,今天,就一起来看看它到底有多牛逼?ApachePulsar简介ApachePulsar是一个企业级的分布式消息系统,最初由Yahoo开发并在2016年开源,目前正在Apache基金会下孵化。Plusar已经在Yahoo的生产环境使用了三年多,主要......