首页 > 其他分享 >第二篇:深入剖析Kafka生产者的架构和原理

第二篇:深入剖析Kafka生产者的架构和原理

时间:2024-05-26 10:30:54浏览次数:16  
标签:架构 生产者 数据 Kafka 发送 参数 acks 第二篇

大家好!今天我们来深入探讨一下Kafka生产者的架构和原理。Kafka生产者是数据流入Kafka集群的起点,其设计和实现直接影响消息传输的可靠性和性能。本文将通过示例代码和源码剖析,带大家全面了解Kafka生产者的参数、整体架构、元数据更新过程等内容。准备好了吗?让我们开始吧!

文章目录

一、Kafka生产者的参数和作用

1. 生产者示例

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaProducerExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key", "value");
        RecordMetadata metadata = producer.send(record).get();

        System.out.printf("Sent record to topic %s partition %d with offset %d%n",
                metadata.topic(), metadata.partition(), metadata.offset());

        producer.close();
    }
}

2. 重要参数和作用

  • bootstrap.servers: Kafka集群的地址,用于初始化连接。
  • key.serializervalue.serializer: 消息键和值的序列化器。
  • acks: 生产者在收到服务器确认前的消息数量。all表示所有副本都收到确认。
  • retries: 如果发送失败,生产者会尝试重新发送消息的次数。
  • batch.size: 批量发送消息的大小。
  • linger.ms: 消息发送前等待的时间。
  • buffer.memory: 生产者可用于缓冲的内存总量。

我们来深入剖析Kafka生产者的重点参数:

acks参数

在Kafka生产者配置中,acks参数决定了消息发送的确认机制。这个参数直接影响消息的可靠性、延迟和吞吐量。下面详细介绍acks参数的各种值及其代表的含义。

acks 参数的含义

1. acks=0

acks设置为0时,生产者在发送消息后不会等待任何来自服务器的确认。即使消息在网络传输过程中丢失或服务器未能成功接收,生产者也不会得到任何通知。

  • 优点:极低的延迟,生产者无需等待确认即可继续发送下一条消息,适用于对数据丢失不敏感的场景。
  • 缺点:可靠性最差,可能会丢失消息,因为生产者不等待任何确认。
2. acks=1

acks设置为1时,生产者在发送消息后会等待Leader分区确认已收到消息。Leader分区确认后,生产者可以继续发送下一条消息。

  • 优点:延迟较低,Leader确认即可继续,适用于对可靠性有一定要求,但不需要最高可靠性的场景。
  • 缺点:如果Leader在确认后但在Follower同步前宕机,消息可能丢失。
3. acks=allacks=-1

acks设置为all或-1时,生产者在发送消息后会等待所有同步副本(Leader和所有ISR中的Follower)确认已收到消息。只有当所有副本都确认后,生产者才会继续发送下一条消息。

  • 优点:最高的可靠性,确保消息已被所有副本接收,适用于对数据丢失完全不可接受的场景。
  • 缺点:延迟较高,因为需要等待所有副本的确认,吞吐量可能降低。
acks源码解析

Kafka生产者在发送消息时,根据acks参数的值来决定是否等待以及等待哪些副本的确认。以下是相关源码解析:

public class KafkaProducer<K, V> implements Producer<K, V> {
    private final KafkaProducerConfig config;
    private final String acks;

    public KafkaProducer(Map<String, Object> configs) {
        this(new KafkaProducerConfig(configs));
    }

    KafkaProducer(KafkaProducerConfig config) {
        this.config = config;
        this.acks = config.getString(ProducerConfig.ACKS_CONFIG);
        // 初始化其他配置和组件
    }

    public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
        // 构建ProducerRequest并发送
        Node node = cluster.leaderFor(partition);
        if (acks.equals("0")) {
            // 不等待确认,直接返回
        } else if (acks.equals("1")) {
            // 只等待Leader确认
        } else if (acks.equals("all") || acks.equals("-1")) {
            // 等待所有同步副本确认
        }
    }
}
流程图展示
Producer发送消息 acks参数 acks=0 acks=1 acks=all或-1 不等待任何确认 等待Leader确认 等待所有同步副本确认 继续发送下一条消息

这里详细探讨了Kafka生产者中acks参数的意义及其对消息可靠性和延迟的影响。不同的acks值提供了不同级别的确认机制,允许开发者在性能和可靠性之间找到平衡。

retries和retry.backoff.ms参数

在Kafka生产者配置中,retriesretry.backoff.ms是两个重要参数,它们用于控制消息发送失败后的重试机制。这两个参数的配置直接影响消息发送的可靠性和性能。下面详细介绍这两个参数的含义及其作用。

