using System.Collections.Concurrent; using System.Text; using MQTTnet; using MQTTnet.Client; using Microsoft.Extensions.Options; using Microsoft.Extensions.Logging; using MQTTnet.Server; using Microsoft.Extensions.Hosting; using System.Net.NetworkInformation; using Serilog.Core; using Serilog; using Huahuan.CollectorServiceCore.Methods; using Huahuan.CollectorServiceCore.Models; using Huahuan.CollectorServiceCore.SqlSugar.Methods; using Huahuan.CollectorServiceCore.Protocols; using Huahuan.CollectorServiceCore.SqlSugar.Models; using Huahuan.CollectorServiceCore.Services; using Microsoft.VisualBasic; using System.Text.Json.Nodes; using Huahuan.CommonLib.Methods; namespace Huahuan.CollectorServiceCore { public class MqttHuahuan { private static readonly Logger _logger; //private readonly ConcurrentDictionary<long, string> dicReply = new(); // 消息id,消息 private static ConcurrentQueue<MqttMesssage> queuePubMsg = new(); private static ConcurrentQueue<MqttMesssage> queueSubMsg = new(); private static IMqttClient? mqttClient; private static MqttClientOptions? mqttClientOptions; private static bool isLongConnect = false;// 长连接 private static ConcurrentDictionary<string, UInt32> dicCollectorFilter = new ConcurrentDictionary<string, UInt32>(); // 采集器管理 static MqttHuahuan() { _logger = new LoggerConfiguration() .WriteTo.File($"{AppDomain.CurrentDomain.BaseDirectory}/logs/MqttHuahuan-.txt", rollingInterval: RollingInterval.Day) // 按天写入文件 .CreateLogger(); ConcurrentDictionary<string, UInt32> dic31311 = CollectorTB.GetSnCycleByPort(31311); // 获取采集器 ConcurrentDictionary<string, UInt32> dic55888 = CollectorTB.GetSnCycleByPort(55888); // 获取采集器 dicCollectorFilter = dic31311; foreach (var kv in dic55888) { dicCollectorFilter.TryAdd(kv.Key, kv.Value); } _ = ClientCreate(); } public static async Task ClientCreate() { do { try { var mqttParams = AppConfigHelper.GetMqttOptions(); // 获取配置 var mqttFactory = new MqttFactory(); mqttClient = mqttFactory.CreateMqttClient(); mqttClientOptions = new MqttClientOptionsBuilder() .WithTcpServer(mqttParams.Host, mqttParams.Port) .WithClientId(mqttParams.ClientId) .WithCredentials(mqttParams.User, mqttParams.Password) .WithCleanSession(false) .WithKeepAlivePeriod(TimeSpan.FromSeconds(60)) // 设置 Keep Alive 间隔为 60 秒。 .WithCleanSession(true) .Build(); // 连接断开事件 mqttClient.DisconnectedAsync += async e => { if (isLongConnect) { await Task.Delay(5000); _logger.Information("MqttHuahuan:发起重连"); await Start(); } }; // 订阅事件 mqttClient.ApplicationMessageReceivedAsync += e => { //Log.LogInformation($"Received message: {Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment)}"); MqttMesssage mqttMesssage = new() { Topic = e.ApplicationMessage.Topic, Message = Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment) }; queueSubMsg.Enqueue(mqttMesssage); return Task.CompletedTask; }; // 连接成功事件 mqttClient.ConnectedAsync += async e => { _logger.Information("MqttHuahuan:连接成功"); await mqttClient.UnsubscribeAsync("#"); // 添加订阅逻辑 //var mqttParams = AppConfigHelper.GetMqttOptions(); if (mqttParams.SubTopics != null) { foreach (var topic in mqttParams.SubTopics) { await mqttClient.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic(topic).Build()); } } }; Task taskSub = new(action: async () => await SubscribeMessageProcessTask()); taskSub.Start(); Task taskPub = new(action: async () => await PublishMessagePorcessAsync()); taskPub.Start(); //Task testTask = new Task(action: async () => await PublishMessageTest()); //testTask.Start(); } catch (Exception e) { _logger.Error("MqttHuahuan:创建失败,{e}", e.Message); await Task.Delay(5000); } } while (mqttClient == null); } public static async Task Start() { try { isLongConnect = true; if (mqttClient != null) { var response = await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None); } else { _logger.Information("mqttClient is null"); } } catch (Exception e) { _logger.Error("MqttHuahuan:连接失败,{e}", e.Message); } } public static async Task Stop() { isLongConnect = false; if (mqttClient != null && mqttClient.IsConnected) { await mqttClient.DisconnectAsync(); } _logger.Information("MqttHuahuan:已退出"); } private static async Task PublishMessageTest() { await Task.Delay(5000); while (isLongConnect) { MqttMesssage msg = new MqttMesssage(); msg.Retain = false; msg.Topic = "utest/123"; msg.Message = "test1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111"; queuePubMsg.Enqueue(msg); await Task.Delay(1); } } /// <summary> /// 发布消息任务 /// </summary> private static async Task PublishMessagePorcessAsync() { await Task.Delay(2000); int dly = 1000; _logger.Information("MqttHuahuan:发布消息任务已启动"); while (isLongConnect) { try { if (mqttClient != null && mqttClient.IsConnected) { if (!queuePubMsg.IsEmpty) { if (queuePubMsg.TryDequeue(out MqttMesssage? pub_msg)) { if (pub_msg == null) continue; if (string.IsNullOrWhiteSpace(pub_msg.Topic)) continue; if (string.IsNullOrWhiteSpace(pub_msg.Message)) continue; MqttApplicationMessage message = new() { PayloadSegment = Encoding.UTF8.GetBytes(pub_msg.Message), Topic = pub_msg.Topic, Retain = pub_msg.Retain, QualityOfServiceLevel = MQTTnet.Protocol.MqttQualityOfServiceLevel.ExactlyOnce }; //_logger.Information("发布消息:topic={topic},payload={payload}",pub_msg.Topic, pub_msg.Message); await mqttClient.PublishAsync(message); } //await Task.Delay(1); } else { await Task.Delay(1000); } } else { _logger.Warning("MqttHuahuan,发布任务,mqttc is null or disconneted!"); await Task.Delay(10000); } dly = 1000; } catch (Exception e) { _logger.Error("MqttHuahuan:发布消息异常:{e}", e.Message); await Task.Delay(dly); if (dly < 60000) // 1分钟 { dly += dly; } } } _logger.Information("MqttHuahuan:发布消息任务已停止"); } /// <summary> /// 订阅消息处理任务 /// </summary> private static async Task SubscribeMessageProcessTask() { await Task.Delay(2000); int dly = 1000; _logger.Information("MqttHuahuan:订阅消息任务已启动"); while (isLongConnect) { try { if (queueSubMsg.TryDequeue(out MqttMesssage? sub_msg)) { if (sub_msg == null) continue; if (string.IsNullOrWhiteSpace(sub_msg.Topic)) continue; if (string.IsNullOrWhiteSpace(sub_msg.Message)) continue; string[] splitTopics = sub_msg.Topic.Split('/'); string payload = sub_msg.Message; if (splitTopics.Length > 4) { string sn = splitTopics[4]; string model = splitTopics[3]; if (splitTopics[1] == "up") { if (splitTopics[2] == "sensors") { Log.Information("传感器数据, topic={0}, payload={1}", sub_msg.Topic, sub_msg.Message); // 过滤器 if (AppConfigHelper.GetOtherOptions().DataFilterUsed) { if (!dicCollectorFilter.ContainsKey(sn)) // 过滤未添加的采集器 { _logger.Information("【拦截】topic={0},payload={1}" , sub_msg.Topic, sub_msg.Message); // 记录被过滤的数据 continue; } } //List<SensorData>? list = CollectorJsonProto.ToSensorData(sn, payload); List<SensorData>? list = CollectorJsonProto.ToSensorDataPluse(sn, payload); if (list!=null) { foreach (SensorData sd in list) DataService.AddSensorData(sd); } else { _logger.Error("传感器数据解析失败!topic={0},payload={1}", sub_msg.Topic, sub_msg.Message); } } else if (splitTopics[2] == "collectors") { //if (strings[3] == "HD_CJY2021_724") // 转发 //{ // sub_msg.Topic.Replace("shhhcl", "yun"); // sub_msg.Topic.Replace("HD_CJY2021_724", "HD-CJY2021"); // AddToQueuePubMsg(sub_msg); //} // 过滤器 if (AppConfigHelper.GetOtherOptions().DataFilterUsed) { if (!dicCollectorFilter.ContainsKey(sn)) // 过滤未添加的采集器 { _logger.Information("【拦截】topic={0},payload={1}", sub_msg.Topic, sub_msg.Message); // 记录被过滤的数据 continue; } } JsonObject? jo = JsonHepler.FromJson<JsonObject>(payload); string? msgtype = (string?)jo?["msgtype"]; if (string.IsNullOrWhiteSpace(msgtype)) continue; if ((jo?["message"] == null)) continue; if ((msgtype == "collector_datas") || (msgtype == "node_datas")) { CollectorData? cd = CollectorJsonProto.ToColletorData(sn, model, jo["message"]?.ToString()); if (cd != null) { DataService.AddCollectorData(cd); } else { _logger.Error("采集仪数据解析失败!topic={0},payload={1}", sub_msg.Topic, sub_msg.Message); } } else if (msgtype == "collector_event") { } } } } else if (splitTopics.Length == 4) { if (splitTopics[1] == "request") { string msgid = splitTopics[3]; // 消息id string method = splitTopics[2]; // 方法 switch (method) { case "add_collector": new ApiMqttProtoService(_logger).AddCollector(msgid, payload); break; case "del_collector": new ApiMqttProtoService(_logger).DelCollector(msgid, payload); break; case "mod_collector": new ApiMqttProtoService(_logger).ModCollector(msgid, payload); break; case "set_sensors": new ApiMqttProtoService(_logger).SetSensorList(msgid, payload); break; case "set_wx_sensors": break; case "restart_collector": new ApiMqttProtoService(_logger).RestartCollector(msgid, payload); break; case "ser_server": new ApiMqttProtoService(_logger).SetServer(msgid, payload); break; default: _logger.Information("不需要处理的方法,{fn}", method); break; } } } else if (splitTopics.Length == 2) // 节点事件 { string[] arr = payload.Split('_'); string sn = ""; bool isConnected = false; if (arr.Length != 1) continue; // 消息错误 sn = arr[arr.Length - 1]; // 编号 if (sn.Substring(0, 2) != "86") { //_logger.Warning("MqttHuahuan:设备编号不匹配, sn={sn}", sn); // 编号不匹配 continue; // 不是设备编号 } if (AppConfigHelper.GetOtherOptions().DataFilterUsed) { if (!dicCollectorFilter.ContainsKey(sn)) // 过滤未添加的采集器 continue; } if (splitTopics[1] == "client_connected") // 上线 { isConnected = true; } else if (splitTopics[1] == "client_disconnected") // 离线 { isConnected = false; } DataService.AddCollectorSataus(new CollectorStatus() { Sn = sn, Status = isConnected, Created = DateTime.Now }); // 修改采集仪连接状态 if (AppConfigHelper.GetOtherOptions().TcpToMqttAllowed) // 转发 { PublishMessage(CollectorJsonProto.StatusConvert(sn, isConnected)); } } await Task.Delay(1); } else { await Task.Delay(1000); } dly = 1000; } catch (Exception e) { _logger.Error("MqttHuahuan:消息处理异常:{e}", e.Message); await Task.Delay(dly); if (dly < 60000) // 1分钟 { dly += dly; } } } _logger.Information("MqttHuahuan:订阅消息任务已停止"); } /// <summary> /// 添加要发布的消息 /// </summary> /// <param name="mqttMesssage"></param> public static void PublishMessage(MqttMesssage mqttMesssage) { queuePubMsg.Enqueue(mqttMesssage); } /// <summary> /// 移除采集器 /// </summary> /// <param name="sn"></param> /// <returns></returns> public static Boolean CollectorRemove(string sn) { if (dicCollectorFilter.TryRemove(sn, out _)) { return true; } return false; } /// <summary> /// 添加采集器 /// </summary> /// <param name="sn"></param> /// <param name="cycle"></param> /// <returns></returns> public static Boolean CollectorAdd(string sn, UInt32 cycle) { if (dicCollectorFilter.TryAdd(sn, cycle)) { return true; } return false; } } }
标签:Task,await,Mqttnet,mqtt,using,msg,new,logger,客户端 From: https://www.cnblogs.com/zengfeng1013/p/18358202