首页 > 其他分享 >Kafka 的生产者

Kafka 的生产者

时间:2024-10-29 16:42:44浏览次数:1  
标签:topic 生产者 分区 Kafka 发送 消息

Kafka的生产者

1. 生产者的执行流程

生产者客户端由两个线程协调运行,这两个线程分别为主线程和Sender线程(发送线程),其中,

  • 在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称消息收集器)中。
  • Sender线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。

我们从创建一个 KafkaProducer对象开始,它将创建一个ProducerRecord对象。这个对象是Kafka中的一个核心类、它代表生产者发送到Kafka服务器端的一个消息对象,即个Key-Value 的键值对。在ProducerRecord对象中,包含如下信息。

  • Kafka 服务器端的主题名称(Topic Name)
  • Topic 中可选的分区号
  • 时间戳。
  • 其他 Kev-Value 键值对。

其中,十分重要的就是Kafka服务器端的主题名称。

ProducerRecord创建成功后,需要经过拦截器、序列化器将其转换为字节数组,这样它们才能够在网络上传输,然后消息到达分区器。分区器的作用是根据发送过程中指定的有效的分区号,将ProducerRecord发送到该分区;如果没有指定Topic中的分区号,则会根据 Key进行 Hash 运算,将ProducerRecord 映射到一个对应的分区。

ProducerRecord 默认采用当前的时间,用户也在创建ProducerRecord 的时候提供一个时间戳。Kafka 最终使用的时间戳取决于Topic 的配置,而Topic 时间戳的配置主要有以下两种:

  • CreateTime 表示使用生产者产生的时间戳作为Kafka最终的时间戳。
  • LogAppendTime 表示生产者记录中的时间戳将在消息添加到其日志中时,由Kafka Broker重写。

ProducerRecord 在经过主线程后,最终由发送线程发送到Kafka服务器端。Kafka Broker在收到消息时会返回一个响应,如果写入成功,则返回一个RecordMetaData 对象,其中包含主题和分区信息,以及记录在分区里的偏移量,上面两种时间戳类型也会返回给用户。如果写入失败,则返回一个错误。生产者在收到错误之后会尝试重新发送消息,几次后如果还是写入失败,就返回错误消息。

2. 自定义Kafka序列化器

import org.apache.kafka.common.serialization.Serializer;
import java.nio.charset.StandardCharsets;

/**
 * @author strind
 * @date 2024/10/29 15:32
 * @description 自定义kafka序列化器
 */
public class ProducerSerializer implements Serializer<String> {

    @Override
    public byte[] serialize(String topic, String data) {
        return data.getBytes(StandardCharsets.UTF_8);
    }
}

3. 生产者的消息发送模式

Kafka 生产者的消息发送主要有三种模式:发后即忘(fire-and-forget)、同步模式(sync)及异步模式(async)。

发后即忘:只管向Kafka中发送消息而并不用关心消息是否正确到达。在大多数情况下,这种发送方式没有什么问题,不过在某些时候(比如发生不可重试异常时)会造成消息的丢失。这种发送方式的性能最高,可靠性也最差。

同步模式:要实现同步的发送方式,可以利用返回的Future对象的阻塞等待Kafka的响应即可实现,直到消息发送成功。如果发生异常,就需要捕获异常并交由外层逻辑处理。

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

/**
 * @author strind
 * @date 2024/10/29 15:31
 * @description 生产者发送消息
 */
public class SimpleKafkaProducer {

    public static void main(String[] args) {
        Properties properties = new Properties();
        
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"ip:port,ip:port");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        Producer<String,String> producer = new KafkaProducer<>(properties);
        // 发后即忘
        producer.send(new ProducerRecord<>("test-topic","data"));

        // 同步发送
//        Future<RecordMetadata> future = producer.send(new ProducerRecord<>("test-topic", "data"));
//        RecordMetadata response = future.get();
//        if (response != null){
//            // do something 
//        }

        producer.close();
    }

}

异步模式:为了在异步发送消息的同时能够对异常情况进行处理,生产者提供了回调支持。一般在 send()方法中指定一个 Callback的回调函数,Kafka 在返回响应时调用该函数来实现异步的发送确认。

// 异步发送
producer.send(new ProducerRecord<>("test-topic","data"), (recordMetadata,exception)->{
    // recordMetadata 与 exception是 互斥的,只有一个为null
    if (exception == null){
        // 发送成功
    }else {
        // 发送失败
    }
});

4. 生产者的高级特性

4.1 分区机制

