首页 > 系统相关 >Pulsar客户端如何控制内存使用

Pulsar客户端如何控制内存使用

时间:2025-01-11 18:43:22浏览次数:1  
标签:producer client ProducerImpl 内存 pulsar apache Pulsar 客户端

摘要

本文围绕一个常见的使用场景深入分析在高吞吐场景下,使用Pulsar客户端收发消息可能会遇到的若干问题。并以此为切入点,梳理一下Pulsar客户端在内存控制上所做的优化改进。

使用场景

假设这样一个常见的场景,一个搜索类业务需要记录用户搜索请求,以便后续分析搜索热点,以及有针对性的优化搜索效果等。于是,我们有下面这段逻辑,简化后如下:

PulsarClient pulsarClient = PulsarClient.builder()  
        .serviceUrl("pulsar://localhost:6650")  
        .build();  
Producer<byte[]> producer = pulsarClient.newProducer()  
        .topic("search-activities")  
        .create();
try {  
    MessageId messageId = producer.send(/* message payload here */);  
    log.debug("Search activity messageId={}", messageId);  
} catch (Exception e) {  
    log.error("Failed to record search activity", e);  
}

注意pulsarClientproducer均支持复用,并且推荐这么做,这里只是为了演示写到了一起。

producer.send是阻塞方式发送消息,换句话说就是线程会卡在这里等待发送结果返回。在现实中可以根据消息在实际业务中的需要选择阻塞非阻塞两种方式,例如这里我们的业务是在用户发起一次搜索请求时记录搜索请求和上下文信息,业务上对搜索请求事件并无强依赖,因此这里使用阻塞方式发消息就不太适合了,从性能上考虑会加长整体的搜索延迟,从稳定性上考虑会增加搜索执行过程中的不确定性,总的来说,要区分支线流程和主线流程,不应该把支线流程全部嵌套在主线流程中。

于是,我们可以优化一下,调整为非阻塞方式,将记录搜索事件放到其它线程中完成:

producer.sendAsync(/* message payload here */).whenComplete((msgId, ex) -> {  
    if (ex != null) {  
        log.error("Failed to record search activity", ex);  
    } else {  
        log.debug("Search activity messageId={}", msgId);  
    }
});

在现实中,若用户搜索的TPS较高,例如在单实例上可以超过1000QPS(高和低都是相对而言的,这里只是举个例子)。若恰好记录的搜索事件内容较多(例如包含了搜索请求的完整上下文和搜索结果等),序列化之后大小能达到100KB甚至1MB,那么上面代码在运行时你可以会遇到MemoryBufferIsFullError异常:

org.apache.pulsar.client.api.PulsarClientException$MemoryBufferIsFullError: Client memory buffer is full
	at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:972)
	at org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java:452)
	at org.apache.pulsar.client.impl.ProducerImpl.internalSendAsync(ProducerImpl.java:343)
	at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.sendAsync(TypedMessageBuilderImpl.java:102)
	at org.apache.pulsar.client.impl.ProducerBase.sendAsync(ProducerBase.java:68)

另外若服务本身与Pulsar的broker之间出现了网络波动,或者Pulsar服务内部组件之间出现网络波动,导致整体producer写入延迟升高,亦或是短时间出现大量写入,你还可能会遇到ProducerQueueIsFullError异常:

org.apache.pulsar.client.api.PulsarClientException$ProducerQueueIsFullError: Producer send queue is full
	at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:965)
	at org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java:452)
	at org.apache.pulsar.client.impl.ProducerImpl.internalSendAsync(ProducerImpl.java:343)
	at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.sendAsync(TypedMessageBuilderImpl.java:102)
	at org.apache.pulsar.client.impl.ProducerBase.sendAsync(ProducerBase.java:68)

Producer的内存控制

下面我们对上面两种异常产生的原因作一下分析,我们先来看一下构建Producer时,ProducerBuilder中与内存使用有关的配置项:

