首页 > 其他分享 >使用RocketMQ 实现基于标签过滤的消息队列生产和消费

使用RocketMQ 实现基于标签过滤的消息队列生产和消费

时间:2024-07-16 13:59:47浏览次数:9  
标签:消费者 vip1 队列 标签 消息 new consumer RocketMQ

在分布式系统中,消息队列(Message Queue,MQ)是一种常见的通信方式,它能够解耦系统组件,提供异步通信,提升系统的伸缩性和可靠性。Apache RocketMQ 是一款开源的分布式消息中间件,具有高性能、低延迟、高可靠性和高可用性等特点。本文将介绍如何使用 Apache RocketMQ 实现基于标签过滤的消息生产和消费,并通过示例代码展示具体的实现过程。

环境准备

在开始之前,请确保已完成以下准备工作:

  1. 安装并启动 RocketMQ 服务。
  2. 在项目中引入 RocketMQ 的相关依赖。

假设我们已经在 pom.xml 文件中添加了 RocketMQ 的依赖:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.9.3</version>
</dependency>

生产者代码示例

我们首先来看一下如何实现消息的生产者。生产者的主要任务是将消息发送到指定的主题(Topic)中,并为消息指定标签(Tag)。

package com.takumilove.demo;

import com.takumilove.constant.MqConstant;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.junit.Test;

/**
 * 生产者示例
 */
public class GTagTest {
    @Test
    public void tagProducer() throws Exception {
        // 创建生产者实例并指定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("tag_producer_group");
        // 设置 NameServer 地址
        producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
        // 启动生产者
        producer.start();
        // 创建并发送消息
        Message message = new Message("tagTopic", "vip1", "我是vip1的文章".getBytes());
        Message message2 = new Message("tagTopic", "vip2", "我是vip2的文章".getBytes());
        producer.send(message);
        producer.send(message2);
        System.out.println("消息发送成功");
        // 关闭生产者
        producer.shutdown();
    }
}

在上面的代码中,我们创建了一个名为 tagProducer 的测试方法,用于发送两条消息到 tagTopic 主题中。这两条消息分别携带了不同的标签(tag),vip1vip2

消费者代码示例

接下来,我们来看一下如何实现消息的消费者。消费者的任务是从指定的主题中订阅并消费消息,并根据标签进行过滤。

消费者1

@Test
public void tagConsumer1() throws Exception {
    // 创建消费者实例并指定消费者组名
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-a");
    // 设置 NameServer 地址
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    // 订阅指定主题和标签
    consumer.subscribe("tagTopic", "vip1");
    // 注册消息监听器
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
                                                        ConsumeConcurrentlyContext consumeConcurrentlyContext
        ) {
            System.out.println("我是vip1的消费者,我正在消费消息" + new String(list.get(0)
                                                                                      .getBody()));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    // 启动消费者
    consumer.start();
    // 阻塞当前线程,保持消费者运行
    System.in.read();
}

消费者2

@Test
public void tagConsumer2() throws Exception {
    // 创建消费者实例并指定消费者组名
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-b");
    // 设置 NameServer 地址
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    // 订阅指定主题和多个标签
    consumer.subscribe("tagTopic", "vip1||vip2");
    // 注册消息监听器
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
                                                        ConsumeConcurrentlyContext consumeConcurrentlyContext
        ) {
            System.out.println("我是vip2的消费者,我正在消费消息" + new String(list.get(0)
                                                                                      .getBody()));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    // 启动消费者
    consumer.start();
    // 阻塞当前线程,保持消费者运行
    System.in.read();
}

在上述代码中,我们创建了两个消费者,tagConsumer1tagConsumer2。第一个消费者只订阅 vip1 标签的消息,第二个消费者订阅 vip1vip2 标签的消息。

运行示例

  1. 启动 RocketMQ 的 NameServer 和 Broker。
  2. 运行 tagProducer 方法,发送两条消息。
  3. 分别运行 tagConsumer1tagConsumer2 方法,观察控制台输出。

通过上述步骤,我们可以看到 tagConsumer1 只消费了 vip1 标签的消息,而 tagConsumer2 消费了 vip1vip2 标签的消息。

总结

本文介绍了如何使用 Apache RocketMQ 实现基于标签过滤的消息生产和消费,通过示例代码展示了生产者和消费者的具体实现方式。希望本文能够帮助你更好地理解和使用 RocketMQ。

