首页 > 其他分享 >kafka的producer配置参数

kafka的producer配置参数

时间:2022-08-24 14:57:15浏览次数:73  
标签:producer broker kafka 发送 参数 消息 batch

主要介绍下kafka的producer配置参数,只取了其中的一部分常用的,后续的有时间,也会补充一些,更多的详细参数,可以参考《kafka官网》,参数的内容,主要是选取《apache kafka实战》书中的一些讲解和官网相互参看

bootstrap.servers

该参数指定了一组host:port 对,用于创建向 Kafka broker 服务器的连接,比如:kl:9092,k2:9092,k3:9092。

如果 Kafka 集群中机器数很多,那么只需要指定部分 broker 即可,不需要列出所有的机器。因为不管指定几台机器,producer 都会通过该参数找到井发现集群中所有的 broker;为该参数指定多台机器只是为了故障转移使用。这样即使某一台 broker 挂掉了,producer 重启后依然可以通过该参数指定的其他 broker 连入 Kafka 集群。

另外,如果 broker 端没有显式配置 listeners 使用 IP 地址,那么最好将该参数也配置成主机名,而不是 IP 地址。因为 Kafka 内部使用的就是FQDN (Fully Qualified Domain Name )。

key.serializer

被发送到 broker 端的任何消息的格式都必须是宇节数组,因此消息的各个组件必须首先做序列化,然后才能发送到 broker。

该参数就是为消息的 key 做序列化之用的。这个参数指定的是实现了 org.apache.kafka.common.serialization.Serializer接口的类的全限定名称。

Kafka 为大部分的初始类型( primitive type )默认提供了现成的序列化器。producer 程序在发送消息时不指定 key,这个参数也是必须要设置的;否则程序会抛出 ConfigException 异常,提示“ key.serializer”参数无默认值,必须要配置。

value.serialize

和 key.serializer 类似,只是它被用来对消息体(即消息 value )部分做序列化,将消息value 部分转换成宇节数组 。

value.serializer 和 key. serializer 可以设置相同的值,也可以不同的值 。只要消费端,消费数据的时候,保持一致就可以了;

一定要注意的是,这两个参数都必须是全限定类名,org.apache.kafka.common.serialization.Serializer

acks

acks 参数用于控制 producer 生产消息的持久性(durability);对于 producer 而言, Kafka在乎的是“己提交”消息的持久性。一旦消息被成功提交,那么只要有任何一个保存了该消息的副本“存活”,这条消息就会被视为“不会丢失的” 。

经常碰到抱怨Kafka的producer会丢消息,其实这里混淆了 一个概念,即那些所谓的“己丢失”的消息其实并没有被成功写入 Kafka 。换句话说,它们井没有被成功提交,因此 Kafka 对这些消息的持久性不做任何保障;

当然, producer API 确实提供了回调机制供用户处理发送失败的情况。具体来说,当 producer 发送一条消息给 Kafka 集群时,这条消息会被发送到指定 topic 分区 leader 所在的 broker 上,producer 等待从该 leader broker 返回消息的写入结果(当然并不是无限等待,是有超时时间的)以确定消息被成功提交。这一切完成后 producer 可以继续发送新的消息 。

Kafka 能够保证的是 consumer 永远不会读取到尚未提交完成的消息;显然, leader broker 何时发送写入结果返还给 producer 就需要仔细考虑的问题了,也会直接影响消息的持久性甚至是 producer 端的吞吐量(producer越快地接收到 leader broker响应,就能发送下一条消息);producer 端的 acks 参数就是用来控制做这件事情的 。

acks 指定了在给 producer 发送响应前, leader broker 必须要确保己成功写入该消息的副本数 。当前 acks 有 3 个取值:0、 1和 all 。

acks = 0:设置成 0 表示 producer 完全不理睬 leader broker 端的处理结果。此时,producer 发送消息后立即开启下一条消息的发送,根本不等待 leader broker 端返回结果 。

由于不接收发送结果,因此在这种情况下 producer.send 的回调也就完全失去了作用,即用户无法通过回调机制感知任何发送过程中的失败,所以 acks=O 时 producer 并不保证消息会被成功发送。

但凡事有利就有弊,由于不需要等待响应结果,通常这种设置下 producer 的吞吐量是最高的 。

acks = all 或者-1:表示当发送消息时, leader broker 不仅会将消息写入本地日志,同时还会等待 ISR 中所有其他副本都成功写入它们各自的本地日志后,才发送响应结果给 producer。

显然当设置 acks=all 时,只要 ISR中 至少有一个副本是处于“存活”状态的 ,那么这条消息就肯定不会丢失,因而可以达到最高的消息持久性,但通常这种设置下producer 的吞吐量也是最低的 。

