首页 > 其他分享 >Mqttnet实现的mqtt客户端

Mqttnet实现的mqtt客户端

时间:2024-08-14 09:28:35浏览次数:12  
标签:Task await Mqttnet mqtt using msg new logger 客户端

using System.Collections.Concurrent;
using System.Text;
using MQTTnet;
using MQTTnet.Client;
using Microsoft.Extensions.Options;
using Microsoft.Extensions.Logging;
using MQTTnet.Server;
using Microsoft.Extensions.Hosting;
using System.Net.NetworkInformation;
using Serilog.Core;
using Serilog;
using Huahuan.CollectorServiceCore.Methods;
using Huahuan.CollectorServiceCore.Models;
using Huahuan.CollectorServiceCore.SqlSugar.Methods;
using Huahuan.CollectorServiceCore.Protocols;
using Huahuan.CollectorServiceCore.SqlSugar.Models;
using Huahuan.CollectorServiceCore.Services;
using Microsoft.VisualBasic;
using System.Text.Json.Nodes;
using Huahuan.CommonLib.Methods;

namespace Huahuan.CollectorServiceCore
{
    public class MqttHuahuan
    {
        private static readonly Logger _logger;

        //private readonly ConcurrentDictionary<long, string> dicReply = new(); // 消息id,消息
        private static ConcurrentQueue<MqttMesssage> queuePubMsg = new();
        private static ConcurrentQueue<MqttMesssage> queueSubMsg = new();
        private static IMqttClient? mqttClient;
        private static MqttClientOptions? mqttClientOptions;
        private static bool isLongConnect = false;// 长连接

        private static ConcurrentDictionary<string, UInt32> dicCollectorFilter = new ConcurrentDictionary<string, UInt32>(); // 采集器管理


