消息的顺序消费在很多交易型的业务场景中都会被要求实现,而且,消息队列的顺序消费解决方案在很多互联网公司的面试中经常会被问到。
索尔老师在使用了多个消息队列后发现,虽然每个消息队列都有各自的顺序消费解决方案,但是RocketMQ经过了多年电商的洗礼,其功能性的要求,已经设计的非常全面。这样的全面可以通过RocketMQ消息模型的架构设计得以体现。我们看看RocketMQ是怎么解决消息的顺序消费。
一、RocketMQ的消息模型
1.技术架构
RocketMQ架构上主要分为四部分,如上图所示:
NameServer:NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以动态感知Broker的路由的信息。
Producer:消息的发布者,支持分布式集群方式部署。消息发布者通过消息队列的负载均衡模块选择相应的Broker集群进行消息投递,投递的过程低延迟并且支持快速失败。
Consumer:消息的消费者,支持分布式集群方式部署。RocketMQ同时支持push推和pull拉的两种消息消费模式。消息消费者也支持集群和广播两种消费方式,同时提供了实时消息的订阅机制,满足大多数用户的需求。
BrokerServer:Broker主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker包含了以下几个重要子模块。
- Remoting Module:整个Broker的实体,负责处理来自clients端的请求。
- Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息。
- Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
- HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。
- Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。
2. 部署架构
RocketMQ 网络部署特点
- NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
- Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。注意:当前RocketMQ版本在部署架构上支持一Master多Slave,但只有BrokerId=1的从服务器才会参与消息的读负载。
- Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送信息。Producer完全无状态,可集群部署。
- Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送信息。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,消费者在向Master拉取消息时,Master服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读I/O),以及从服务器是否可读等因素建议下一次是从Master还是Slave拉取。
结合部署架构图,描述集群工作流程:
- 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
- Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
- 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
- Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
- Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。
二、顺序消息的应用场景
顺序消费的应用场景有很多,无论是在面试,或是实际生产环境中要解决顺序消费问题,都需要对消息队列的应用场景有一定的理解。
1.AIOT中消息的顺序消费
比如有这么一个物联网的应用场景,IOT中的设备在初始化时需要按顺序接收这样的消息:
- 操作1:设置设备名称
- 操作2:设置设备的网络
- 操作3:重启设备使配置生效
如果这个顺序颠倒了,可能就没有办法让设备的配置生效,因为只有重启设备才能让配置生效,但重启的消息却在设置设备消息之前被消费。
2.用户服务中的顺序消费
应用为了提高用户粘合度,往往通过积分制度提供用户活跃度。比如接下来有这一系列的场景:
- 操作1:新注册用户,将用户积分设置为10分。
- 操作2:奖励行为,比如用户完善了个人信息,则积分+5分。
- 操作3:惩罚行为,比如用户发表违规言论,则积分-3分。
如果上述的操作是顺序进行的,则用户积分为12分。如果上述操作的步骤顺序发生了变化,比如消费者先消费了操作2的消息,再消费操作1和3的消息,则分数变为7分,此时就出现了因为乱序消费导致的错误结果。
三、如何实现顺序消息
顺序消息要求消费者消费消息的顺序按照发送者发送消息的顺序执行。RocketMQ中实现的消息顺序有两个维度,分别是局部有序和全局有序。
1.局部有序
局部消息指的是消费者消费某个topic的某个队列中的消息是顺序的。在图上的八个队列中,消费者可以随机的消费这8个队列,也就是说消费者不能保证消费队列的顺序。但是消费者在消费某一个队列的时候,一定可以进行顺序消费。也就是说,在消费者两次访问一个队列中的消息时,一定是按照从左到右的顺序依次消费消息。所以针对于某一个队列来讲,消费是顺序的。但如果把问题定位在整个队列中时,不同的消息在不同的队列中,又不能保证消息的有序性,不如消息A到了队列2,消息B到了队列1,消费者先消费了队列1再消费队列2,就不能保证有序性。但如果消息C随着消息B进入队列1,那么消息C一定是在消息B之后被消费。
消费者使用MessageListenerOrderly类做消息监听,实现局部顺序。
package com.qf.producer.order;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
/**
* 顺序消息
* @author Thor
* @公众号 Java架构栈
*/
public class OrderConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("OrderTopicTest", "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for(MessageExt msg:msgs){
System.out.println("消息内容:"+new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
2.全局有序
消费者消费全部消息都是顺序的,只能通过一个某个topic只有一个队列才能实现,这种应用场景较少,且性能较差。
3.乱序消费
消费者消费消息不需要关注消息的顺序。消费者使用MessageListenerConcurrently类做消息监听。
package com.qf.producer.order;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
/**
* 顺序消息
* @author Thor
* @公众号 Java架构栈
*/
public class OrderConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("OrderTopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for(MessageExt msg:msgs){
System.out.println("消息内容:"+new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}