首页 > 其他分享 >使用MASA Stack+.Net 从零开始搭建IoT平台 第四章 4.1处理设备上行数据-获取并通知业务系统

使用MASA Stack+.Net 从零开始搭建IoT平台 第四章 4.1处理设备上行数据-获取并通知业务系统

时间:2023-05-15 13:44:45浏览次数:57  
标签:订阅 4.1 MASA Hub IoT topic Topic

目录


前言

物联网平台首先需要可以获取并处理设备上报的MQTT中的数据,我们称这部分为上行数据。
本章将分为三小节。
1、通过MQTT获取设备上报数据并通知业务系统
2、业务系统存储和处理上报数据
3、业务系统查询历史数据

分析

1:因为MQTT协议里面没用服务端和客户端的区别,那么我们可以创建一个IoT Hub的server来接收设备端的数据,它需要和其他设备一样以MQTT客户端的身份接入MQTT,并订阅特定Topic来实现获取设备数据。
2:可以使用第三章介绍的Webhook方式,在设备向特定Topic发布数据时,EMQX会收到Publish数据包,我们可以通过Webhook将数据传递出来。
以上两种方案都可以实现,这里我们使用第一种方式,因为它复杂的相对低一些。

方案

完全基于MQTT的方案
我们与设备约定,设备连接MQTT成功后,消息全部通过名称规则为"topic/{设备名称}"的Topic传递
那么我们可以订阅所有规则为"topic/{设备名称}"的Topic来获取所有设备的上行数据。
EMQX中支持通配符,我们可以通过订阅"topic/+",关于通配符的更多内容,请参考官方文档

https://www.emqx.io/docs/zh/v5/deploy/cluster/introduction.html#主题树-主题匹配通配符

这个方案可行,但是存在单点故障,如果我们的IoT Hub出现了故障,或者数据流量很大导致Hub出现瓶颈,致使最新上报的数据无法及时有效地被处理,可能会影响业务
我们想到可以分配多个Hub,我们可以在设备发布数据时随机生成1-10个Topic分组,"topic/1/+"..."topic/10/+" ,这样我们可以启动多个Hub,然后每个Hub分别订阅一个主题,这样压力会平分到10个Hub server中
但是这样做虽然可以减少系统瓶颈的可能,但是如果某一个hub挂了,还是会有一部分设备上报的数据受影响,直到该hub恢复。
如果可以弹性的伸缩Hub订阅,支持负载均衡就好了。

共享订阅

EMQX 支持共享订阅,多个订阅者订阅同一主题,EMQX Broker会按照一定的分发策略讲消息发给订阅者,在这个层面上实现订阅者的负载均衡。
共享订阅只需要在订阅时加上前缀即可(发布的时候无需添加)
在原有主题的基础上,添加 $share 前缀即可为一组订阅端启用共享订阅,而且也是支持通配符 "#" 和 "+"。

示例 前缀 真实主题名
$share/topic/1 $share/ topic/1

带群组的共享订阅

$share/<group-name> 为前缀的共享订阅是带群组的共享订阅:
group-name 可以为任意字符串,属于同一个群组内部的订阅者将以负载均衡接收消息,但 EMQX 会向不同群组广播消息。
例如,假设订阅者 s1,s2,s3 属于群组 g1,订阅者 s4,s5 属于群组 g2。
那么当 EMQX 向这个主题发布消息 msg1 的时候:
EMQX 会向两个群组 g1 和 g2 同时发送 msg1
s1,s2,s3 中只有一个会收到 msg1
s4,s5 中只有一个会收到 msg1

均衡分发策略

EMQX 支持很多不同的平衡策略(MQTT协议中并没有明确的规范平衡策略)
平衡策略可以在全局或每组中指定。可以在 etc/emqx.conf 文件中修改

  1. 全局策略可以在 broker.shared_subscription_strategy 配置中设置。
  2. 配置 broker.shared_subscription_group.$group_name.strategy 为每组策略。
均衡策略 描述
random 在所有订阅者中随机选择
round_robin 按照订阅顺序选择
round_robin_per_group 在每个共享订阅组中按照订阅顺序进行选择
local 随机在本地订阅中进行选择,如无法找到,则在集群范围内随机选择
sticky 选定订阅者后,始终向其进行发送,直到该订阅者断开连接
hash_clientid 通过对发送者的客户端 ID 进行 Hash 处理来选择订阅者
hash_topic 通过对源主题进行 Hash 处理来选择订阅者

