首页 > 其他分享 >MQ系列16:MQ实现消息过滤处理

MQ系列16:MQ实现消息过滤处理

时间:2023-10-19 12:45:49浏览次数:33  
标签:16 标签 过滤 Tag MQ 消息 属性

MQ系列1:消息中间件执行原理
MQ系列2:消息中间件的技术选型
MQ系列3:RocketMQ 架构分析
MQ系列4:NameServer 原理解析
MQ系列5:RocketMQ消息的发送模式
MQ系列6:消息的消费
MQ系列7:消息通信,追求极致性能
MQ系列8:数据存储,消息队列的高可用保障
MQ系列9:高可用架构分析
MQ系列10:如何保证消息幂等性消费
MQ系列11:如何保证消息可靠性传输
MQ系列12:如何保证消息顺序性
MQ系列13:消息大量堆积如何为解决
MQ系列14:MQ如何做到消息延时处理
MQ系列15:MQ实现批量消息处理

1 背景

消息队列作为发布订阅模型的消息中间件广泛应用于上下游业务集成场景。在实际业务场景中,同一个主题下的消息往往会被多个不同的下游业务方处理,各下游的处理逻辑不同,只需要关注自身逻辑需要的消息子集。所以在 消息中心和消费者之间,需要有一种消息过滤功能,可以帮助消费者更高效地过滤自己需要的消息集合,避免大量无效消息投递给消费者,降低下游系统处理压力。
Apache RocketMQ 很好的支持了这一能力,它解决了单个业务域即同一个主题内不同消息子集的过滤问题。

2 关于消息过滤

2.1 概念

在消费者订阅了某个主题后,Apache RocketMQ 会将该主题中的所有消息投递给消费者。若消费者只需要关注部分消息,可通过设置过滤条件在 Apache RocketMQ 服务端进行过滤,只获取到需要关注的消息子集,避免接收到大量无效的消息。所以,过滤的本质就是将符合条件的消息投递给消费者,而不是将匹配到的消息过滤掉。
Apache RocketMQ 的消息过滤功能通过生产者和消费者对消息的属性、标签进行定义,并在 Apache RocketMQ 服务端根据过滤条件进行筛选匹配,将符合条件的消息投递给消费者进行消费。

2.2 消息过滤说明

2.2.1 原理介绍

image

备注:图片直接使用官网提供的

消息过滤主要通过以下几个关键流程实现:

  • 生产者:生产者在初始化消息时预先为消息设置一些属性和标签,用于后续消费时指定过滤目标。
  • 消费者:消费者在初始化及后续消费流程中通过调用订阅关系注册接口,向服务端上报需要订阅指定主题的哪些消息,即过滤条件。
  • 服务端:消费者获取消息时会触发服务端的动态过滤计算,Apache RocketMQ 服务端根据消费者上报的过滤条件的表达式进行匹配,并将符合条件的消息投递给消费者。

2.2.2 消息过滤分类

Apache RocketMQ 支持Tag标签过滤和SQL属性过滤,这两种过滤方式对比如下:

对比项 Tag标签过滤 SQL属性过滤
过滤目标 消息的Tag标签。 消息的属性,包括用户自定义属性以及系统属性(Tag是一种系统属性)。
过滤能力 精准匹配。 SQL语法匹配。
适用场景 简单过滤场景、计算逻辑简单轻量。 复杂过滤场景、计算逻辑较复杂。

2.3 Tag标签过滤

Tag标签过滤方式是 Apache RocketMQ 提供的基础消息过滤能力,基于生产者为消息设置的Tag标签进行匹配。生产者在发送消息时,设置消息的Tag标签,消费者需指定已有的Tag标签来进行匹配订阅。
Tag标签设置规则:

  • Tag由生产者发送消息时设置,每条消息允许设置一个Tag标签。
  • Tag使用可见字符,建议长度不超过128字符。

生产消息:发送的时候,需要设置Tag标签