retries参数的含义

1. retries参数

retries参数用于指定生产者在发送消息失败时重试的次数。网络故障、Kafka服务器暂时不可用等情况都可能导致消息发送失败,通过配置retries参数,可以提高消息发送的成功率。

  • 默认值:0(不进行重试)
  • 配置方法:通过在生产者的配置属性中设置retries参数值,例如props.put("retries", 3);表示最多重试3次。
2. 源码解析

Kafka生产者在发送消息时,如果发送失败且retries参数大于0,生产者会按照配置的次数进行重试。以下是相关源码解析:

public class KafkaProducer<K, V> implements Producer<K, V> {
    private final KafkaProducerConfig config;
    private final int retries;

    public KafkaProducer(Map<String, Object> configs) {
        this(new KafkaProducerConfig(configs));
    }

    KafkaProducer(KafkaProducerConfig config) {
        this.config = config;
        this.retries = config.getInt(ProducerConfig.RETRIES_CONFIG);
        // 初始化其他配置和组件
    }

    public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
        // 构建ProducerRequest并发送
        int attempts = 0;
        while (attempts <= retries) {
            try {
                // 尝试发送消息
                return doSend(record);
            } catch (Exception e) {
                if (attempts == retries) {
                    throw e;
                }
                attempts++;
                // 等待一段时间后重试
                Thread.sleep(retryBackoffMs);
            }
        }
    }
}

retry.backoff.ms参数的含义

retry.backoff.ms参数用于指定生产者在重试发送失败消息前等待的时间(以毫秒为单位)。配置该参数可以避免重试过于频繁导致的资源浪费。

  • 默认值:100ms
  • 配置方法:通过在生产者的配置属性中设置retry.backoff.ms参数值,例如props.put("retry.backoff.ms", 200);表示在重试前等待200ms。

流程图展示

消息发送重试流程图
发送消息 发送成功? 返回成功响应 是否达到最大重试次数? 返回失败响应 等待retry.backoff.ms 重试发送消息

这里详细探讨了Kafka生产者中retriesretry.backoff.ms参数的意义及其作用。retries参数控制生产者在消息发送失败时的重试次数,而retry.backoff.ms参数则指定了重试前的等待时间。合理配置这两个参数,可以提高消息发送的可靠性,同时避免资源浪费。希望这篇文章能帮助你更好地理解和配置Kafka生产者,以满足不同场景下的需求。

3. 生产者源码剖析

public KafkaProducer(Properties properties) {
    this(new KafkaProducerConfig(properties), null, null, null, null, Time.SYSTEM);
}

KafkaProducer(KafkaProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer, ...
    this.keySerializer = keySerializer == null ? config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class) : keySerializer;
    this.valueSerializer = valueSerializer == null ? config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class) : valueSerializer;
    ...
}

4. 流程图展示

KafkaProducer 配置参数 初始化序列化器 初始化分区器 初始化拦截器 启动生产者

二、Kafka生产者的整体架构

1. 整体架构剖析

Kafka生产者的架构由多个组件构成,包括序列化器、分区器、拦截器、缓冲区和元数据管理等。

2. 源码剖析

public KafkaProducer(Properties properties) {
    // 初始化配置、序列化器、分区器、拦截器
    this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
    this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
    this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
    this.interceptors = config.getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class);
    ...
}

3. 流程图展示

KafkaProducer 配置参数 初始化序列化器 初始化分区器 初始化拦截器 启动生产者

三、Kafka生产者的元数据更新过程

Kafka生产者在发送消息时,需要了解哪些Broker负责存储消息的分区。这些信息被称为元数据(Metadata),包括主题、分区、副本分配等。元数据的准确性和及时更新对生产者的性能和可靠性至关重要。这里将详细讲解Kafka生产者的元数据更新流程,帮助大家更好地理解Kafka内部的工作机制。

元数据更新的原理

元数据更新是指Kafka生产者从Kafka集群中获取最新的主题和分区信息的过程。生产者在以下几种情况下会触发元数据更新:

  1. 初次启动时,需要获取主题和分区信息。
  2. 发送消息时发现元数据过期或无效。
  3. 发送消息时收到Broker的NotLeaderForPartition异常,表示当前分区的Leader发生变化。

元数据更新的详细流程

1. 初始化元数据

Kafka生产者在初始化时,会创建一个Metadata实例,用于管理和缓存元数据。

public class KafkaProducer<K, V> implements Producer<K, V> {
    private final Metadata metadata;