Kafka 的主题 Topic 是由分区组成的,而将Topic进行分区的主要目的就是提供负载均衡和容错的能力,以及实现系统的高伸缩性和高可用性。Kafka的消息组织方式实际上是三层结构:主题一分区一消息。主题下的每条消息只会保存在某一个分区中,而不会在多个分区中保存多份。在创建Topic 的时候可以指定每个分区的副本数,用于支持分区中消息的容错。

不同的分区能够放置在Kafka集群中不同的节点上,而生产者和消费者在产生消息和消费消息的时候,也都是针对分区进行的,这样每个节点的机器都能独立执行各自分区的读写请求处理,并且还可以通过添加新的Kafka节点来增加整体系统的吞吐量。

Kafka 提供了如下的几种分区策略(分区策略是决定生产者将消息发送到哪个分区的算法):

  • 默认分区策略:

    1. 如果记录中指定了分区,则可以直接使用。
    2. 如果记录中未指定分区,但指定了key值,则根据key 的 hash 值选择一个分区。
    3. 如果记录中未指定分区,也未指定key值,则以黏性分区策略选择一个分区。
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        if (keyBytes == null) {
            return this.stickyPartitionCache.partition(topic, cluster);
        } else {
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            int numPartitions = partitions.size();
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }
    
  • 轮询分区策略:
    如果 key 值为 null,并且使用了默认的分区器,Kafka会根据轮询策略将消息均匀地分布到各个分区上。

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        int nextValue = this.nextValue(topic);
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (!availablePartitions.isEmpty()) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return ((PartitionInfo)availablePartitions.get(part)).partition();
        } else {
            return Utils.toPositive(nextValue) % numPartitions;
        }
    }
    private int nextValue(String topic) {
        AtomicInteger counter = (AtomicInteger)this.topicCounterMap.computeIfAbsent(topic, (k) -> {
            return new AtomicInteger(0);
        });
        return counter.getAndIncrement();
    }
    
  • 黏性分区策略:
    黏性分区策略就像黏住这个分区一样,只要这个分区没有被填满,就会尽可能地坚持使用该分区。这种策略首先会选择单个分区发送所有无key的消息,一旦这个分区已填满,黏性分区策略就会随机选择另一个分区。

    public int partition(String topic, Cluster cluster) {
        Integer part = (Integer)this.indexCache.get(topic);
        return part == null ? this.nextPartition(topic, cluster, -1) : part;
    }
    
    public int nextPartition(String topic, Cluster cluster, int prevPartition) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        Integer oldPart = (Integer)this.indexCache.get(topic);
        Integer newPart = oldPart;
        if (oldPart != null && oldPart != prevPartition) {
            return (Integer)this.indexCache.get(topic);
        } else {
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            Integer random;
            if (availablePartitions.size() < 1) {
                random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
                newPart = random % partitions.size();
            } else if (availablePartitions.size() == 1) {
                newPart = ((PartitionInfo)availablePartitions.get(0)).partition();
            } else {
                while(newPart == null || newPart.equals(oldPart)) {
                    random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
                    newPart = ((PartitionInfo)availablePartitions.get(random % availablePartitions.size())).partition();
                }
            }
    
            if (oldPart == null) {
                this.indexCache.putIfAbsent(topic, newPart);
            } else {
                this.indexCache.replace(topic, prevPartition, newPart);
            }
    
            return (Integer)this.indexCache.get(topic);
        }
    }
    
  • 散列分区策略:

    如果键值不为 null,并且使用了默认的分区器,Kafka 会对键进行散列,然后根据散列值把消息映射到对应的分区上。

  • 自定义分区策略:

    只需实现接口org.apache.kafka.clients.producer.Partitioner,用户可以根据需要对数据使用不一样的分区策略,在自定义分区策略后,只需要在生产者的Properties中指定 ProducerConfig.PARTITIONER_CLASS_CONFIG 参数即可。

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

/**
 * @author strind
 * @date 2024/10/29 16:01
 * @description 分区策略
 */
public class MyPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
        Integer num = (Integer) key;
        if (num % 2 == 0){
            return 0;
        }
        return 1;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}
properties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());

4.2 压缩机制

Kafka发送消息的时候,可以在生产者端和Broker端进行消息的压缩。在一般情况建议采用的压缩机制是:生产者端负责压缩;Broker端负责保持;消费者端负责解压。Kafka采用这样的压缩机制,主要是使用CPU的时间去换磁盘存储的空间,以及网络IO的传输量。这样的做法可以以较小的CPU开销带来更少的磁盘占用或更少的网络IO 传输。

在目前的Kafka版本中,支持GZIP、Snappy和 LZ4 三种压缩方式。

properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");

4.3 拦截器

