首页 > 其他分享 >使用MASA Stack+.Net 从零开始搭建IoT平台 第八章 指令下发

使用MASA Stack+.Net 从零开始搭建IoT平台 第八章 指令下发

时间:2023-10-18 17:38:00浏览次数:36  
标签:string MASA IoT request +. RequestId EMQX public 设备

目录


指令下发-RPC式调用

我们需要控制IoT设备,就需要通过MQTT向设备发送指令,这个功能我们可以通过如下方式实现:

通过向MQTT写入数据的方式实现向设备发送指令,设备回复的结果同样写入MQTT,我们可以通过绑定一个RequestID来获取下发对应的回复结果。但是这样做是异步的。

现在各大IoT平台向设备发送指令基本都是类似RPC式调用,向设备发送指令只需要定义好消息体和超时时间,然后就可以通过类似“同步”的方式获取设备返回的结果或者下发超时的结果。

完整流程如下:

  1. 业务系统调用IoT core Api 通过http下发控制指令。
  2. IoT core调用EMQX的发布接口向mqtt写入数据。
  3. 设备通过mqtt的特定Topic订阅到指令数据。
  4. 设备执行对应操作后,将结果通过向mqtt发布消息的方式写入EMQX。
  5. EMQX通过配置规则使用Hook的方式将结果回复给IoT core Api。
  6. 如果没有超时则IoT core Api将结果写入HTTP 的响应体内,完成HTTP的请求响应。

如果设备没有在规定超时时间内回复,IoT core Api会将超时错误写入HTTP响应体,返回给业务系统。
这样业务系统只需要一次HTTP请求就可以获取到设备的执行结果,我们可以使用这种方式来执行一些时效性比较高的操作。

主题规划

我们为RPC式调用指定一个发布主题
rpc/${productKey}/${deviceName}/${requestId}

再指定一个回复主题
rpc_resp/${productKey}/${deviceName}/${requestId}

等待设备回复

业务系统调用接口下发RPC式指令后,我们需要将下发的内容记录下来,然后等待设备回复后将回复内容写入HTTP的响应体内,完成这次HTTP调用。
我们依然采用influxDB来记录下发和上报的日志,方便日后查询。

流程如下:

  1. 调用接口下发指令后,我们将指令内容记录到influxDB。并通过循环查询RequestId的方式等待设备返回,获取到结果之后返回成功,并记录成功结果(篇幅问题本文没有实现记录HTTP调用结果功能)。
  2. 接收到设备对RPC式调用的回复后,将回复内容写入InfluxDB,并关联RequestId。
  3. 超时时间内没有获取到RequestId对应的回复信息,则返回错误,并记录错误结果。

这样我们就能准确记录每次下发的指令和设备回复的信息,以及每次http请求的最终结果。

服务端实现

一、发布指令到MQTT

1、定义请求体

public class RpcMessageRequest
{
    /// <summary>
    /// 设备名称
    /// </summary>
    public string? DeviceName { get; set; }

    /// <summary>
    /// 请求ID
    /// </summary>
    public Guid RequestId { get; set; } 
    /// <summary>
    /// 产品ID
    /// </summary>

    public Guid ProductId { get; set; } = Guid.Parse("c85ef7e5-2e43-4bd2-a939-07fe5ea3f459");

    /// <summary>
    /// 消息类型
    /// </summary>
    public MessageType MessageType { get; set; } = MessageType.Down;

    /// <summary>
    /// 消息ID 来自EMQX
    /// </summary>
    public string MessageId { get; set; }

    /// <summary>
    /// 消息体
    /// </summary>
    public string MessageData { get; set; }

    /// <summary>
    /// 超时时间默认5s
    /// </summary>
    public int Timeout { get; set; } = 5;
}

2、在 MqttHandler 添加 PublishToMqttAsync 方法用于向EMQX发布数据,
发布数据我们使用EMQX提供的publish Api,相关参数与响应内容,请参考官方文档

https://www.emqx.io/docs/zh/v5.0/admin/api-docs.html#tag/Publish

