首页 > 其他分享 >使用MASA Stack+.Net 从零开始搭建IoT平台 第三章 设备生命周期管理-管理设备的连接状态

使用MASA Stack+.Net 从零开始搭建IoT平台 第三章 设备生命周期管理-管理设备的连接状态

时间:2023-04-28 13:44:33浏览次数:49  
标签:MASA IoT +. MQTT 消息 遗嘱 客户端 连接 设备

@

目录


前言

获取一个设备的在线和离线状态,是一个很关键的功能。我们对设备下发的控制指令,设备处于在线状态才能及时给我们反馈。这里的在线和离线,我们可以简单的理解为设备与MQTT的连接状态。

分析

我们打电话的时候经常能听到:"您拨打的用户已关机“和”用户不在服务区或暂时无法接通“,这两种的区别是什么?


1、当用户开机时,会自动向最近的移动基站注册,基站标记该用户为"attach"(在线)状态。
2、当用户关机时,手机会发起datach流程,告知基站自己关机了,基站标记该用户为"detach"(离线)状态。这样再次拨打就可以节省寻呼资源,直接提示用户关机。
3、当用户忽然进入无网络的环境,或者手机故障,导致来不及发起datach流程,基站还认为用户"在线",当有人拨打用户号码时,基站测会对用户进行寻呼,但是超时得不到回应后,就会提示"不在服务区"或者"暂时无法接通" 的语音。

其实这个方案在IoT上也是可行的,我们可以让设备在线和离线的过程中向特定Topic发送状态消息,但是存在问题,我们需要一个单独的Broker去订阅这个Topic,但是这个单独的Broker很容易成为单点故障点。而且如果设备数量很大,这种意外离线的设备也很难及时发现,需要下发指令后等待设备响应超时才能发现。

方案1:遗嘱消息

MQTT 遗嘱消息可以在客户端意外断线时将“遗嘱”优雅地发送给第三方订阅者,以实现离线通知、设备状态更新等业务。其中意外断线指客户端断开前未向服务器发送 DISCONNECT 消息,比如:

因网络故障或网络波动,设备在保持连接周期内未能通讯,连接被服务端关闭
设备意外掉电
设备尝试进行不被允许的操作而被服务端关闭连接,例如订阅自身权限以外的主题等
遗嘱消息在 MQTT 客户端向服务器端 CONNECT 请求时设置,可选属性包括是否发送遗嘱消息 (Will Message)标志,和遗嘱消息主题 (Topic) 与内容(Payload) 以及 Properties。

值得一提的,遗嘱消息发布的时间可能会有延迟:通常意外断线时,服务器无法立即检测到断线行为,需要通过连接保活心跳机制并经过一定周期后才会触发;MQTT 5.0 提供的遗嘱延迟间隔(Will Delay Interval)属性也会影响发布时间。

演示遗嘱消息的使用

我们使用A、B两台电脑使用MQTT X来演示。
我们在A电脑的 MQTT X 中新建一个名为 Test 的连接,Host 修改为 修改为我们的MQTT地址(192.120.5.204),并输入账号密码,在 Advanced 部分选择 MQTT Version 为 5.0,并且将 Session Expiry Interval 设置为 10,确保会话不会在遗嘱消息发布前过期。

然后在 Lass Will and Testament 部分将 Last-Will Topic 设置为 offline,Last-Will Payload 设置为 I'm offline,Will Delay Interval (s) 设置为 5。


完成以上设置后,我们点击右上角的 Connect 按钮以建立连接。

我们在B电脑的MQTTX中新建一个连接Sub,mqtt地址同样指向我们的mqtt服务器(192.120.5.204)

并订阅offline主题

我们用任务管理器直接结束A电脑的MQTTX进程,这是连接会被直接断开,模拟了设备断电的场景,在5s之后,在B电脑的MQTTX订阅中收到了一条内容为 I‘m offline 的遗嘱消息。

实施流程

1、设备遗嘱消息内容设置为offline,该遗嘱主题与一个普通发送状态的主题设定成同一个 {设备名称}/status。例如 284202304230001/status
2、当设备连接时,向主题 {设备名称}/status 发送内容为 online 的Retained消息,其它客户端订阅主题 {设备名称}/status 的时候,将获取到 Retained 消息为 online。

保留消息(Retain )
MQTT 服务端收到 Retain 标志为 1 的 PUBLISH 报文时,会将该报文视为保留消息,除了被正常转发以外, 保留消息会被存储在服务端,每个主题下只能存在一份保留消息,因此如果已经存在相同主题的保留消息,则该保留消息被替换。
当客户端建立订阅时,如果服务端存在主题匹配的保留消息,则这些保留消息将被立即发送给该客户端。 借助保留消息,新的订阅者能够立即获取最近的状态,而不需要等待无法预期的时间,这在很多场景下是非常重要的。
EMQX 默认开启保留消息的能力和服务,可以在 etc/emqx.conf 中修改 mqtt.retain_available 为 false 来关闭保留消息的能力, 这样客户端将被禁止发送 Retain 标志为 1 的 PUBLISH 报文,否则,客户端将会收到原因码为 0x9A(不支持保留消息)的 DISCONNECT 报文。
保留消息的服务会存储和管理客户端发送的保留消息,并发送给相应的订阅者。