使用共享订阅的方式,可以根据数据量动态的添加和减少Hub数量,不存在单点故障,扩展性好。

数据格式

上行数据分两部分,Payload和Metadata
Metadata部分,我们在MQTTX中我们可以通过点击Meta来添加

  • 1、Payload指消息所携带的数据本身,多数情况为传感器数据,可以是任何格式,例如二进制或者JSON字符串,格式是业务和嵌入式之间约定的,Hub部分不解析Payload只是将它传递到业务系统
  • 2、Metadata相当于我们http请求中的head数据,可以添加发布者的身份信息或者消息的唯一ID等等。据本人了解,貌似不是所有的MQTT平台都支持设备端自定义Metadata,或者说很多通讯模组已经配置好了Metadata,不支持自定义,所以这里我们暂时不在Metadata中配置任何信息。

实施流程

  1. 我们在MASA.IoT.Hub项目中创建一个服务HostedService
  2. 以mqtt client的形式连接EMQX的mqtt,并订阅所有设备的Topic
  3. 将消息原封不动的封装,并发送给业务系统(MASA.IoT.Core项目中的api接口)

编写代码

我们首选在MASA.IoT.Common项目中安装 MQTTnet nuget包,方便我们连接mqtt
并创建MqttHelper类

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Formatter;

namespace MASA.IoT.Common.Helper
{
    public class MqttHelper
    {
        private MqttFactory _mqttFactory;
        private IMqttClient _mqttClient;
        private MqttClientOptions _mqttClientOptions;
        private MqttClientSubscribeOptions _mqttClientSubscribeOptions;

        public MqttHelper(string mqttUrl, string clientID, string userName, string passWord)
        {
            _mqttFactory = new MqttFactory();
            _mqttClient = _mqttFactory.CreateMqttClient();
            _mqttClientOptions = new MqttClientOptionsBuilder()
                                  .WithTcpServer(mqttUrl)
              .WithCredentials(userName, passWord).WithProtocolVersion(MqttProtocolVersion.V500).Build();

            _mqttClientOptions.ClientId = clientID;
        }

        /// <summary>
        /// 连接并订阅Topic
        /// </summary>
        /// <param name="callback"></param>
        /// <param name="topic"></param>
        /// <returns></returns>
        public async Task ConnectClient(Func<MqttApplicationMessageReceivedEventArgs, Task> callback, string topic) 
        {
            _mqttClientSubscribeOptions = _mqttFactory.CreateSubscribeOptionsBuilder()
                .WithTopicFilter(f => { f.WithTopic(topic); })
                .Build();

            var response = await _mqttClient.ConnectAsync(_mqttClientOptions, CancellationToken.None);
            if (response.ResultCode == MqttClientConnectResultCode.Success)
            {
                Console.WriteLine($"The MQTT client with topic:{topic} is connected.");
                await Task.Delay(500);
                _mqttClient.ApplicationMessageReceivedAsync += callback;
                await _mqttClient.SubscribeAsync(_mqttClientSubscribeOptions, CancellationToken.None);
            }
        }

        /// <summary>
        /// 断开连接
        /// </summary>
        /// <returns></returns>
        public async Task Disconnect_Client() 
        {
            if (_mqttClient.IsConnected)
            {
                await _mqttClient.DisconnectAsync();

                Console.WriteLine("The MQTT client is Disconnected.");
            }
        }
    }
}

在ConnectClient方法中,连接mqtt的同时会订阅特定topic,然后获取到订阅的消息时调用Func<MqttApplicationMessageReceivedEventArgs, Task> 类型的callback,消息的内容(Payload)通过MqttApplicationMessageReceivedEventArgs中的ApplicationMessage.Payload获取。

我们在MASA.IoT.Hub项目中新建MQHostedService后台任务

using Dapr.Client;
using MASA.IoT.Common;
using MASA.IoT.Common.Helper;
using Microsoft.Extensions.Options;
using MQTTnet.Client;

namespace MASA.IoT.Hub;

public class MQHostedService : IHostedService
{
    private readonly HubAppSettings _appSettings;
    private readonly DaprClient _daprClient;
    public MQHostedService(IOptions<HubAppSettings> appSettings)
    {
        _daprClient = new DaprClientBuilder().Build();
        _appSettings = appSettings.Value;
    }

