名言警句
任何先进的技术均与魔法无异
追本溯源
【经历了6个月的失踪,我将带着干货终究归来!【RocketMQ入门到精通】】
什么是消息过滤
Message Filter,可不是web容器的过滤器哦,可以发生在服务端也可以发生在客户端,消息过滤是指消息生产者向Topic中发送消息时,设置消息属性对消息进行分类,消费者订阅Topic时,根据消息属性设置过滤条件对消息进行过滤,只有符合过滤条件的消息才会被投递到消费端进行消费。
RocketMQ消息过滤
RocketMQ分布式消息队列的消息过滤方式有别于其它MQ中间件,是在Consumer端订阅消息时再做消息过滤的。RocketMQ这么做是在于其Producer端写入消息和Consumer端订阅消息采用分离存储的机制来实现的。
- Consumer端订阅消息是需要通过ConsumeQueue这个消息的逻辑队列拿到一个索引,然后再从CommitLog里面读取真正的消息实体内容,所以说到底也是还绕不开其存储结构。
- ConsumeQueue的存储结构如下,可以看到其中有8个字节存储的Message Tag的哈希值,基于Tag的消息过滤正式基于这个字段值的。
主要支持如下2种的过滤方式:简单消息过滤(通过Tag方式实现)和高级过滤消息(通过Filter去实现)
消费者订阅Topic时若未设置过滤条件,无论消息发送时是否有设置过滤属性,Topic中的所有消息都将被投递到消费端进行消费。
RocketMQ支持的消息过滤方式有两种,Tag过滤和SQL92过滤。
【Tag过滤方式】
Consumer端在订阅消息时除了指定Topic还可以指定TAG,如果一个消息有多个TAG,可以用'||'分隔。其中Consumer端会将这个订阅请求构建成一个SubscriptionData,发送一个Pull消息的请求给Broker端。
Broker端从RocketMQ的文件存储层—Store读取数据之前,会用这些数据先构建一个MessageFilter,然后传给Store。Store从ConsumeQueue读取到一条记录后,会用它记录的消息tag hash值去做过滤,由于在服务端只是根据hashcode进行判断,无法精确对tag原始字符串进行过滤,故在消息消费端拉取到消息后,还需要对消息的原始tag字符串进行比对,如果不同,则丢弃该消息,不进行消息消费。
过滤消息样例
在大多数情况下,TAG是一个简单而有用的设计,其可以来选择您想要的消息,例如:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TOPIC_TEST");
consumer.subscribe("TOPIC", "A || B || C");
消费者将接收包含TAGA或TAGB或TAGC的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。
【Filter(SQL92)过滤方式】
大致做法和上面的Tag过滤方式一样,只是在Store层的具体过滤过程不太一样,真正的SQL expression的构建和执行由rocketmq-filter模块负责的。每次过滤都去执行SQL表达式会影响效率,所以RocketMQ使用了BloomFilter避免了每次都去执行。SQL92的表达式上下文为消息的属性。
在这种情况下,可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算。在RocketMQ定义的语法下,可以实现一些简单的逻辑。下面是一个例子。
消息可以未被过滤,符合条件
------------
| messageA |
|----------| a > 5 AND b = 'abc'
| a = 10 | --------------------> Gotten
| b = 'abc'|
| c = true |
------------
消息可以会被过滤,不符合条件
------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 1 | --------------------> Missed
| b = 'abc'|
| c = true |
------------
SQL92基本语法
RocketMQ只定义了一些基本语法来支持这个特性。
- 数值比较的运算符:>,>=,<,<=,BETWEEN,=;
- 字符比较的运算符:\=,<>,IN;
- IS NULL或者IS NOT NULL;
- 逻辑符号AND,OR,NOT;
常量支持类型为
- 数值类型:123,3.1415;
- 字符串类型:‘abc’,必须用单引号包裹起来;
- NULL,特殊的常量
- 布尔值,TRUE或FALSE
只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:
public void subscribe(finalString topic, final MessageSelector messageSelector)
过滤消息样例
生产者样例
发送消息时,你能通过putUserProperty
来设置消息的属性
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
Message msg = new Message("TopicTest",tag, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 设置一些属性
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);
producer.shutdown();
消费者样例
用MessageSelector.bySql来使用sql筛选消息。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// 只有订阅的消息有这个属性a, a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
);
consumer.start();
消息过滤实际案例(参考官方案例)
以图电商交易场景为例,从客户下单到收到商品这一过程会生产一系列消息,以如下消息为例:
- 订单消息A
- 支付消息B
- 物流消息C
这些消息会发送到名称为Trade_Topic的Topic中,被各个不同的系统所订阅,以如下系统为例:
- 支付系统:只需订阅支付消息。
- 物流系统:只需订阅物流消息。
- 实时计算系统:需要订阅所有和交易相关的消息。
- 交易成功率分析系统:需订阅订单和支付消息。
过滤示意图如下所示
Tag的案例代码
对于物流系统和支付系统来说,它们都只订阅单个Tag,此时只需要在调用subcribe接口时明确标明Tag即可。
consumer.subscribe("TagFilterTest", "TagA");
对于实时计算系统来说,它订阅交易Topic下所有的消息,Tag用星号(*)表示即可。
consumer.subscribe("TagFilterTest", "*");
对于交易成功率分析系统来说,它订阅了订单和支付两个Tag的消息,在多个Tag之间用两个竖线(||)分隔即可。
consumer.subscribe("TagFilterTest", "TagA || TagB");
这里需要注意的是,如果同一个消费者多次订阅某个Topic下的Tag,以最后一次订阅为准。
//如下错误代码中,Consumer只能订阅到TagFilterTest下TagB的消息,而不能订阅TagA的消息。
consumer.subscribe("TagFilterTest", "TagA");
consumer.subscribe("TagFilterTest", "TagB");
SQL92的案例代码
SQL92过滤是在消息发送时设置消息的Tag或自定义属性,消费者订阅时使用SQL语法设置过滤表达式,根据自定义属性或Tag过滤消息。
Tag属于一种特殊的消息属性,在SQL语法中,Tag的属性值为TAGS。 开启属性过滤首先要在Broker端设置配置enablePropertyFilter=true,该值默认为false。
以下图电商交易场景为例,从客户下单到收到商品这一过程会生产一系列消息,按照类型将消息分为订单消息和物流消息,其中给物流消息定义地域属性,按照地域分为杭州和上海:
- 订单消息
- 物流消息
- 物流消息且地域为杭州
- 物流消息且地域为上海
这些消息会发送到名称为Trade_Topic的Topic中,被各个不同的系统所订阅,以如下系统为例:
- 物流系统1:只需订阅物流消息且消息地域为杭州。
- 物流系统2:只需订阅物流消息且消息地域为杭州或上海。
- 订单跟踪系统:只需订阅订单消息。
地域将作为自定义属性设置在消息中。
消息发送端
设置消息的自定义属性。
Message msg = new Message("topic", "tagA", "Hello MQ".getBytes());
// 设置自定义属性A,属性值为1。
msg.putUserProperties("a", "1");
消息消费端
使用SQL语法设置过滤表达式,并根据自定义属性过滤消息。
consumer.subscribe("SqlFilterTest", MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
"and (a is not null and a between 0 and 3)"));