    public KafkaProducer(Map<String, Object> configs) {
        this.metadata = new Metadata(
            config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
            config.getLong(ProducerConfig.METADATA_REFRESH_BACKOFF_CONFIG)
        );
    }
}

2. 获取元数据的流程

元数据更新流程主要分为以下几个步骤:

  1. 检查元数据有效性:生产者在发送消息前会检查当前缓存的元数据是否过期或无效。
  2. 请求元数据:如果元数据过期或无效,生产者会向Kafka集群发送元数据请求。
  3. 处理元数据响应:生产者接收到Kafka集群返回的元数据响应后,更新本地缓存。

3. 源码剖析

以下是元数据更新流程的关键源码:

public class KafkaProducer<K, V> implements Producer<K, V> {
    private final Metadata metadata;

    public KafkaProducer(Map<String, Object> configs) {
        this.metadata = new Metadata(
            config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
            config.getLong(ProducerConfig.METADATA_REFRESH_BACKOFF_CONFIG)
        );
    }

    public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
        // 检查元数据有效性
        ClusterAndWaitTime clusterAndWaitTime = metadata.fetch();
        Cluster cluster = clusterAndWaitTime.cluster;

        if (cluster.partitionsForTopic(record.topic()) == null) {
            // 元数据无效,触发元数据更新
            metadata.add(record.topic());
            metadata.requestUpdate();
        }

        // 获取分区信息
        int partition = partition(record, cluster);
        // 构建ProducerRequest并发送消息
    }
}

public class Metadata {
    private final long metadataExpireMs;
    private final long refreshBackoffMs;
    private final AtomicReference<Cluster> cluster = new AtomicReference<>(Cluster.empty());
    private final AtomicReference<Long> lastSuccessfulRefreshMs = new AtomicReference<>(0L);

    public Metadata(long metadataExpireMs, long refreshBackoffMs) {
        this.metadataExpireMs = metadataExpireMs;
        this.refreshBackoffMs = refreshBackoffMs;
    }

    public synchronized ClusterAndWaitTime fetch() {
        long now = System.currentTimeMillis();
        long elapsed = now - lastSuccessfulRefreshMs.get();
        if (elapsed > metadataExpireMs) {
            // 元数据过期,触发更新
            requestUpdate();
        }
        return new ClusterAndWaitTime(cluster.get(), Math.max(metadataExpireMs - elapsed, 0));
    }

    public synchronized void requestUpdate() {
        // 发送元数据请求
        // 获取元数据响应并更新本地缓存
    }
}

4. 流程图展示

KafkaProducer发送消息 元数据有效? 触发元数据更新 发送元数据请求 处理元数据响应 更新本地元数据缓存 获取分区信息 发送消息

元数据更新的触发条件

元数据更新的触发条件主要有以下几种:

  1. 元数据过期:超过配置的metadata.max.age.ms时间后,生产者认为元数据过期,需要重新获取。
  2. 主题新增或删除:当生产者发现新的主题或已有主题被删除时,需要更新元数据。
  3. 分区Leader变化:当生产者发送消息时,发现某个分区的Leader发生变化,需要更新元数据。

元数据更新的重要参数

  • metadata.max.age.ms:元数据的最大有效时间,默认值为300000ms(5分钟)。超过此时间后,元数据被认为过期,生产者会触发元数据更新。
  • metadata.refresh.backoff.ms:元数据更新的退避时间,默认值为100ms。当元数据更新失败时,生产者会等待该时间后再次尝试更新。

示例代码

生产者示例

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaProducerExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("acks", "all");
        props.put("retries", 3);
        props.put("retry.backoff.ms", 200);
        props.put("metadata.max.age.ms", 300000); // 设置元数据的最大有效时间为5分钟
        props.put("metadata.refresh.backoff.ms", 100); // 设置元数据更新的退避时间为100ms

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key", "value");
        RecordMetadata metadata = producer.send(record).get();

        System.out.printf("Sent record to topic %s partition %d with offset %d%n",
                metadata.topic(), metadata.partition(), metadata.offset());

        producer.close();
    }
}

这里我们详细探讨了Kafka生产者的元数据更新流程。元数据的及时更新对于确保消息能够正确地发送到目标分区至关重要,还介绍了元数据更新的触发条件和重要参数配置。

四、总结

通过本文的介绍,我们详细探讨了Kafka生产者的架构和原理。我们从生产者的参数和作用入手,介绍了Kafka生产者的整体架构、元数据更新过程,最后列举了生产者的重要参数及其作用。希望这篇文章能为你在使用Kafka生产者时提供有价值的参考。

如果本文对您有所帮助的话,请收藏文章、关注作者、订阅专栏,感激不尽。

