首页 > 其他分享 >RocketMQ之通信机制

RocketMQ之通信机制

时间:2023-05-06 13:55:25浏览次数:40  
标签:通信 线程 消息 机制 NameServer RocketMQ 客户端

一、概述

RocketMQ消息队列集群主要包括NameServerBroker(Master/Slave)、ProducerConsumer4个角色,基本通讯流程如下:

  1. Broker启动后需要完成一次将自己注册至NameServer的操作;随后每隔30s时间定时向NameServer上报Topic路由信息。
  2. 消息生产者Producer作为客户端发送消息时候,需要根据消息的Topic从本地缓存的TopicPublishInfoTable获取路由信息。如果没有则更新路由信息会从NameServer上重新拉取,同时Producer会默认每隔30sNameServer拉取一次路由信息。
  3. 消息生产者Producer根据2)中获取的路由信息选择一个队列(MessageQueue)进行消息发送;Broker作为消息的接收者接收消息并落盘存储。
  4. 消息消费者Consumer根据2)中获取的路由信息,并再完成客户端的负载均衡后,选择其中的某一个或者某几个消息队列来拉取消息并进行消费。

从上面1~3中可以看出在消息生产者,BrokerNameServer之间都会发生通信(这里只说了MQ的部分通信),因此如何设计一个良好的网络通信模块在MQ中至关重要,它将决定RocketMQ集群整体的消息传输能力与最终的性能。

rocketmq-remoting模块是RocketMQ消息队列中负责网络通信的模块,它几乎被其他所有需要网络通信的模块(诸如rocketmq-clientrocketmq-brokerrocketmq-namesrv)所依赖和引用。为了实现客户端与服务器之间高效的数据请求与接收,RocketMQ消息队列自定义了通信协议并在Netty的基础之上扩展了通信模块。

二、Remoting通信类结构

rocketmqDesign3.png

  • RemotingService是最上层的接口,定义了三个方法
void start();
void shutdown();
void registerRPCHook(RPCHook rpcHook);
  • RemotingServer:定义了服务端的接口,继承了上层接口RemotingService
  • RemotingClient:定义了客户端的接口,继承了上层RemotingService

RemotingServerRemotingClient定义的方法是类似的,主要包含了同步、异步、oneway方式的通信和注册处理器processor,其余的就是针对服务端和客户端特定的接口方法,比如服务端根据requestCode获取处理器的getProcessorPair()方法,客户端获取NameServer地址列表getNameServerAddressList()方法。

  • NettyRemotingAbstract:Netty通信抽象类,定义并封装了服务端与客户端公共方法。这个也是RocketMQ网络通信的核心类。
  • NettyRemotingServer:服务端的实现类,实现了RemotingServer接口,继承NettyRemotingAbstract抽象类。
  • NettyRemotingClient:客户端的实现类,实现类RemotingClient接口,继承NettyRemotingAbstract抽象类。

三、协议设计与编解码

ClientServer之间完成一次消息发送时,需要对发送的消息进行一个协议约定,因此就有必要自定义RocketMQ的消息协议。同时,为了高效地在网络中传输消息和对收到的消息读取,就需要对消息进行编解码。在RocketMQ中,RemotingCommand这个类在消息传输过程中对所有数据内容的封装,不但包含了所有的数据结构,还包含了编码解码操作。

Header字段 类型 Request说明 Response说明
code int 请求操作码,应答方根据不同的请求码进行
不同的业务处理
应答响应码。
0表示成功,非0则表示各种错误
language LanguageCode 请求方实现的语言 应答方实现的语言
version int 请求方程序的版本 应答方程序的版本
opaque int 相当于requestId,在同一个连接上的不同
请求标识码,与响应消息中的相对应
应答不做修改直接返回
flag int 区分是普通RPC还是onewayRPC的标志 区分是普通RPC还是onewayRPC的标志
remark String 传输自定义文本信息 传输自定义文本信息
extFields HashMap 请求自定义扩展信息 响应自定义扩展信息

rocketmqDesign4.png

可见传输内容主要可以分为以下4部分:

  1. 消息长度:总长度,四个字节存储,占用一个int类型;
  2. 序列化类型&消息头长度:同样占用一个int类型,第一个字节表示序列化类型,后面三个字节表示消息头长度;
  3. 消息头数据:经过序列化后的消息头数据;
  4. 消息主体数据:消息主体的二进制字节数据内容;

四、消息的通信方式和流程

RocketMQ消息队列中支持通信的方式主要有同步(sync)、异步(async)、单向(oneway)三种。其中“单向”通信模式相对简单,一般用在发送心跳包场景下,无需关注其Response。这里,主要介绍RocketMQ的异步通信流程。

rocketmqDesign5.png

五、Reactor多线程设计

RocketMQRPC通信采用Netty组件作为底层通信库,同样也遵循了Reactor多线程模型,同时又在这之上做了一些扩展和优化。

rocketmqDesign6.png