标签:消费者,vip1,队列,标签,消息,new,consumer,RocketMQ
From: https://blog.csdn.net/Takumilove/article/details/140460561

相关文章

  • Day10(栈与队列) | 150. 逆波兰表达式求值 239. 滑动窗口最大值 347.前 K 个高频元
    150.逆波兰表达式求值给你一个字符串数组tokens,表示一个根据逆波兰表示法表示的算术表达式。请你计算该表达式。返回一个表示表达式值的整数。注意:有效的算符为'+'、'-'、'*'和'/'。每个操作数(运算对象)都可以是一个整数或者另一个表达式。两个整数之间的除法总是......
  • 代码随想录算法训练营第六十六天 | Bellman_ford 队列优化算法(SPFA)、Bellman_ford之
    Bellman_ford队列优化算法(SPFA)题目链接:https://kamacoder.com/problempage.php?pid=1152文档讲解:https://programmercarl.com/kamacoder/0094.%E5%9F%8E%E5%B8%82%E9%97%B4%E8%B4%A7%E7%89%A9%E8%BF%90%E8%BE%93I-SPFA.html思路Bellman_ford算法每次松弛都是对所......
  • zookeeper+kafka消息队列群集部署
    目录消息队列1:什么是消息队列2:消息队列的特征3:为什么需要消息队列Kafka基础与入门1:kafka基本概念2:kafka角色术语3:kafka拓扑架构4:Topic和partition5:Producer生产机制6:Consumer消费机制zookeeper概念介绍1:zookeeper应用举例2:zookeeper的工作原理是什么?3:zookeeper......
  • 易优CMS模板标签articlepay文章付费
      [基础用法]  标签:articlepay   描述:文章模型实现文章付费阅读,会员专享,会员付费,在使用之前先在文章模型开启付费阅读  属性:   aid=''文档id   id=''可以任意指定循环里的变量名替代c_field,假设id='c_field',模板调用如:{$c_field.hidden}变成 {$c_fiel......
  • 延迟队列
     绑定队列编写监听器@RabbitListener(bindings=@QueueBinding(value=@Queue(MqConstants.Queue.LEARNING_RECORD_QUEUE),exchange=@Exchange(value=MqConstants.Exchange.LEARNING_DELAY_EXCHANGE,type=ExchangeTypes.TOPIC,delayed="true&quo......
  • 【数据结构】线性结构——数组、链表、栈和队列
    目录前言一、数组(Array)1.1优点1.2缺点1.3适用场景二、链表(LinkedList)2.1优点2.2缺点2.3适用场景三、栈(Stack)3.1优点3.2缺点3.3适用场景四、队列(Queue)4.1优点4.2缺点4.3适用场景......
  • 【JavaScript脚本宇宙】解密六大Node.js消息队列库:选对工具,事半功倍
    从Bull到NSQ:探索Node.js消息队列库的全貌前言在现代软件开发中,消息队列是一种常见的通信模式,用于实现异步任务处理、解耦系统组件、以及实现可靠的事件驱动架构。Node.js作为一个流行的后端开发平台,有许多优秀的消息队列库可以供开发者选择和使用。本文将介绍六个流行的No......
  • YOLOv8中根据标签绘制真实框
    这个在写论文的过程中获取展示图片的时侯可能会需要用的。最近也是自己在弄目标检测方面的东西,然后这也是自己碰到的问题,想着能分享一下,希望对有需要的人有所帮助。也欢迎大家来讨论问题、交流心得。importcv2importos#定义输入文件夹和输出文件夹路径input_img_folder......
  • POSIX消息队列
    一.POSIX消息队列概述什么是POSIX消息队列?POSIX消息队列是POSIX标准(PortableOperatingSystemInterface)的一部分,它提供了一种进程间通信(IPC)机制,允许不同的进程通过队列交换消息。应用场景:需要异步通信或者多个进程需要协调工作中1.1POSIX消息队列特点**独立性:**消息队......
  • 易优CMS模板标签assign定义变量模板文件中定义变量,可在其他标签里使用该变量
    【基础用法】标签:assign描述:模板文件中定义变量,可在其他标签里使用该变量用法:{eyou:assignname='typeid'value='5'/}文件:无涉及表字段:name=''变量名value=''赋给变量名的值底层字段:无 【更多示例】-------------------------------示例1----------------------......