过滤消息概述
在大多数情况下,TAG是一个简单而有用的设计,其可以来选择您想要的消息。例如:
// 定义一个group1的消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
// 订阅 TOPIC 主题中的 三个TAG 类型的消息
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
消费者将接收包含TAGA或TAGB或TAGC的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。在这种情况下,可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算。在RocketMQ定义的语法下,可以实现一些简单的逻辑。下面是一个例子:
基本语法
RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。
- 数值比较,比如:>,>=,<,<=,BETWEEN,=;
- 字符比较,比如:=,<>,IN;
- IS NULL 或者 IS NOT NULL;
- 逻辑符号 AND,OR,NOT;
常量支持类型为:
- 数值,比如:123,3.1415;
- 字符,比如:'abc',必须用单引号包裹起来;
- NULL,特殊的常量
- 布尔值,TRUE 或 FALSE
只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:
public void subscribe(finalString topic, final MessageSelector messageSelector)
生产者代码
在生产者中通过putUserProperty
设置一些属性
// 设置一些属性
msg.putUserProperty("a", String.valueOf(i));
public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = ProducerUtils.getProducer("group1");
producer.start();
for (int i = 0; i < 3; i++) {
// 创建消息,并指定Topic(主题),Tag(消息Tag)和消息体(消息内容)
Message msg = new Message("FilterTagTopic",
"Tag1",
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 设置一些属性,消费者可以通过consumer.subscribe去或者这个消息
msg.putUserProperty("a", String.valueOf(i));
// 发送消息到一个Broker
SendResult sendResult = producer.send(msg);
TimeUnit.SECONDS.sleep(1);
// 通过sendResult返回消息是否成功送达
System.out.printf("%s%n", sendResult);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
消费者代码
消费者只有通过consumer.subscribe("TopicTest", MessageSelector.bySql("消息的匹配规则");
才能获取到消息,并且要匹配sql的规则
例如:生产者通过msg.putUserProperty("a", String.valueOf(i));
设置了消息的属性,那么a为标识,value为值,此时要获取,可以这样写MessageSelector.bySql("a between 0 and 3")
这样获取的就是 值为 0-3 的消息
public class Consumer {
public static void main(String[] args) throws MQClientException {
// 1.创建消费者Consumer,指定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
// 2.指定NameServer地址
consumer.setNamesrvAddr("192.168.78.129:9876;192.168.78.130:9876");
// 3. 只有订阅的消息有这个属性a, a >=0 and a <= 3
MessageSelector ms = MessageSelector.bySql("a between 0 3");
// 订阅主题
consumer.subscribe("FilterTagTopic",ms);
// 开启消费offset
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 4.设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
// 接收消息内容的方法
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("接收到的消息是:"+new String(msg.getBody()));
}
// 标记该消息已经被成功消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 5.启动消费者consumer
consumer.start();
}
}
看官网的教程就是这样,但是发现启动后报错,抛出
CODE: 1 DESC: The broker does not support consumer to filter message by SQL92
查看原因是因为要在broker.conf文件中加入:enablePropertyFilter = true,由于我们采用的是2主2从,所以指定的文件在/usr/local/rocketmq/conf/2m-2s-sync/ broker-a.properties 中去修改。注意是修改多个哦。
消息过滤不仅仅只有上述方法,我们可以查看subscribe的重载方法:
- subscribe(String topic, String subExpression) 就是直接加标签的这种:
例如,生产者发送消息时,指明了多个TAG,那么消费者可以通过subExpression
去获取
// 订阅多个Tag
consumer.subscribe("FilterTagTopic","TAG1 || TAG2 || TAG3");
-
subscribe(String topic, String fullClassName, String filterClassSource)
这种,是自定义一个过滤器实现类的。
fullClassName:是指类的全路径 filterClassSource:是指类的java文件路径
• 1,Broker机器启动多个FilterServer过滤进程
• 2,Consumer启动后,会想Broker传递一个Java类
• 3,Consumer从FilterServer拉取消息,FilterServer从Broker拉取消息,按照上传的java类进行过滤,过滤后返回给Consumer -
最后一种就是通过sql语句去过滤的啦