首页 > 其他分享 >Kafka—生产者和消费者的内部结构

Kafka—生产者和消费者的内部结构

时间:2023-07-02 21:12:57浏览次数:38  
标签:记录 生产者 分区 写入 代理 Kafka 内部结构

 

生产者

将数据发布到 Kafka 主题的应用程序称为生产者。应用程序集成了一个Kafka 客户端库来写入 Kafka。编写过程从创建 ProducerRecird开始。

 

Kafka Producers 中的组件/流程

  • 拦截器——可以在发送之前改变记录的拦截器,例如Claim-check-interceptor。
  • 生产者元数据——管理生产者所需的元数据:集群中的主题和分区、充当分区领导者的代理节点等。
  • 序列化器——将对象转换为字节数组的键/值序列化器。
  • Partitioner — 计算给定记录的分区。如果 ProducerRecord 中指定了分区,则分区器将返回相同的分区,否则,它将根据分区策略(轮询、哈希键或自定义分区)为消息键选择分区。org.apache.kafka.clients.producer.internals.DefaultPartitioner, org.apache.kafka.clients.producer.RoundRobinPartitioner, org.apache.kafka.clients.producer.UniformStickyPartitioner, org.apache.kafka.clients.producer.Partitioner (Inteface)
  • Record Accumulator - 累积记录并按主题分区将它们分组为批次。一批未发送的记录保存在缓冲存储器中。一个单独的 I/O 线程负责将这些批次的记录作为请求发送到 Kafka 代理。
  • 事务管理器——管理事务并维护必要的状态以确保幂等生产。
  • 通道选择器——创建一个网络客户端来与代理建立通信。

生产者确认设置

Kafka 生产者将数据写入分区的当前领导代理。如果我们希望消息在被视为成功写入之前必须写入最少数量的副本,我们需要设置在被视为成功写入之前acks需要确认收到消息的代理数量。

 

注意:当acks=all使用 a时replication.factor=N,min.insync.replicas=M我们可以容忍N-M代理出于主题可用性的目的而关闭。

生产者重试

 

重试
delivery.timeout.ms
retry.backoff.ms

max.in.flight.requests.per.connection

幂等生产者

重试发送失败消息的重复风险很小。如果数据被复制到 ISR 但确认未到达生产者并因此重试,则可能会发生这种情况。为了避免这种情况,Kafka 使用了不断增加的 PID 序列。Kafka 总是采用成功写入的最大 PID-Sequence Number 组合。当接收到较低的序列号时,将其丢弃。

enable.idempotence=true
acks=all

Kafka 消息压缩

compression.type
none、gzip、lz4、snappy 和 zstd

如果我们使用生产者级压缩,那么我们应该将代理级设置设置为compression.type=producer. 如果生产者级别压缩和代理级别压缩不匹配,代理将解压并再次压缩。

Kafka 生产者批处理

linger.ms — 发送批次前的等待时间
batch.size — 批次中包含的最大字节数

Broker

 

  • 生产者记录落在套接字接收缓冲区上。网络线程之一拾取消息并将其传递到共享请求队列。
  • 记录由 I/O 线程拾取。它验证数据的CRC。然后将记录写入提交日志。
  • I/O 线程将响应逻辑交给 Purgatory map(管理延迟操作的代理)。此映射等待其他代理确认写入 (ISR)。这个映射是用 ConcurrentHashMap 和 ConcurrentLinkedQueue 实现的。
  • 复制消息后,响应将被放入响应队列。
  • 网络线程从队列中拉取响应并将其放入套接字发送缓冲区。

消费者

从 Kafka 主题读取数据的应用程序称为消费者。应用程序集成了一个 Kafka 客户端库来读取 Apache Kafka。消费者从一个或多个分区中读取,并在每个分区内维护排序。Kafka 消费者实现了“拉模型”。这意味着消费者向代理发送获取请求以获取数据。

 

从上图中,我们可以看到 Kafka 消费者中的以下组件。

  • Coordinator - 管理组成员,偏移量
  • 元数据——管理消费者所需的元数据:集群中的主题和分区、充当分区领导者的代理节点等。
  • 网络客户端——处理对代理的连接/请求
  • Fetcher — 从经纪人那里获取成批的记录。
  • 反序列化器——将字节数组转换为对象的键/值反序列化器。
  • 拦截器——可能改变记录的拦截器

消费者的交付语义

