首页 > 其他分享 >【RocketMQ入门到精通】— RocketMQ初级特性能力 | Message Filter消息过滤,可不是web容器的过滤器哦

【RocketMQ入门到精通】— RocketMQ初级特性能力 | Message Filter消息过滤,可不是web容器的过滤器哦

时间:2022-10-27 17:01:36浏览次数:112  
标签:web Tag 订阅 Filter 过滤 消息 consumer RocketMQ

名言警句


任何先进的技术均与魔法无异


追本溯源

【​​经历了6个月的失踪,我将带着干货终究归来!【RocketMQ入门到精通】​​】

什么是消息过滤

Message Filter,可不是web容器的过滤器哦,可以发生在服务端也可以发生在客户端,消息过滤是指消息生产者向Topic中发送消息时,设置消息属性对消息进行分类,消费者订阅Topic时,根据消息属性设置过滤条件对消息进行过滤,只有符合过滤条件的消息才会被投递到消费端进行消费。


RocketMQ消息过滤

  RocketMQ分布式消息队列的消息过滤方式有别于其它MQ中间件,是在Consumer端订阅消息时再做消息过滤的。RocketMQ这么做是在于其Producer端写入消息和Consumer端订阅消息采用分离存储的机制来实现的。

  • Consumer端订阅消息是需要通过ConsumeQueue这个消息的逻辑队列拿到一个索引,然后再从CommitLog里面读取真正的消息实体内容,所以说到底也是还绕不开其存储结构。
  • ConsumeQueue的存储结构如下,可以看到其中有8个字节存储的Message Tag哈希值,基于Tag消息过滤正式基于这个字段值的。

【RocketMQ入门到精通】— RocketMQ初级特性能力 | Message Filter消息过滤,可不是web容器的过滤器哦_sql

主要支持如下2种的过滤方式:简单消息过滤(通过Tag方式实现)和高级过滤消息(通过Filter去实现)

【RocketMQ入门到精通】— RocketMQ初级特性能力 | Message Filter消息过滤,可不是web容器的过滤器哦_sql_02

消费者订阅Topic时若未设置过滤条件,无论消息发送时是否有设置过滤属性,Topic中的所有消息都将被投递到消费端进行消费。

RocketMQ支持的消息过滤方式有两种,Tag过滤和SQL92过滤。

【Tag过滤方式】

Consumer端在订阅消息时除了指定Topic还可以指定TAG,如果一个消息有多个TAG,可以用'||'分隔。其中Consumer端会将这个订阅请求构建成一个SubscriptionData,发送一个Pull消息的请求给Broker端。

Broker端从RocketMQ的文件存储层—Store读取数据之前,会用这些数据先构建一个MessageFilter,然后传给Store。Store从ConsumeQueue读取到一条记录后,会用它记录的消息tag hash值去做过滤,由于在服务端只是根据hashcode进行判断,无法精确对tag原始字符串进行过滤,故在消息消费端拉取到消息后,还需要对消息的原始tag字符串进行比对如果不同,则丢弃该消息,不进行消息消费。

过滤消息样例

在大多数情况下,TAG是一个简单而有用的设计,其可以来选择您想要的消息,例如:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TOPIC_TEST");
consumer.subscribe("TOPIC", "A || B || C");

消费者将接收包含TAGA或TAGB或TAGC的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。

【Filter(SQL92)过滤方式】

大致做法和上面的Tag过滤方式一样,只是在Store层的具体过滤过程不太一样,真正的SQL expression的构建和执行由rocketmq-filter模块负责的。每次过滤都去执行SQL表达式会影响效率,所以RocketMQ使用了BloomFilter避免了每次都去执行。SQL92的表达式上下文为消息的属性。

在这种情况下,可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算。在RocketMQ定义的语法下,可以实现一些简单的逻辑。下面是一个例子。

消息可以未被过滤,符合条件
------------
| messageA |
|----------| a > 5 AND b = 'abc'
| a = 10 | --------------------> Gotten
| b = 'abc'|
| c = true |
------------
消息可以会被过滤,不符合条件
------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 1 | --------------------> Missed
| b = 'abc'|
| c = true |
------------
SQL92基本语法

RocketMQ只定义了一些基本语法来支持这个特性。

  • 数值比较的运算符:>,>=,<,<=,BETWEEN,=;
  • 字符比较的运算符:\=,<>,IN;
  • IS NULL或者IS NOT NULL;
  • 逻辑符号AND,OR,NOT;
常量支持类型为
  • 数值类型:123,3.1415;
  • 字符串类型:‘abc’,必须用单引号包裹起来;
  • NULL,特殊的常量
  • 布尔值,TRUEFALSE

只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:

public void subscribe(finalString topic, final MessageSelector messageSelector)

过滤消息样例
生产者样例

发送消息时,你能通过​​putUserProperty​​来设置消息的属性

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
Message msg = new Message("TopicTest",tag, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 设置一些属性
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);
producer.shutdown();
消费者样例

用MessageSelector.bySql来使用sql筛选消息。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// 只有订阅的消息有这个属性a, a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
);
consumer.start();

消息过滤实际案例(参考官方案例)

以图电商交易场景为例,从客户下单到收到商品这一过程会生产一系列消息,以如下消息为例:

  • 订单消息A
  • 支付消息B
  • 物流消息C

这些消息会发送到名称为Trade_Topic的Topic中,被各个不同的系统所订阅,以如下系统为例:

  • 支付系统:只需订阅支付消息。
  • 物流系统:只需订阅物流消息。
  • 实时计算系统:需要订阅所有和交易相关的消息。
  • 交易成功率分析系统:需订阅订单和支付消息。

过滤示意图如下所示

