首页 > 其他分享 >Pulsar客户端消费模式揭秘:Go 语言实现 ZeroQueueConsumer

Pulsar客户端消费模式揭秘:Go 语言实现 ZeroQueueConsumer

时间:2024-07-29 11:29:00浏览次数:6  
标签:消费 ReceiverQueueSize go ZeroQueueConsumer client pulsar Go Pulsar 客户端

前段时间在 pulsar-client-go 社区里看到这么一个 issue

import "github.com/apache/pulsar-client-go/pulsar"

client, err := pulsar.NewClient(pulsar.ClientOptions{
    URL: "pulsar://localhost:6650",
})
if err != nil {
    log.Fatal(err)
}
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
    Topic:             "persistent://public/default/mq-topic-1",
    SubscriptionName:  "sub-1",
    Type:              pulsar.Shared,
    ReceiverQueueSize: 0,
})
if err != nil {
    log.Fatal(err)
}


// 小于等于 0 时会设置为 1000
const (  
    defaultReceiverQueueSize = 1000  
)
if options.ReceiverQueueSize <= 0 {  
    options.ReceiverQueueSize = defaultReceiverQueueSize  
}

他发现手动将 pulsar-client-go 客户端的 ReceiverQueueSize 设置为 0 的时候,客户端在初始化时会再将其调整为 1000.

if options.ReceiverQueueSize < 0 {  
    options.ReceiverQueueSize = defaultReceiverQueueSize  
}

而如果手动将源码修改为可以设置为 0 时,却不能正常消费,消费者会一直处于 waiting 状态,获取不到任何数据。

经过我的排查发现是 Pulsar 的 Go 客户端缺少了一个 ZeroQueueConsumerImpl的实现类,这个类主要用于可以精细控制消费逻辑。

If you'd like to have tight control over message dispatching across consumers, set the consumers' receiver queue size very low (potentially even to 0 if necessary). Each consumer has a receiver queue that determines how many messages the consumer attempts to fetch at a time. For example, a receiver queue of 1000 (the default) means that the consumer attempts to process 1000 messages from the topic's backlog upon connection. Setting the receiver queue to 0 essentially means ensuring that each consumer is only doing one thing at a time.

https://pulsar.apache.org/docs/next/cookbooks-message-queue/#client-configuration-changes

正如官方文档里提到的那样,可以将 ReceiverQueueSize 设置为 0;这样消费者就可以一条条的消费数据,而不会将消息堆积在客户端队列里。

客户端消费逻辑

借此机会需要再回顾下 pulsar 客户端的消费逻辑,这样才能理解 ReceiverQueueSize 的作用以及如何在 pulsar-client-go 如何实现这个 ZeroQueueConsumerImpl

Pulsar 客户端的消费模式是基于推拉结合的:


如这张图所描述的流程,消费者在启动的时候会主动向服务端发送一个 Flow 的命令,告诉服务端需要下发多少条消息给客户端。

同时会使用刚才的那个 ReceiverQueueSize参数作为内部队列的大小,将客户端下发的消息存储在内部队列里。

然后在调用 receive 函数的时候会直接从这个队列里获取数据。


每次消费成功后都会将内部的一个 AvailablePermit+1,直到大于 MaxReceiveQueueSize / 2 就会再次向 broker 发送 flow 命令,告诉 broker 再次下发消息。

所以这里有一个很关键的事件:就是向 broker 发送 flow 命令,这样才会有新的消息下发给客户端。

之前经常都会有研发同学让我排查无法消费的问题,最终定位到的原因几乎都是消费缓慢,导致这里的 AvailablePermit 没有增长,从而也就不会触发 broker 给客户端推送新的消息。

看到的现象就是消费非常缓慢。

ZeroQueueConsumerImpl 原理

下面来看看 ZeroQueueConsumerImpl 是如何实现队列大小为 0 依然是可以消费的。


在构建 consumer 的时候,就会根据队列大小从而来创建普通消费者还是 ZeroQueueConsumerImpl 消费者。

@Override  
protected CompletableFuture<Message<T>> internalReceiveAsync() {  
    CompletableFuture<Message<T>> future = super.internalReceiveAsync();  
    if (!future.isDone()) {  
        // We expect the message to be not in the queue yet  
        increaseAvailablePermits(cnx());  
    }  
    return future;  
}