enable.auto.commit=true
auto.commit.interval.ms

  • 最多一次:收到消息后立即提交偏移量。如果出现错误,消息可能会丢失。
  • 至少一次:在处理完消息后提交偏移量。可能导致多次读取。确保消息处理是幂等的。
  • 恰好一次:仅在 Kafka → Kafka 与事务一起流动。

其他配置:

fetch.min.bytes
fetch.max.wait.ms

增加 fetch.min.bytes 和时间将导致吞吐量增加,而减少它将导致更好的延迟。

标签:记录,生产者,分区,写入,代理,Kafka,内部结构
From: https://www.cnblogs.com/nifrecxgh/p/17521392.html

相关文章

  • Kafka-核心设计和实现原理,生产者和消费者详述
    1.体系架构Producer:生产者Consumber:消费者Broker:服务代理节点(kafka实例)2.消息存储主题(Topic):kafka消息以topic为单位进行归类,逻辑概念分区(Partition):Topic-Partition为一对多分区在存储层面可看做是一个可追加的日志文件消息在追加到分区时会分配一个特定的偏移量(offset)作为在此分区......
  • Kafka-核心设计和实现原理,生产者和消费者详述
    1.体系架构 Producer:生产者Consumber:消费者Broker:服务代理节点(kafka实例) 2.消息存储主题(Topic):kafka消息以topic为单位进行归类,逻辑概念分区(Partition):Topic-Partition为一对多分区在存储层面可看做是一个可追加的日志文件消息在追加到分区时会分配一个特定的......
  • kafka入门必备知识
    1.Kafka是一个分布式流处理平台:可以让你发布和订阅流式的记录。这一方面与消息队列或者企业消息系统类似。可以储存流式的记录,并且有较好的容错性。可以在流式记录产生时就进行处理。2.消息系统:定义将数据从一个应用程序传递到另一个应用程序,通过提供消息传递和消......
  • (一)kafka从入门到精通之初识kafka
    一、发布订阅系统在学习kafka之前,我们先来看看什么是发布订阅系统。概念数据的发送者不会直接把消息发送给接收者,这是发布与订阅消息系统的一个特点。发布者以某种方式对消息进行分类,接受者订阅它们,以便接受特定类型的消息。发布与订阅系统一般会有一个broker,也就是发布消息的......
  • (二)kafka从入门到精通之kafka的优势
    学习传送门(一)kafka从入门到精通之初识kafka一、常用消息队列比较基于发布与订阅的消息系统那么多,为什么Kafka会是一个更好的选择呢?咱们先来简单的看看mq的一个对比图吧。特性ActiveMQRabbitMQRocketMQKafka生产者消费者模式支持支持支持支持发布订阅......
  • Kafka中的消费者Offset
    消费者位移每个consumer实例都会为它消费的分区维护属于自己的位置信息来记录当前消费了多少条消息。这在Kafka中有一个特有的术语:位移(offset)。相比较将offset保存在服务器端(broker),这样虽然简单,但是有如下的问题:broker变成了有状态的,增加了同步成本,影响伸缩性。需......
  • SpringBoot整合Kafka
    1、安装kafka这里我是用的是docker-compose方式安装(1)安装docker和docker-composesudoyuminstall-yyum-utilssudoyum-config-manager\--add-repo\https://download.docker.com/linux/centos/docker-ce.reposudoyuminstalldocker-cedocker-ce-clico......
  • 18、【SparkStreaming】object not serializable (class: org.apache.kafka.clients.c
    背景:当SparkStream连接kafka,消费数据时,报错:objectnotserializable(class:org.apache.kafka.clients.consumer.ConsumerRecord,value:ConsumerRecord分析:消费者的消费记录序列化出现了问题,需要正确的进行序列化。措施:在设置sparkconf的时候,指定序列化方式就可以解......
  • 白话Kafka
    一、Kafka基础消息系统的作用应该大部份小伙伴都清楚,用机油装箱举个例子所以消息系统就是如上图我们所说的仓库,能在中间过程作为缓存,并且实现解耦合的作用。引入一个场景,我们知道中国移动,中国联通,中国电信的日志处理,是交给外包去做大数据分析的,假设现在它们的日志都交给了你做的系......
  • 白话Kafka
     一、Kafka基础消息系统的作用应该大部份小伙伴都清楚,用机油装箱举个例子 所以消息系统就是如上图我们所说的仓库,能在中间过程作为缓存,并且实现解耦合的作用。引入一个场景,我们知道中国移动,中国联通,中国电信的日志处理,是交给外包去做大数据分析的,假设现在它们的日志都交......