首页 > 其他分享 >RocketMq发送消息之过滤消息

RocketMq发送消息之过滤消息

时间:2023-09-24 14:34:32浏览次数:35  
标签:String subscribe 过滤 消息 msg consumer RocketMq

过滤消息概述

在大多数情况下,TAG是一个简单而有用的设计,其可以来选择您想要的消息。例如:

// 定义一个group1的消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
// 订阅 TOPIC 主题中的 三个TAG 类型的消息
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

消费者将接收包含TAGA或TAGB或TAGC的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。在这种情况下,可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算。在RocketMQ定义的语法下,可以实现一些简单的逻辑。下面是一个例子:
image

基本语法

RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。

  • 数值比较,比如:>,>=,<,<=,BETWEEN,=;
  • 字符比较,比如:=,<>,IN;
  • IS NULL 或者 IS NOT NULL;
  • 逻辑符号 AND,OR,NOT;

常量支持类型为:

  • 数值,比如:123,3.1415;
  • 字符,比如:'abc',必须用单引号包裹起来;
  • NULL,特殊的常量
  • 布尔值,TRUE 或 FALSE

只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:

public void subscribe(finalString topic, final MessageSelector messageSelector)

生产者代码

在生产者中通过putUserProperty设置一些属性

// 设置一些属性
msg.putUserProperty("a", String.valueOf(i));
public class Producer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = ProducerUtils.getProducer("group1");
        producer.start();
        for (int i = 0; i < 3; i++) {
            // 创建消息,并指定Topic(主题),Tag(消息Tag)和消息体(消息内容)
            Message msg = new Message("FilterTagTopic",
                    "Tag1",
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );
            // 设置一些属性,消费者可以通过consumer.subscribe去或者这个消息
            msg.putUserProperty("a", String.valueOf(i));
            // 发送消息到一个Broker
            SendResult sendResult = producer.send(msg);
            TimeUnit.SECONDS.sleep(1);
            // 通过sendResult返回消息是否成功送达
            System.out.printf("%s%n", sendResult);
        }
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}

消费者代码

消费者只有通过consumer.subscribe("TopicTest", MessageSelector.bySql("消息的匹配规则");才能获取到消息,并且要匹配sql的规则
例如:生产者通过msg.putUserProperty("a", String.valueOf(i)); 设置了消息的属性,那么a为标识,value为值,此时要获取,可以这样写MessageSelector.bySql("a between 0 and 3")这样获取的就是 值为 0-3 的消息

public class Consumer {
    public static void main(String[] args) throws MQClientException {
        // 1.创建消费者Consumer,指定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        // 2.指定NameServer地址
        consumer.setNamesrvAddr("192.168.78.129:9876;192.168.78.130:9876");
        // 3. 只有订阅的消息有这个属性a, a >=0 and a <= 3
        MessageSelector ms = MessageSelector.bySql("a between 0 3");
        // 订阅主题
        consumer.subscribe("FilterTagTopic",ms);

        // 开启消费offset
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        // 4.设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            // 接收消息内容的方法
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("接收到的消息是:"+new String(msg.getBody()));
                }
                // 标记该消息已经被成功消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 5.启动消费者consumer
        consumer.start();
    }
}

看官网的教程就是这样,但是发现启动后报错,抛出
CODE: 1 DESC: The broker does not support consumer to filter message by SQL92
查看原因是因为要在broker.conf文件中加入:enablePropertyFilter = true,由于我们采用的是2主2从,所以指定的文件在/usr/local/rocketmq/conf/2m-2s-sync/ broker-a.properties 中去修改。注意是修改多个哦。

