首页 > 其他分享 >RocketMQ-(9-1)-MQTT-EventBridge概述

RocketMQ-(9-1)-MQTT-EventBridge概述

时间:2023-09-13 10:36:21浏览次数:47  
标签:场景 EventBridge IoT MQTT 消息 RocketMQ 客户端


RocketMQ MQTT 概览

传统的消息队列MQ主要应用于服务(端)之间的消息通信,比如电商领域的交易消息、支付消息、物流消息等等。然而在消息这个大类下,还有一个非常重要且常见的消息领域,即IoT类终端设备消息。近些年,我们看到随着智能家居、工业互联而兴起的面向IoT设备类的消息正在呈爆炸式增长,而且已经发展十余年的移动互联网的手机APP端消息仍然是数量级庞大。面向终端设备的消息数量级比传统服务端的消息要大很多量级并仍然在快速增长。

如果可以有一个统一的消息系统(产品)来提供多场景计算(如stream、event)、多场景(IoT、APP)接入,其实是非常有价值的,因为消息也是一种重要数据,数据如果只存在一个系统内,可以最大地降低存储成本,同时可以有效地避免数据因在不同系统间同步带来的一致性难题和挑战。

RocketMQ-(9-1)-MQTT-EventBridge概述_rocketmq

基于此,我们引入了RocketMQ-MQTT这个扩展项目来实现RocketMQ统一接入IoT设备和服务端的消息,提供一体化消息存储和互通能力。

MQTT协议

在IoT终端场景,目前业界广泛使用的是MQTT协议,是起源于物联网IoT场景,OASIS联盟定义的标准的开放式协议。因为IoT设备种类繁多,运行环境各异,一个标准的接入协议尤为关键。

MQTT协议定义的是一个Pub/Sub的通信模型,这个与RocketMQ是类似的,不过其在订阅方式上比较灵活,可以支持多级Topic订阅(如 “/t/t1/t2”),甚至可以支持通配符订阅(如 “/t/t1/+”)。

模型介绍

队列存储模型

RocketMQ-(9-1)-MQTT-EventBridge概述_服务端_02

我们设计了一种多维度分发的Topic队列模型,如上图所示,消息可以来自各个接入场景(如服务端的MQ/AMQP、客户端的MQTT),但只会写一份存到commitlog里面,然后分发出多个需求场景的队列索引(ConsumerQueue),如服务端场景(MQ/AMQP)可以按照一级Topic队列进行传统的服务端消费,客户端MQTT场景可以按照MQTT多级Topic以及通配符订阅进行消费消息。

这样的一个队列模型就可以同时支持服务端和终端场景的接入和消息收发,达到一体化的目标。

推拉模型

RocketMQ-(9-1)-MQTT-EventBridge概述_rocketmq_03

上图展示的是一个推拉模型,图中的P节点是一个协议网关或broker插件,终端设备通过MQTT协议连到这个网关节点。消息可以来自多种场景(MQ/AMQP/MQTT)发送过来,存到Topic队列后会有一个notify逻辑模块来实时感知这个新消息到达,然后会生成消息事件(就是消息的Topic名称),将该事件推送至网关节点,网关节点根据其连上的终端设备订阅情况进行内部匹配,找到哪些终端设备能匹配上,然后会触发pull请求去存储层读取消息再推送至终端设备。

架构概览

RocketMQ-(9-1)-MQTT-EventBridge概述_终端设备_04

 我们的目标是期望基于RocketMQ实现一体化且自闭环,但不希望Broker被侵入更多场景逻辑,我们抽象了一个协议计算层,这个计算层可以是一个网关,也可以是一个broker插件。Broker专注解决Queue的事情以及为了满足上面的计算需求做一些Queue存储的适配或改造。协议计算层负责协议接入,并且要可插拔部署。

RocketMQ MQTT 快速开始

系统要求

  • 64位操作系统,推荐 Linux/Unix/macOS
  • 64位 JDK 1.8+

部署说明

由于RocketMQ-MQTT项目依赖RocketMQ底层的多队列分发,RocketMQ从4.9.3版本开始支持这一特性,因此您需要确认RocketMQ的版本升级到4.9.3或更高版本,并且确保以下配置项已开启:

enableLmq = true 
enableMultiDispatch = true

RocketMQ-MQTT的部署参考项目说明,下载工程release版本或直接从源码构建。

git clone https://github.com/apache/rocketmq-mqtt
cd rocketmq-mqtt 
mvn -Prelease-all -DskipTests clean install -U 
cd distribution/target/ 
cd bin
sh mqtt.sh start

配置说明

username=xxx    // 权限验证账户配置
secretKey=xxx    // 权限验证账户配置
NAMESRV_ADDR=xxx  //namesrv接入点
eventNotifyRetryTopic=xx   //notify重试topic,提前创建
clientRetryTopic=xx  //客户端消息重试topic,提前创建

其他启动配置参考项目README.md