/// <summary>
/// 向EMQX发布消息
/// </summary>
/// <returns></returns>
public async Task<EmqxBaseResponse> PublishToMqttAsync(PublishMessageRequest request)
{
    var url = $"{_appSettings.MqttSetting.Url}/api/v5/publish";
    var response = await url.WithBasicAuth(_appSettings.MqttSetting.ApiKey, _appSettings.MqttSetting.SecretKey).AllowAnyHttpStatu().PostJsonAsync(request);
    if (response.StatusCode is (int)HttpStatusCode.OK //200
        or (int)HttpStatusCode.BadRequest //400
        or (int)HttpStatusCode.ServiceUnavailable //503
        or (int)HttpStatusCode.Accepted) //202
    {
        return await response.GetJsonAsync<EmqxBaseResponse>();
    }
    else
    {
        throw new UserFriendlyException(await response.GetStringAsync());
    }
}

二、将下发日志写入InfluxDB

1、定义InfluxDB 的 Measurement RPCMessage

 [InfluxDB.Client.Core.Measurement("RPCMessage")]
 public class RPCMessage
 {
     /// <summary>
     /// 设备名称
     /// </summary>
     [Column("DeviceName", IsTag = true)] public string? DeviceName { get; set; }

     /// <summary>
     /// 产品ID
     /// </summary>
     [Column("ProductId", IsTag = true)]
     public Guid ProductId { get; set; } = Guid.Parse("c85ef7e5-2e43-4bd2-a939-07fe5ea3f459");

     /// <summary>
     /// 消息类型
     /// </summary>
     [Column("MessageType", IsTag = true)] public MessageType MessageType { get; set; }

     /// <summary>
     /// 请求ID
     /// </summary>
     [Column("RequestId", IsTag = true)] public Guid RequestId { get; set; }

     /// <summary>
     /// 消息ID
     /// </summary>
     [Column("MessageId", IsTag = true)] public string MessageId { get; set; }

     /// <summary>
     /// 消息体
     /// </summary>
     [Column("MessageData")] public string? MessageData { get; set; }

     /// <summary>
     /// 时间戳
     /// </summary>
     [JsonProperty(propertyName: "Ts")]
     [Column(IsTimestamp = true)] public long Timestamp { get; set; }
 }

 public enum MessageType
 {
     Up, //回复
     Down, //下发
     Other //其他
 }

2、在DeviceHandler添加 WriteRpcMessageLog 方法用于向InfluxDb写入日志

        /// <summary>
        /// 写入RPC日志
        /// </summary>
        /// <param name="request"></param>
        /// <returns></returns>
        private bool WriteRpcMessageLog(RpcMessageRequest request)
        {
            var message = new RPCMessage
            {
                DeviceName = request.DeviceName, 
                ProductId = request.ProductId, 
                MessageType = request.MessageType, 
                RequestId = request.RequestId,
                MessageId = request.MessageId,
                MessageData = request.MessageData,
                Timestamp = Convert.ToInt64((DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0, 0)).TotalMilliseconds)
            };

            //记录下发指令
            return _timeSeriesDbClient.WriteMeasurement(message);
        }

成功写入EMQX后会获取到消息的全局唯一id(MessageID)。

三、从InfluxDb获取设备响应消息

1、在TimeSeriesDbClient中添加 GetRpcMessageResultAsync 方法用于从InfluxDb获取设备回复的消息

        /// <summary>
        /// 从influxDb获取设备回复的消息
        /// </summary>
        /// <param name="option"></param>
        /// <returns></returns>
        public async Task<(string messageId, string deviceResonse)> GetRpcMessageResultAsync(GetRpcMessageOption option)
        {
            var query =
                $@"from(bucket: ""{_bucket}"")
                    |> range(start: {option.UTCStartDateTimeStr},stop:{option.UTCStopDateTimeStr})                                                           
                    |> filter(fn: (r) => r._measurement == ""RPCMessage"" 
                    and r.MessageType ==""Up""
                    and r.RequestId == ""{option.RequestId}"")
                    |>last()";

            var tables = await _client.GetQueryApi().QueryAsync(query, _org);

            var fluxRecords = tables.SelectMany(table => table.Records);

            if (fluxRecords.Any())
            {
                return new ValueTuple<string, string>(fluxRecords.First().GetValueByKey("MessageId").ToString(),
                          fluxRecords.First().GetValue().ToString());
            }

            return new ValueTuple<string, string>(string.Empty, string.Empty);
        }

