首页 > 其他分享 >Kafka生产者

Kafka生产者

时间:2023-03-24 12:33:36浏览次数:32  
标签:重试 生产者 分区 Kafka 发送 消息

生产者创建消息。在其他基于发布与订阅的消息系统中,生产者可能被称为发布者 或 写入者。

一般情况下,一个消息会被发布到一个特定的主题上。生产者在默认情况下把消息均衡地分布到主题的所有分区上,而并不关心特定消息会被写到哪个分区。不过,在某些情况下,生产者会把消息直接写到指定的分区。这通常是通过消息键和分区器来实现的,分区器为键生成一个散列值,并将其映射到指定的分区上。这样可以保证包含同一个键的消息会被写到同一个分区上。生产者也可以使用自定义的分区器,根据不同的业务规则将消息映射到分区。

生产者发送消息的方式

生产者发送消息主要有 2 种方式:同步发送消息、异步发送消息

同步发送消息

同步发送消息:我们调用 KafkaProducer 的 send() 方法发送消息,send() 方法会返回一个包含 RecordMetadata 的 Future 对象,然后调用 Future 的 get() 方法等待 Kafka 响应,通过 Kafka 的响应,我们就可以知道消息是否发送成功。

  • 如果服务器返回错误,Future 的 get() 方法会抛出异常。
  • 如果没有发生错误,我们会得到一个 RecordMetadata 对象,这个对象包含消息的目标主题、分区信息和消息的偏移量等信息。

我们调用 KafkaProducer 的 send() 方法发送 ProducerRecord 对象,消息先是被放进缓冲区,然后使用单独的线程将消息发送到服务器端。


异常处理

如果在发送数据之前或者在发送过程中发生了任何错误,比如 broker 返回了一个不允许重发消息的异常或者已经超过了重发的次数,那么就会抛出异常。在发送消息之前,生产者也是有可能发生异常的。这些异常有可能是 SerializationException(说明序列化消息失败)、BufferExhaustedException 或 TimeoutException(说明缓冲区已满),又或者是 InterruptException(说明发送线程被中断)。

KafkaProducer 一般会发生两类错误。

  • 其中一类是可重试错误,这类错误可以通过重发消息来解决。比如对于连接错误,可以通过再次建立连接来解决,“无主(no leader)”错误则可以通过重新为分区选举首领来解决。KafkaProducer 可以被配置成自动重试,如果在多次重试后仍无法解决问题,应用程序会收到一个重试异常。
  • 另一类错误无法通过重试解决,比如“消息太大”异常。对于这类错误,KafkaProducer 不会进行任何重试,直接抛出异常。
public void send(String topic, String key, String val) {

    ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, key, val);

    try {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(producerRecord);
        SendResult<String, String> sendResult = future.get();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

异步发送消息

异步发送消息:我们调用 KafkaProducer 的 send() 方法,并指定一个回调方法,在服务器返回响应时调用该方法。

大多数时候,我们并不需要等待响应。不过在遇到消息发送失败时,我们需要抛出异常、记录错误日志,或者把消息写入“错误消息”文件以便日后分析。为了在异步发送消息的同时能够对异常情况进行处理,生产者提供了回调支持。

为了使用回调,需要一个实现了 org.apache.kafka.clients.producer.Callback 接口的类,这个接口只有一个 onCompletion() 方法。如果 Kafka 返回一个错误,onCompletion() 方法会抛出一个非空异常。通过 onCompletion() 方法抛出的异常,我们可以对发送失败的消息进行处理。一般情况下,因为生产者会自动进行重试,所以就没必要在代码逻辑里处理那些可重试的错误。你只需要处理那些不可重试的错误或重试次数超出上限的情况。

private class DemoProducerCallback implements Callback {
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if (e != null) {
            e.printStackTrace();
        }
    }
}

ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA");

producer.send(record, new DemoProducerCallback());

分区器

介绍分区

ProducerRecord 对象包含目标主题、消息键和值(消息)。

  • 如果消息键为 null,并且使用了默认的 DefaultPartitioner 分区器,那么分区器使用粘性分区策略(UniformSticky),会随机选择一个分区,并尽可能一直使用该分区,等到该分区的 batch 已满或者已完成,Kafka 再随机一个分区进行使用(保证和上一次的分区不同)。
  • 如果消息键不为 null,并且使用了默认的 DefaultPartitioner 分区器,那么分区器会对消息键进行散列(使用 Kafka 自己的散列算法,即使升级 Java 版本,散列值也不会发生变化),然后根据散列值把消息映射到特定的分区上(散列值 与 主题的分区数进行取余得到 partition 值)。