这是 ZeroQueueConsumerImpl 重写的一个消费函数,其中关键的就是 increaseAvailablePermits(cnx());.

    void increaseAvailablePermits(ClientCnx currentCnx) {
        increaseAvailablePermits(currentCnx, 1);
    }

    protected void increaseAvailablePermits(ClientCnx currentCnx, int delta) {
        int available = AVAILABLE_PERMITS_UPDATER.addAndGet(this, delta);
        while (available >= getCurrentReceiverQueueSize() / 2 && !paused) {
            if (AVAILABLE_PERMITS_UPDATER.compareAndSet(this, available, 0)) {
                sendFlowPermitsToBroker(currentCnx, available);
                break;
            } else {
                available = AVAILABLE_PERMITS_UPDATER.get(this);
            }
        }
    }

从源码里可以得知这里的逻辑就是将 AvailablePermit 自增,达到阈值后请求 broker 下发消息。

因为在 ZeroQueueConsumerImpl 中队列大小为 0,所以 available >= getCurrentReceiverQueueSize() / 2永远都会为 true。

也就是说每消费一条消息都会请求 broker 让它再下发一条消息,这样就达到了每一条消息都精确控制的效果。

pulsar-client-go 中的实现

为了在 pulsar-client-go 实现这个需求,我提交了一个 PR 来解决这个问题。

其实从上面的分析已经得知为啥手动将 ReceiverQueueSize 设置为 0 无法消费消息了。

根本原因还是在初始化的时候优于队列为 0,导致不会给 broker 发送 flow 命令,这样就不会有消息推送到客户端,也就无法消费到数据了。

所以我们依然得参考 Java 的 ZeroQueueConsumerImpl 在每次消费的时候都手动增加 availablePermits

为此我也新增了一个消费者 zeroQueueConsumer

// EnableZeroQueueConsumer, if enabled, the ReceiverQueueSize will be 0.  
// Notice: only non-partitioned topic is supported.  
// Default is false.  
EnableZeroQueueConsumer bool

consumer, err := client.Subscribe(ConsumerOptions{  
    Topic:                   topicName,  
    SubscriptionName:        "sub-1",  
    Type:                    Shared,  
    NackRedeliveryDelay:     1 * time.Second,  
    EnableZeroQueueConsumer: true,  
})

if options.EnableZeroQueueConsumer {  
    options.ReceiverQueueSize = 0  
}

在创建消费者的时候需要指定是否开启 ZeroQueueConsumer,当开启后会手动将 ReceiverQueueSize 设置为 0.

// 可以设置默认值。
private int receiverQueueSize = 1000;

在 Go 中无法像 Java 那样在结构体初始化化的时候就指定默认值,再加上 Go 的 int 类型具备零值(也就是0),所以无法区分出 ReceiverQueueSize=0 是用户主动设置的,还是没有传入这个参数使用的零值。

所以才需要新增一个参数来手动区分是否使用 ZeroQueueConsumer


之后在创建 consumer 的时候进行判断,只有使用的是单分区的 topic 并且开启了 EnableZeroQueueConsumer 才能创建 zeroQueueConsumer


使用 PARTITIONED_METADATA 命令可以让 broker 返回分区数量。


func (z *zeroQueueConsumer) Receive(ctx context.Context) (Message, error) {
	if state := z.pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
		z.log.WithField("state", state).Error("Failed to ack by closing or closed consumer")
		return nil, errors.New("consumer state is closed")
	}
	z.Lock()
	defer z.Unlock()
	z.pc.availablePermits.inc()
	for {
		select {
		case <-z.closeCh:
			return nil, newError(ConsumerClosed, "consumer closed")
		case cm, ok := <-z.messageCh:
			if !ok {
				return nil, newError(ConsumerClosed, "consumer closed")
			}
			return cm.Message, nil
		case <-ctx.Done():
			return nil, ctx.Err()
		}
	}

}

其中的关键代码:z.pc.availablePermits.inc()

消费时的逻辑其实和 Java 的 ZeroQueueConsumerImpl 逻辑保持了一致,也是每消费一条数据之前就增加一次 availablePermits

pulsar-client-go 的运行原理与 Java 客户端的类似,也是将消息存放在了一个内部队列里,所以每次消费消息只需要从这个队列 messageCh 里获取即可。

值得注意的是, pulsar-client-go 版本的 zeroQueueConsumer 就不支持直接读取内部的队列了。

func (z *zeroQueueConsumer) Chan() <-chan ConsumerMessage {  
    panic("zeroQueueConsumer cannot support Chan method")  
}