标签:架构,生产者,数据,Kafka,发送,参数,acks,第二篇
From: https://blog.csdn.net/wjm1991/article/details/139208575

相关文章

  • TypeScript 学习笔记(十一):TypeScript 与微服务架构的结合应用
    TypeScript学习笔记(十一):TypeScript与微服务架构的结合应用1.引言在前几篇学习笔记中,我们探讨了TypeScript的基础知识、前后端框架的结合应用、测试与调试技巧、数据库以及GraphQL的结合应用。本篇将重点介绍TypeScript与微服务架构的结合应用,包括如何使用TypeSc......
  • 动态地控制kafka的消费速度,从而满足业务要求
    kafka是一个分布式流媒体平台,它可以处理大规模的数据流,并允许实时消费该数据流。在实际应用中,我们需要动态控制kafka消费速度,以便处理数据流的速率能够满足系统和业务的需求。本文将介绍如何在kafka中实现动态控制消费速度的方法。1.消费者配置在Kafka中,消费者可以使用以下参......
  • 第二周 架构
    第二周总结各系统版本的zabbix安装。常用安装:包安装、二进制安装、源码编译安装、docker容器安装如果需要中文建议现在linux安装中文包:#CentOS安装中文包,再修改语言,否则无法选择[root@zabbix-server~]#yum-yinstalllangpacks-zh_CN#Ubuntu安装下面中文包[root@zabb......
  • 第三周 架构
    第三周1、在两台服务器上安装kvm虚拟化,把其中一台的虚拟机迁移到另一台。冷迁移虚拟机:将一个宿主机的处于关机状态的虚拟机迁移到另一台宿主机,注意:不支持Ubuntu和Rocky8宿主机之间迁移#在一台目标宿主机安装相关虚拟化软件[root@ubuntu2004~]#aptupdate[root@ubuntu2004......
  • 工业组态软件Intouch(单机版)入门{第二篇}
    Intouch激活授权步骤(以2014r2sp1版本(硬件狗+lic文件)和2017u3版本(XML文件)为例)Intouch授权一共分为两种。1、老版本配合硬件加密狗的授权方式。2、2017版本开始的XML文件授权方式。1、2017版之前硬件加密狗授权的方式。*第1步:*安装好软件之后,找到一个叫ArchestrALicenseM......
  • 大模型最新黑书:大模型应用解决方案: 基于GPT-3、ChatGPT、GPT-4等Transformer架构的自
    今天给大家推荐一本丹尼斯·罗斯曼(DenisRothman)编写的关于大语言模型(LLM)权威教程<<大模型应用解决方案>基于GPT-3、ChatGPT、GPT-4等Transformer架构的自然语言处理>!Google工程总监AntonioGulli作序,这含金量不用多说,在这里给大家强烈推荐一下这本黑书,下面直接开始介绍!......
  • 基于附带Attention机制的seq2seq模型架构实现英译法的案例
    模型架构先上图我们这里选用GRU来实现该任务,因此上图的十个方框框都是GRU块,如第二张图,放第一张图主要是强调编码器的输出是作用在解码器每一次输入的观点,具体的详细流程图将在代码实现部分给出。编码阶段1.准备工作要用到的数据集点此下载,备用地址,点击下载导入相关的......
  • 企业生产环境中的麒麟V10(ARM架构)操作系统部署jdk和redis三主三从交叉版集群
    前言:麒麟ARM操作系统是国企和政务机关推行信创化选择率比较高的一款操作系统,然而ARM操作系统非主流的X86系统,除了命令一样,在架构方面差别极大,初次接触多多少少会踩坑,下面我将在公司中部署的实例列举出来,供大家参考,ip和设计机密信息不方便展示,统用虚拟信息代替。经过多次验证,用了......
  • SpringCloud + Python 混合微服务架构,打造AI分布式业务应用的技术底层
    文章很长,且持续更新,建议收藏起来,慢慢读!疯狂创客圈总目录博客园版为您奉上珍贵的学习资源:免费赠送:《尼恩Java面试宝典》持续更新+史上最全+面试必备2000页+面试必备+大厂必备+涨薪必备免费赠送:《尼恩技术圣经+高并发系列PDF》,帮你实现技术自由,完成职业升级,薪......
  • 基于Android Room的三层架构设计与实现
    摘要本文探讨了在Android应用中实现三层架构的设计思路与具体实现,采用Kotlin语言及AndroidStudio开发工具。通过对Room数据库的集成,展示了数据层、业务层和表现层的详细代码和实现原理。本文旨在为开发者提供一个清晰的参考示例,以便在实际开发中高效地构建可维护、扩展性强的应......