首页 > 其他分享 >Kafka-生产者性能调优

Kafka-生产者性能调优

时间:2023-10-31 12:44:19浏览次数:39  
标签:producer 生产者 batch Kafka 发送 调优 消息 props put

(一)参数调优

参数调优相关代码

在实际的kafka开发中,我们会发现,无论是生产者还是消费者,都需要构建一个Properties对象,里面设置了很多参数。在这段代码中有很多常用的参数配置,在线上使用时,我们要根据实际的数据量和数据大小来决定这些配置的具体值。

Properties props = new Properties();

//集群地址,多个服务器用","分隔 (必填参数)

props.put("bootstrap.servers", "192.168.72.21:9092,192.168.72.22:9092,192.168.72.23:9092");

//key、value的序列化,此处以字符串为例,使用kafka已有的序列化类 (必填参数)

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

//应答数

props.put("acks", "1");

//重新发送消息次数,到达次数返回错误

props.put("retries", 3);

//在Producer端用来存放尚未发送出去的Message的缓冲区大小

props.put("buffer.memory", 33554432);

//Producer会尝试去把发往同一个Partition的多个Requests进行合并,batch.size指明了一次Batch合并后Requests总大小的上限。如果这个值设置的太小,可能会导致所有的Request都不进行Batch。

props.put("batch.size", 163840);

//Producer默认会把两次发送时间间隔内收集到的所有Requests进行一次聚合然后再发送,以此提高吞吐量,而linger.ms则更进一步,这个参数为每次发送增加一些delay,以此来聚合更多的Message。

props.put("linger.ms", 5);

//请求超时时间

props.put("request.timeout.ms", "60000");

//开启压缩

props.put("compression.type","lz4");

1、acks设置应答数

在消息被认为是“已提交”之前,producer需要leader确认的produce请求的应答数。该参数用于控制消息的持久性,目前提供了3个取值:

acks = 0: 表示produce请求立即返回,不需要等待leader的任何确认。这种方案有最高的吞吐率,但是不保证消息是否真的发送成功。

acks = 1: 表示leader副本必须应答此produce请求并写入消息到本地日志,之后produce请求被认为成功。如果此时leader副本应答请求之后挂掉了,消息会丢失。这个方案,提供了不错的持久性保证和吞吐。

acks = -1(all): 表示分区leader必须等待消息被成功写入到所有的ISR副本(同步副本)中才认为produce请求成功。这种方案提供最高的消息持久性保证,但是理论上吞吐率也是最差的。

 

配置推荐:

如果要较高的持久性要求以及无数据丢失的需求,设置acks = -1。其他情况下设置acks = 1。

 

2、buffer.memory 设置缓存内存大小(吞吐量)

该参数用于指定Producer端用于缓存消息的缓冲区大小,单位为字节,默认值为:33554432合计为32M。kafka采用的是异步发送的消息架构,prducer启动时会首先创建一块内存缓冲区用于保存待发送的消息,然后由一个专属线程负责从缓冲区读取消息进行真正的发送。

 

消息持续发送过程中,当缓冲区被填满后,producer立即进入阻塞状态直到空闲内存被释放出来,这段时间不能超过max.blocks.ms设置的值,一旦超过,producer则会抛出TimeoutException 异常,因为Producer是线程安全的,若一直报TimeoutException,需要考虑调高buffer.memory 了。

用户在使用多个线程共享kafka producer时,很容易把 buffer.memory 打满。

 

3、 compression.type 设置压缩方式

producer压缩器,目前支持none(不压缩),gzip,snappy和lz4。

2016年8月,FaceBook开源了Ztandard。官网测试: Ztandard压缩率为2.8,snappy为2.091,LZ4 为2.101

4、 retries设置重试次数

producer重试的次数设置。重试时producer会重新发送之前由于瞬时原因出现失败的消息。瞬时失败的原因可能包括:元数据信息失效、副本数量不足、超时、位移越界或未知分区等。倘若设置了retries > 0,那么这些情况下producer会尝试重试。

5、 batch.size设置批次消息大小

producer都是按照batch进行发送的,因此batch大小的选择对于producer性能至关重要。producer会把发往同一分区的多条消息封装进一个batch中,当batch满了后,producer才会把消息发送出去。但是也不一定等到满了,这和另外一个参数linger.ms有关。默认值为16K,合计为16384.