消息过滤不仅仅只有上述方法,我们可以查看subscribe的重载方法:
image

  • subscribe(String topic, String subExpression) 就是直接加标签的这种:
    例如,生产者发送消息时,指明了多个TAG,那么消费者可以通过subExpression去获取
  // 订阅多个Tag
  consumer.subscribe("FilterTagTopic","TAG1 || TAG2 || TAG3");
  • subscribe(String topic, String fullClassName, String filterClassSource)
    这种,是自定义一个过滤器实现类的。
    image
    fullClassName:是指类的全路径 filterClassSource:是指类的java文件路径
    • 1,Broker机器启动多个FilterServer过滤进程
    • 2,Consumer启动后,会想Broker传递一个Java类
    • 3,Consumer从FilterServer拉取消息,FilterServer从Broker拉取消息,按照上传的java类进行过滤,过滤后返回给Consumer

  • 最后一种就是通过sql语句去过滤的啦

标签:String,subscribe,过滤,消息,msg,consumer,RocketMq
From: https://www.cnblogs.com/zgf123/p/17725941.html

相关文章

  • Kibana中的时间过滤器与时间选择器实践案例
    前言Kibana是一个非常强大的数据可视化工具,它可以帮助我们快速地分析和展示数据。在使用Kibana时,时间过滤器和时间选择器是非常重要的功能,它们可以帮助我们更好地理解数据。本文将深入探讨Kibana中的时间过滤器与时间选择器实践案例。时间过滤器时间过滤器是Kibana中的一个非常......
  • Kafka消息生产者拦截器配置最佳实践
    介绍Kafka是一个分布式的消息队列系统,它具有高吞吐量、可扩展性、容错性等优点。在Kafka中,消息生产者可以通过拦截器(interceptor)来对消息进行预处理,例如添加额外的信息、修改消息内容等。本文将深入探讨Kafka消息生产者拦截器配置的最佳实践。拦截器配置在Kafka中,消息生产者可以......
  • RocketMq发送消息之批量消息
    概述批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB发送批量消息如果您每次只发送不超过4MB的消息,则很容易使用批处理,样例如下:Stringtopic="BatchTest";List<M......
  • RocketMq发送消息之延迟消息
    延迟消息比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。使用限制对比于rabbitmq中的延迟消息来说,rockermq并不支持任意时间的延迟,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18级//org/apache/rocket......
  • MQ - 04 基础篇_存储_消息数据和元数据的存储设计
    @[toc]![在这里插入图片描述](https://img-blog.csdnimg.cn/855d3c6d2ef74a1893f352a4545b479c.png)---------------------#导图![在这里插入图片描述](https://img-blog.csdnimg.cn/d0569e3871dd433784f74d76ebee9a9d.png)----------#概述消息数据和元数据的存......
  • RockerMq发送消息之顺序消息
    顺序消息        消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。        顺序消费的原理解析,在默认的情况下消息发送会采取RoundRobin轮询方式把消息发送到不同的queue(分区队列);而消费消......
  • RocketMQ发送消息之同步异步单向
    官网教程:https://rocketmq.apache.org/zh/docs/quickStart/01quickstart基于双主双从异步方式开启的前提下,在maven项目中引入下列依赖<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.1&l......
  • Kafka的消息传递保证和一致性
    前言通过前面的文章,相信大家对Kafka有了一定的了解了,那接下来问题就来了,Kafka既然作为一个分布式的消息队列系统,那它会不会出现消息丢失或者重复消费的情况呢?今天咱们就来一探。实现机制Kafka采用了一系列机制来实现消息传递的保证和一致性,关键点:至少一次的消息传递(AtLeastOnceD......
  • 消息队列中,如何保证消息的顺序性?
    本文选自:advanced-java作者:yanglbme问:如何保证消息的顺序性?面试官心理分析其实这个也是用MQ的时候必问的话题,第一看看你了不了解顺序这个事儿?第二看看你有没有办法保证消息是有顺序的?这是生产系统中常见的问题。面试题剖析我举个例子,我们以前做过一个mysqlbinlog同步的系统,压......
  • 消息队列中,如何保证消息的顺序性?
    本文选自:advanced-java作者:yanglbme问:如何保证消息的顺序性?面试官心理分析其实这个也是用MQ的时候必问的话题,第一看看你了不了解顺序这个事儿?第二看看你有没有办法保证消息是有顺序的?这是生产系统中常见的问题。面试题剖析我举个例子,我们以前做过一个mysqlbinlog同步......