/*
 * Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.
 */
ProducerBuilder<T> maxPendingMessages(int maxPendingMessages);

/*
 * Set the number of max pending messages across all partitions.
 */
ProducerBuilder<T> maxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions);

maxPendingMessages用来控制producer内部队列中正在发送还没有接收到broker确认的消息数量,若队列大小超出了这个限制,默认的行为就是抛出ProducerQueueIsFullError异常,你可以通过修改另外一个配置blockIfQueueFull=true调整为阻塞等待队列中空出新的空间,这里还有另外需要注意的地方在下面会细说。

maxPendingMessages这个配置实际上是直接传递给底层各个分区的内部producer的,对于多分区的topic,实际处于pending状态的最大消息数量是maxPendingMessages乘以topic分区数量。由于maxPendingMessages结合可变的topic分区数量使得最终的pending消息数量变得不可控,因此还有另外一个优先级更高的配置maxPendingMessagesAcrossPartitions用来控制整个topic所有分区的总的一个pending消息数量,最终到各个分区内部producer取maxPendingMessagesmaxPendingMessagesAcrossPartitions / partitions的较小值。

然而,在现实应用场景中,不同业务的消息大小差异很大,单纯基于消息数量控制内存使用,开发者需要预估平均消息大小,这几乎不可能做到,因为消息的实际大小很可能会随着业务的变化而发生变化,因此在PIP-74中,在构建PulsarClient时,ClientBuilder提供了一个面向整个client实例统一的内存限制配置:

/*
 * Configure a limit on the amount of memory that will be allocated by this client instance.
 *
 * Setting this to 0 will disable the limit.
 */
ClientBuilder memoryLimit(long memoryLimit, SizeUnit unit);

当客户端所有producer中所有pending的消息大小总和超过这个限制时,默认则会抛出MemoryBufferIsFullError异常,若同时配置了blockIfQueueFull=true,则当前线程会阻塞等待前面pending的消息发送完成。

前面提到关于blockIfQueueFull配置的使用有一个细节需要注意,这个配置是为了限制客户端producer内存使用的同时,让开发者简化处理队列或者内存buffer满了的情况可以继续发送消息,例如在一个后台定时任务的场景中批量发送消息。然而这里需要强调的是blockIfQueueFull一旦配置为true,不论是应用发送消息调用的是阻塞的Producer.send方法还是非阻塞的Producer.sendAsync方法都会出现阻塞等待,”卡“住当前线程,那么对于我们上面的业务来说这是不可接受的,若由于支线流程(特殊情况容忍丢失的用户搜索事件)异常抖动,阻塞了主线流程(搜索主线程)就得不偿失了。

// 注意:若producer发送队列满或者内存buffer满,当前线程将卡在sendAsync方法调用
producer.sendAsync(/* message payload here */).whenComplete((msgId, ex) -> {  
    if (ex != null) {  
        log.error("Failed to record search activity", ex);  
    } else {  
        log.debug("Search activity messageId={}", msgId);  
    }
});

PIP-1202.10.0以及之后版本的客户端中,默认启用了memoryLimit配置,其默认值为64MB,同时默认禁用了maxPendingMessagesmaxPendingMessagesAcrossPartitions配置(默认值修改为0),另外将maxPendingMessagesAcrossPartitions配置标记为了Deprecated,因为使用这个配置最终目的就是控制客户端producer的内存使用,现在已经有memoryLimit这个更加直接的配置可以替代了。

Consumer的内存控制

上面说的全部都是围绕着Producer侧的内存使用来讲的,其实在PIP-74中也提到了Pulsar客户端consumer侧的内存使用,只不过在实现中是分阶段进行的。

我们先来看一下Pulsar客户端的API早期在构造一个Consumer时,ConsumerBuilder提供的与内存使用有关的选项:

/*
 * Sets the size of the consumer receive queue.
 *
 * (default: 1000)
 */