在KafkaProducer的主线程中可以创建一个或多个Producerlnterceptors(拦截器)。拦截器在生产者端和消费者端均可设置,拦截器主要用于实现生产者端和消费者端的定制化控制逻辑。

对生产者而言,拦截器需要实现 org.apache.kafka.clients.producer.ProducerInterceptor接口。

下面对接口中的三个方法做出必要的解释:

  1. onSend( )。
    该方法将在KafkaProducer.send方法的主线程中执行。KafkaProducer 确保在消息被序列化以前,调用ProducerInterceptor.onSend方法。用户可以在该方法中对消息进行操作,但最好不要修改消息所属的Topic和分区,否则会影响目标分区的计算。

  2. onAcknowledgement( )。

    该方法会在消息被确认应答之前或消息发送失败时调用。如果生产者采用的是异步发送机制,该方法通常是在生产者回调逻辑被触发之前被调用的。需要注意的是,该方法运行在生产者的 IO 线程中,如果在该方法中放入很重的逻辑,会影响生产者的消息发送性能

  3. close( )
    关闭拦截器之前,可以将一些资源清理工作放在close 方法中。
    需要注意的是,如果指定了多个连接器,生产者将按照指定顺序调用它们。如果拦截器中出现了异常,生产者会将异常的错误信息记录到错误日志中,而不是向上传递。

5. 生产者参数配置

Kafka 生产者端的配置参数,除了之前使用过的bootstrap.servers、key.serializer 和value.serializer 三个必须参数,还有很多可选的参数。

  1. acks
    这个参数用于指定 分区中必须有多少个副本成功接收到消息之后,才能向生产者返回这条消息写入是成功的。acks 参数的本质其实就是一个字符串。有三种类型的值。

    • acks=1。
      这是 acks 参数的默认值。Kafka的生产者将消息发送到Kafka的Broker 服务器端。只要 Topic 分区中的Leader 成功写入消息,就算该消息成功发送。
      如果在生产者写入消息的过程中,Leader分区所在的Broker出现了宕机,将会造成
      消息无法正常写入。在重新选举Leader 的过程中,生产者Producer 会受到一个服务器端返回的错误信息。生产者为了支持容错,避免消息的丢失,会尝试重新发送该消息。直至消息成功写入 Leader 分区。

      这里需要注意的是,如果消息已经成功写入Leader所在的分区,但还未同步至其他Follower 分区的时候,如果Leader分区所在的Broker 出现了宕机,这时候会造成写入Leader分区的消息丢失。所以在这种参数值的设置下,消息是可能丢失的

    • acks=0。
      在这种参数设置下,Kafka的生产者不需要等待任何服务器端的响应,所以这时Kafka 集群可以达到最大的吞吐量。如果消息从生产者发送到写入Kafka消息系统的过程中出现异常,比如Broker宕机,生产者将不会得到任何反馈信息,也不会重发消息,会导致消息丢失。

    • acks=-1 或acks=all。
      在这种参数设置下,Kafka集群将达到最高的可靠性。生产者发送完消息后,需要等待Leader 分区和所有Follower分区都成功写入消息后,才返回给生产者一个成功写入的应答响应。

  2. buffer.memory
    Kafka 生产者的 Sender 线程在将消息发送到Kafka 服务器端之前,会把消息缓存到内存中,这个参数就决定了消息缓存的内存大小,其默认值是32MB。如果生产者产生消息的速度大于将消息发送到服务器端的速度,那么生产者将会被阻塞,并最终导致生产者抛出一个RecordTooLargeException的异常。

