首页 > 其他分享 >分享两种Pulsar消息积压topic级别策略老化办法

分享两种Pulsar消息积压topic级别策略老化办法

时间:2023-12-26 11:02:00浏览次数:30  
标签:配置 积压 策略 ZooKeeper topic 老化 Pulsar

本文分享自华为云社区《Pulsar消息积压topic级别策略老化的两种方案》,作者: 张俭。

Pulsar像大多数消息中间件一样,支持按时间和大小对消息积压进行老化。但是默认的策略只能在namespace级别配置。本文将介绍如何在topic级别实现老化策略的两种方案。

方案一:开启 TopicLevelPolicy 来实现

默认的策略配置通过在Zookeeper上配置对应的策略,可以通过./pulsar zookeeper-shell命令来登录zookeeper集群查询。但是如果将这一实现方式扩展到topic级别,将会产生大量的(百万、千万级别)的ZooKeeper节点,这对于ZooKeeper集群来说几乎是不可接受的。因此,Pulsar提供了一种新的实现方式,即通过Topic来存储策略配置,而不是通过ZooKeeper来存储。

Pulsar,从2.7.0版本开始,引入了SystemTopic,用于存储Topic的元数据信息,包括Topic的策略配置。主题级策略使用户可以更灵活地管理主题,并不会给 ZooKeeper 带来额外负担。

您可以通过如下配置来开启TopicLevelPolicy

systemTopicEnabled=true
topicLevelPoliciesEnabled=true

然后通过set-backlog-quota命令来设置您想要的老化时间和老化大小

PS: 完整的一些功能,如命令行set-backlog-quota,在3.0.0版本中支持

方案二:通过自定义代码来实现

PulsarTopicLevelPolicy实现需要通过topic存储策略配置,而不是通过ZooKeeper来存储。在实际的极端场景下,Topic中存储的内容可能会丢失(因为未开启Bookkeeper立即落盘或磁盘文件损坏等原因),这将导致策略配置丢失,从而导致策略失效。因此,我们可以通过自定义代码来实现topic级别的策略配置,这样可以避免策略配置丢失的问题。

举个例子,业务可以将策略存放在Mysql中,然后通过PulsarAdmin API来让策略生效。

自定义代码实现Backlog时间策略

分享两种Pulsar消息积压topic级别策略老化办法_消息中间件

自定义代码实现Backlog大小策略

分享两种Pulsar消息积压topic级别策略老化办法_消息中间件_02

点击关注,第一时间了解华为云新鲜技术~

标签:配置,积压,策略,ZooKeeper,topic,老化,Pulsar
From: https://blog.51cto.com/u_15214399/8980125

相关文章

  • Pulsar3.0 升级指北
    Pulsar3.0介绍Pulsar3.0是Pulsar社区推出的第一个LTS长期支持版本。如图所示,LTS版本会最长支持到36个月,而Feature版本最多只有六个月;类似于我们使用的JDK11,17,21都是可以长期使用的;所以也推荐大家都升级到LTS版本。作为首个LTS版本,3.0自然也是自带了许多......
  • RabbitMQ Topic交换机
     代码示例:1.新建两个队列 2.创建交换机,名字叫hmall.topic,类型选择topic 3.hmall.topic交换机绑定第一步的两个队列,绑定过程中填写RoutingKey  4.编写消费者代码监听这两个队列@RabbitListener(queues="topic.queue1")publicvoidlistenQueue05(Str......
  • 【Spring】SpringBoot+RabbitMQ(direct/fanout/topic)の構築方法
     ■POM.xmlの中で、下記の内容を追加<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency>......
  • kafka中如何创建topic?
    通过如下的命令创建topickafka-topics.sh\--bootstrap-server<Kafka集群地址>\--create\--topic<Topic名称>\--partitions<分区数>\--replication-factor<副本因子>  示例bin/kafka-topics.sh\--bootstrap-server122.12......
  • 什么是kafka中topic、分区、副本、偏移量、消息保留策略
    Kafka是一个开源的分布式流式数据平台,也成为分布式消息队列。用于高吞吐量、低延迟的数据发布和订阅。 1、什么是topic(主题)? 在Kafka中,Topic(主题)是数据发布和订阅的基本单位,它代表了相同类型的消息流。 一个Topic可以看作是一个逻辑上的数据流管道,消息的生产者(Prod......
  • 升级到 Pulsar3.0 后深入了解 JWT 鉴权
    背景最近在测试将Pulsar2.11.2升级到3.0.1的过程中碰到一个鉴权问题,正好借着这个问题充分了解下Pulsar的鉴权机制是如何运转的。Pulsar支持Namespace/Topic级别的鉴权,在生产环境中往往会使用topic级别的鉴权,从而防止消息泄露或者其他因为权限管控不严格而导致的问题......
  • java——kafka随笔——broker&主题-topic&分区-partition理解
                  首先,让我们来看一下基础的消息(Message)相关术语:名称解释Broker消息中间件处理节点,⼀个Kafka节点就是⼀个broker,⼀个或者多个Broker可以组成⼀个Kafka集群TopicKafka根据topic对消息进⾏归类,发布到Kafka集群的每条消息都......
  • Rabbitmq消息队列:Topic话题模式简单应用
    一、生产者声明topic话题模式的交换机,分别发送几条消息到不同的路由key。packagetest.topic;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importutils.RabbitmqConUtil;publicclassGive{//定义交换机privatefinals......
  • 【Azure Logic App】消费型逻辑应用在消费Service Bus时遇见消息并发速度慢,消息积压
    问题描述消费型逻辑应用(ConsumptionLogicApp)使用触发器模式消费AzureServiceBus的消息,当ServiceBus中存在大量消息等待消费时,LogicApp消费速度太慢,并发数无法满足需求。造成消息积压,有什么办法可以优化吗? 问题解答在LogicApp的配置中,可以修改“更改触发器并发”来......
  • MQTT 主题通配符和过滤器Topic Wildcards & Topic Filters
    主题名称中引入了级别分隔符/,用于分割主题级别,如果存在,它将主题名称划分为多个“主题级别”。订阅的主题过滤器可以包含特殊的通配符,可以一次订阅多个主题。特殊字符的通配符可以用在订阅过滤器中,但是不能用于主题名称1.主题级别"/"用于分割主题级别,并为主题名称提供......