初探RocketMQ
1、引言
Message Queue(消息队列),从字面上理解:首先它是一个队列。FIFO先进先出的数据结构-队列。消息队列就是所谓的存放消息的队列。
消息队列解决的不是消息的队列的目的,解决的是通信问题。
比如以电商订单系统为例,如果各服务之间使用同步通信,不仅耗时较久,且过程中受到网络波动的影响,不能保证高成功率。因此,使用异步的通信方式对架构进行改造。
使用异步的通信方式对模块间的调用进行解耦,可以快速的提升系统的吞吐量。上游执行完消息的发送业务后立即获得结果,下游多个服务订阅到消息后各自消费。通过消息队列,屏蔽底层的通信协议,使得解耦和并行消费得以实现。
2、RocketMQ介绍
随着使⽤中队列和虚拟主题的增加,阿⾥巴巴团队使⽤的ActiveMQ IO 模块达到了瓶颈。为了尽⼒通过节流、断路器或降级来解决这个问题,但效果不佳。所以开始关注当时流⾏的消息传递解决⽅案Kafka。不幸的是, Kafk⽆法满⾜要求,尤其是在低延迟和⾼可靠性⽅⾯。在这种情况下,决定发明⼀种新的消息传递引擎来处理更⼴泛的⽤例,从传统的发布/订阅场景到⼤容量实时零丢失交易系统。⽬前RocketMQ已经开源给Apache基⾦会。如今,已有 100 多家公司在其业务中使⽤开源版本的 RocketMQ。
消息产品 | 客户端SDK | 协议和规范 | 订购 信息 | 预定消息 | 批量消息 | 广播消息 | 消息过滤器 | 服务器触发的重新交付 | 消息存储 | 消息追溯 | 消息优先级 | 高可用性和故障转移 | 消息跟踪 | 配置 | 管理和运 |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
ActiveMQ | Java、.Net、C++等 | 推送模型,支持OpenWire、STOMP、AMQP、MQTT、JMS | Exclusive Consumer或Exclusive Queues可以保证排序 | 支持 | 不支持 | 支持 | 支持 | 不支持 | 使用 JDBC可高性能日志支持非常快速的持久化,例如levelDB、kahaDB | 支持 | 支持 | 支持,取决于存储,如果使用levelDB则需要ZooKeeper服务器 | 不支持 | 默认配置为低级,用户需优化配置参数 | 支持 |
kafka | Java、Scala等 | 拉取模型,支持TCP | 确保分区内消息的排序 | 不支持 | 支持,带有异步生产者 | 不支持 | 支持,可以使用kafka Streams过滤消息 | 不支持 | 高性能文件存储 | 支持,偏移量指示 | 不支持 | 支持,需要ZooKeeper服务器 | 不支持 | kafka使用键值对格式进行配置。这些值可以从文件或以编程方式提供。 | 支持,使用终端命令公开核心指标 |
RocketMQ | Java、C++、Go | 拉取模型,支持TCP、JMS、OpenMessaging | 确保消息的严格排序,并且可以优雅地横向扩展 | 支持 | 支持,具有同步模式以避免消息丢失 | 支持 | 支持基于SQL92的属性过滤器表达式 | 支持 | 高性能和低延迟的文件存储 | 支持,时间戳和偏移量两种表示 | 不支持 | 支持的主从模型,无需其他套件 | 支持 | 开箱即用,用户只需注意一些配置 | 支持,丰富的Web和终端命令以公开核心指标 |
3、RocketMQ的基本概念
3.1 技术架构
RocketMQ架构上主要分为四部分,如上图所示:
-
Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
-
Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。
-
NameServer:NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的ZooKeeper,支持Broker的动态注册于发现。主要包括两个功能:Broker管理,NameServer接收Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Consumer通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息。Producer,Consumer仍然可以动态感知Broker的路由信息。
-
BrokerServer:Broker主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker包含了以下几个重要子模块:
- Remoting Module:整个Broker的实体,负责处理来自clients端的请求。
- Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息。
- Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
- HA Service:高可用服务,提供Master Broker和Slave Broker之间的数据同步功能。
- Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。
3.2 部署架构
RocketMQ网络部署特点:
- NameServer是⼀个⼏乎⽆状态节点,可集群部署,节点之间⽆任何信息同步。
- Broker部署相对复杂, Broker分为Master与Slave,⼀个Master可以对应多个Slave,但是⼀个Slave只能对应⼀个Master, Master与Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义, BrokerId为0表示Master,⾮0表示Slave。 Master也可以部署多个。每个Broker与NameServer集群中的所有节点建⽴⻓连接,定时注册Topic信息到所有NameServer。 注意:当前RocketMQ版本在部署架构上⽀持⼀Master多Slave,但只有BrokerId=1的从服务器才会参与消息的读负载。
- Producer与NameServer集群中的其中⼀个节点(随机选择)建⽴⻓连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建⽴⻓连接,且定时向Master发送⼼跳。 Producer完全⽆状态,可集群部署。
- Consumer与NameServer集群中的其中⼀个节点(随机选择)建⽴⻓连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、 Slave建⽴⻓连接,且定时向Master、 Slave发送⼼跳。 Consumer既可以从Master订阅消息,也可以从Slave订阅消息,消费者在向Master拉取消息时, Master服务器会根据拉取偏移量与最⼤偏移量的距离(判断是否读⽼消息,产⽣读I/O),以及从服务器是否可读等因素建议下⼀次是从Master还是Slave拉取。
结合部署架构图,描述集群工作流程:
- 启动NameServer, NameServer起来后监听端⼝,等待Broker、 Producer、 Consumer连上来,相当于⼀个路由控制中⼼。
- Broker启动,跟所有的NameServer保持⻓连接,定时发送⼼跳包。⼼跳包中包含当前Broker信息(IP+端⼝等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
- 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时⾃动创建Topic。
- Producer发送消息,启动时先跟NameServer集群中的其中⼀台建⽴⻓连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择⼀个队列,然后与队列所在的Broker建⽴⻓连接从⽽向Broker发消息。
- Consumer跟Producer类似,跟其中⼀台NameServer建⽴⻓连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建⽴连接通道,开始消费消息。
4、快速开始
4.1 下载RocketMQ
这里使用的是RocketMQ4.7.1版本,去官方下载即可:
4.2 安装RocketMQ
- 准备一台装有Linux系统的虚拟机,这里我用的是Centos7版本
- 安装JDK,并配置环境变量
- 安装RocketMQ,上传RocketMQ安装包并使用
unzip
命令解压缩再/usr/local/rocketmq
目录下。 - 配置JDK和RocketMQ的环境变量。
注意,RocketMQ的环境变量用来加载
ROCKETMQ_HOME/conf
下的配置文件,如果不配置则无法启动NameServer和Broker。
完成后执行命令,让环境变量生效:
source /etc/profile
修改bin/runserver.sh
文件,由于RocketMQ默认设置的JVM内存为4G,但虚拟机一般没有这么大内存,因此调整为512MB。
vim runserver.sh
4.3 启动NameServer
启动RocketMQ服务需要先启动NameServer。
在bin目录内使用静默方式启动:
nohup ./mqnamesrv -n 192.168.159.33:9876 &
4.4 启动Broker
- 修改broker的JVM参数配置,将默认8G内存修改为512MB。
vim runbroker.sh
- 在
conf/broker.conf
文件中加入如下配置,开启自动创建Topic功能。
autoCreateTopicEnable=true
-
以静默方法启动Broker
nohup ./mqbroker -n 192.168.159.33:9876 &
4.5 使用发送和接收消息验证MQ
- 配置NameServer的环境变量
在发送/接收消息之前,需要告诉客户端NameServer的位置。配置环境变量NAMESRV_ADDR
:
export NAMESRV_ADDR=192.168.159.33:9876
- 使用
bin/tools.sh
工具验证消息的发送,默认会发1000条消息
./tools.sh org.apache.rocketmq.example.quickstart.Producer
执行上述命令后,发送消息的日志如下:
- 使用
bin/tools.sh
工具验证消息的接收
./tools.sh org.apache.rocketmq.example.quickstart.Consumer
执行上述命令后,可以看到接收的消息
4.6 关闭服务器
-
关闭Broker:
./mqshutdown broker
-
关闭NameServer:
./mqshutdown namesrv