3、当客户端异常断开时,系统自动向主题 {设备名称}/status 发送内容为 offline 的消息,其它订阅了此主题的客户端会马上收到 offline 消息;如果遗嘱消息设置了 Will Retain,那么此时如果有新的订阅 A/status 主题的客户端上线,也将获取到内容为 offline 的遗嘱消息。

方案2:使用Webhook

方案1需要设备主动设置遗嘱消息才能实现,那么有没有更简单的方式,直接通过设备与Mqtt的连接事件来获取连接状态呢。
EMQX 设计了一套WebHook系统,可以通过这个WebHook系统获取内部的事件并进行处理。

开启WebHook

数据集成 -> 数据桥接 中创建一个Webhook

名称设置为ConnectedEvent,URL 中填写我们的Webhook地址,也就是触发事件之后的调用接口地址,这里我们填:

http://192.120.5.204:5000/api/Device/ConnectedEvent

请求方式为Post,其他内容保持默认不变

这里注意URL可以通过${field}的方式拼接,请求体也可以自己指定,如果留空会原样转发消息,我们这里请求体留空
设备在线和离线的事件转发的消息格式如下

{
	"username": "284202304230001",
	"timestamp": 1682652598840,
	"sockname": "172.17.0.5:1883",
	"receive_maximum": 32,
	"proto_ver": 5,
	"proto_name": "MQTT",
	"peername": "172.17.0.1:48524",
	"node": "emqx@172.17.0.5",
	"mountpoint": "undefined",
	"metadata": {
		"rule_id": "rule_3hsx"
	},
	"keepalive": 60,
	"is_bridge": false,
	"expiry_interval": 10,
	"event": "client.connected",
	"connected_at": 1682652598840,
	"conn_props": {
		"User-Property": {},
		"Session-Expiry-Interval": 10
	},
	"clientid": "mqttx_c4491df0",
	"clean_start": false
}

我们点击 创建 ,并继续点击 创建规则

我们在创建规则中指定新的规则名称 rule_client_connected,并在SQL编辑器复制以下内容

SELECT
  *
FROM
  "$events/client_connected",
  "$events/client_disconnected"

在右侧的事件中,我们可以看到所有可用的事件,我们选择了连接和断开两个事件,在这两个事件触发时会通过Webhook调用我们配置的接口,这样我们就能获取到设备的在线、离线状态了。

我们点击 创建按钮 完成规则的创建
我们可以看见我们创建好的规则

点击规则ID,还可以看到统计数据

在FLows中还可以看到整个工作流程

演示Webhook

我们使用MQTTX模拟一次设备连接和断开动作,可以在规则统计界面看到我们的操作已经被记录。

编写代码

我们这里采用方案2。
我们需要实现之前配置的ConnectedEvent接口

    /// <summary>
    /// 连接事件请求
    /// </summary>
    public class ConnectedEventRequest
    {
        /// <summary>
        /// 设备名称
        /// </summary>
        public string Username { get; set; }
        /// <summary>
        /// 时间戳
        /// </summary>
        public long Timestamp { get; set; }

        /// <summary>
        /// 事件(连接/断开)
        /// </summary>
        public string Event { get; set; }
        /// <summary>
        /// 连接时间(断开事件中为0)
        /// </summary>
        public long Connected_at { get; set; }

        /// <summary>
        /// Client ID
        /// </summary>
        public string Clientid { get; set; }
    }
        /// <summary>
        /// 更新设备在线状态
        /// </summary>
        /// <param name="deviceName"></param>
        /// <param name="onlineStatus"></param>
        /// <returns></returns>
        public async Task UpdateDeviceOnlineStatusAsync(string deviceName, OnLineStates onlineStatus)
        {
            var device = await _ioTDbContext.IoTDeviceInfo.AsNoTracking()
                .FirstOrDefaultAsync(o => o.DeviceName == deviceName);
            if (device == null)
            {
                return;
            }
            if (device.OnLineStates != (int)onlineStatus)
            {
                device.OnLineStates = (int)onlineStatus;

                _ioTDbContext.Attach(device);
                //防止更新其他字段
                _ioTDbContext.Entry(device).State = EntityState.Unchanged;
                _ioTDbContext.Entry(device).Property(o => o.OnLineStates).IsModified = true;
                await _ioTDbContext.SaveChangesAsync();
            }
        }