    /// <summary>
    /// 开始                
    /// </summary>
    /// <param name="cancellationToken"></param>
    /// <returns></returns>
    public async Task StartAsync(CancellationToken cancellationToken)
    {
        var mqttHelper = new MqttHelper(_appSettings.MqttSetting.MqttUrl, "IoTHub", _appSettings.MqttSetting.UserName, _appSettings.MqttSetting.Password);
 
        await mqttHelper.ConnectClient(CallbackAsync, _appSettings.MqttSetting.Topic);

    }
    private async Task CallbackAsync(MqttApplicationMessageReceivedEventArgs e)
    {
        var deviceDataPointStr = System.Text.Encoding.Default.GetString(e.ApplicationMessage.PayloadSegment);
        
        Console.WriteLine(deviceDataPointStr);
        var pubSubOptions = new PubSubOptions
        {
            DeviceName = e.ApplicationMessage.Topic[6..],
            Msg = deviceDataPointStr,
            PubTime = new DateTimeOffset(DateTime.Now).ToUnixTimeMilliseconds(),
            TrackId = Guid.NewGuid()
        };                            
        try
        {
            await _daprClient.PublishEventAsync("pubsub", "DeviceMessage", pubSubOptions);
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }

    }
    /// <summary>
    /// 结束
    /// </summary>
    /// <param name="cancellationToken"></param>
    /// <returns></returns>
    public Task StopAsync(CancellationToken cancellationToken)
    {
        return Task.CompletedTask;
    }
}

appsettings.json内容
{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft.AspNetCore": "Warning"
    }
  },
  "AllowedHosts": "*",
  "MqttSetting": {
    "MqttUrl": "192.120.5.204",
    "UserName": "IoTHub",
    "Password": "123456",
    "Topic": "$share/IotHub/topic/+"
  },
  "DaprOptions": {
    "AppId": "masa-iot-service-hub",
    "AppPort": 18001,
    "AppIdSuffix": "",
    "DaprGrpcPort": 20241,
    "DaprHttpPort": 20242
  }
}

后台任务以ClientID="IoTHub"的mqtt 客户端形式接入,用户名和密码是在Emqx配置好的一个普通设备的权限。Topic为"$share/IotHub/topic/+",通过通配符+订阅所有设备上行数据的Topic。
数据以PubSubOptions形式包装,并通过_daprClient.PublishEventAsync通过dapr的方式发送到业务的接口上,

最后不要忘记在Program.cs中注入后台服务

builder.Services.AddHostedService<MQHostedService>();

这部分不了解的可以参考第一章的dapr搭建教程

我们继续在MASA.IoT.Core项目中添加控制器DeviceMqttController.cs

using Dapr;
using MASA.IoT.Common;
using Microsoft.AspNetCore.Mvc;

namespace MASA.IoT.Core.Controllers
{

    [Route("api/[controller]")]
    [ApiController]
    public class DeviceMqttController : ControllerBase
    {
        [Topic("pubsub", "DeviceMessage")]
        [HttpPost("DeviceMessage")]
        public void DeviceMessageAsync([FromBody] PubSubOptions pubSubOptions)
        {
            Console.WriteLine($"Subscriber received, DeviceName:{pubSubOptions.DeviceName},Msg:{pubSubOptions.Msg}");
        }
    }
}

这样Hub获取到订阅数据后就可以调用该接口将数据通知到业务系统。
这里演示只是简单的打印出消息数据。

验证效果

我们同时启动MASA.IoT.Hub和MASA.IoT.Core项目,并启动MQTTX模拟设备连接mqtt,我们可以在EMQX后台看到连接的设备和订阅的情况


我们尝试发布一条数据,可以看到MASA.IoT.Core项目的接口成功的获取了数据

总结

本文以基于共享订阅的方式来实现获取设备上行数据,
真实的场景我们还需要处理重复数据,关于消息的去重需要先了解 MQTT QoS 0, 1, 2
篇幅问题本文不再进行扩展,可以参考下面的文章

https://www.emqx.com/zh/blog/introduction-to-mqtt-qos

完整代码在这里:https://github.com/sunday866/MASA.IoT-Training-Demos

标签:订阅,4.1,MASA,Hub,IoT,topic,Topic
From: https://www.cnblogs.com/sunday866/p/17388812.html