MessageType = UP 代表我们获取设备上报的数据,RequestId为我们下发时的请求ID,我们使用RequestId保证一次完整下发与上报的对应关系。

2、根据RequestId 在Timeout超时时间内循环检查InfluxDB是否存在设备上报数据

       /// <summary>
       /// 获取设备回复
       /// </summary>
       /// <param name="option"></param>
       /// <returns></returns>
       private async Task<RpcMessageResponse> GetRpcMessageResponseAsync(GetRpcMessageOption option)
       {
           (string messageId, string deviceResonse) messageInfo = new();
           for (int i = 0; i < option.Timeout * 10; i++) //100ms查询一次
           {
               messageInfo = await _timeSeriesDbClient.GetRpcMessageResultAsync(option);
               if (!string.IsNullOrEmpty(messageInfo.deviceResonse))
               {
                   break; //查询到设备返回消息就停止
               }
               await Task.Delay(100);
           }

           var result = new RpcMessageResponse()
           {
               RequestId = option.RequestId,
               Success = false,
               ErrorMessage = "Cmd Timeout",
               DeviceResponse = string.Empty,
               MessageId = string.Empty,
           };

           if (!string.IsNullOrEmpty(messageInfo.deviceResonse))
           {
               var rpcMessageResponse = JsonConvert.DeserializeObject<RpcMessageResponse>(messageInfo.deviceResonse);
               result.Success = rpcMessageResponse.Success;
               result.ErrorMessage = rpcMessageResponse.ErrorMessage;
               result.DeviceResponse = messageInfo.deviceResonse;
               result.MessageId = messageInfo.messageId;
               result.RequestId = option.RequestId;
           }

           return result;
       }

每隔100ms检查一次,如果超时没有找到返回:Cmd Timeout

四、整合RPC下发业务

1、在DeviceHandler中添加 PublishAndGetResponseAsync 方法将上述流程整合

        /// <summary>
        /// 发布指令并等待设备返回
        /// </summary>
        /// <param name="request"></param>
        /// <returns></returns>
        /// <exception cref="UserFriendlyException"></exception>
        public async Task<RpcMessageResponse> PublishAndGetResponseAsync(RpcMessageRequest request)
        {
            var emqxBaseResponse = await _mqttHandler.PublishToMqttAsync(new PublishMessageRequest
            {
                Topic = $"rpc/{request.ProductId}/{request.DeviceName}/{request.RequestId}",
                Payload = request.MessageData
            });
            if (string.IsNullOrEmpty(emqxBaseResponse.Id)) //发布失败
            {
                throw new UserFriendlyException($"reason_code:{emqxBaseResponse.reason_code},Code:{emqxBaseResponse.Code},Message:{emqxBaseResponse.Message}");
            }
            request.MessageId = emqxBaseResponse.Id;
            request.MessageType = MessageType.Down;
            //写入influxDb 日志
            var writeSucceeded = WriteRpcMessageLog(request);
            if (writeSucceeded)
            {
                //获取设备返回数据
                return await GetRpcMessageResponseAsync(new GetRpcMessageOption
                {
                    RequestId = request.RequestId,
                    StartDateTime = DateTime.Now.AddMinutes(-5),
                    Timeout = request.Timeout,
                    StopDateTime = DateTime.Now.AddMinutes(+5)
                });
            }

            throw new UserFriendlyException("Write inflxDB error!");
        }

2、添加Web Api用于业务调用
在DeviceMqttController中添加方法SendRpcMessageAsync 用于业务系统调用

        /// <summary>
        /// 发送RPC式调用,并同步等待设备返回
        /// </summary>
        /// <param name="request"></param>
        /// <returns></returns>
        [HttpPost("SendRpcMessage")]
        public async Task<RpcMessageResponse> SendRpcMessageAsync(SendRpcMessageRequest request)
        {
            return await _deviceHandler.PublishAndGetResponseAsync(new RpcMessageRequest
            {
                DeviceName = request.DeviceName,
                RequestId = Guid.NewGuid(),
                ProductId = request.ProductId,
                MessageType = MessageType.Up,
                MessageData = request.MessageData,
                Timeout = request.Timeout
            });
        }

