生产者
将数据发布到 Kafka 主题的应用程序称为生产者。应用程序集成了一个Kafka 客户端库来写入 Kafka。编写过程从创建 ProducerRecird开始。
Kafka Producers 中的组件/流程
- 拦截器——可以在发送之前改变记录的拦截器,例如Claim-check-interceptor。
- 生产者元数据——管理生产者所需的元数据:集群中的主题和分区、充当分区领导者的代理节点等。
- 序列化器——将对象转换为字节数组的键/值序列化器。
- Partitioner — 计算给定记录的分区。如果 ProducerRecord 中指定了分区,则分区器将返回相同的分区,否则,它将根据分区策略(轮询、哈希键或自定义分区)为消息键选择分区。org.apache.kafka.clients.producer.internals.DefaultPartitioner, org.apache.kafka.clients.producer.RoundRobinPartitioner, org.apache.kafka.clients.producer.UniformStickyPartitioner, org.apache.kafka.clients.producer.Partitioner (Inteface)
- Record Accumulator - 累积记录并按主题分区将它们分组为批次。一批未发送的记录保存在缓冲存储器中。一个单独的 I/O 线程负责将这些批次的记录作为请求发送到 Kafka 代理。
- 事务管理器——管理事务并维护必要的状态以确保幂等生产。
- 通道选择器——创建一个网络客户端来与代理建立通信。
生产者确认设置
Kafka 生产者将数据写入分区的当前领导代理。如果我们希望消息在被视为成功写入之前必须写入最少数量的副本,我们需要设置在被视为成功写入之前acks需要确认收到消息的代理数量。
注意:当acks=all使用 a时replication.factor=N,min.insync.replicas=M我们可以容忍N-M代理出于主题可用性的目的而关闭。
生产者重试
重试
delivery.timeout.ms
retry.backoff.ms
max.in.flight.requests.per.connection
幂等生产者
重试发送失败消息的重复风险很小。如果数据被复制到 ISR 但确认未到达生产者并因此重试,则可能会发生这种情况。为了避免这种情况,Kafka 使用了不断增加的 PID 序列。Kafka 总是采用成功写入的最大 PID-Sequence Number 组合。当接收到较低的序列号时,将其丢弃。
enable.idempotence=true
acks=all
Kafka 消息压缩
compression.type
none、gzip、lz4、snappy 和 zstd
如果我们使用生产者级压缩,那么我们应该将代理级设置设置为compression.type=producer. 如果生产者级别压缩和代理级别压缩不匹配,代理将解压并再次压缩。
Kafka 生产者批处理
linger.ms — 发送批次前的等待时间
batch.size — 批次中包含的最大字节数
Broker
- 生产者记录落在套接字接收缓冲区上。网络线程之一拾取消息并将其传递到共享请求队列。
- 记录由 I/O 线程拾取。它验证数据的CRC。然后将记录写入提交日志。
- I/O 线程将响应逻辑交给 Purgatory map(管理延迟操作的代理)。此映射等待其他代理确认写入 (ISR)。这个映射是用 ConcurrentHashMap 和 ConcurrentLinkedQueue 实现的。
- 复制消息后,响应将被放入响应队列。
- 网络线程从队列中拉取响应并将其放入套接字发送缓冲区。
消费者
从 Kafka 主题读取数据的应用程序称为消费者。应用程序集成了一个 Kafka 客户端库来读取 Apache Kafka。消费者从一个或多个分区中读取,并在每个分区内维护排序。Kafka 消费者实现了“拉模型”。这意味着消费者向代理发送获取请求以获取数据。
从上图中,我们可以看到 Kafka 消费者中的以下组件。
- Coordinator - 管理组成员,偏移量
- 元数据——管理消费者所需的元数据:集群中的主题和分区、充当分区领导者的代理节点等。
- 网络客户端——处理对代理的连接/请求
- Fetcher — 从经纪人那里获取成批的记录。
- 反序列化器——将字节数组转换为对象的键/值反序列化器。
- 拦截器——可能改变记录的拦截器
消费者的交付语义
enable.auto.commit=true
auto.commit.interval.ms
- 最多一次:收到消息后立即提交偏移量。如果出现错误,消息可能会丢失。
- 至少一次:在处理完消息后提交偏移量。可能导致多次读取。确保消息处理是幂等的。
- 恰好一次:仅在 Kafka → Kafka 与事务一起流动。
其他配置:
fetch.min.bytes
fetch.max.wait.ms
增加 fetch.min.bytes 和时间将导致吞吐量增加,而减少它将导致更好的延迟。
标签:记录,生产者,分区,写入,代理,Kafka,内部结构 From: https://blog.51cto.com/u_16173760/6607112