首页 > 其他分享 >kafka发送超大消息

kafka发送超大消息

时间:2023-08-24 10:22:05浏览次数:44  
标签:max 超大 bytes kafka 发送 消息 2147483647 put

kafka发送超大消息设置

 

  最近开发一cdc框架,为了测试极端情况,需要kafka传递100万条数据过去,1个G左右,由于其他环节限制,不便进行拆包(注:测下来,大包走kafka不一定性能更好,甚至可能更低)。

  测试百万以上的变更数据时,报消息超过kafka broker允许的最大值,因此需要修改如下参数,保证包能够正常发送:

  • socket.request.max.bytes=2147483647    # 设置了socket server接收的最大请求大小
  • log.segment.bytes=2147483647              # kafka数据文件的大小,确保这个数值大于一个消息的长度。一般说来使用默认值即可(一般一个消息很难大于1G,因为这是一个消息系统,而不是文件系统)。
  • message.max.bytes=2147483647             # 设置了kafka server接收的最大消息大小,应小于等于socket.request.max.bytes
  • replica.fetch.max.bytes=2147483647         #每个分区试图获取的消息字节数。要大于等于message.max.bytes,否则broker会接收此消息,但无法将此消息复制出去,从而造成数据丢失。
  • fetch.message.max.bytes=2147483647      #每个提取请求中为每个主题分区提取的消息字节数。要大于等于message.max.bytes,否则broker就会因为消费端无法使用这个消息而挂起。

生产者可以如下设定:

kafkaProps.put("max.request.size", 2147483647);    # 要小于 message.max.bytes,也可以设置在producer.properties配置文件中
kafkaProps.put("buffer.memory", 2147483647);
// kafkaProps.put("timeout.ms", 3000000); # 该选项在最新版本中已经不再起作用
kafkaProps.put("request.timeout.ms", 30000000);

消费者设定如下:

props.put("request.timeout.ms", 30000000);
props.put("session.timeout.ms", "3000000");
props.put("fetch.max.wait.ms", "3000000");

  各参数的含义可以参考kafka官方文档https://kafka.apache.org/documentation/#configuration。

  kafka基础知识体系,请参考LZ学习笔记kafka学习指南(总结版)

  注,各参数对内存的影响如下:Brokers会为每个分区分配replica.fetch.max.bytes参数指定的内存空间,假设replica.fetch.max.bytes=1M,且有1000个分区,则需要差不多1G的内存,确保 分区数*最大的消息不会超过服务器的内存,否则会报OOM错误。同样地,消费端的fetch.message.max.bytes指定了最大消息需要的内存空间,同样,分区数*最大需要内存空间 不能超过服务器的内存。所以,如果你有大的消息要传送,则在内存一定的情况下,只能使用较少的分区数或者使用更大内存的服务器。

  虽然上面的方法可以奏效,但是并不推荐。Kafka设计的初衷是迅速处理短小的消息,一般10K大小的消息吞吐性能最好(可参见LinkedIn的kafka性能测试)。但有时候,我们需要处理更大的消息,比如XML文档或JSON内容,一个消息差不多有10-100M,这种情况下,Kakfa应该如何处理?

针对这个问题,有以下几个建议:

  1.   最好的方法是不直接传送这些大的数据。如果有共享存储,如NAS, HDFS, S3等,可以把这些大的文件存放到共享存储,然后使用Kafka来传送文件的位置信息。
  2.   第二个方法是,将大的消息数据切片或切块,在生产端将数据切片为10K大小,使用分区主键确保一个大消息的所有部分会被发送到同一个kafka分区(这样每一部分的拆分顺序得以保留),如此以来,当消费端使用时会将这些部分重新还原为原始的消息。
  3.   第三,Kafka的生产端可以压缩消息,如果原始消息是XML,当通过压缩之后,消息可能会变得不那么大。在生产端的配置参数中使用compression.codec和commpressed.topics可以开启压缩功能,压缩算法可以使用GZip或Snappy。

  上面这些值太大还会造成一个问题,就是消息没有在指定时间内(max.poll.interval.ms(默认300秒))消费完,导致被rebalance,如下:

 