RequestId 是业务端请求时产生的,RequestId 会被带入rpc Topic中,设备需要回复以该RequestId 结尾的rpc_resp Topic完成一次完整的请求和回复过程。MessageID是来自于EMQX Publish接口发布成功后的接口返回。

五、接收设备回复消息

接收设备回复的消息是通过在EMQX配置规则,通过Hook的方式获取的,所以我们需要添加一个方法让EMQX 的Hook调用。

1、添加请求体

    public class RespondRpcMessageRequest
    {
        /// <summary>
        /// Topic
        /// </summary>
        public string Topic { get; set; }

        /// <summary>
        /// 消息体
        /// </summary>
        public string Payload { get; set; }

        /// <summary>
        /// 消息Id(来自EMQX)
        /// </summary>
        public string MessageId { get; set; }
    }

2、添加RespondToRpc接口用于Hook

        /// <summary>
        /// 设备响应RPC请求
        /// </summary>
        /// <param name="request"></param>
        /// <returns></returns>
        [HttpPost("RespondToRpc")]
        public async Task<bool> RespondToRpcAsync(RespondRpcMessageRequest request)
        {
            var infoArr = request.Topic.Split("/");
            var result = _deviceHandler.RespondToRpc(new RpcMessageRequest
            {
                DeviceName = infoArr[2],
                RequestId = Guid.Parse(infoArr[3]),
                ProductId = Guid.Parse(infoArr[1]),
                MessageType = MessageType.Up,
                MessageId = request.MessageId,
                MessageData = request.Payload,
            });
            return await Task.FromResult(result);
        }

DeviceName 、RequestId 、ProductId 我们都从设备回复的Topic中获取。获取之后我们写入InfluxDB,使上面的GetRpcMessageResultAsync方法可以及时获取。

配置EMQX规则

通过配置EMQX规则,实现设备通过回复rpc_resp时,调用上面的RespondToRpcAsync接口
我们在EMQX管理界面->集成->规则中新建规则RespondToRpcMessage

SELECT
  *,json_encode(payload) as payloadStr
FROM
  "rpc_resp/#"

这里我们使用json_encode函数将payload转换成字符串payloadStr,因为设备上报的是Json,RespondToRpcAsync方法接收消息体的是string类型。

然后在右侧"添加动作",选择HTTP服务,动作选择"使用数据桥接转发",配置我们的接口和请求体

注意:这里${payloadStr}不需要加双引号。

测试

我们在Swagger中调用SendRpcMessage接口,为了方便演示,这里超时时间我设置为30s,方便在MQTTX模拟器中手动操作回复动作。

我们连接上EMQX后在订阅中添加对rpc Topic的订阅

Topic:"rpc/c85ef7e5-2e43-4bd2-a939-07fe5ea3f459/284202304230001/#" 其中c85ef7e5-2e43-4bd2-a939-07fe5ea3f459为产品id,284202304230001为设备名称,# 用于匹配RequestID

我们调用接口下发后可以看到MQTTX订阅到了RPC式调用的指令

我们快速修改MQTTX模拟设备上报的Topic为 rpc_resp/c85ef7e5-2e43-4bd2-a939-07fe5ea3f459/284202304230001/3c0131a8-6eff-48c4-b703-1f681b9ec7ff,其中末尾的3c0131a8-6eff-48c4-b703-1f681b9ec7ff为RequestID

我们回复JSON格式消息"{"Success": true,"DeviceResponse":"{"DevicePower":"ON"}"}",可以看到,接口"同步"获取到了设备的响应

总结

至此,我们完成了一次完整对设备的RPC式调用,可以通过该方式控制设备。
这里只实现了最基础的功能,生产环境中还需要做很多工作,
例如下发指令需要先鉴权,需要检查设备是否存在、提供设备不存在或者不在线的错误提示等,而且应该将每次调用 SendRpcMessageAsync 接口返回的内容也记录到InfluxDB中,通过RequestId,与这一次完整请求关联,方便业务查询。
介于篇幅问题,我也没有将功能整合到UI中,我会在之后的内容中将这部分内容整合,并作为单独的功能展示出来。