        static MqttHuahuan()
        {
            _logger = new LoggerConfiguration()
                .WriteTo.File($"{AppDomain.CurrentDomain.BaseDirectory}/logs/MqttHuahuan-.txt", rollingInterval: RollingInterval.Day) // 按天写入文件 
                .CreateLogger();

            ConcurrentDictionary<string, UInt32> dic31311 = CollectorTB.GetSnCycleByPort(31311); // 获取采集器
            ConcurrentDictionary<string, UInt32> dic55888 = CollectorTB.GetSnCycleByPort(55888); // 获取采集器
            dicCollectorFilter = dic31311;
            foreach (var kv in dic55888)
            {
                dicCollectorFilter.TryAdd(kv.Key, kv.Value);
            }
            _ = ClientCreate();
        }
        public static async Task ClientCreate()
        {
            do
            {
                try
                {
                    var mqttParams = AppConfigHelper.GetMqttOptions(); // 获取配置
                    var mqttFactory = new MqttFactory();
                    mqttClient = mqttFactory.CreateMqttClient();
                    mqttClientOptions = new MqttClientOptionsBuilder()
                        .WithTcpServer(mqttParams.Host, mqttParams.Port)
                        .WithClientId(mqttParams.ClientId)
                        .WithCredentials(mqttParams.User, mqttParams.Password)
                        .WithCleanSession(false)
                        .WithKeepAlivePeriod(TimeSpan.FromSeconds(60)) // 设置 Keep Alive 间隔为 60 秒。
                        .WithCleanSession(true)
                        .Build();

                    // 连接断开事件
                    mqttClient.DisconnectedAsync += async e =>
                    {
                        if (isLongConnect)
                        {
                            await Task.Delay(5000);
                            _logger.Information("MqttHuahuan:发起重连");
                            await Start();
                        }
                    };
                    // 订阅事件
                    mqttClient.ApplicationMessageReceivedAsync += e =>
                    {
                        //Log.LogInformation($"Received message: {Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment)}");
                        MqttMesssage mqttMesssage = new()
                        {
                            Topic = e.ApplicationMessage.Topic,
                            Message = Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment)
                        };
                        queueSubMsg.Enqueue(mqttMesssage);
                        return Task.CompletedTask;
                    };
                    // 连接成功事件
                    mqttClient.ConnectedAsync += async e =>
                    {
                        _logger.Information("MqttHuahuan:连接成功");
                        await mqttClient.UnsubscribeAsync("#");

                        // 添加订阅逻辑
                        //var mqttParams = AppConfigHelper.GetMqttOptions();
                        if (mqttParams.SubTopics != null)
                        {
                            foreach (var topic in mqttParams.SubTopics)
                            {
                                await mqttClient.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic(topic).Build());
                            }
                        }
                    };

                    Task taskSub = new(action: async () => await SubscribeMessageProcessTask());
                    taskSub.Start();
                    Task taskPub = new(action: async () => await PublishMessagePorcessAsync());
                    taskPub.Start();

                    //Task testTask = new Task(action: async () => await PublishMessageTest());
                    //testTask.Start();
                }
                catch (Exception e)
                {
                    _logger.Error("MqttHuahuan:创建失败,{e}", e.Message);
                    await Task.Delay(5000);
                }
            } while (mqttClient == null);
        }

        public static async Task Start()
        {
            try
            {
                isLongConnect = true;
                if (mqttClient != null)
                {
                    var response = await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);
                }
                else
                {
                    _logger.Information("mqttClient is null");
                }
            }
            catch (Exception e)
            {
                _logger.Error("MqttHuahuan:连接失败,{e}", e.Message);
            }
        }
        public static async Task Stop()
        {
            isLongConnect = false;
            if (mqttClient != null && mqttClient.IsConnected)
            {
                await mqttClient.DisconnectAsync();
            }
            _logger.Information("MqttHuahuan:已退出");
        }


        private static async Task PublishMessageTest()
        {
            await Task.Delay(5000);
            while (isLongConnect)
            { 
                MqttMesssage msg = new MqttMesssage();
                msg.Retain = false;
                msg.Topic = "utest/123";
                msg.Message = "test1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111";
                queuePubMsg.Enqueue(msg);
                await Task.Delay(1);
            }
        }

        /// <summary>
        /// 发布消息任务
        /// </summary>
        private static async Task PublishMessagePorcessAsync()
        {
            await Task.Delay(2000);
            int dly = 1000;
            _logger.Information("MqttHuahuan:发布消息任务已启动");
            while (isLongConnect)
            {
                try
                {
                    if (mqttClient != null && mqttClient.IsConnected)
                    {
                        if (!queuePubMsg.IsEmpty)
                        {
                            if (queuePubMsg.TryDequeue(out MqttMesssage? pub_msg))
                            {
                                if (pub_msg == null) continue;
                                if (string.IsNullOrWhiteSpace(pub_msg.Topic)) continue;
                                if (string.IsNullOrWhiteSpace(pub_msg.Message)) continue;
                                MqttApplicationMessage message = new()
                                {
                                    PayloadSegment = Encoding.UTF8.GetBytes(pub_msg.Message),
                                    Topic = pub_msg.Topic,
                                    Retain = pub_msg.Retain,
                                    QualityOfServiceLevel = MQTTnet.Protocol.MqttQualityOfServiceLevel.ExactlyOnce
                                };
                                //_logger.Information("发布消息:topic={topic},payload={payload}",pub_msg.Topic, pub_msg.Message);

                                await mqttClient.PublishAsync(message);
                            }
                            //await Task.Delay(1);
                        }
                        else
                        {
                            await Task.Delay(1000);
                        }
                    }
                    else
                    {
                        _logger.Warning("MqttHuahuan,发布任务,mqttc is null or disconneted!");
                        await Task.Delay(10000);
                    }
                    dly = 1000;
                }
                catch (Exception e)
                {
                    _logger.Error("MqttHuahuan:发布消息异常:{e}", e.Message);
                    await Task.Delay(dly);
                    if (dly < 60000) // 1分钟
                    {
                        dly += dly;
                    }
                }
            }
            _logger.Information("MqttHuahuan:发布消息任务已停止");
        }

        /// <summary>
        /// 订阅消息处理任务
        /// </summary>
        private static async Task SubscribeMessageProcessTask()
        {
            await Task.Delay(2000);
            int dly = 1000;
            _logger.Information("MqttHuahuan:订阅消息任务已启动");
            while (isLongConnect)
            {
                try
                {
                    if (queueSubMsg.TryDequeue(out MqttMesssage? sub_msg))
                    {
                        if (sub_msg == null) continue;
                        if (string.IsNullOrWhiteSpace(sub_msg.Topic)) continue;
                        if (string.IsNullOrWhiteSpace(sub_msg.Message)) continue;
                        string[] splitTopics = sub_msg.Topic.Split('/');
                        string payload = sub_msg.Message;

                        if (splitTopics.Length > 4)
                        {
                            string sn = splitTopics[4];
                            string model = splitTopics[3];
                            if (splitTopics[1] == "up")
                            {
                                if (splitTopics[2] == "sensors")
                                {
                                    Log.Information("传感器数据, topic={0}, payload={1}", sub_msg.Topic, sub_msg.Message);
                                    // 过滤器
                                    if (AppConfigHelper.GetOtherOptions().DataFilterUsed)
                                    {
                                        if (!dicCollectorFilter.ContainsKey(sn)) // 过滤未添加的采集器
                                        {
                                            _logger.Information("【拦截】topic={0},payload={1}" , sub_msg.Topic, sub_msg.Message); // 记录被过滤的数据
                                            continue;
                                        }
                                    }
                                    //List<SensorData>? list = CollectorJsonProto.ToSensorData(sn, payload);
                                    List<SensorData>? list = CollectorJsonProto.ToSensorDataPluse(sn, payload);
                                    if (list!=null)
                                    {
                                        foreach (SensorData sd in list) DataService.AddSensorData(sd);
                                    }
                                    else
                                    {
                                        _logger.Error("传感器数据解析失败!topic={0},payload={1}", sub_msg.Topic, sub_msg.Message);
                                    }
                                }
                                else if (splitTopics[2] == "collectors")
                                {
                                    //if (strings[3] == "HD_CJY2021_724") // 转发
                                    //{
                                    //    sub_msg.Topic.Replace("shhhcl", "yun");
                                    //    sub_msg.Topic.Replace("HD_CJY2021_724", "HD-CJY2021");
                                    //    AddToQueuePubMsg(sub_msg);
                                    //}

                                    // 过滤器
                                    if (AppConfigHelper.GetOtherOptions().DataFilterUsed)
                                    {
                                        if (!dicCollectorFilter.ContainsKey(sn)) // 过滤未添加的采集器
                                        {
                                            _logger.Information("【拦截】topic={0},payload={1}", sub_msg.Topic, sub_msg.Message); // 记录被过滤的数据
                                            continue;
                                        }
                                    }
                                    JsonObject? jo = JsonHepler.FromJson<JsonObject>(payload);
                                    string? msgtype = (string?)jo?["msgtype"];
                                    if (string.IsNullOrWhiteSpace(msgtype)) continue;
                                    if ((jo?["message"] == null)) continue;
                                    if ((msgtype == "collector_datas") || (msgtype == "node_datas"))
                                    {
                                        CollectorData? cd = CollectorJsonProto.ToColletorData(sn, model, jo["message"]?.ToString());
                                        if (cd != null)
                                        {
                                            DataService.AddCollectorData(cd);
                                        }
                                        else
                                        {
                                            _logger.Error("采集仪数据解析失败!topic={0},payload={1}", sub_msg.Topic, sub_msg.Message);
                                        }
                                    }
                                    else if (msgtype == "collector_event")
                                    {

                                    }
                                }
                            }
                        }
                        else if (splitTopics.Length == 4)
                        {
                            if (splitTopics[1] == "request")
                            {
                                string msgid = splitTopics[3]; // 消息id
                                string method = splitTopics[2]; // 方法
                                
                                switch (method)
                                {
                                    case "add_collector":
                                        new ApiMqttProtoService(_logger).AddCollector(msgid, payload);
                                        break;
                                    case "del_collector":
                                        new ApiMqttProtoService(_logger).DelCollector(msgid, payload);
                                        break;
                                    case "mod_collector":
                                        new ApiMqttProtoService(_logger).ModCollector(msgid, payload);
                                        break;
                                    case "set_sensors":
                                        new ApiMqttProtoService(_logger).SetSensorList(msgid, payload);
                                        break;
                                    case "set_wx_sensors":

                                        break;
                                    case "restart_collector":
                                        new ApiMqttProtoService(_logger).RestartCollector(msgid, payload);
                                        break;
                                    case "ser_server":
                                        new ApiMqttProtoService(_logger).SetServer(msgid, payload);
                                        break;
                                    default:
                                        _logger.Information("不需要处理的方法,{fn}", method);
                                        break;
                                }
                            }

                        }
                        else if (splitTopics.Length == 2) // 节点事件
                        {
                            string[] arr = payload.Split('_');
                            string sn = "";
                            bool isConnected = false;
                            if (arr.Length != 1) continue; // 消息错误
                            sn = arr[arr.Length - 1]; // 编号
                            if (sn.Substring(0, 2) != "86")
                            {
                                //_logger.Warning("MqttHuahuan:设备编号不匹配, sn={sn}", sn); // 编号不匹配
                                continue; // 不是设备编号
                            }
                            if (AppConfigHelper.GetOtherOptions().DataFilterUsed)
                            {
                                if (!dicCollectorFilter.ContainsKey(sn)) // 过滤未添加的采集器
                                    continue;
                            }
                            if (splitTopics[1] == "client_connected") // 上线
                            {
                                isConnected = true;
                            }
                            else if (splitTopics[1] == "client_disconnected") // 离线
                            {
                                isConnected = false;
                            }
                            DataService.AddCollectorSataus(new CollectorStatus() { Sn = sn, Status = isConnected, Created = DateTime.Now }); // 修改采集仪连接状态
                            if (AppConfigHelper.GetOtherOptions().TcpToMqttAllowed) // 转发
                            {
                                PublishMessage(CollectorJsonProto.StatusConvert(sn, isConnected));
                            }

                        }
                        await Task.Delay(1);
                    }
                    else
                    {
                        await Task.Delay(1000);
                    }
                    dly = 1000;
                }
                catch (Exception e)
                {
                    _logger.Error("MqttHuahuan:消息处理异常:{e}", e.Message);
                    await Task.Delay(dly);
                    if (dly < 60000) // 1分钟
                    {
                        dly += dly;
                    }
                }
            }
            _logger.Information("MqttHuahuan:订阅消息任务已停止");
        }
        /// <summary>
        /// 添加要发布的消息
        /// </summary>
        /// <param name="mqttMesssage"></param>
        public static void PublishMessage(MqttMesssage mqttMesssage)
        {
            queuePubMsg.Enqueue(mqttMesssage);
        }

        /// <summary>
        /// 移除采集器
        /// </summary>
        /// <param name="sn"></param>
        /// <returns></returns>
        public static Boolean CollectorRemove(string sn)
        {
            if (dicCollectorFilter.TryRemove(sn, out _))
            {
                return true;
            }
            return false;
        }
        /// <summary>
        /// 添加采集器
        /// </summary>
        /// <param name="sn"></param>
        /// <param name="cycle"></param>
        /// <returns></returns>
        public static Boolean CollectorAdd(string sn, UInt32 cycle)
        {
            if (dicCollectorFilter.TryAdd(sn, cycle))
            {
                return true;
            }
            return false;
        }

    }
}

 