6、 linger.ms设置

producer是按照batch进行发送的,但是还要看linger.ms的值,默认是0,表示不做停留。这种情况下,可能有的batch中没有包含足够多的produce请求就被发送出去了,造成了大量的小batch,给网络IO带来的极大的压力。

 

配置推荐:

为了减少了网络IO,提升了整体的TPS。假设设置linger.ms=5,表示producer请求可能会延时5ms才会被发送。

 

(二)代码调优

使用异步发送消息

// 设置生产者的批量发送参数

props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);

props.put(ProducerConfig.LINGER_MS_CONFIG, 5);

props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");

 

// 使用异步发送消息

producer.send(record, new Callback() {

public void onCompletion(RecordMetadata metadata, Exception e) {

if (e != null) {

e.printStackTrace();

} else {

System.out.println("Sent message: " + record.value() + ", offset: " + metadata.offset());

}

}

});

 

标签:producer,生产者,batch,Kafka,发送,调优,消息,props,put
From: https://www.cnblogs.com/yeyuzhuanjia/p/17799978.html

相关文章

  • kafka复习:(11)auto.offset.reset的默认值
    在ConsumerConfig这个类中定义了这个属性的默认值,如下图也就是默认值为latest,它的含义是:如果没有客户端提交过offset的话,当新的客户端消费时,把最新的offset设置为当前消费的offset.默认是自动提交位移的,每5秒进行一次提交。可以通过参数配置手动提交。手动提交offset的示例import......
  • kafka复习:(10)按分区获取ConsumerRecord
    packagecom.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka......
  • kafka复习:(8)消费某个主题指定分区的消息
    packagecom.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka......
  • Kafka-生产者、broker、消费者的调优参数总结
     生产环境下,为了尽可能提升Kafka的整体吞吐量,可以对Kafka的相关配置参数进行调整,以达到提升整体性能的目的。本文主要从Kafka的不同组件出发,讲解各组件涉及的配置参数和参数含义。一、生产者(producer.properties或者代码中)1、acks:Producer需要Leader确认的Producer请求的应答......
  • 【技术分享】Amazon RDS MySQL参数说明及性能调优
    在亚马逊云科技的RDS中支持几乎主流的数据库,对于亚马逊云中的数据库的参数设置及性能调优对于我们的日常业务的运行会很有帮助。本篇将对于Amazon RDSMySQL的参数说明及性能调优。RDSMySQL的执行基本架构1.我们都知道,基础设施的潜在异常是常见的,也是不可避免的。Flamingo零售......
  • 生产者消费者模式下实现多batch延时推理
    生产者消费者模式下实现多batch延时推理需求分析在实际推理过程中为了实现较高的吞吐量和较高的资源利用率,往往会使用多线程来收集多次请求,并组合形成多batch下的模型推理,一种常见的实现便是生产者和消费者模式,其需求如下:生产者收集提交的请求,消费者对请求进行消费,并将结果返......
  • Kafka基础学习笔记
    一、Kafka:1、简介:Kafka是由Apache开源,具有分布式、分区的、多副本的、多订阅者,基于Zookeeper协调的分布式处理平台,由Scala和Java语言编写。最大的特性就是可以实时并高速的处理大量数据来满足需求,同时对消息数据进行持久化存储。2、优点:Kafka与其他消息队列MQ(如ActiveMQ、Rabb......
  • kafka代码实践
    安装kafka:Windows安装kafka,详情见:https://blog.csdn.net/sinat_32502451/article/details/133067851Linux安装kafka,详情见:https://blog.csdn.net/sinat_32502451/article/details/133080353添加依赖包:<dependency><groupId>org.springframework.k......
  • kafka代码示例
    安装kafka:Windows安装kafka,详情见:https://blog.csdn.net/sinat_32502451/article/details/133067851Linux安装kafka,详情见:https://blog.csdn.net/sinat_32502451/article/details/133080353添加依赖包:<dependency><groupId>org.springframework.k......
  • Java基础 什么是生产者和消费者
    在Java中,"生产者-消费者"(Producer-Consumer)是一种常见的并发编程模型,用于协调多个线程之间的工作,其中一些线程充当生产者,而其他线程充当消费者。这模型通常用于处理共享数据的情况,其中生产者线程生成数据并将其放入共享缓冲区,而消费者线程则从缓冲区中取出数据并进行处理。主要特......