acks = 1 :是 0 和 all 折中的方案,也是默认的参数值。

producer发送消息后 leader broker仅将该消息写入本地日志,然后发送响应结果给 producer,而无须等待 ISR中其他副本写入该消息。那么此时只要该 leader broker 一直存活 , Kafka 就能够保证这条消息不丢失。这实际上是一种折中方案,既可以达到适当的消息持久性,同时也保证了 producer 端的吞吐量。

 

buffer.memory

该参数指定了 producer 端用于缓存消息的缓冲区大小,单位是字节,默认值是 33554432,即 32MB 。

如前所述,由于采用了异步发送消息的设计架构, Java 版本 producer 启动时会首先创建一块内存缓冲区用于保存待发送的消息,然后由另 一个专属线程负责从缓冲区中读取消息执行真正的发送。这部分内存空间的大小即是由 buffer.memory 参数指定的。

若 producer 向缓冲区写消息的速度超过了专属 I/0 线程发送消息的速度,那么必然造成该缓冲区空间的不断增大。此时 producer 会停止手头的工作等待 I/0 线程追上来,若一段时间之后 I/0 线程还是无法追上 producer 的进度, 就会抛出异常;若 producer 程序要给很多分区发送消息,那么就需要仔细地设置这个参数,以防止过小的内存缓冲区降低了producer 程序整体的吞吐量。

compression.type

设置 producer 端是否压缩消息,默认值是 none ,即不压缩消息 。

Kafka 的 producer 端引入压缩后可以显著地降低网络 I/O 传输开销从而提升整体吞吐量,但也会增加 producer 端机器的 CPU 开销。另外,如果 broker 端的压缩参数设置得与 producer 不同, broker 端在写入消息时也会额外使用 CPU 资源对消息进行对应的解压缩-重新压缩操作。

目前 Kafka 支持 3 种压缩算法:GZIP、Snappy 和 LZ4。根据实际使用经验来看 producer 结合 LZ4 的性能是最好;LZ4 > Snappy > GZIP;

retries

Kafka broker 在处理写入请求时可能因为瞬时的故障(比如瞬时的leader选举或者网络抖动)导致消息发送失败。这种故障通常都是可以自行恢复的,如果把这些错误封装进回调函数的异常中返还给 producer,producer程序也并没有太多可以做的,只能简单地在回调函数中重新尝试发送消息。与其这样,还不如 producer 内部自动实现重试。因此 Java 版本 producer 在内部自动实现了重试,当然前提就是要设置retries参数。

该参数表示进行重试的次数,默认值是 0,表示不进行重试。

在实际使用过程中,设置重试可以很好地应对那些瞬时错误,因此推荐用户设置该参数为一个大于 0 的值 。

只不过在考虑retries 的设置时,有两点需要着重注意 。

1、重试可能造成消息的重复发送;

比如由于瞬时的网络抖动使得 broker 端己成功写入消息但没有成功发送响应给 producer,因此 producer 会认为消息发送失败,从而开启重试机制。为了应对这一风险, Kafka 要求用户在 consumer 端必须执行去重处理 。令人欣喜 的是,社区己于 0.11.0.0 版本开始支持“精确一次”处理语义,从设计上避免了类似的问题 。

2、重试可能造成消息的乱序;

当前 producer 会将多个消息发送请求(默认是 5 个) 缓存在内存中;如果由于某种原因发生了消息发送的重试,就可能造成消息流的乱序 。为了避免乱序发生, Java 版本 producer 提供了max.in.flight.requets.per.connection 参数 。一旦用户将此参数设置成 1, producer 将确保某一时刻只能发送一个请求 。

另外, producer 两次重试之间会停顿一段时间,以防止频繁地重试对系统带来冲击 。这段时间是可以配置的,由参数retry.backff.ms 指定,默认是 100 毫秒 。由于 leader “换届选举 ”是最常见的瞬时错误,推荐用户通过测试来计算平均 leader 选举时间并根据该时间来设定retries 和retry.backff.ms 的值 。

batch.size

batch.size 是 producer 最重要的参数之一 !它对于调优 producer 吞吐量和延时性能指标都有着非常重要的作用 。

producer 会将发往同一分区的多条消息封装进一个 batch中,当 batch 满了的时候, producer 会发送 batch 中的所有消息 。不过, producer并不总是等待batch满了才发送消息,很有可能当batch还有很多空闲空间时 producer 就发送该 batch 。显然,batch 的大小就显得非常重要 。