标签:Task,await,Mqttnet,mqtt,using,msg,new,logger,客户端
From: https://www.cnblogs.com/zengfeng1013/p/18358202

相关文章

  • 在Unity中开发MQTT客户端
    概述:        在Unity环境中使用MQTTnet库(一个流行的.NET库,用于实现MQTT客户端和服务器。它支持.NETCore和.NETFramework,并提供了灵活的API以及高性能的实现)搭建自己的MQTT客户端.我使用的版本:Version=4.3.6.1152        但是在开发客户端之前,你需要......
  • oracle 客户端安装
    环境信息参考服务端的安装文档(安装包,创建用户和组,文件目录)[root@redhat760813]#catclient/response/client_install.rsp|grep-v"#"静默安装修改相应文件下面的几个值即可(目录要存在)oracle.install.responseFileVersion=/oracle/install/rspfmt_clientinstall_response_sche......
  • Java SSE:实现服务器推送数据客户端
            体验过大语言模型的人(chatgpt,文心一言,通义千问...)都知道,大模型的回答是一边思考一边返回数据的,属于流式响应。要达到这种效果就需要实现前后端的即时通讯。SSE        SSE(Server-sentEvents):WebSocket的一种轻量代替方案,使用HTTP协议,用于实现服务......
  • BugKu CTF Misc:被勒索了 & disordered_zip & simple MQTT & 请攻击这个压缩包
    前言BugKu是一个由乌云知识库(wooyun.org)推出的在线漏洞靶场。乌云知识库是一个致力于收集、整理和分享互联网安全漏洞信息的社区平台。BugKu旨在提供一个实践和学习网络安全的平台,供安全爱好者和渗透测试人员进行挑战和练习。它包含了各种不同类型的漏洞场景,如Web漏洞、系统......
  • zabbix服务器和客户端连接配置
    zabbix服务器和客户端连接配置1.安装环境说明本例中安装zabbix开源软件和zabbix运行所需的中间件和数据库apache、php和postgres,软件版本信息如下:软件版本zabbixZabbix6.4.0apachehttpd-2.4.57aprapr-1.7.4apr-util1.6.3php8.2.6sqlite-autoconf......
  • mqtt订阅和发布
    importpaho.mqtt.clientasmqttimporttimeMQTTHOST="192.168.0.4"MQTTPORT=1883mqttClient=mqtt.Client()#连接MQTT服务器defon_mqtt_connect():mqttClient.connect(MQTTHOST,MQTTPORT,60)mqttClient.loop_start()#publish消息defon_publish(t......
  • 离线win/mac下vscode客户端通过ssh连接linux服务器
    1、下载vscode,这个网上教程一大堆就不一一介绍了,自行百度建议安装蓝色的版本,最后图标是蓝色的。下载网址:VisualStudioCode-CodeEditing.Redefined2、win系统或mac系统安装openssh,这个也一大堆教程,可以自行百度3、安装vscode插件下载网址:ExtensionsforVisual......
  • TCP客户端服务器的数据传送
    客户端----发送方先导入WSAStartup函数的一个声明//获取文件大小structstats这个结构体包含了文件的基本信息,例如大小、所有者、权限、最后访问和修改时间等。//发送信息给服务器char*ptemp=szbuf;while(*++ptemp!='\0');while(*--ptemp!='\\'); ++ptemp;ptemp就......
  • SSL 忽略客户端证书是自签名的
    我正在尝试构建一个库来构建一个库来制作Gemini(协议)服务器,到目前为止,除了一件事之外,我一切正常。在gemini规范中,它说客户端能够用tls发送他们的证书以进行识别等,但是当在python中我收到证书时,我收到一个关于它如何自签名的错误ssl.SSLCertVerificationEr......