我们根据Event中的内容来判断是 连接(client.connected)/断开(client.disconnected) 的事件

        /// <summary>
        /// 连接、断开事件
        /// </summary>
        /// <param name="request"></param>
        /// <returns></returns>
        [HttpPost]
        public async Task ConnectedEventAsync([FromBody] ConnectedEventRequest request)
        {
            var onlineStatus = request.Event switch
            {
                "client.connected" => OnLineStates.OnLine,
                _ => OnLineStates.OffLine
            };

            await _deviceHandler.UpdateDeviceOnlineStatusAsync(request.Username, onlineStatus);
        }

总结

以上就是本文要讲的内容,可以通过MQTTX来测试我们的代码有效性。

完整代码在这里:https://github.com/sunday866/MASA.IoT-Training-Demos

标签:MASA,IoT,+.,MQTT,消息,遗嘱,客户端,连接,设备
From: https://www.cnblogs.com/sunday866/p/17350038.html

相关文章

  • AIRIOT助力城市管廊工程,智慧物联守护城市生命线
    ​随着科技的不断革新,人工智能、大数据、物联网等新一代技术驱动的智慧城市快速发展,众多领域和行业的参随着科技的不断革新,人工智能、大数据、物联网等新一代技术驱动的智慧城市快速发展,众多领域和行业的参与者开始深入智慧城市建设,以自身优势开始推动智慧城市从“建设......
  • Android音频开发之AudioTrack
    原文地址www.jianshu.com在前两节中分享了Android音频开发之音频基本概念和Android音频开发之音频采集,本文分享的是如何使用AudioTrack来播放使用AudioRecord采集后的PCM数据。构造AudioTrack实例publicAudioTrack(intstreamType,intsampleRateInHz,intcha......
  • 学习MASA第二天:框架分析
    学习MASA第二天:框架分析今天主要是看了下MASATeam的github,目的是对未来设计开源框架选型。对于目前比较火的国内masaframework。当然是首当其冲是要用一用的。masaframework地址:MASA.Framework构思开源框架可行性方案目前看,用masa.stack有点大,对于我们这种个人项目不......
  • DTIOT物联网中台
    将可视化和IOT(物联网)智能感知技术与一站式运维管理理念相结合,以可视化平台为核心,将企业空间管理、设备设施和管线全生命周期管理、环境监测管理、安防/消防、等通过“可视化、集成化、智能化”的技术手段建立一站式智慧运维平台,实现以“安全、高效、节能、便利”为目标。平台架......
  • Spatiotemporal Remote Sensing Image Fusion Using Multiscale Two-Stream Convoluti
    SpatiotemporalRemoteSensingImageFusionUsingMultiscaleTwo-StreamConvolutionalNeuralNetworksabstract地表反射率图像的渐变和突变是现有STF方法的主要挑战。(Gradualandabruptchangesinlandsurfacereflectanceimagesarethemainchallengesinexisting......
  • GraalVM(云原生时代的Java)和IoT在边缘侧落地与实践
    环顾四周,皆是对手!云时代的掉队者,由于Java启动的高延时、对资源的高占用、导致在Serverless及FaaS架构下力不从心,在越来越流行的边缘计算、IoT方向上也是难觅踪影;Java语言在业务服务开发中孤独求败,但在系统级应用领域几乎是C、C++、搅局者Go、黑天鹅Rust的天下;移动应用、敏捷......
  • 学习MASA第一天:MASA Blazor TEST项目创建
    个人博客地址:https://note.raokun.top拥抱ChatGPT,国内访问网站:https://www.playchat.top学习MASA第一天:MASABlazorTEST项目创建从今天开始,学习MASA框架,目标是基于MASA做一套开源项目。第一天,从下载源码开始![443684122256924]我们今天先把框架源码下载下来,以便后面每天......
  • 让智慧物联赋能高效生产, AIRIOT助力数字化油田转型升级
    ​近年来,中国石油行业为了推进工业化和信息化深度融合,充分结合勘探开发、生产科研和经营管理的实际需求,积极通过信息化建设促进油田业务转型升级。在勘探开发与管理的领域中,油气生产物联网系统是一个极其重要的信息系统建设项目。利用该系统不但可以对生产过程进行可视......
  • c++.12
    复习:  1、输出缓冲区    满足哪些条件会刷新输出缓冲区:    1、遇到'\n'    2、遇到输入语句    3、缓冲区满4k    4、程序正常结束    5、fflush(stdout)  2、输入缓冲区    1、当想要输入的是整型、......
  • c++.13
    预处理指令:  #define  常见笔试面试题:  1、简述#define与typedef的区别:    如果是普通类型,它们在功能上无任何区别,但本质不同,一个是代码替换,一个是类型重定义    #defineINTPint*      INTPp1,p2,p3; //p1是指针p2p3是int......