示例说明

RocketMQ-MQTT项目工程代码里面提供了基本的example代码,详见代码示例。

MqttConsumer.java  // MQTT客户端启动订阅消息
MqttProducer.java   // MQTT客户端启动发布消息
RocketMQConsumer.java //RocketMQ客户端启动订阅消息
RocketMQProducer.java  // RocketMQ客户端启动发布消息

标签:场景,EventBridge,IoT,MQTT,消息,RocketMQ,客户端
From: https://blog.51cto.com/ratelcloud/7452266

相关文章

  • RocketMQ教程-(6-5)-运维部署-Promethus Exporter
    介绍Rocketmq-exporter 是用于监控RocketMQbroker端和客户端所有相关指标的系统,通过 mqAdmin 从broker端获取指标值后封装成87个cache。警告过去版本曾是87个concurrentHashMap,由于Map不会删除过期指标,所以一旦有label变动就会生成一个新的指标,旧的无用指标无法......
  • RocketMQ教程-(5)-功能特性-事务消息
    事务消息为ApacheRocketMQ中的高级特性消息,本文为您介绍事务消息的应用场景、功能原理、使用限制、使用方法和使用建议。事务消息为ApacheRocketMQ中的高级特性消息,本文为您介绍事务消息的应用场景、功能原理、使用限制、使用方法和使用建议。以电商交易场景为例,用户支付订单......
  • RocketMQ教程-(5)-功能特性-顺序消息
    顺序消息为ApacheRocketMQ中的高级特性消息,本文为您介绍顺序消息的应用场景、功能原理、使用限制、使用方法和使用建议。应用场景在有序事件处理、撮合交易、数据实时增量同步等场景下,异构系统间需要维持强一致的状态同步,上游的事件变更需要按照顺序传递到下游进行处理。在这类......
  • RocketMQ教程-(4)-领域模型-消费者(Consumer)
    本文介绍ApacheRocketMQ中消费者(Consumer)的定义、模型关系、内部属性、行为约束、版本兼容性及使用建议。定义消费者是ApacheRocketMQ中用来接收并处理消息的运行实体。消费者通常被集成在业务系统中,从ApacheRocketMQ服务端获取消息,并将消息转化成业务可理解的信息,供业务......
  • RocketMQ教程-(4)-领域模型概述
    ApacheRocketMQ是一款典型的分布式架构下的中间件产品,使用异步通信方式和发布订阅的消息传输模型。通信方式和传输模型的具体说明,请参见下文通信方式介绍和消息传输模型介绍。ApacheRocketMQ产品具备异步通信的优势,系统拓扑简单、上下游耦合较弱,主要应用于异步解耦,流量削峰填......
  • RocketMQ教程-安装和配置
    Linux系统安装配置64位操作系统,推荐Linux/Unix/macOS64位JDK1.8+Maven3.0yum安装jdk8yum安装maven1.下载安装ApacheRocketMQRocketMQ的安装包分为两种,二进制包和源码包。点击这里 下载ApacheRocketMQ5.1.3的源码包。你也可以从这里 下载到二进制包。二进制包是已......
  • RocketMQ教程-(5)-功能特性-消息发送重试和流控机制
    本文为您介绍ApacheRocketMQ的消息发送重试机制和消息流控机制。背景信息消息发送重试ApacheRocketMQ的消息发送重试机制主要为您解答如下问题:部分节点异常是否影响消息发送?请求重试是否会阻塞业务调用?请求重试会带来什么不足?消息流控ApacheRocketMQ的流控机制主要为您解答......
  • RocketMQ教程-(4)-领域模型-消费者分组ConsumerGroup
    定义消费者分组是ApacheRocketMQ系统中承载多个消费行为一致的消费者的负载均衡分组。和消费者不同,消费者分组并不是运行实体,而是一个逻辑资源。在ApacheRocketMQ中,通过消费者分组内初始化多个消费者实现消费性能的水平扩展以及高可用容灾。在消费者分组中,统一定义以下消费行......
  • RabbitMQ、RocketMQ和Kafka的不同之处
    RabbitMQ、RocketMQ和Kafka是三种常见的消息队列系统,它们在设计和使用方面有一些不同之处:架构设计:RabbitMQ:RabbitMQ是一个基于AMQP(高级消息队列协议)的开源消息队列系统,采用的是传统的Broker架构模式,其中包括生产者、消费者和中间件(Broker)。RocketMQ:RocketMQ是一个基于分布式......
  • 教你2种方法,将iOS设备通过MQTT协议连接到华为云物联网平台
    本文分享自华为云社区《如何将iOS设备通过MQTT协议连接到华为云物联网平台:Flutter和Swift两种方法》,作者:张俭。前言当今时代,物联网技术正逐步改变我们的生活和工作方式。华为云IoTDA服务,为开发者提供了一个开放、稳定、可靠的基础设施,以便实现设备与云端的无缝连接和双向通......