在实际的生产环境下,应该根据实际情况进行测试最终决定 buffer.memory 参数值的大小。

  1. batch.size 。
    当 Kafka 客户端将多个消息发送到同一个分区的时候,生产者为了减少客户端与服务器端的请求交互,会尝试将消息批量打包在一起,进行统一发送,这样有助于提升客户端和服务器端的性能。该配置的默认批次大小(以字节为单位)是16384 字节。如果消息的内存大小大于该参数的配置,将不会进行批量打包的过程。
    通过提升 batch.size 的大小,可以允许更多数据缓冲在分区中,那么一次请求服务器端所发送出去的数据量就更多了,这样吞吐量可能会有所提升。但是这样会造成大量内存的浪费。反过来如果减小 batch.size 的大小,则会系统地降低吞吐量。如果将 batch.size 设置为0,则批处理机制被禁用。所以需要在这里按照生产环境的发消息速率,调节不同的batch.size的大小。

  2. compression.type。

    该参数指定给到 Topic 中数据的压缩类型,其有效值的设置可以是标准的压缩方式,例如,'gzip'、'snappy’、"z4'、'zstd,同时该参数也可以是'uncompressed',在这种设置下消息数将不会被压缩。

  3. client.id。
    当生产者向服务器端发送请求时,传递给服务器端ID字符串。通过这个ID字符串Kafka服务器端就可以追踪请求的资源,其本质就是将生产者及其请求的资源进行逻辑上的隔离。

  4. connections.max.idle.ms.
    当生产者不再往服务器端发送消息时,这个参数用来决定关闭生产者连接的时间阈值,其默认值是 9min。

  5. linger.ms.
    该参数决定消息在由生产者发送到服务器端之前,在客户端延长发送的时间。通过这样的延时发送机制,可以将多个消息组合成一个批处理进行统一发送。从本质上讲,该参数与上面提到的 batch.size 参数类似。合理设置 batch.size 参数和 linger.ms 参数,将很好地利用 到Kafka 批处理机制。把linger.ms设置得太小了,比如默认就是0ms,或者设置为5ms,那可能导致 Batch虽然设置了 32KB,但是经常是还没凑够 32KB的数据,5ms之后就直接强制 Batch将数据发送出去,这会导致你的 Batch 形同虚设,一直凑不满数据。

  6. max.block.ms.
    该配置控制KafkaProducer.send()和KafkaProducer.partitionsFor()将消息阻塞多长时间,此外也可能是因为缓冲区已满或元数据不可用,导致这些方法被阻止。在用户提供的序化程序或分区器中的锁定不会计入此超时,其默认值为60000ms。

  7. max.request.size .
    Kafka生产者能发送消息的最大值,默认值为1MB。此设置将限制生产者的单个请求中发送的消息批次数,以避免发送过大的请求。这个参数涉及其他一些相关参数,比如务器 Broker 端的 message.max.bytes 参数,如果 message.max.bytes 参数设置为 10,而 max.request.size 设置为20,这时候就可以造成生产者报错。

  8. retries 和 retry.backoff.ms .
    如果生产者出现了异常,或者消息没有成功写入Kafka的服务器端,生产者可以配置重试的参数值,通过生产者端的内部重试机制来执行恢复,并不是直接将异常抛出。如果重试达到设定次数,生产者才会放弃重试并抛出异常。retries参数的默认值是0。同时生产者的重试还与retry,backof.ms参数有关,该参数用来设定两次重试之间的时间间隔其默认值是 100ms,从而避免无效的频繁重试。在配置retries 和 retry.backoft.ms之前,可以设定总重试时间要大于异常恢复时间,最好先估算一下异常恢复时间,避免过早放弃重试。

  9. receive.buffer.bytes 。
    这个参数用来设置socket接收消息缓冲区的大小,该缓存区大小的默认值32KB。如果将其值设置为-1,则使用操作系统的默认值。、

  10. send.buffer.bytes.
    这个参数用来设置socket发送消息缓冲区的大小,默认值为128KB。与receive.buffer.bytes 参数一样,如果将其值设置为-1,则使用操作系统的默认值。

  11. request.timeout.ms.
    消息由生产者发出后,该参数用于决定生产者等待请求响应的最长时间,其默认值为40s。如果响应的时间超过了该参数的设置,客户端将按照重试策略进行重试。注意,这个参数值需要比 Broker 端的参数replica.lag.time.max.ms值要大,这样可以减少因客户端重试引起的消息重复的概率。

  12. reconnect.backoff.max.ms.
    该参数表示Kaka客户端重连的最大时间。每次连接失败,重连时间都会成指数级增加,每次增加的时间会存在20%的随机浮动,以避免连接风暴。

  13. reconnect.backoff.ms.
    该参数表示Kafka客户端每次重连时候的间隔时间。

  14. delivery.timeout.ms.
    当生产者调用send方法后,该参数用于指定客户端等待发送成功或失败报告时,客户端等待时间的上限。这个时间上限包含以下几部分。

    1. 一条消息在发送前的延时时间。
    2. 生产者等待服务器端Broker确认信息的等待时间,
    3. 失败时的重试时间
  15. transaction.timeout.ms.
    该参数表示生产者主动终止当前正在进行的操作之前,Kaka等待操作状态更新的最大时间,其默认值是1min。如果该值大于Broker 中max.transaction.timeout.ms 的设置,则请求失败,并报"InvalidTransactionTimeout"错误。

  16. transactional.id.
    在事务传递过程中该参数用于表示某个事务的ID。这样可以保证跨多个生产者会话时语义的可靠性。因为它允许客户端保证在开始任何新事务之前使用相同的Transactional ID的事务来完成。

  17. max.in.flight.requests.per.connection .
    该参数表示在消息被阻塞前,每个客户端上发送的未应答请求的最大数量,其默认值是5。注意,如果该参数值设置大于1,并且消息发送失败,则由于客户端的重试会增加消息重新排序的风险。

  18. metadata.max.age.ms .
    该参数表示当超过这个时间间隔时,系统就会更新元信息,其默认值5min。Kafka的元数据信息由ZooKeeper 维护,包含Topic 信息、副本信息、分区信息、Broker 信息。

  19. metadata.max.idle.ms.
    当Topic 处于空闲状态时,该参数用于控制生产者抓取 Topic 元信息的时间。