Message message = messageBuilder.setTopic("topicTest")
//设置消息索引键,可根据关键字精确查找某条消息
.setKeys("msgKey")
//设置消息Tag,这样消费端可以根据Tag过滤消息
//该语句表示消息的Tag设置为"TagTest1"
.setTag("TagTest1")
// 设置消息体
.setBody("hello world!".getBytes())
.build();

订阅消息:匹配单个或者多个Tag标签。

String topic = "topicTest";

//1、第一种情况,只订阅消息标签为"TagTest1"的消息。
FilterExpression filterExpression = new FilterExpression("TagTest1", FilterExpressionType.TAG);
//2、第二种情况,订阅消息标签为"TagTest1"、"TagTest2"或"TagTest3"的消息。
FilterExpression filterExpression = new FilterExpression("TagTest1||TagTest2||TagTest3", FilterExpressionType.TAG);

pushConsumer.subscribe(topic, filterExpression);

如上,消费者可以限制接收包含 TagTest1 或 TagTest2 或 TagTest3 的消息,但是限制是一个消息只能有一个标签,这无法应对互联网复杂的应用场景。在这种情况下,可以在消息中设置一些属性,再使用SQL表达式通过筛选属性来筛选消息。下面我们看看怎么实现。

2.4 SQL属性过滤

SQL属性过滤是 RocketMQ 提供的高级消息过滤方式,通过生产者为消息设置的属性(Key)及属性值(Value)进行匹配。生产者在发送消息时可设置多个属性,消费者订阅时可设置SQL语法的过滤表达式过滤多个属性。

2.4.1 基本语法

RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。

  • 数值比较,比如:>,>=,<,<=,BETWEEN,=;
  • 字符比较,比如:=,<>,IN;
  • IS NULL 或者 IS NOT NULL;
  • 逻辑符号 AND,OR,NOT;

常量支持类型为:

  • 数值,比如:123,3.1415;
  • 字符,比如:‘abc’,必须用单引号包裹起来;
  • NULL,特殊的常量
  • 布尔值,TRUE 或 FALSE

只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:

public void subscribe(finalString topic, final MessageSelector messageSelector)

2.4.2 消息生产者

发送消息时,设置了消息Tag标签并定义了属性,用于做消息过滤

Message message = messageBuilder.setTopic("topicTest")
//设置消息索引键,可根据关键字精确查找某条消息
.setKeys("msgKey")
//设置消息Tag,这样消费端可以根据Tag过滤消息
//该语句表示消息的Tag设置为"TagTest1"
.setTag("TagTest1")
//消息也可以设置自定义的分类属性,比如下面这句话表示为消息自定义一个属性,该属性为性别,属性值为1(男)或者0(女)。
.addProperty("Sex", 1)
// 设置消息体
.setBody("hello world!".getBytes())
.build();

2.4.3 消息消费者

使用过滤表达式进行消息筛选,如下:

String topic = "topic";

//只订阅性别为1(男)的消息。
FilterExpression filterExpression = new FilterExpression("Sex IS NOT NULL AND Sex=1", FilterExpressionType.SQL92);
simpleConsumer.subscribe(topic, filterExpression);

//只订阅性别为1(男)且年龄大于18的消息。
FilterExpression filterExpression = new FilterExpression("Sex IS NOT NULL AND Age IS NOT NULL AND Sex = 1 AND Age > 18", FilterExpressionType.SQL92);
simpleConsumer.subscribe(topic, filterExpression);

//订阅所有消息
FilterExpression filterExpression = new FilterExpression("True", FilterExpressionType.SQL92);
simpleConsumer.subscribe(topic, filterExpression);

3 总结

Rocket MQ的消息过滤功能是在服务端进行的,可以根据消息的标签、属性等进行过滤。具体来说,Rocket MQ主要支持以下两种过滤方式:
TAG过滤:TAG是消息的业务标识,可以通过设置Tag表达式,判断消息是否包含指定的Tag,从而进行过滤。这种过滤方式简单直观,适用于基于Tag进行消息分类的场景。
SQL92过滤:可以使用SQL92表达式来灵活地过滤消息的Tag和属性。这种方式提供了更强大的过滤能力,可以根据复杂的条件进行消息筛选。
需要注意的是,Rocket MQ的消息过滤功能虽然强大,但是也会增加服务端的处理负担。因此,在使用时需要根据实际情况进行权衡,避免过度依赖消息过滤功能导致系统性能下降。

