顺序消息
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。
消费者监听消息:会独立的开出一个线程去消费,例如独立开出一个线程去消费张三的队列消息。
创建一个订单类:用于模拟业务
package com.zgf.mq.rocketmq.order;
import java.util.ArrayList;
import java.util.List;
/**
* 订单的步骤
*/
public class OrderStep {
private long orderId;
private String desc;
public long getOrderId() {
return orderId;
}
public void setOrderId(long orderId) {
this.orderId = orderId;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
@Override
public String toString() {
return "OrderStep{" +
"orderId=" + orderId +
", desc='" + desc + '\'' +
'}';
}
/**
* 生成模拟订单数据
*/
private List<OrderStep> buildOrders() {
List<OrderStep> orderList = new ArrayList<OrderStep>();
/**
* 15103111039L: 创建、付款、推送、完成
* 15103111065L:创建、付款、完成
* 15103117235L:创建、付款、完成
*/
OrderStep orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("推送");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
return orderList;
}
}
生产者代码:
/**
* 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)
*/
public class Producer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
// 1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1");
// 2.指定NameServer地址
producer.setNamesrvAddr("192.168.78.129:9876;192.168.78.130:9876");
// 3.启动producer
producer.start();
// 构建消息集合
List<OrderStep> orderSteps = OrderStep.buildOrders();
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);
// 发送消息
for (int i = 0;i<orderSteps.size();i++) {
// 需要发送的消息实体
String body = dateStr+" Hello RocketMQ " + orderSteps.get(i);
Message message = new Message("OrderTopic","Order",""+i,body.getBytes());
/**
* 参数1:消息对象
* 参数2:MessageQueueSelector 消息队列的选择器
* 参数3:选择队列的业务标识(订单id)
*/
SendResult sendResult = producer.send(message, new MessageQueueSelector() {
/**
* 来实现消息的自定义路由逻辑,以确保消息被按照特定的规则发送到合适的队列。这对于需要发送顺序消息或者自定义消息路由的场景非常有用。
* @param mqs 一个包含了所有可用消息队列的列表。你可以在这个列表中选择一个队列来发送消息。
* @param msg 要发送的消息对象。
* @param arg 自定义参数,可以用于根据业务逻辑来选择消息队列。通常情况下,你可以使用这个参数来实现选择队列的逻辑。
* @return
*/
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 这里的arg,就是我们的 orderSteps.get(i).getOrderId()
Long orderId = (Long) arg;
// 根据订单id选择要发送的queue
long index = orderId % mqs.size(); // 例如 1 % 10 =1(可用的队列,即1号队列)
return mqs.get((int) index);
}
}, orderSteps.get(i).getOrderId());// 订单id,没有特殊的情况,都是唯一的
System.out.println("发送结果:"+sendResult);
}
producer.shutdown(); // 关闭
}
}
这里引入了新的内容:MessageQueueSelector
接口,该接口注释也写的很清楚,就不介绍了。
消费者代码:
/**
* 顺序消息的消费
*/
public class Consumer {
public static void main(String[] args) throws MQClientException {
// 1.创建消费者Consumer,指定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
// 2.指定NameServer地址
consumer.setNamesrvAddr("192.168.78.129:9876;192.168.78.130:9876");
// 3.订阅主题Topic和Tag
consumer.subscribe("OrderTopic","*");//消费Tag1中的消息,通过 || xxx 可以消费多个消息,通过*可以消费OrderTopic下的所有消息
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 4.设置回调函数,处理消息,用于监听消息
consumer.registerMessageListener(new MessageListenerOrderly() {
/**
* 如果消耗失败,不建议抛出异常,而不是返回ConsumerOrderlyStatus.SSUSPEND_CURRENT_QUEUE_A_MOMEN
* @param msgs msgs.size() >= 1 <br> DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here
* @param context
* @return 消息状态
*/
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("线程:【"+Thread.currentThread().getName()+"】: 消费消息:"+new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
// 5.启动消费者consumer
consumer.start();
}
}
结果:
从上面的结果,我们可以看出来实现了分区有序,即一个线程只完成唯一标识的订单消息。
顺序消息缺陷
- 消费顺序消息的并行度依赖于队列的数量 ;
- 队列热点问题,个别队列由于哈希不均导致消息过多,消费速度跟不上,产生消息堆积问题;
- 遇到消息失败的消息,无法跳过,当前队列消费暂停;