标签:topic,生产者,分区,Kafka,发送,消息
From: https://www.cnblogs.com/strind/p/18513822

相关文章

  • 运维监控丨16条常用的Kafka看板监控配置与告警规则
    本期我们针对企业运维监控的场景,介绍一些监控配置和告警规则。可以根据Kafka集群和业务的具体要求,灵活调整和扩展这些监控配置及告警规则。在实际应用场景中,需要综合运用多种监控工具(例如Prometheus、Grafana、Zabbix等)和告警机制,以保障Kafka集群的稳定性和可靠性。此外,定期审核并......
  • kafka服务挂掉排查
    kafka服务挂掉排查kafka运行一段时间后,某天突然挂掉了。通过排查日志得知:cd/data/kafka/kafka_2.13-3.3.1/logsls-alt排序下日志,找到挂掉那天的:viserver.log.2024-10-28-17定位到日志最后:[2024-10-2817:04:15,463]WARNStoppingservinglogsindir/tmp/kafka-l......
  • Flink + Kafka 实现通用流式数据处理详解
    Flink+Kafka实现通用流式数据处理详解在大数据时代,实时数据处理和分析成为企业快速响应市场变化、提高业务效率和优化决策的关键技术。ApacheFlink和ApacheKafka作为两个重要的开源项目,在数据流处理领域具有广泛的应用。本文将深入探讨Flink和Kafka的关系、它们在数据......
  • Kafka
    Kafka基础Kafka是一款流行分布式消息分布订阅系统,除Kafka之外还有MQ、Redis等。把消息队列视为一个管道,管道的两端分别是消息生产者(producer)和消息消费者(consumer),消息生产者产生日志等消息后可以发送到管道中,这时消息队列可以驻留在内存或者磁盘上,直到消费者来把它读走为......
  • Kafka基本概念
    消息队列常见场景系统解耦:重要操作完成后,发送消息到Kafka中,由别的服务系统来消费消息完成其他操作(将非核心业务拆分出去缩短核心业务的处理流程和时间)流量削峰:一般用于秒杀或抢购活动中,缓冲系统短时间内高流量带来的压力(防止瞬间流量打崩系统)异步处理:通过异步处理机......
  • Kafka 解决消息丢失、乱序与重复消费
    一、引言在分布式系统中,ApacheKafka作为一种高吞吐量的分布式发布订阅消息系统,被广泛应用于日志收集、流式处理、消息队列等场景。然而,在实际使用过程中,可能会遇到消息丢失、乱序、重复消费等问题,这些问题可能会影响系统的稳定性和可靠性。本文将深入探讨Kafka中这些问题......
  • Kafka学习笔记(已完结)
    Kafka消息中间件官网:https://kafka.apache.org/docker安装kafka教程:https://bugstack.cn/md/road-map/kafka.htmlKafka的几个概念生产者Producer消费者Consumer主题Topic分区Partition一个topic下可以有多个分区。当创建topic时,如果补置顶该topic的partition数量,那么默认......
  • Kafka
    Kafka是由Linkedin公司开发的,它是一个分布式的,支持多分区、多副本,基于Zookeeper的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系统。一消息队列介绍1.Kafka的基本术语消息:Kafka中的数据单元被称为消息,也被称为记录,可以把它看作数据库表中某一行......
  • 解决kafka3.0.0在windows下不能启动的问题
    看到一个问题,说在用java代码发送kafka消息的时候能指定一个partition参数:importorg.apache.kafka.clients.producer.ProducerRecord;publicclassKafkaProducerExample{publicstaticvoidmain(String[]args){Stringtopic="test";intparti......
  • 生产者消费者模型
     线程同步互斥锁(互斥量)条件变量生产/消费者模型一、互斥锁C++11提供了四种互斥锁:mutex:互斥锁。timed_mutex:带超时机制的互斥锁。recursive_mutex:递归互斥锁。recursive_timed_mutex:带超时机制的递归互斥锁。包含头文件:#include<mutex>1、mutex类1)加锁lock()互斥锁......