【设备层】边缘网关
using HslCommunication; using HslCommunication.Core; using HslCommunication.Core.Device; using HslCommunication.ModBus; using IotDataFlow.Section.gate.model; using IotDataFlow.Section.iot.model; using IotDataFlow.Util; using Microsoft.ClearScript.V8; using MQTTnet.Client; using System.Text; using System.Text.Json; namespace IotDataFlow.Section.gate { /// <summary> /// 【设备层】边缘网关 /// </summary> public class Gate { static GateModel GateModel_Time = new GateModel(); static GateModel GateModel_Change = new GateModel(); static Dictionary<string, EquipModel> DicTag = new Dictionary<string, EquipModel>(); static MQClient MqttGate = new MQClient(); static string ScriptPathGate = AppDomain.CurrentDomain.BaseDirectory + $"Script{Path.DirectorySeparatorChar}gate{Path.DirectorySeparatorChar}script1.js"; static V8ScriptEngine Engine = new V8ScriptEngine(); static string PubTopic; /// <summary> /// 配置基础信息:网关信息 /// </summary> public static void ConfigBaseInfo() { try { GateModel_Time.GateCode = "gw1"; GateModel_Time.Equips = new List<EquipModel>() { new EquipModel() { EquipCode = "JY355", HostAddress = "127.0.0.1", PortNumber = 502, Tags = new List<TagModel>() { new TagModel() { TagCode = "40105", TagDT = (typeof(ushort)).Name.ToLower(), TagValue = 0 }, new TagModel() { TagCode = "40106", TagDT = (typeof(ushort)).Name.ToLower(), TagValue = 0 }, new TagModel() { TagCode = "00105", TagDT = (typeof(bool)).Name.ToLower(), TagValue = false }, new TagModel() { TagCode = "00106", TagDT = (typeof(bool)).Name.ToLower(), TagValue = false } } } }; // 添加地址与数据类型对应关系,用于写入时查询 foreach (var equip in GateModel_Time.Equips) { foreach (var tag in equip.Tags) { if (!DicTag.ContainsKey(tag.TagCode)) { DicTag.Add(tag.TagCode, equip); } } } // 变化上传Model GateModel_Change.GateCode = GateModel_Time.GateCode; GateModel_Change.Equips = new List<EquipModel>() { new EquipModel() { Tags = new List<TagModel>() { new TagModel() } } }; } catch (Exception ex) { Logger.Error($"Gate,ConfigBaseInfo,errmsg:{ex.Message}\r\nstacktrace:{ex.StackTrace}"); } } /// <summary> /// 运行服务(定时轮询&定时上报) /// </summary> public static async Task Run(MqttModel mqModel, string subTopic, string pubTopic) { try { PubTopic = pubTopic; var scriptContent = File.ReadAllText(ScriptPathGate); Engine.Execute(scriptContent); await MqttGate.InitConnect("127.0.0.1", mqModel.Port, mqModel.UserName, mqModel.Password, "mqgate", subTopic, SubCallBack); Task.Run(() => TskPoll()); Task.Run(() => TskUpload()); } catch (Exception ex) { Logger.Error($"Gate,Run,errmsg:{ex.Message}\r\nstacktrace:{ex.StackTrace}"); } } /// <summary> /// 定时轮询 /// </summary> static async void TskPoll() { while (true) { try { foreach (var equip in GateModel_Time.Equips) { var hsl = GetHslBase(equip); foreach (var tag in equip.Tags) { bool status = false; object result = null; string message = null; var startaddress = tag.TagCode; ConvertStandardModbusAddress2HSLAddress(ref startaddress); switch (tag.TagDT) { case "uint16": { OperateResult<ushort> response = await hsl.ReadUInt16Async(startaddress); if (null != response) { status = response.IsSuccess; result = response.Content; message = response.ToMessageShowString(); } } break; case "boolean": { OperateResult<bool> response = await hsl.ReadBoolAsync(startaddress); if (null != response) { status = response.IsSuccess; result = response.Content; message = response.ToMessageShowString(); } } break; } if (!status) { Logger.Error($"网关读取Modbus数据异常,address:{tag.TagCode},errmsg:{message}"); } else { // 变化上报 if (!tag.TagValue.ToString().Equals(result.ToString())) { tag.TagValue = result; // GateModel_Change.Equips[0].EquipCode = equip.EquipCode; GateModel_Change.Equips[0].Tags[0] = tag; UploadData("变化上报", GateModel_Change);// {"ts":"2024-09-04T03:13:41.874Z","d":[{"tag":"40105","value":0}]} } } await Task.Delay(100); } } } catch (Exception ex) { Logger.Error($"Gate,TskPoll,errmsg={ex.Message}\r\nstacktrace:{ex.StackTrace}"); } await Task.Delay(500); } } /// <summary> /// 定时上报 /// </summary> static async void TskUpload() { while (true) { await Task.Delay(60000); try { await UploadData("定时上报", GateModel_Time);// {"ts":"2024-09-03T07:57:58.471Z","d":[{"tag":"40105","value":"5"},{"tag":"40106","value":"6"}]} } catch (Exception ex) { Logger.Error($"Gate,TskUpload,errmsg={ex.Message}\r\nstacktrace:{ex.StackTrace}"); } } } /// <summary> /// 上报数据 /// </summary> static async Task UploadData(string note, GateModel gateModel) { string dataJson = JsonSerializer.Serialize(gateModel); var encodeJson = Engine.Invoke("encodeMqttPayload", dataJson); Logger.Info($"网关自定义编码 >> 备注=【{note}】 Topic主题=【{PubTopic}】 消息=【{encodeJson}】"); await MqttGate.Publish(PubTopic, encodeJson.ToString()); } /// <summary> /// 收到消息事件 /// </summary> /// <param name="arg"></param> /// <returns></returns> static async Task SubCallBack(MqttApplicationMessageReceivedEventArgs arg) { try { string topic = arg.ApplicationMessage.Topic; string mqttPayload = Encoding.UTF8.GetString(arg.ApplicationMessage.PayloadSegment); Logger.Info($"网关接收报文 >> 客户端ID=【{arg.ClientId}】 Topic主题=【{topic}】 消息=【{mqttPayload}】"); // 写入 {"ts":"2024-09-04T03:41:34.747Z","w":[{"tag":"10105"}]} var decodeJson = Engine.Invoke("decodeMqttPayload", mqttPayload); Logger.Info($"网关自定义解码 >> {decodeJson}"); var model = JsonSerializer.Deserialize<GateModel>(decodeJson.ToString()); foreach (var equip in model.Equips) { foreach (var tag in equip.Tags) { var tagCode = tag.TagCode; var address = tagCode; ConvertStandardModbusAddress2HSLAddress(ref address); var content = tag.TagValue.ToString(); OperateResult? operateResult = null; if (DicTag.ContainsKey(tagCode)) { var tempDevice = DicTag[tagCode]; var hsl = GetHslBase(tempDevice); var tempTag = tempDevice.Tags.Where(x => x.TagCode == tagCode).First(); if (null != tempTag) { switch (tempTag.TagDT) { case "uint16": { operateResult = await hsl.WriteAsync(address, Convert.ToUInt16(content)); } break; case "boolean": { operateResult = await hsl.WriteAsync(address, Convert.ToBoolean(content)); } break; } if (null != operateResult && operateResult.IsSuccess) { if (operateResult.IsSuccess) { Logger.Info($"网关写入数据 >> 状态=【成功】 地址=【{tagCode}】 值=【{content}】"); } else { Logger.Error($"网关写入数据 >> 状态=【失败】 地址=【{tagCode}】 值=【{content}】错误信息=【{operateResult.ToMessageShowString()}】"); } } else { Logger.Error($"网关写入数据 >> 状态=【失败】 地址=【{tagCode}】 值=【{content}】 错误信息=【operateResult==null】"); } } else { Logger.Error($"网关写入数据 >> 状态=【失败】 地址=【{tagCode}】 值=【{content}】 错误信息=【tempTag==null】"); } } else { Logger.Error($"网关写入数据 >> 状态=【失败】 地址=【{tagCode}】 值=【{content}】错误信息=【DicTag字典查无数据】"); } } } } catch (Exception ex) { Logger.Error($"Gate,SubCallBack,errmsg:{ex.Message}\r\nstacktrace:{ex.StackTrace}"); } } /// <summary> /// 获取Hsl进行设备交互的对象 /// </summary> static DeviceCommunication GetHslBase(EquipModel equip) { var hsl = (DeviceCommunication)UidMgr.GetClient(equip.EquipCode); if (null == hsl) { var temp = new ModbusTcpNet(equip.HostAddress, equip.PortNumber); temp.Station = 1; temp.AddressStartWithZero = false; temp.DataFormat = DataFormat.CDAB; temp.ReceiveTimeOut = 5000; hsl = temp; UidMgr.AddClient(equip.EquipCode, hsl); Logger.Info($"网关初始化设备交互 >> 设备编码=【{equip.EquipCode}】"); } return hsl; } /// <summary> /// 地址转换 /// </summary> /// <param name="val"></param> static void ConvertStandardModbusAddress2HSLAddress(ref string val) { if (!val.Contains("x=")) { int code = 1; ushort address = Convert.ToUInt16(val); if (address >= 00001 && address <= 09999)// 00001 ~ 09999 { code = 1;// 读线圈状态 } else if (address >= 10001 && address <= 19999)// 10001 ~ 19999 { code = 2;// 读离散输入状态(只读) } else if (address >= 30001 && address <= 39999)// 30001 ~ 39999 04指令 { code = 4;// 读输入寄存器(只读) } else if (address >= 40001 && address <= 49999)// 40001 ~ 49999 03指令 { code = 3;// 读保存寄存器 } var temp = Convert.ToUInt16(val.Substring(1)); val = $"x={code};{temp}"; } } } }
【应用层】管理后台
using IotDataFlow.Section.iot.model; using IotDataFlow.Util; using System.Text.Encodings.Web; using System.Text.Json; namespace IotDataFlow.Section.iot { /// <summary> /// 【应用层】管理后台 /// </summary> public class Web { /// <summary> /// 配置基础数据:设备信息(物模型) /// </summary> public static void ConfigBaseInfo(string deviceId, string subTopic, string pubTopic) { // 设备注册 var insert = false; var deviceModel = LitedbHelper.GetDeviceModel(deviceId); if (null == deviceModel) { deviceModel = new DeviceModel(); insert = true; } deviceModel.SubTopic = subTopic; deviceModel.PubTopic = pubTopic; deviceModel.DeviceId = deviceId; deviceModel.DeviceName = "温湿度计"; deviceModel.Propertys = new Dictionary<string, PropertyModel>() { ["temperature"] = new PropertyModel() { DataType = (typeof(double)).Name.ToLower(), DataValue = 0 }, ["humidity"]= new PropertyModel() { DataType = (typeof(double)).Name.ToLower(), DataValue = 0 }, ["status"] = new PropertyModel() { DataType = (typeof(string)).Name.ToLower(), DataValue = "offline" }, ["switchstate1"] = new PropertyModel() { DataType = (typeof(bool)).Name.ToLower(), DataValue = false }, ["switchstate2"] = new PropertyModel() { DataType = (typeof(bool)).Name.ToLower(), DataValue = false } }; deviceModel.Triggers = new List<TriggerModel>() { new TriggerModel() { TriggerId = 1, Description = "温度超过阈值警报", Condition = "temperature > 30", Action = new ActionModel(){ Type = ActionType.WriteDevice, Arg = new Dictionary<string, object>() { ["switchstate1"] = true } } }, new TriggerModel() { TriggerId = 2, Description = "温度正常", Condition = "temperature <= 30", Action = new ActionModel(){ Type = ActionType.WriteDevice, Arg = new Dictionary<string, object>() { ["switchstate1"] = false } } }, new TriggerModel() { TriggerId = 3, Description = "湿度低于阈值警报", Condition = "humidity > 30", Action = new ActionModel(){ Type = ActionType.WriteDevice, Arg = new Dictionary<string, object>() { ["switchstate2"] = true } } }, new TriggerModel() { TriggerId = 4, Description = "湿度正常", Condition = "humidity <= 30", Action = new ActionModel(){ Type = ActionType.WriteDevice, Arg = new Dictionary<string, object>() { ["switchstate2"] = false } } }, new TriggerModel() { TriggerId = 5, Description = "温度和湿度都正常", Condition = "temperature <= 30 and humidity >= 30", Action = new ActionModel(){ Type = ActionType.WxNotice, Arg = new Dictionary<string, object>() { ["openid"] = "112233" } } }, new TriggerModel() { TriggerId = 6, Description = "继电器1开", Condition = "switchstate1 == true", Action = null }, new TriggerModel() { TriggerId = 7, Description = "继电器2开", Condition = "switchstate2 == true", Action = null } }; if (insert) { LitedbHelper.InsertDeviceModel(deviceModel); } else { LitedbHelper.UpdateDeviceModel(deviceModel); } Logger.Info($"Web基础信息:{ JsonSerializer.Serialize(deviceModel, new JsonSerializerOptions { Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping })}"); } } }
【服务层】通信服务
using DynamicExpresso; using IotDataFlow.Section.iot.model; using IotDataFlow.Util; using Microsoft.ClearScript.V8; using MQTTnet.Client; using System.Text; using System.Text.Json; namespace IotDataFlow.Section.iot { /// <summary> /// 【服务层】通信服务 /// </summary> public class Service { static MQClient mqIot = new MQClient(); static string scriptPathIot = AppDomain.CurrentDomain.BaseDirectory + $"Script{Path.DirectorySeparatorChar}iot{Path.DirectorySeparatorChar}script1.js"; static V8ScriptEngine engine = new V8ScriptEngine(); /// <summary> /// 运行服务:MQ服务 /// </summary> public static async Task Run(MqttModel mqModel, string subTopic) { string scriptContent = File.ReadAllText(scriptPathIot); engine = new V8ScriptEngine(); engine.Execute(scriptContent); // 启动mqttserver await MQServer.Start(mqModel.Port, mqModel.UserName, mqModel.Password); // 监听订阅信息 await mqIot.InitConnect("127.0.0.1", mqModel.Port, mqModel.UserName, mqModel.Password, "mqiot", subTopic, SubCallBack); } /// <summary> /// IOT收到消息事件 /// </summary> /// <param name="arg"></param> /// <returns></returns> static async Task SubCallBack(MqttApplicationMessageReceivedEventArgs arg) { try { // mq接收 string topic = arg.ApplicationMessage.Topic; string mqttPayload = Encoding.UTF8.GetString(arg.ApplicationMessage.PayloadSegment); Logger.Info($"IoT接收报文 >> 客户端ID=【{arg.ClientId}】 Topic主题=【{topic}】 消息=【{mqttPayload}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】"); var encodeJson = engine.Invoke("encodeMqttPayload", mqttPayload);// {"temperature":5,"humidity":6} Logger.Info($"IoT自定义解码 >> {encodeJson}"); // 规则引擎 var deviceId = topic.Split("/").Last(); var deviceModel = LitedbHelper.GetDeviceModel(deviceId); var propertys = deviceModel.Propertys; var rules = deviceModel.Triggers; if (null == deviceModel || null == propertys || null == rules) return; var datas = JsonSerializer.Deserialize<Dictionary<string, JsonElement>>(encodeJson.ToString()); var lstParam = new List<Parameter>(); var lstJosnProperty = new List<string>(); foreach (var data in datas) { var propertyKey = data.Key.ToLower(); if (propertys.ContainsKey(propertyKey)) { // 收集json报文中含有的属性 lstJosnProperty.Add(propertyKey); // 填充表达式的参数 var propertyModel = propertys[propertyKey]; switch (propertyModel.DataType) { case "double": lstParam.Add(new Parameter(data.Key, data.Value.GetDouble())); break; case "string": lstParam.Add(new Parameter(data.Key, data.Value.GetString())); break; case "boolean": lstParam.Add(new Parameter(data.Key, data.Value.GetBoolean())); break; } } } // 评估规则 var interpreter = new Interpreter(); var filteredRules = rules.Where(rule => lstJosnProperty.Any(property => rule.Condition.Contains(property))).ToList(); foreach (var rule in filteredRules) { try { var condition = rule.Condition.Replace(" and ", " && ").Replace(" or ", " || "); bool result = (bool)interpreter.Eval(condition, lstParam.ToArray()); if (result) { // 添加记录 LitedbHelper.InsertAlarm(new AlarmModel() { Description= rule.Description }); if (null != rule.Action && null != rule.Action.Arg) { string argJson = JsonSerializer.Serialize(rule.Action.Arg); switch (rule.Action.Type) { case ActionType.WxNotice: { Logger.Info($"IoT触发规则 >> {rule.Description} 联动场景=【微信通知】 执行参数=【{argJson}】"); } break; case ActionType.WriteDevice: { var decodeJson = engine.Invoke("decodeMqttPayload", argJson);// {"temperature":5,"humidity":6} Logger.Info($"IoT触发规则 >> {rule.Description} 联动场景=【写入设备】 执行参数=【{argJson}】 自定义解码=【{decodeJson}】"); mqIot.Publish(deviceModel.PubTopic, decodeJson.ToString()); } break; } } else { Logger.Info($"IoT触发规则 >> {rule.Description} 联动场景=【无】"); } } } catch (Exception ex) { //Logger.Error($"Service,Condition,errmsg:{ex.Message}\r\nstacktrace:{ex.StackTrace}"); } } } catch (Exception ex) { Logger.Error($"Service,SubCallBack,errmsg:{ex.Message}\r\nstacktrace:{ex.StackTrace}"); } } } }
编解码文件
// ============================================================================================================================= // ** 脚本名称:script2.js // ** 输入Json:{"GateCode":"gw1","Equips":[{"EquipCode":"JY355","Tags":[{"TagCode":"40105","TagValue":"5"},{"TagCode":"40106","TagValue":"6"}]},{"DeviceCode":"JY356","Tags":[{"TagCode":"40107","TagValue":"7"},{"TagCode":"40108","TagValue":"8"}]}]} // ** 输出Json:{"clientid":"gw1","time":"2024-08-30 11:34:35","JY355":[{"tag":"40105","value":"5"},{"tag":"40106","value":"6"}],"JY356":[{"tag":"40107","value":"7"},{"tag":"40108","value":"8"}]} // ============================================================================================================================= function createMqttPayload(dataJson) { let gate = JSON.parse(dataJson); let clientid = gate.GateCode; let device = gate.Devices; let result = { clientid: gate.GateCode, time: new Date(), }; device.forEach(function (d) { let equipCode = d.EquipCode; let tag = d.Tags; if (!result[equipCode]) { result[equipCode] = []; } tag.forEach(function (t) { result[equipCode].push({ tag: t.TagCode, value: t.TagValue }); }); }); return JSON.stringify(result); }
【设备模拟器】- Modbus TCP
场景测试(变化上报):
1. 轮询Modbus设备点位 -> 检测到数据变化 -> 发布mq报文(自定义编码)
2. IoT订阅接收报文 -> 自定义解码 -> 规则引擎校验 -> 发布mq报文(自定义编码)
3. IoT订阅接收报文 -> 自定义解码 -> 写入点位值
场景测试(定时上报):
标签:string,IoT,骨架,tag,using,var,new,Logger,搭建 From: https://www.cnblogs.com/chen1880/p/18397083