会直接 panic,因为直接消费 channel 在客户端层面就没法帮用户主动发送 flow 命令了,所以这个功能就只能屏蔽掉了,只可以主动的 receive 消息。

许久之前我也画过一个关于 pulsar client 的消费流程图,后续考虑会再写一篇关于 pulsar client 的原理分析文章。

参考链接:

标签:消费,ReceiverQueueSize,go,ZeroQueueConsumer,client,pulsar,Go,Pulsar,客户端
From: https://www.cnblogs.com/crossoverJie/p/18329716

相关文章

  • Gin框架深度解析:构建高性能Go Web应用的基石
    Gin框架深度解析:构建高性能GoWeb应用的基石在当今的Web开发领域,选择一个合适的框架对于项目的成功至关重要。Gin,作为一款用Go(Golang)语言编写的Web框架,凭借其高性能、简洁的API设计以及丰富的特性,迅速在开发者社区中崭露头角。本文将深入解析Gin框架,从其核心特性、工作原理......
  • 找不到 Django 模块
    我是一个试图掌握Django的新手。我刚刚开始尝试使用应用程序配置URL。但是,每当我尝试运行服务器时,它都会告诉我找不到urls模块,即使我使用绝对路径也是如此。我在下面包含了一些代码。fromdjango.urlsimportinclude,pathurlpatterns=[path('custom_regions/',......
  • Argon 主题美化
    页脚在Argon主题选项的页脚内容中添加如下内容<divclass="github-badge-big"> <spanclass="badge-subject"><iclass="fafa-id-card"></i>备案号</span> <spanclass="badge-valuebg-orange"> <!--备案链......
  • Go: Gin框架中的binding验证器使用指南
    Go:Gin框架中的binding验证器使用指南原创 王义杰 AI学者王义杰  2024年05月30日22:33 广东 听全文在Gin框架中,数据绑定和验证是开发API时不可或缺的部分。Gin提供了强大的binding功能,允许我们将请求的数据绑定到结构体,并通过标签进行数据验证。本文将详细讲解如......
  • Vue3 - 最新详细实现安装使用 Google 谷歌地图教程,提供搜索城市名称及地点(搜索关键字
    前言如果您需要Vue2版本,请访问这篇文章。在vue3|nuxt3网站开发中,详解实现接入谷歌google地图申请密钥及相关配置完整流程,附带使用谷歌地图相关功能示例代码,支持地图渲染展示、在地图上标点、全球地图搜索及搜索框相关联想关键词、地图导航、用户当前位置经纬度......
  • Django REST Framework(十四)路由Routes
    如何在DjangoRESTframework中利用SimpleRouter和DefaultRouter来高效生成视图集的路由信息,并详细解释如何使用action装饰器为视图集中的自定义方法生成路由1.1使用Routers创建router对象并注册视图集在创建router对象并注册视图集时,我们会定义一个视图集并注册到ro......
  • train_test_split 导致 xgboost 忽略“enable_categorical”
    我正在使用xgboost版本2.1.0当使用xgboost.DMatrix()和'enable_categorical'=True将包含类别列的pandas数据帧转换为DMatrix时,所有行为均按预期运行,除非数据帧是sklearntrain_test_split()返回的数据帧,尽管所有列的数据类型仍属于类别。以下代码产生预期的......
  • 如何优化 Django 自动重载/启动过程?
    我目前正在开发一个非常大的Django项目,其中包含许多文件,更重要的是,还有大量依赖项,包括Torch和Transformers等包。自从安装Torch以来,我注意到自动重新加载功能和整个启动过程使用开发服务器时的过程变得非常慢。现在我需要10-15秒才能测试我的代码,这在开发过程中非......
  • Django Admin TabularInline:如何通过模型隐藏 M2M 的对象名称?
    如何在管理显示中隐藏Unit_attributeobject(3)?admin.py:fromdjango.contribimportadminfromcore.modelsimportAttribute,UnitclassUnitAttributeInline(admin.TabularInline):[email protected]......
  • 使用celery进行异步处理和定时任务(django)
    一、celery的作用    celery是一个简单、灵活且可靠的分布式系统,用于处理大量消息,同时为操作提供一致的接口。它专注于实时操作,但支持任务调度。Celery主要用于异步任务处理,特别是在Web应用环境中,用于执行后台任务,如发送电子邮件、处理图片、视频转码、运行复杂的......