完整代码在这里:https://github.com/sunday866/MASA.IoT-Training-Demos

标签:string,MASA,IoT,request,+.,RequestId,EMQX,public,设备
From: https://www.cnblogs.com/sunday866/p/17768836.html

相关文章

  • IOT 围炉札记
    文章目录一、蓝牙二、PAN1080三、IOTOS四、通讯  物联网(英文:InternetofThings,缩写:IoT)起源于传媒领域,是信息科技产业的第三次革命。物联网是指通过信息传感设备,按约定的协议,将任何物体与网络相连接,物体通过信息传播媒介进行信息交换和通信,以实现智能化识别、定位、跟踪、监......
  • MASA MAUI 预览Office文件
    目录背景介绍1、新建MAUIBlazor项目2、创建OfficeViewer.razor组件3、使用安卓模拟器运行4、兼容iOS总结背景接到一个在Maui中预览Office文件的需求,包含excel、word、PDF三种常见的文件,经过技术选型,最后选择了微软原生支持的office在线预览Api,原因是此技术方案简单、跨平台。在......
  • C# +.Net +Oracle的医院化验室LIS系统源码
    LIS系统源码技术细节:Ø体系结构:Client/Server架构SaaS模式Ø客户端:WPF+WindowsFormsØ服务端:C#+.NetØ数据库:OracleØ接口技术:RESTfulAPI+Http+WCFLIS检验系统一种专门用于医院化验室的计算机系统,它致力于提高医院化验室的工作效率和检测准确率。LIS系统由多个子系统组......
  • 求1!+2!+3!+......+10!
    #include<stdio.h>intmain()                                                   {intn=1;inti=1;intt=0;for(n=1;n<=10;n++)          ......
  • iot逆向之与设备之间建立调试
    引脚图JTAG与SWD引脚定义SWD:SerialWireDebug串行调试。由ARM公司开发出来的,目的是减少调试接口的引脚数。SWD与JTAG接口都是在使用仿真器时需要用到的调试接口。仿真器的作用是替代单片机、ARM对程序的运行进行控制,实现硬件的仿真。SWD引脚:●GND:公共地信号(地线)●SWD......
  • 节能减排 | AIRIOT智慧工厂节能管理解决方案
    工厂作为高能耗的生产型企业,降低能耗和提升资源利用率方面就显得很重要,对实施国家倡导的节能降耗、绿色发展有着很大程度上的必要性。然而,工厂能源管理从传统手段向智能化升级转型的过程中,企业也不可避免的面临一些痛点和挑战:节能目标完成难度大:随着产量上升,企业能源综合消耗量增加......
  • 中国电信 NB-IoT 翻频方案及操作指导
    中国电信800MNB-IoT翻频方案说明:翻频前频点分布如下图所示,对应中心频点为879.4/.6/.8MHz或879.5/.7/.9MHz翻频前频点信息:BAND52504,2506,2508  翻频后频点下移分布如下图所示,对应中心频点为869.1/869.3/869.5MHz位置,翻频后频点信息:BAND52401,2403,2405 ......
  • LIS实验室(检验科)信息系统源码 C# +.Net+Oracle
    LIS实验室(检验科)信息系统,一体化设计,与其他系统无缝连接,全程化条码管理。集申请、采样、核收、计费、检验、审核、发布、质控、查询、耗材控制等检验科工作为一体的网络管理系统。技术细节:体系结构:Client/Server架构客户端:WPF+WindowsForms服务端:C#+.Net数据库:Oracle接口技术:RESTf......
  • 天翼物联网平台(AIot) 架构
    平台架构-中国电信天翼物联网CTWing门户网站https://www.ctwing.cn/wlwkfpt/13#see   翻译搜索复制......
  • Apache IoTDB开发系统之C++原生接口
    安装相关依赖MAC安装Bison:Mac环境下预安装了Bison2.3版本,但该版本过低不能够用来编译Thrift。使用Bison2.3版本会报以下错误: invaliddirective:'%code'使用下面brew命令更新bison版本:brewinstallbisonbrewlinkbison--force添加环境变量:echo'exportPATH="/......