延迟消息
比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
使用限制
对比于rabbitmq中的延迟消息来说,rockermq并不支持任意时间的延迟,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18级
// org/apache/rocketmq/store/config/MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
生产者代码
通过 msg.setDelayTimeLevel(2);
设置延迟等级,2 表示5s的延迟
public class producer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer,指定一个生产者组
DefaultMQProducer producer = new DefaultMQProducer("group1");
// 设置NameServer的地址
producer.setNamesrvAddr("192.168.78.129:9876;192.168.78.130:9876");
// 启动Producer实例
producer.start();
for (int i = 0; i < 10; i++) {
// 创建消息,并指定Topic(主题),Tag(消息Tag)和消息体(消息内容)
Message msg = new Message("DelayTopic",
"Tag1",
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 设置 延迟时间 , 目前只支持"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";对应18个等级
msg.setDelayTimeLevel(2);// 表示5s
// 发送消息到一个Broker
SendResult sendResult = producer.send(msg);
TimeUnit.SECONDS.sleep(1);
// 通过sendResult返回消息是否成功送达
System.out.printf("%s%n", sendResult);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
消费者代码
public class Consumer {
public static void main(String[] args) throws MQClientException {
// 1.创建消费者Consumer,指定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
// 2.指定NameServer地址
consumer.setNamesrvAddr("192.168.78.129:9876;192.168.78.130:9876");
// consumer.setNamesrvAddr("127.0.0.1:9876");
// 3.订阅主题Topic和Tag
consumer.subscribe("DelayTopic","Tag1");//消费Tag1中的消息,通过 || xxx 可以消费多个消息,通过*可以消费 DelayTopic 下的所有消息
// 4.设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
// 接收消息内容的方法
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("接收到的消息消息ID:"+msg.getMsgId()+"延迟时间:"+(System.currentTimeMillis()-msg.getStoreTimestamp()));
}
// 标记该消息已经被成功消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 5.启动消费者consumer
consumer.start();
}
}