ConsumerBuilder<T> receiverQueueSize(int receiverQueueSize);

/*
 * Sets the max total receiver queue size across partitions.
 *
 * (default: 50000)
 */
ConsumerBuilder<T> maxTotalReceiverQueueSizeAcrossPartitions(int maxTotalReceiverQueueSizeAcrossPartitions);

Pulsar客户端通过预接收队列临时存放broker推送过来的消息,以便应用程序调用Consumer#receive或者Consumer#receiveAsync方法时直接从内存中返回消息,这是出于消费吞吐的考虑,本质上是一种以空间换取时间的策略。上面两个选项是给这个”空间“设置一个数量上的上限,注意这里仅是数量上的上限,实际的内存空间使用还要取决于平均消息大小。receiverQueueSize控制每个分区consumer的接收队列大小,maxTotalReceiverQueueSizeAcrossPartitions来控制所有分区consumer和parent consumer的接收队列总大小。

前面提到receiverQueueSizemaxTotalReceiverQueueSizeAcrossPartitions参数是以数量的形式间接的控制Consumer预接收队列的内存使用,在[PIP-74][pip-74]中提出了整个client级别的memoryLimit,同时提出了一个新的控制Consumer内存使用的方案,就是autoScaledReceiverQueueSizeEnabled:

/*
 * If this is enabled, the consumer receiver queue size is initialized as a very small value, 1 by default,  
 * and will double itself until it reaches either the value set by {@link #receiverQueueSize(int)} or the client  
 * memory limit set by {@link ClientBuilder#memoryLimit(long, SizeUnit)}.
 */
ConsumerBuilder<T> autoScaledReceiverQueueSizeEnabled(boolean enabled);

当启用了这个特性之后,receiverQueueSize会从1开始呈2的指数倍增长,直至达到receiverQueueSize的限制或达到client的memoryLimit限制,其目标是在有限制的内存使用下,达到最大的吞吐效率。

番外

除了上面说的Producer和Consumer在生产和消费过程中的内存使用之外,还有一个容易被忽视的点是创建Consumer时ackTimeoutackTimeoutTickTime的配置如果不匹配,会消耗较多堆内内存。

/*
 * Sets the timeout for unacknowledged messages, truncated to the nearest millisecond. The timeout must be greater than 1 second.
 */
ConsumerBuilder<T> ackTimeout(long ackTimeout, TimeUnit timeUnit);

/**
 * Define the granularity of the ack-timeout redelivery.
 *
 * <p>By default, the tick time is set to 1 second. Using a higher tick time  
 * reduces the memory overhead to track messages when the ack-timeout is set to  
 * bigger values (e.g., 1 hour).
 */
ConsumerBuilder<T> ackTimeoutTickTime(long tickTime, TimeUnit timeUnit);

若Consumer配置了ackTimeout并且配置了较大的时间窗口(例如1小时或者更长)时,应适当的调大ackTimeoutTickTime,这是因为Consumer内部使用了一个简单时间轮的算法对消息的处理时间计时,若ackTimeout时间窗口很大,ackTimeoutTickTime仍然使用其默认值1s,时间轮本身将会占用大量堆内存空间。具体细节可参考客户端源码UnAckedMessageTracker.java

总结

  1. 使用sendAsync非阻塞方法要注意其不能保证消息一定发送成功,特别是开启了blockIfQueueFull之后,它会在特定情况下演变成阻塞方法。
  2. 对于同时使用到了Producer和Consumer的应用,推荐创建两个client,分别用来创建Producer和Consumer,做读写分离,避免由于共用memoryLimit导致相互影响。

本文最先发表于: https://aops.io/article/pulsar-client-memory-control.html

作者 萧易客 一线深耕消息中间件,RPC框架多年,欢迎评论区或通过邮件交流。

微信公众号: 萧易客

github id: shawyeok

