一、概述
RocketMQ
消息队列集群主要包括NameServer
、Broker
(Master
/Slave
)、Producer
、Consumer
4个角色,基本通讯流程如下:
Broker
启动后需要完成一次将自己注册至NameServer
的操作;随后每隔30s
时间定时向NameServer
上报Topic
路由信息。- 消息生产者
Producer
作为客户端发送消息时候,需要根据消息的Topic
从本地缓存的TopicPublishInfoTable
获取路由信息。如果没有则更新路由信息会从NameServer
上重新拉取,同时Producer
会默认每隔30s
向NameServer
拉取一次路由信息。 - 消息生产者
Producer
根据2
)中获取的路由信息选择一个队列(MessageQueue
)进行消息发送;Broker
作为消息的接收者接收消息并落盘存储。 - 消息消费者
Consumer
根据2
)中获取的路由信息,并再完成客户端的负载均衡后,选择其中的某一个或者某几个消息队列来拉取消息并进行消费。
从上面1~3中可以看出在消息生产者,Broker
和NameServer
之间都会发生通信(这里只说了MQ
的部分通信),因此如何设计一个良好的网络通信模块在MQ
中至关重要,它将决定RocketMQ
集群整体的消息传输能力与最终的性能。
rocketmq-remoting
模块是RocketMQ
消息队列中负责网络通信的模块,它几乎被其他所有需要网络通信的模块(诸如rocketmq-client
、rocketmq-broker
、rocketmq-namesrv
)所依赖和引用。为了实现客户端与服务器之间高效的数据请求与接收,RocketMQ
消息队列自定义了通信协议并在Netty
的基础之上扩展了通信模块。
二、Remoting通信类结构
- RemotingService是最上层的接口,定义了三个方法
void start();
void shutdown();
void registerRPCHook(RPCHook rpcHook);
- RemotingServer:定义了服务端的接口,继承了上层接口
RemotingService
- RemotingClient:定义了客户端的接口,继承了上层
RemotingService
RemotingServer
与RemotingClient
定义的方法是类似的,主要包含了同步、异步、oneway
方式的通信和注册处理器processor
,其余的就是针对服务端和客户端特定的接口方法,比如服务端根据requestCode
获取处理器的getProcessorPair()
方法,客户端获取NameServer
地址列表getNameServerAddressList()
方法。
- NettyRemotingAbstract:
Netty
通信抽象类,定义并封装了服务端与客户端公共方法。这个也是RocketMQ
网络通信的核心类。 - NettyRemotingServer:服务端的实现类,实现了
RemotingServer
接口,继承NettyRemotingAbstract
抽象类。 - NettyRemotingClient:客户端的实现类,实现类
RemotingClient
接口,继承NettyRemotingAbstract
抽象类。
三、协议设计与编解码
在Client
和Server
之间完成一次消息发送时,需要对发送的消息进行一个协议约定,因此就有必要自定义RocketMQ
的消息协议。同时,为了高效地在网络中传输消息和对收到的消息读取,就需要对消息进行编解码。在RocketMQ
中,RemotingCommand
这个类在消息传输过程中对所有数据内容的封装,不但包含了所有的数据结构,还包含了编码解码操作。
Header字段 | 类型 | Request说明 | Response说明 |
---|---|---|---|
code |
int |
请求操作码,应答方根据不同的请求码进行 不同的业务处理 |
应答响应码。0 表示成功,非0 则表示各种错误 |
language |
LanguageCode |
请求方实现的语言 | 应答方实现的语言 |
version |
int |
请求方程序的版本 | 应答方程序的版本 |
opaque |
int |
相当于requestId ,在同一个连接上的不同请求标识码,与响应消息中的相对应 |
应答不做修改直接返回 |
flag |
int |
区分是普通RPC 还是onewayRPC 的标志 |
区分是普通RPC 还是onewayRPC 的标志 |
remark |
String |
传输自定义文本信息 | 传输自定义文本信息 |
extFields |
HashMap |
请求自定义扩展信息 | 响应自定义扩展信息 |
可见传输内容主要可以分为以下4部分:
- 消息长度:总长度,四个字节存储,占用一个
int
类型; - 序列化类型&消息头长度:同样占用一个
int
类型,第一个字节表示序列化类型,后面三个字节表示消息头长度; - 消息头数据:经过序列化后的消息头数据;
- 消息主体数据:消息主体的二进制字节数据内容;
四、消息的通信方式和流程
在RocketMQ
消息队列中支持通信的方式主要有同步(sync
)、异步(async
)、单向(oneway
)三种。其中“单向”通信模式相对简单,一般用在发送心跳包场景下,无需关注其Response
。这里,主要介绍RocketMQ
的异步通信流程。
五、Reactor多线程设计
RocketMQ
的RPC
通信采用Netty
组件作为底层通信库,同样也遵循了Reactor
多线程模型,同时又在这之上做了一些扩展和优化。
上面的框图中可以大致了解RocketMQ
中NettyRemotingServer
的Reactor
多线程模型。一个Reactor
主线程(eventLoopGroupBoss
,即为上面的1)负责监听TCP
网络连接请求,建立好连接,创建SocketChannel
,并注册到selector
上。RocketMQ
的源码中会自动根据OS
的类型选择NIO
和Epoll
,也可以通过参数配置),然后监听真正的网络数据。拿到网络数据后,再丢给Worker
线程池(eventLoopGroupSelector
,即为上面的“N”,源码中默认设置为3),在真正执行业务逻辑之前需要进行SSL
验证、编解码、空闲检查、网络连接管理,这些工作交给defaultEventExecutorGroup
(即为上面的“M1”,源码中默认设置为8)去做。而处理业务操作放在业务线程池中执行,根据RomotingCommand
的业务请求码code
去processorTable
这个本地缓存变量中找到对应的processor
,然后封装成task
任务后,提交给对应的业务processor
处理线程池来执行(sendMessageExecutor
,以发送消息为例,即为上面的“M2”)。从入口到业务逻辑的几个步骤中线程池一直再增加,这跟每一步逻辑复杂性相关,越复杂,需要的并发通道越宽。
线程数 | 线程名 | 线程具体说明 |
---|---|---|
1 | NettyBoss_%d | Reactor主线程 |
N | NettyServerEPOLLSelector_%d_%d | Reactor线程池 |
M1 | NettyServerCodecThread_%d | Worker线程池 |
M2 | RemotingExecutorThread_%d | 业务processor处理线程池 |