这里的关键之处在于,同一个键总是被映射到同一个分区上,所以在进行映射时,我们会使用主题的所有分区,而不仅仅是可用的分区。这也意味着,如果写入数据的分区是不可用的,那么就会发生错误。

只有在不改变主题分区数量的情况下,键与分区之间的映射才能保持不变。一旦主题增加了新的分区,那么键与分区之间的映射关系就改变了。如果要使用键来映射分区,那么最好在创建主题的时候就把分区规划好,而且永远不要增加新分区。

自定义分区策略

生产者可以使用自定义的分区器,根据不同的业务规则将消息映射到分区。

通过分区器实现自定义分区策略的步骤:

  1. 定义一个类,该类实现 Partitioner 接口(分区器)
  2. 配置生产者(KafkaProducer),让生产者发送消息时使用自定义的分区器:properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());
public class MyPartitioner implements Partitioner {

    /**
     * 返回信息对应的分区
     *
     * @param topic      主题
     * @param key        消息的 key
     * @param keyBytes   消息的 key 序列化后的字节数组
     * @param value      消息的 value
     * @param valueBytes 消息的 value 序列化后的字节数组
     * @param cluster    集群元数据可以查看分区信息
     * @return
     */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

        if ((keyBytes == null) || (!(key instanceof String))) {
            throw new InvalidRecordException("We expect all messages to have String type as key");
        }

        // 实现自己的分区策略
        // 返回数据写入的分区号
        return 0;
    }

    // 关闭资源
    @Override
    public void close() {
    }

    // 配置方法
    @Override
    public void configure(Map<String, ?> configs) {
    }
}

参考资料

《Kafka 权威指南》第 3 章:Kafka 生产者——向 Kafka 写入数据

标签:重试,生产者,分区,Kafka,发送,消息
From: https://blog.51cto.com/haofeiyu/6147074

相关文章

  • kafka
    kafka使用场景消息传递Messaging消息传递就是发送数据,作为TCPHTTP或者RPC的替代方案,可以实现异步、解耦、削峰(RabbitMQ和RocketMQ能做的事情,它也能做)。因为K......
  • kafka集群原理及部署
    官方地址https://kafka.apache.org/概述Kakfa起初是由LinkedIn公司开发的一个分布式的消息系统,后成为Apache的一部分,它使用Scala编写,以可水平扩展和高吞吐率而被广泛使......
  • Apache Kafka JNDI注入(CVE-2023-25194)漏洞复现浅析
    关于ApacheKafka是一个开源的分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和任务关键型应用程序。影响版本2.4.0<=Apachekafka<=3.2.2环境......
  • springcloud Stream整合rabbitmq消息驱动生产者踩坑
    消息驱动之生产者8801(踩坑记录)1.首先说一下情况,我是跟着尚硅谷周阳老师的springcloud2020教程学习的,前面也踩了不少坑,但是这个坑,是我找的比较久的坑了,所以希望大家能直......
  • 初识Kafka
    介绍KafkaKafka是一款基于发布与订阅的消息系统。用生产者客户端API向Kafka生产消息,用消费者客户端API从Kafka读取这些消息。Kafka使用Zookeeper保存元数......
  • kafka 性能优化与常见问题优化处理方案
    本文为博主原创,未经允许不得转载:1.  JVM参数优化设置kafka是scala语言开发,运行在JVM上,需要对JVM参数合理设置,修改bin/kafka-start-server.sh中的jvm设置exportKAFKA_HEAP......
  • kafka消息堆积,consumer掉线
    注:本文转自:https://www.toutiao.com/article/7160323779812983296/?log_from=5abd712547149_1679497545032线上kafka消息堆积,所有consumer全部掉线,到底怎么回事?最近处理......
  • Linux 部署:kafka(虚拟机集群)
    参考文档:https://blog.csdn.net/wt334502157/article/details/116518259目录1.节点规划2.部署kafka集群3.修改配置4.附录1.节点规划节点ipvm8110.99.0.8......
  • kafka的基本概念
    1BrokerKafka集群包含一个或多个服务器,服务器节点称为broker。如图,我们有2个broker,6个partition,则会均分;如果只有1个partition,那么另一个broker会闲置。理想情况,我们......
  • golang解决kafka消息重复发送和重复消费
    1、解决消息重复发送当使用Kafka生产者发送消息时,可以设置消息的Key,使用Key来保证相同Key的消息不会被重复发送。在发送消息时,可以使用带Key的消息发送方式,如下所示:msg......