上面的框图中可以大致了解RocketMQNettyRemotingServerReactor多线程模型。一个Reactor主线程(eventLoopGroupBoss,即为上面的1)负责监听TCP网络连接请求,建立好连接,创建SocketChannel,并注册到selector上。RocketMQ的源码中会自动根据OS的类型选择NIOEpoll,也可以通过参数配置),然后监听真正的网络数据。拿到网络数据后,再丢给Worker线程池(eventLoopGroupSelector,即为上面的“N”,源码中默认设置为3),在真正执行业务逻辑之前需要进行SSL验证、编解码、空闲检查、网络连接管理,这些工作交给defaultEventExecutorGroup(即为上面的“M1”,源码中默认设置为8)去做。而处理业务操作放在业务线程池中执行,根据RomotingCommand的业务请求码codeprocessorTable这个本地缓存变量中找到对应的processor,然后封装成task任务后,提交给对应的业务processor处理线程池来执行(sendMessageExecutor,以发送消息为例,即为上面的“M2”)。从入口到业务逻辑的几个步骤中线程池一直再增加,这跟每一步逻辑复杂性相关,越复杂,需要的并发通道越宽。

线程数 线程名 线程具体说明
1 NettyBoss_%d Reactor主线程
N NettyServerEPOLLSelector_%d_%d Reactor线程池
M1 NettyServerCodecThread_%d Worker线程池
M2 RemotingExecutorThread_%d 业务processor处理线程池

标签:通信,线程,消息,机制,NameServer,RocketMQ,客户端
From: https://www.cnblogs.com/ciel717/p/17375131.html

相关文章

  • RocketMQ之重试机制
    一、概述Producer对发送失败的消息进行重新发送的机制,称为消息发送重试机制,也称为消息重投机制。对于消息重投,需要注意以下几点:生产者在发送消息时,若采用同步或异步发送方式,发送失败会重试,但oneway消息发送方式发送失败是没有重试机制的;只有普通消息具有发送重试机制,顺序消......
  • linux局域网通信软件都有哪些?要如何选择?
    出于安全性考虑和上级单位要求,不少原本使用Windows系统电脑的单位都在逐渐把单位内的电脑替代为linux系统电脑,但由于原先使用的局域网通信软件没有做linux适配,无法使用,只能替换为能在linux系统上使用的局域网通信软件。那么linux局域网通信软件如何选择?可以参考以下几点:功能不同的l......
  • IM开发者的零基础通信技术入门(十二):上网卡顿?网络掉线?一文即懂!
    【来源申明】本文引用了微信公众号“鲜枣课堂”的《上网慢?经常掉线?这篇文章告诉你该怎么办!》文章内容。为了更好的内容呈现,即时通讯网在引用和收录时内容有改动,转载时请注明原文来源信息,尊重原作者的劳动。1、本文内容概述对于不太了解网络通信的人来说(包括开发者),可能会经常碰......
  • CINN 中子图编译缓存机制
    采用「问-答」形式记录研读CINN开源框架的笔记Q:CINN中子图编译的入口是在哪里?for(constauto&node_vec:clusters){//<-------逐个遍历每个子图//Classifyvarnodetoinputs,outputs,andinternals.GraphNodeSetcluster_set(node_vec.begin(),n......
  • C++中的多线程编程和同步机制
    C++中的多线程编程和同步机制使得程序员可以利用计算机的多核心来提高程序的运行效率和性能。本文将介绍多线程编程和同步机制的基本概念和使用方法。多线程编程基础在C++中,使用<thread>库来创建和管理线程。线程可以通过函数、成员函数或者Lambda表达式来实现。以下是一个使......
  • rocketmq启动nameserver的坑
    当你启动rocketmq时可以启动但找不到日志时:第一:减小JVM的内存其次:要到bin目录中输入启动命令。楼主就是第二步解决的 ......
  • 类加载机制和Bean的生命周期
    类加载机制和Bean的生命周期是Java中非常重要的两个概念,它们分别对应了Java类的加载和对象的创建、初始化、销毁等过程。类加载机制是指当Java程序需要使用某个类时,JVM会通过类加载器将该类加载到内存中,并对该类进行初始化。类加载器会按照一定的顺序查找类文件,并加载到内存中。......
  • 关于java反射机制基础资料
    Java的反射机制允许在程序运行期间,借助反射API获取类的内部信息,并能直接操作对象的内部属性及方法。Java反射机制提供的功能:在运行时,使用反射分析类的能力,获取有关类的一切信息(类所在的包、类实现的接口、标注的注解、类的数据域、类的构造器、类的方法等)在运行时,使用反......
  • Java的反射机制
    介绍反射机制Java的反射机制允许在程序运行期间,借助反射API获取类的内部信息,并能直接操作对象的内部属性及方法。Java反射机制提供的功能:在运行时,使用反射分析类的能力,获取有关类的一切信息(类所在的包、类实现的接口、标注的注解、类的数据域、类的构造器、类的方法等)在......
  • C# 串口通信
    这里浅说一下蓝牙与串口的区别:        蓝牙:连接以mac地址为主,显示名称可以更改,低功耗蓝牙还需要配置服务与特征(服务有读,写,可读可写区别)特点:不同设备连接同一台蓝牙设备,mac地址与显示名称都是唯一的    串口:连接以端口名称为主,例如com1,com2,连接时需要配置参数......