相关文章

  • 485转web api服务器框架源码。 集成iot,web api服务,这套带码是通过C#
    485转webapi服务器框架源码。集成iot,webapi服务,这套带码是通过C#编写集成IOCP高性能高并发优势服务器服务源码。带手机app测试demo源码具体具备功能如下:1、具备EF6+mssql数据库功能,可更改为MYSQL或SQLITe.2、自带WEBAPI服务,抛弃IIS支持。用户可以通过WEB前端直接读取远程设备数......
  • OPC转web API服务器框架源码。 集成iot,web api服务,这套带码是通
    OPC转webAPI服务器框架源码。集成iot,webapi服务,这套带码是通过C#编写集成IOCP高性能高并发优势服务器服务源码。带手机app测试demo源码具体具备功能如下:1、具备EF6+mssql数据库功能,可更改为MYSQL或SQLITe.2、自带WEBAPI服务,抛弃IIS支持。用户可以通过WEB前端直接读取远程设备数......
  • 文心一言 VS chatgpt (16)-- 算法导论4.1 1~2题
    一、当A的所有元素均为负数时,FIND-MAXIMUM-SUBARRAY返回什么?文心一言:当A的所有元素均为负数时,FIND-MAXIMUM-SUBARRAY返回-1。这是因为在二进制中,当所有元素均为负数时,A的每个元素都对应一个负数,而-1的二进制表示是11111111,与A的每个元素的值的每一位的负号是相对应的,所以,如果F......
  • 4.11到4.13
    4.11通过角色来实现将一组权限授予一个用户。1,首先创建一个角色R12,然后使用GRANT语句,使角色R1拥有Student表的SELECT、UPDATE、INSERT权限。3,将这个角色授予王平、张明、赵玲,使他们具有角色R1所包含的全部权限。4,当然,也可以一次性地通过R1来收回王平的这三个权限。......
  • 定了!AIRIOT新品发布会,6月6日北京见。
    随着物联网、大数据、AI技术的成熟和演进,智能物联网技术正在加速、深入渗透至各行业应用。 AIRIOT物联网平台作为赋能数字经济发展和产业转型的数字基座,由航天科技控股集团股份有限公司(股票代码:000901)历时数年倾力打造,致力于以低代码搭建IOT平台,以企业可靠、产品可靠、服务可......
  • 使用MASA全家桶从零开始搭建IoT平台(三)管理设备的连接状态
    目录前言分析方案1:遗嘱消息演示遗嘱消息的使用实施流程方案2:使用WebHook开启WebHook演示Webhook编写代码总结前言获取一个设备的在线和离线状态,是一个很关键的功能。我们对设备下发的控制指令,设备处于在线状态才能及时给我们反馈。这里的在线和离线,我们可以简单的理解为设备......
  • 【备忘】IOT的七大通信协议(IOT协议)
    在物联网协议中,一般分为两大类,一类是传输协议,一类是通信协议。传输协议一般负责子网内设备间的组网及通信;通信协议则主要是运行在传统互联网TCP/IP协议之上的设备通讯协议,负责设备通过互联网进行数据交换及通信。那么物联网都有哪些通信协议呢?深度分析IOT的七大通信协议(IOT协......
  • UIOTOS:一款无门槛的前端0代码搭建工具
    什么是UIOTOS?UIOTOS中文名称前端大师,是一款基于图形技术的前端0代码工具,支持通过连线和嵌套无门槛来搭建各类复杂的的交互界面,包括后台管理系统、组态数据大屏等,实现跟代码开发媲美的效果。为什么要做?前端技术更新快,开发和学习成本高在软件开发领域,UI界面开发技术更新迭代最......
  • 使用MASA全家桶从零开始搭建IoT平台(二)设备注册
    前言我们不希望任何设备都可以接入我们的IoT平台,所以一个设备正常的接入流程是这样的,1、上位机软件通过串口或其他方式读取设备的唯一标识码UUID。2、上位机调用IoT后台接口,发送UUID和ProductID。3、后台接口判断设备是否注册过,如果没有注册过,就根据ProductID并按照一定规律生......
  • MASA MinimalAPI源码解析:为什么我们只写了一个app.MapGet,却生成了三个接口
    源码解析:为什么我们只写了一个app.MapGet,却生成了三个接口1.ServiceBase1.AutoMapRoute源码如下:AutoMapRoute自动创建map路由,MinimalAPI会根据service中的方法,创建对应的api接口。比如上文的一个方法:publicasyncTask<WeatherForecast[]>PostWeather(){re......