在分布式系统中,消息队列(Message Queue,MQ)是一种常见的通信方式,它能够解耦系统组件,提供异步通信,提升系统的伸缩性和可靠性。Apache RocketMQ 是一款开源的分布式消息中间件,具有高性能、低延迟、高可靠性和高可用性等特点。本文将介绍如何使用 Apache RocketMQ 实现基于标签过滤的消息生产和消费,并通过示例代码展示具体的实现过程。
环境准备
在开始之前,请确保已完成以下准备工作:
- 安装并启动 RocketMQ 服务。
- 在项目中引入 RocketMQ 的相关依赖。
假设我们已经在 pom.xml
文件中添加了 RocketMQ 的依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.3</version>
</dependency>
生产者代码示例
我们首先来看一下如何实现消息的生产者。生产者的主要任务是将消息发送到指定的主题(Topic)中,并为消息指定标签(Tag)。
package com.takumilove.demo;
import com.takumilove.constant.MqConstant;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.junit.Test;
/**
* 生产者示例
*/
public class GTagTest {
@Test
public void tagProducer() throws Exception {
// 创建生产者实例并指定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("tag_producer_group");
// 设置 NameServer 地址
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
// 启动生产者
producer.start();
// 创建并发送消息
Message message = new Message("tagTopic", "vip1", "我是vip1的文章".getBytes());
Message message2 = new Message("tagTopic", "vip2", "我是vip2的文章".getBytes());
producer.send(message);
producer.send(message2);
System.out.println("消息发送成功");
// 关闭生产者
producer.shutdown();
}
}
在上面的代码中,我们创建了一个名为 tagProducer
的测试方法,用于发送两条消息到 tagTopic
主题中。这两条消息分别携带了不同的标签(tag),vip1
和 vip2
。
消费者代码示例
接下来,我们来看一下如何实现消息的消费者。消费者的任务是从指定的主题中订阅并消费消息,并根据标签进行过滤。
消费者1
@Test
public void tagConsumer1() throws Exception {
// 创建消费者实例并指定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-a");
// 设置 NameServer 地址
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
// 订阅指定主题和标签
consumer.subscribe("tagTopic", "vip1");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
ConsumeConcurrentlyContext consumeConcurrentlyContext
) {
System.out.println("我是vip1的消费者,我正在消费消息" + new String(list.get(0)
.getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
// 阻塞当前线程,保持消费者运行
System.in.read();
}
消费者2
@Test
public void tagConsumer2() throws Exception {
// 创建消费者实例并指定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-b");
// 设置 NameServer 地址
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
// 订阅指定主题和多个标签
consumer.subscribe("tagTopic", "vip1||vip2");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
ConsumeConcurrentlyContext consumeConcurrentlyContext
) {
System.out.println("我是vip2的消费者,我正在消费消息" + new String(list.get(0)
.getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
// 阻塞当前线程,保持消费者运行
System.in.read();
}
在上述代码中,我们创建了两个消费者,tagConsumer1
和 tagConsumer2
。第一个消费者只订阅 vip1
标签的消息,第二个消费者订阅 vip1
和 vip2
标签的消息。
运行示例
- 启动 RocketMQ 的 NameServer 和 Broker。
- 运行
tagProducer
方法,发送两条消息。 - 分别运行
tagConsumer1
和tagConsumer2
方法,观察控制台输出。
通过上述步骤,我们可以看到 tagConsumer1
只消费了 vip1
标签的消息,而 tagConsumer2
消费了 vip1
和 vip2
标签的消息。
总结
本文介绍了如何使用 Apache RocketMQ 实现基于标签过滤的消息生产和消费,通过示例代码展示了生产者和消费者的具体实现方式。希望本文能够帮助你更好地理解和使用 RocketMQ。
标签:消费者,vip1,队列,标签,消息,new,consumer,RocketMQ From: https://blog.csdn.net/Takumilove/article/details/140460561