通常来说,一个小的 batch 中包含的消息数很少,因而一次发送请求能够写入的消息数也很少,所以 producer 的吞吐量会很低;一个 batch 非常之巨大,那么会给内存使用带来极大的压力,因为不管是否能够填满,producer 都会为该batch 分配固定大小的内存。

因此batch.size 参数的设置其实是一种时间与空间权衡的体现 。batch.size 参数默认值是 16384 ,即 16KB 。这其实是一个非常保守的数字。在实际使用过程中合理地增加该参数值,通常都会发现 producer 的吞吐量得到了相应的增加 。

linger.ms

参数控制消息发送延时行为的 。该参数默认值是 0,表示消息需要被立即发送,无须关心 batch 是否己被填满;

大多数情况下这是合理的;毕竟我们总是希望消息被尽可能快地发送;不过这样做会拉低 produc 吞吐量,毕竟 produce 发送的每次请求中包含的消息数越多,produce就越能将发送请求的开销摊薄到更多的消息上 从而提升吞吐量;如果要设置,跟上述参数batch.size 配合使用;针对消息的发送的一种权衡考虑;

max.request.size

官网中给出的解释是,该参数用于控制 producer 发送请求的大小 。实际上该参数控制的是producer 端能够发送的最大消息大小 。

由于请求有一些头部数据结构,因此包含一条消息的请求的大小要比消息本身大 。不过姑且把它当作请求的最大尺寸是安全的 。如果 producer 要发送尺寸很大的消息 , 那么这个参数就是要被设置的 。默认的 1048576 字节(1MB)

request.timeout.ms

当 producer 发送请求给broker后,broker 需要在规定的时间范围内将处理结果返还给producer 。默认是 30 秒 。

如果 broker 在 30 秒内都没有给 producer 发送响应,就会认为该请求超时了,回调函数中显式地抛出TimeoutException异常 。默认的 30 秒对于一般的情况是足够的,但如果 producer发送的负载很大 ,超时的情况就很容易碰到,此时就应该适当调整该参数值。

标签:producer,broker,kafka,发送,参数,消息,batch
From: https://www.cnblogs.com/dreamzy996/p/16619905.html

相关文章

  • @PathVariable和@RequestParam的区别 获取请求参数的四种方式
    @PathVariable和@RequestParam的区别获取请求参数的四种方式请求路径上有个id的变量值,可以通过@PathVariable来获取 @RequestMapping(value="/page/{id}",method......
  • 服务器性能参数学习与总结
    服务器性能参数学习与总结总体说明在不考虑奸商和回扣的的情况下:同时间段购买的机器,价钱越高,配置越高,机器的性能越好.其实服务器与PC机器一样,高性能往往意味着......
  • C++ 默认参数
    1.C++支持函数的默认参数,C语言不支持;2.默认参数只能放在最后面。#include<iostream>#include<windows.h>#include<string>usingnamespacestd;voidscorePri......
  • 过滤器(过滤器只能最多有两个参数)
    过滤器就类似于是模版语法内置的内置方法django内置有60多个过滤器我们不需要学这么多了解10个左右就差不多了后面碰到了再去记忆基本语法{{数据|过滤器:参数}}转......
  • Kafka FAQ
    1.首先在docker上安装kafka和zookeeper,加起来要1G的imagesize,先安装1台,测试完后再加集群对比测试性能用spring-kafka的包写个测试例子privateKafkaTemplate<Str......
  • 04-React路由5版本(高亮, 嵌套, 参数传递... )
    React-Router-Dom(路由版本[5])简介React的一个插件库用于实现SPA应用基于React的项目基本都用API<BrowserRouter><HashRouter><Route><Redirect><Link><Na......
  • sping boot使用LocalDate和LocalDateTime当入参时,报缺少必要参数
    时间字符串作为普通请求参数传入时,转换用的是Converter增加一个时间转换的配置类importcom.sjaco.lccloud.common.pay.kit.DateKit;importcom.sjaco.lccloud.common.......
  • uniApp 返回上一页携带参数
    clickCity(name){console.log(name);letpages=getCurrentPages();//获取pageletprevPage=pages[pages.length-2];//......
  • 函数的参数
    定义函数时()里的参数叫做形参(形式参数),它只是一个变量名,接受调用时传递的实参,仅供函数体中的代码调用。函数调用时,传入()里的参数叫实参(实际参数),它是实际的数据,会传递......
  • C++ 数组作为函数的参数
    1.一个指针在32位操作系统上占4个字节,一个指针在64位操作系统上占8个字节,但是,编译器为了兼容32位操作系统和64位操作系统,所以指针都是4个字节长度。下面程序中的形参本质......