参考官方原图:​​https://rocketmq.apache.org/zh/assets/images/Tag%E8%BF%87%E6%BB%A4-844cfe6dd033746c7134bde843021ad6.png​

【RocketMQ入门到精通】— RocketMQ初级特性能力 | Message Filter消息过滤,可不是web容器的过滤器哦_字符串_03

Tag的案例代码

对于物流系统和支付系统来说,它们都只订阅单个Tag,此时只需要在调用subcribe接口时明确标明Tag即可。

consumer.subscribe("TagFilterTest", "TagA");

对于实时计算系统来说,它订阅交易Topic下所有的消息,Tag用星号(*)表示即可。

consumer.subscribe("TagFilterTest", "*");

对于交易成功率分析系统来说,它订阅了订单和支付两个Tag的消息,在多个Tag之间用两个竖线(||)分隔即可。

consumer.subscribe("TagFilterTest", "TagA || TagB");

这里需要注意的是,如果同一个消费者多次订阅某个Topic下的Tag,以最后一次订阅为准。

//如下错误代码中,Consumer只能订阅到TagFilterTest下TagB的消息,而不能订阅TagA的消息。
consumer.subscribe("TagFilterTest", "TagA");
consumer.subscribe("TagFilterTest", "TagB");
SQL92的案例代码

SQL92过滤是在消息发送时设置消息的Tag或自定义属性,消费者订阅时使用SQL语法设置过滤表达式,根据自定义属性或Tag过滤消息。

Tag属于一种特殊的消息属性,在SQL语法中,Tag的属性值为TAGS。 开启属性过滤首先要在Broker端设置配置enablePropertyFilter=true,该值默认为false。

以下图电商交易场景为例,从客户下单到收到商品这一过程会生产一系列消息,按照类型将消息分为订单消息和物流消息,其中给物流消息定义地域属性,按照地域分为杭州和上海:

  • 订单消息
  • 物流消息
  • 物流消息且地域为杭州
  • 物流消息且地域为上海

这些消息会发送到名称为Trade_Topic的Topic中,被各个不同的系统所订阅,以如下系统为例:

  • 物流系统1:只需订阅物流消息且消息地域为杭州。
  • 物流系统2:只需订阅物流消息且消息地域为杭州或上海。
  • 订单跟踪系统:只需订阅订单消息。


参考官方案例:​​https://rocketmq.apache.org/zh/assets/images/SQL92%E8%BF%87%E6%BB%A4-716732acb1aad27fc8e7a9e218ebaa65.png​

【RocketMQ入门到精通】— RocketMQ初级特性能力 | Message Filter消息过滤,可不是web容器的过滤器哦_字符串_04

地域将作为自定义属性设置在消息中。

消息发送端

设置消息的自定义属性。

Message msg = new Message("topic", "tagA", "Hello MQ".getBytes());
// 设置自定义属性A,属性值为1。
msg.putUserProperties("a", "1");
消息消费端

使用SQL语法设置过滤表达式,并根据自定义属性过滤消息。

consumer.subscribe("SqlFilterTest", MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
"and (a is not null and a between 0 and 3)"));


标签:web,Tag,订阅,Filter,过滤,消息,consumer,RocketMQ
From: https://blog.51cto.com/alex4dream/5800932

相关文章

  • Go 云原生实战:如何增加 Web 应用配置模块
    1介绍当我们为自己编写程序时,通常会将一些重要的配置项直接写在源代码里,比如:服务器监听的端口、数据库使用的名称和端口号、HTTP请求超时的持续时间...但是,如果我们尝试将......
  • Istio envoy filter
    EnvoyFilterCREnvoyFilter EnvoyFilterCR提供了自定义SidecarEnvoy配置的接口,其支持的配置功能包括修改指定字段的值、添加特定的过滤器甚至是新增Listener和Clus......
  • .NET Core WebApi 快速切换开发/生产环境
    项目结构中有三个配置文件  appsettings.json:通用配置文件appsettings.Development.json:开发环境配置文件appsettings.Production.json:生产环境配置文......
  • Webpack构建速度优化
    前言当我们的项目越来越大,webpack的配置项越来越多时,构建速度会越来越慢,所以我们需要通过一些配置来提高webpack的构建速度。目录缩小范围noParseIgnorePlugin优化......
  • 基于Java websocket的公共聊天程序
    实验中使用的是tomcat的websocket,由于程序部署到apache-tomcat-8.5.24上,所以只需额外添加消息Json解析包:json-org。实际使用中注意修改目标地址:ws://localhost:8080/GameDem......
  • csharp-webuploader+csharp如何实现分片+断点续传
    ​文件夹数据库处理逻辑public class DbFolder{    JSONObjectroot;       public DbFolder()    {        this.root= new JSONOb......
  • c#-webuploader+c#如何实现分片+断点续传
    ​以ASP.NETCoreWebAPI 作后端 API ,用 Vue 构建前端页面,用 Axios 从前端访问后端 API,包括文件的上传和下载。 准备文件上传的API #region 文件上传 ......
  • 连接WebSocket失败
    ​ 浏览器控制台报错信息: WebSocketconnectionto'ws://127.0.0.1:9100/'failed:Errorinconnectionestablishment:net::ERR_CONNECTION_REFUSED  ​编......
  • .Net Core WebApi AutoFac用法
    1.安装Autofac.Extensions.DependencyInjection管理包UI层安装 2.在Program里面配置服务提供工厂  3.在Startup里面添加一个配置容器的方法使用基于扫描程序集......
  • Node原生开发Web服务器介绍
     1.  使用Node创建一个HTTP的服务器,并能够接收到客服端发来的请求获取到客服端具体的请求数据,并根据不同的请求数据进行处理将处理之后的结果,响应回客户端,并断......