标签:16,标签,过滤,Tag,MQ,消息,属性
From: https://www.cnblogs.com/wzh2010/p/17206047.html

相关文章

  • RabbitMQ的使用
    一、消息中间件RabbitMQ---概述和概念【一】1、概述1、大多应用中,可通过消息服务中间件来提升系统异步通信、扩展解耦能力2、消息服务中两个重要概念:消息代理(messagebroker)和目的地(destination)当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目的地。3、......
  • ARC166B题解
    发现还没有和我一样的做法。觉得B比A好想的多。令\(A_i\)为\(a_i\)变成\(A\)的倍数最少次数,\(B_i,C_i,AB_i,AC_i,BC_i,ABC_i\)同理。那么我们就有\(A_i=(A-A\bmod{a_i})\bmodA\),其他同理。这一大坨东西显然都能在\(O(n)\)的时间复杂度内算出来。剩下的就很好......
  • 初学Bokeh:使用自适应绘图大小【16】跬步
    学习Bokeh:使用自适应绘图大小【16】跬步在绘图的过程中,如果要使绘图自动适应浏览器或屏幕大小,可以使用属性:sizing_modefrombokeh.plottingimportfigure,show#preparesomedata#定义显示数据x=[1,2,3,4,5]y=[4,5,5,7,2]#createanewplotwithrespo......
  • 题解 CF1651F【Tower Defense】
    题解CF1651F【TowerDefense】problem一个塔防游戏。一共有\(n\)个塔按\(1\simn\)的顺序排成一列,每座塔都有魔力容量\(c_i\)和魔力恢复速率\(r_i\)。对于一座塔\(i\),每过一秒它的魔力\(m_i\)会变为\(\min(m_i+r_i,c_i)\)。每座塔初始时满魔力。一共有\(q\)个......
  • 204 K8S API资源对象介绍03 (Job CronJob Endpoint ConfigMap Secret) 2.12-2.16
    一、API资源对象Job一次性运行后就退出的Pod1.1使用kubect生成YAML文件#kubectlcreatejobjob01--image=busybox--dry-run=client-oyaml>job01.yaml#vimjob01.yaml#catjob01.yamlapiVersion:batch/v1kind:Jobmetadata:creationTimestamp:nullnam......
  • 10.16
    今日代码:500行今日时间:4小时学习内容:今天大数据学习了MapReduce知识,人机交互学习了css的选择器知识。写了一下大数据作业词频统计任务编程实践,任务要求:在Linux系统本地创建两个文件,即文件wordfile1.txt和wordfile2.txt,文件wordfile1.txt的内容格式如下,需要将zhangsan换成自己名字......
  • Spring Boot中的过滤器、拦截器、监听器技巧汇总:让你快速成为大神
    ......
  • 169. 多数元素
    给定一个大小为n的数组nums,返回其中的多数元素。多数元素是指在数组中出现次数大于⌊n/2⌋的元素。你可以假设数组是非空的,并且给定的数组总是存在多数元素。示例1:输入:nums=[3,2,3]输出:3思路本题常见的三种解法:哈希表统计法:遍历数组nums,用HashMap......
  • ActiveMQ升级版本操作
    ActiveMQ升级版本操作ActiveMQ依赖JDK版本MQ版本号Build-Jdkapache-activemq-5.10.01.7apache-activemq-5.11.01.7apache-activemq-5.12.01.7apache-activemq-5.13.01.7apache-activemq-5.14.01.7apache-activemq-5.15.01.8apache-activemq-......
  • AtCoder Regular Contest 167
    Preface补一下上周日的ARC,因为当天白天和队友一起VP了一场所以就没有精力再打一场了这场经典C计数不会D这种贪心乱搞反而是一眼秒了,后面的EF过的太少就没看A-ToastsforBreakfastParty用一个类似于蛇形的放法就好了,比如对于\(n=9,m=5\),放法为:567894321#includ......