作者为在读研究生,目前研二,计划在公众号记录学习常用中间件笔记,以及明年更新面试经历!
消息追踪
设置消息追踪需要修改 broker 启动的配置文件,添加一行配置:traceTopicEnable=true
即可,操作如下:
# 进入到 rocketmq 的安装目录中
# 先复制一份配置文件
cp broker.conf custom.conf
# 在自定义配置文件中添加一行配置
vi custom.conf
## 添加配置
traceTopicEnable=true
# 杀死原来的 broker 进程,再重新启动即可
# 先查看原来 broker 进程 id
jps
# 杀死 broker
kill -9 [进程id]
# 重新启动 broker,并指定配置文件
nohup sh bin/mqbroker -c conf/custom.conf ‐n localhost:9876 & autoCreateTopicEnable=true
在发送消息的时候,指定消息的 keys
就可以在 DashBoard 中观看到消息的追踪记录了
public class GlobalProducer {
public static void main(String[] args) throws Exception {
// true 即设置允许消息追踪
DefaultMQProducer producer = new DefaultMQProducer(
"producer_group",
true);
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 12; i++) {
Message msg = new Message(
"Global-Orderly-Topic",
"Global_Orderly_Tag",
("( " + i + " )message from GlobalProducer").getBytes());
// 设置消息的 keys
msg.setKeys("Global_Orderly_Tag");
producer.send(msg);
}
System.out.println("Send Finished.");
}
}
之后就可以在 DashBoard 中查看消息的追踪记录了:
点击进去,查看消息追踪详细信息如下:
延时消息实战
上边的案例使用了 SpringCloudStream 的 API 进行消息的收发,这里使用原生 API 进行消息收发实战,通过设置消息的延时时间,可以让消息等待指定时间之后再发送
5.x 之前,只能设置固定时间的延时消息
5.x 之后,可以自定义任意时间的延时消息
由于这里引入的 SpringCloudAlibaba 整合的 RocketMQ 是 4.9.4 版本的,因此只能设置固定时间的延时消息
延时时间有以下几种,通过 Leven 进行定位,如果 delayTimeLevel = 2
,就是第二个延时时间 5s
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
消费者代码如下:
public class Consumer {
public static void main(String[] args) throws Exception {
// 1、创建消费者对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_group");
// 2、为消费者对象设置 NameServer 地址
consumer.setNamesrvAddr("127.0.0.1:9876");
// 3、订阅主题
consumer.subscribe("custom-delay-topic", "*");
// 4、注册监听消息,并打印消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
String printMsg = new String(msg.getBody()) + ", recvTime: "
+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date());
System.out.println(printMsg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 5、把消费者直接启动起来
consumer.start();
}
}
生产者代码如下:
public class Producer {
public static void main(String[] args) throws Exception {
// 1、创建生产者对象
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
// 2、为生产者对象设置 NameServer 地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 3、把我们的生产者直接启动起来
producer.start();
// 4、创建消息、并发送消息
for (int i = 0; i < 3; i++) {
// public Message(String topic, String tags, String keys, byte[] body) {
Message message = new Message(
"custom-delay-topic",
"delayTag",
"CUSTOM_DELAY",
("("+i+")Hello Message From Delay Producer, " +
"date="+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())).getBytes()
);
// 设置定时的逻辑
// "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
message.setDelayTimeLevel(2);
// 利用生产者对象,将消息直接发送出去
producer.send(message);
}
System.out.println("Send Finished.");
}
}