目录
前言
物联网平台首先需要可以获取并处理设备上报的MQTT中的数据,我们称这部分为上行数据。
本章将分为三小节。
1、通过MQTT获取设备上报数据并通知业务系统
2、业务系统存储和处理上报数据
3、业务系统查询历史数据
分析
1:因为MQTT协议里面没用服务端和客户端的区别,那么我们可以创建一个IoT Hub的server来接收设备端的数据,它需要和其他设备一样以MQTT客户端的身份接入MQTT,并订阅特定Topic来实现获取设备数据。
2:可以使用第三章介绍的Webhook方式,在设备向特定Topic发布数据时,EMQX会收到Publish数据包,我们可以通过Webhook将数据传递出来。
以上两种方案都可以实现,这里我们使用第一种方式,因为它复杂的相对低一些。
方案
完全基于MQTT的方案
我们与设备约定,设备连接MQTT成功后,消息全部通过名称规则为"topic/{设备名称}"的Topic传递
那么我们可以订阅所有规则为"topic/{设备名称}"的Topic来获取所有设备的上行数据。
EMQX中支持通配符,我们可以通过订阅"topic/+",关于通配符的更多内容,请参考官方文档
https://www.emqx.io/docs/zh/v5/deploy/cluster/introduction.html#主题树-主题匹配通配符
这个方案可行,但是存在单点故障,如果我们的IoT Hub出现了故障,或者数据流量很大导致Hub出现瓶颈,致使最新上报的数据无法及时有效地被处理,可能会影响业务
我们想到可以分配多个Hub,我们可以在设备发布数据时随机生成1-10个Topic分组,"topic/1/+"..."topic/10/+" ,这样我们可以启动多个Hub,然后每个Hub分别订阅一个主题,这样压力会平分到10个Hub server中
但是这样做虽然可以减少系统瓶颈的可能,但是如果某一个hub挂了,还是会有一部分设备上报的数据受影响,直到该hub恢复。
如果可以弹性的伸缩Hub订阅,支持负载均衡就好了。
共享订阅
EMQX 支持共享订阅,多个订阅者订阅同一主题,EMQX Broker会按照一定的分发策略讲消息发给订阅者,在这个层面上实现订阅者的负载均衡。
共享订阅只需要在订阅时加上前缀即可(发布的时候无需添加)
在原有主题的基础上,添加 $share 前缀即可为一组订阅端启用共享订阅,而且也是支持通配符 "#" 和 "+"。
示例 | 前缀 | 真实主题名 |
---|---|---|
$share/topic/1 | $share/ | topic/1 |
带群组的共享订阅
以 $share/<group-name> 为前缀的共享订阅是带群组的共享订阅:
group-name 可以为任意字符串,属于同一个群组内部的订阅者将以负载均衡接收消息,但 EMQX 会向不同群组广播消息。
例如,假设订阅者 s1,s2,s3 属于群组 g1,订阅者 s4,s5 属于群组 g2。
那么当 EMQX 向这个主题发布消息 msg1 的时候:
EMQX 会向两个群组 g1 和 g2 同时发送 msg1
s1,s2,s3 中只有一个会收到 msg1
s4,s5 中只有一个会收到 msg1
均衡分发策略
EMQX 支持很多不同的平衡策略(MQTT协议中并没有明确的规范平衡策略)
平衡策略可以在全局或每组中指定。可以在 etc/emqx.conf 文件中修改
- 全局策略可以在 broker.shared_subscription_strategy 配置中设置。
- 配置 broker.shared_subscription_group.$group_name.strategy 为每组策略。
均衡策略 | 描述 |
---|---|
random | 在所有订阅者中随机选择 |
round_robin | 按照订阅顺序选择 |
round_robin_per_group | 在每个共享订阅组中按照订阅顺序进行选择 |
local | 随机在本地订阅中进行选择,如无法找到,则在集群范围内随机选择 |
sticky | 选定订阅者后,始终向其进行发送,直到该订阅者断开连接 |
hash_clientid | 通过对发送者的客户端 ID 进行 Hash 处理来选择订阅者 |
hash_topic | 通过对源主题进行 Hash 处理来选择订阅者 |
使用共享订阅的方式,可以根据数据量动态的添加和减少Hub数量,不存在单点故障,扩展性好。
数据格式
上行数据分两部分,Payload和Metadata
Metadata部分,我们在MQTTX中我们可以通过点击Meta来添加
- 1、Payload指消息所携带的数据本身,多数情况为传感器数据,可以是任何格式,例如二进制或者JSON字符串,格式是业务和嵌入式之间约定的,Hub部分不解析Payload只是将它传递到业务系统
- 2、Metadata相当于我们http请求中的head数据,可以添加发布者的身份信息或者消息的唯一ID等等。据本人了解,貌似不是所有的MQTT平台都支持设备端自定义Metadata,或者说很多通讯模组已经配置好了Metadata,不支持自定义,所以这里我们暂时不在Metadata中配置任何信息。
实施流程
- 我们在MASA.IoT.Hub项目中创建一个服务HostedService
- 以mqtt client的形式连接EMQX的mqtt,并订阅所有设备的Topic
- 将消息原封不动的封装,并发送给业务系统(MASA.IoT.Core项目中的api接口)
编写代码
我们首选在MASA.IoT.Common项目中安装 MQTTnet nuget包,方便我们连接mqtt
并创建MqttHelper类
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Formatter;
namespace MASA.IoT.Common.Helper
{
public class MqttHelper
{
private MqttFactory _mqttFactory;
private IMqttClient _mqttClient;
private MqttClientOptions _mqttClientOptions;
private MqttClientSubscribeOptions _mqttClientSubscribeOptions;
public MqttHelper(string mqttUrl, string clientID, string userName, string passWord)
{
_mqttFactory = new MqttFactory();
_mqttClient = _mqttFactory.CreateMqttClient();
_mqttClientOptions = new MqttClientOptionsBuilder()
.WithTcpServer(mqttUrl)
.WithCredentials(userName, passWord).WithProtocolVersion(MqttProtocolVersion.V500).Build();
_mqttClientOptions.ClientId = clientID;
}
/// <summary>
/// 连接并订阅Topic
/// </summary>
/// <param name="callback"></param>
/// <param name="topic"></param>
/// <returns></returns>
public async Task ConnectClient(Func<MqttApplicationMessageReceivedEventArgs, Task> callback, string topic)
{
_mqttClientSubscribeOptions = _mqttFactory.CreateSubscribeOptionsBuilder()
.WithTopicFilter(f => { f.WithTopic(topic); })
.Build();
var response = await _mqttClient.ConnectAsync(_mqttClientOptions, CancellationToken.None);
if (response.ResultCode == MqttClientConnectResultCode.Success)
{
Console.WriteLine($"The MQTT client with topic:{topic} is connected.");
await Task.Delay(500);
_mqttClient.ApplicationMessageReceivedAsync += callback;
await _mqttClient.SubscribeAsync(_mqttClientSubscribeOptions, CancellationToken.None);
}
}
/// <summary>
/// 断开连接
/// </summary>
/// <returns></returns>
public async Task Disconnect_Client()
{
if (_mqttClient.IsConnected)
{
await _mqttClient.DisconnectAsync();
Console.WriteLine("The MQTT client is Disconnected.");
}
}
}
}
在ConnectClient方法中,连接mqtt的同时会订阅特定topic,然后获取到订阅的消息时调用Func<MqttApplicationMessageReceivedEventArgs, Task> 类型的callback,消息的内容(Payload)通过MqttApplicationMessageReceivedEventArgs中的ApplicationMessage.Payload获取。
我们在MASA.IoT.Hub项目中新建MQHostedService后台任务
using Dapr.Client;
using MASA.IoT.Common;
using MASA.IoT.Common.Helper;
using Microsoft.Extensions.Options;
using MQTTnet.Client;
namespace MASA.IoT.Hub;
public class MQHostedService : IHostedService
{
private readonly HubAppSettings _appSettings;
private readonly DaprClient _daprClient;
public MQHostedService(IOptions<HubAppSettings> appSettings)
{
_daprClient = new DaprClientBuilder().Build();
_appSettings = appSettings.Value;
}
/// <summary>
/// 开始
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task StartAsync(CancellationToken cancellationToken)
{
var mqttHelper = new MqttHelper(_appSettings.MqttSetting.MqttUrl, "IoTHub", _appSettings.MqttSetting.UserName, _appSettings.MqttSetting.Password);
await mqttHelper.ConnectClient(CallbackAsync, _appSettings.MqttSetting.Topic);
}
private async Task CallbackAsync(MqttApplicationMessageReceivedEventArgs e)
{
var deviceDataPointStr = System.Text.Encoding.Default.GetString(e.ApplicationMessage.PayloadSegment);
Console.WriteLine(deviceDataPointStr);
var pubSubOptions = new PubSubOptions
{
DeviceName = e.ApplicationMessage.Topic[6..],
Msg = deviceDataPointStr,
PubTime = new DateTimeOffset(DateTime.Now).ToUnixTimeMilliseconds(),
TrackId = Guid.NewGuid()
};
try
{
await _daprClient.PublishEventAsync("pubsub", "DeviceMessage", pubSubOptions);
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
/// <summary>
/// 结束
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
appsettings.json内容
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
},
"AllowedHosts": "*",
"MqttSetting": {
"MqttUrl": "192.120.5.204",
"UserName": "IoTHub",
"Password": "123456",
"Topic": "$share/IotHub/topic/+"
},
"DaprOptions": {
"AppId": "masa-iot-service-hub",
"AppPort": 18001,
"AppIdSuffix": "",
"DaprGrpcPort": 20241,
"DaprHttpPort": 20242
}
}
后台任务以ClientID="IoTHub"的mqtt 客户端形式接入,用户名和密码是在Emqx配置好的一个普通设备的权限。Topic为"$share/IotHub/topic/+",通过通配符+订阅所有设备上行数据的Topic。
数据以PubSubOptions形式包装,并通过_daprClient.PublishEventAsync通过dapr的方式发送到业务的接口上,
最后不要忘记在Program.cs中注入后台服务
builder.Services.AddHostedService<MQHostedService>();
这部分不了解的可以参考第一章的dapr搭建教程
我们继续在MASA.IoT.Core项目中添加控制器DeviceMqttController.cs
using Dapr;
using MASA.IoT.Common;
using Microsoft.AspNetCore.Mvc;
namespace MASA.IoT.Core.Controllers
{
[Route("api/[controller]")]
[ApiController]
public class DeviceMqttController : ControllerBase
{
[Topic("pubsub", "DeviceMessage")]
[HttpPost("DeviceMessage")]
public void DeviceMessageAsync([FromBody] PubSubOptions pubSubOptions)
{
Console.WriteLine($"Subscriber received, DeviceName:{pubSubOptions.DeviceName},Msg:{pubSubOptions.Msg}");
}
}
}
这样Hub获取到订阅数据后就可以调用该接口将数据通知到业务系统。
这里演示只是简单的打印出消息数据。
验证效果
我们同时启动MASA.IoT.Hub和MASA.IoT.Core项目,并启动MQTTX模拟设备连接mqtt,我们可以在EMQX后台看到连接的设备和订阅的情况
我们尝试发布一条数据,可以看到MASA.IoT.Core项目的接口成功的获取了数据
总结
本文以基于共享订阅的方式来实现获取设备上行数据,
真实的场景我们还需要处理重复数据,关于消息的去重需要先了解 MQTT QoS 0, 1, 2
篇幅问题本文不再进行扩展,可以参考下面的文章
完整代码在这里:https://github.com/sunday866/MASA.IoT-Training-Demos
标签:订阅,4.1,MASA,Hub,IoT,topic,Topic From: https://www.cnblogs.com/sunday866/p/17388812.html