标签:max,超大,bytes,kafka,发送,消息,2147483647,put
From: https://www.cnblogs.com/muyi-yang/p/17653477.html

相关文章

  • Kafka快速实战以及基本原理详解
     这一部分主要是接触Kafka,并熟悉Kafka的使用方式。快速熟练的搭建kafka服务,对于快速验证一些基于Kafka的解决方案,也是非常有用的。一、Kafka介绍​ChatGPT对于ApacheKafka的介绍:ApacheKafka是一个分布式流处理平台,最初由LinkedIn开发并于2011年开源。它主要用于解决大规模......
  • Kafka入门到精通学习路线图 技术文章
    Kafka入门到精通学习路线图技术文章Kafka是一个分布式流式处理平台,被广泛应用于大规模数据处理和实时数据流分析的场景中。以下是一个从入门到精通的学习路线图,帮助你系统地学习和掌握Kafka的相关技术。1.学习Kafka的概念和基础知识:-了解Kafka的起源和背景,掌握Kafka的基本概......
  • 服务端向客户端发送消息Server-Sent Events
    今天听说了服务端向客户端发消息的一种方式:Server-SentEventsSSE使用的是HTTP协议,本质上是服务端向客户端发送流式数据。HTTP不支持服务端向客户端发送请求,但是如果客户端向服务端发出请求后,服务端向客户端声明,接下来的数据是流信息,则连接不会关闭,服务端可以继续发送数据流。......
  • Linux命令发送http
    curl“百度一下,你就知道”如果这里的URL指向的是一个文件或者一幅图都可以直接下载到本地curl-i“百度一下,你就知道”显示全部信息curl-I“百度一下,你就知道”只显示头部信息curl-v“百度一下,你就知道”显示get请求全过程解析curl命令模拟get请求携带参数(linux):curl-vh......
  • 初识kafka,先了解这些就够了
    一、了解Kafka中的相关概念MQ作为消息中间件,对于我们来说,已经并不陌生了,那么,由于Kafka它在众多的MQ间是非常火热的,那么必然也是我们需要着重关注的中间件之一了,为了更加清晰的了解Kafka,我们先从Kafka的体系结构入手,看看大体上都包含哪些东西。具体请见下图所示:其中有一些我们很......
  • Kafka 基础命令
    Kafka部署路径说明#程序部署路径: /opt/kafka#配置文件路径: /opt/kafka/config#启动脚本目录 /opt/kafka/bin#数据持久化目录 /opt/kafka/kafka-logs Kafka启停命令说明#切换路径至:cd/opt/kafka/bin#前台启动命令shkafka-server-start.shconfig/server.p......
  • 发送到Teams的python程序处理,其中也保括上传到OSS的文件处理
    1importurllib2importtime3importoss24importos5importrandom6frompathlibimportPath7importrequests8importdatetime9fromdecoupleimportconfig1011fromurllib.parseimportunquote121314#......
  • Zookeeper对于Kafka的作用和意义
    Zookeeper在ApacheKafka中扮演着关键的角色,它提供了分布式协调和配置管理服务,对于Kafka集群的正常运行和高可用性至关重要。以下是具体介绍。配置管理Zookeeper负责存储和管理Kafka集群的配置信息,包括主题(topics)和分区(partitions)的分配、副本(replicas)的分布、消费者组(consumergro......
  • 记录--post为什么会发送两次请求?
    这里给大家分享我在网上总结出来的一些知识,希望对大家有所帮助在前段时间的一次面试中,被问到了一个如标题这样的问题。要想好好地去回答这个问题,这里牵扯到的知识点也是比较多的。那么接下来这篇文章我们就一点一点开始引出这个问题。同源策略在浏览器中,内容是很开放的,任何......
  • springboot~kafka中延时消息的实现
    应用场景用户下单5分钟后,给他发短信用户下单30分钟后,如果用户不付款就自动取消订单kafka无死信队列kafka本身没有这种延时队列的机制,像rabbitmq有自己的死信队列,当一些消息在一定时间不消费时会发到死信队列,由死信队列来处理它们,上面的两个需求如果是rabbitmq可以通过死信......