标签:producer,client,ProducerImpl,内存,pulsar,apache,Pulsar,客户端
From: https://www.cnblogs.com/shawyeok/p/18666091

相关文章

  • 请说说WASM如何进行内存管理?
    WASM(WebAssembly)的内存管理主要涉及到内存的分配、使用和释放,这对于确保Web应用的性能和稳定性至关重要。以下是关于WASM如何进行内存管理的详细解释:内存分配:WASM使用线性内存模型,其中所有的内存都被视为一个连续的字节数组。这种模型简化了内存的分配和管理。在WASM中,内存......
  • 如何控制docker中占用系统资源(CPU,内存)的份额?
    在Docker中,你可以通过多种方式来控制容器对系统资源(如CPU和内存)的使用份额,以确保容器不会过度消耗宿主机的资源,并与其他容器公平地共享资源。以下是一些常用的方法:一.控制CPU资源CPU份额(CPUShares):Docker使用CPU份额来分配CPU时间。默认情况下,所有容器具有相同的CPU份额,这意......
  • 《操作系统真相还原》实验记录2.4——内存管理系统
    一、位图bitmap及其函数的实现1.1位图简介位图,也就是bitmap,广泛用于资源管理,是一种管理资源的方式、手段。“资源”包括很多,比如内存或硬盘,对于此类大容量资源的管理一般都会采用位图的方式。位是指bit,即字节中的位,1字节中有8个位;图是指map,地图本质上就是映射的意思,映......
  • [.NET] 使用客户端缓存提高API性能
    使用客户端缓存提高API性能摘要在现代应用程序中,性能始终是一个关键的考虑因素。无论是提高响应速度,降低延迟,还是减轻服务器负载,开发者都在寻找各种方法来优化他们的API。在Web开发中,利用客户端缓存是一种有效的方法,可以显著提高API的性能。本文将结合Replicant和Delta,深入探讨......
  • Redis连接失败:客户端IP不在白名单中的分析与解决(ERR client ip is not in whitelist)
    个人名片......
  • 一个共享内存管理的类
    一个用于内存管理分块的代码/*mIndexes位置说明:3是取结果的位置,2写的结果的位置,1是取图片的位置,0是放图片位置*//*mIndexes其他位置*/#defineWIDTH_INDEX10//图片宽度//#defineHEIGHT_INDEX(WIDTH_INDEX+1)//图片高度......
  • 从 bcp 客户端收到一个对 colid x 无效的列长度。
    出现场景:批量插入数据的时候出现这个问题。原因分析:某个数据的长度应该是大于这个数据对应的列的定义长度。所以一一检查到底是那个列的长度超出了。第一种方法:从bcp客户端收到一个对colidx无效的列长度。colidx是多少说明是第x+1列出了问题,比如colid1说明是第二列。......
  • ABP项目添加第三方API客户端代理
    第三方API客户端代理启动模板中包含HttpApi.Client​项目,这个项目是应用程序自己的客户端代理,用于提供给其他应用访问。例如BlazorWebAssembly使用HttpApi.Client​项目生成的API客户端代理访问应用程序的服务。本文介绍在应用程序中如何访问其他应用的API。‍远程应用服务接......
  • C++之内存分区模型
    C++程序在执行时将内存大方向划分为4个区域代码区:存放函数体的二进制代码,由操作系统进行管理的全局区:存放全局变量和静态变量以及常量栈区:由编译器自动分配释放,存放函数的参数值,局部变量等堆区:由程序员分配和释放,若程序员不释放,程序结束时由操作系统回收内存四区的意义:不同......
  • linux 手动释放内存
    在Linux系统中,内存管理通常由系统自动处理,但在某些情况下,手动释放内存可能是必要的。例如,当业务应用比较繁忙时会频繁存取文件,物理内存会被缓存大量占用,有时会出现内存不足的情况发生,甚至会导致系统性能下降。此时可主动在业务闲时手动释放内存。一、首先查看当前内存使用情况......