首页 > 编程语言 >c#学习笔记,实现物联网MQTT通信

c#学习笔记,实现物联网MQTT通信

时间:2023-07-24 14:12:59浏览次数:43  
标签:obj string c# void 笔记 MQTT var new public

一、什么是MQTT 

MQTT协议由于其用极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务,具有开销低、占用带宽低、即时通讯等优点,

使其在物联网、小型设备、移动应用等方面有较广泛的应用,在工业物联网中,MQTT也有广泛的应用。

主要有以下特点:

  • 使用发布/订阅消息模式,提供一对多的消息发布
  • 使用TCP/IP提供网络连接
  • 小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量,传输的内容最大为256MB。
  • 使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制。

二、MQTT服务器设计 

MQTT 服务端主要用于与多个客户端保持连接,并处理客户端的发布和订阅等逻辑,

多数情况下服务端都是转发主题匹配的客户端消息,在系统中起到一个中介的作用。

 

下面就是在项目中引入mqtt服务端和客户端:

引用nugut包

服务代码:

using System;
using System.Collections.Generic;
using System.Text;
using MQTTnet;
using MQTTnet.Client.Receiving;
using MQTTnet.Protocol;
using MQTTnet.Server;

namespace LR.Utils.MQTT
{
    public class MqttServer
    {
        private MQTTnet.Server.MqttServer mqttServer = null;

        public delegate void MqttServerStarted(EventArgs obj) ;
        public  MqttServerStarted MqttServerStartedHandler;

        public delegate void MqttServerStopped(EventArgs obj);
        public MqttServerStopped MqttServerStoppedHandler;

        public delegate void MqttServerClientConnected(MqttServerClientConnectedEventArgs obj);
        public MqttServerClientConnected MqttServerClientConnectedHandler;

        public delegate void MqttServerClientDisConnected(MqttServerClientDisconnectedEventArgs obj);
        public MqttServerClientDisConnected MqttServerClientDisConnectedHandler;


        public async void StartMqttServer(int port, string username, string pwd)
        {
            try
            {
                if (mqttServer == null)
                {
                    var optionsBuilder = new MqttServerOptionsBuilder()
                    .WithDefaultEndpoint().WithDefaultEndpointPort(port).WithConnectionValidator(
                    c =>
                    {
                        if (username == null || pwd == null)
                        {
                            c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
                            return;
                        }

                        if (c.Username != username)
                        {
                            c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
                            return;
                        }

                        if (c.Password != pwd)
                        {
                            c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
                            return;
                        }

                        c.ReasonCode = MqttConnectReasonCode.Success;
                    }).WithSubscriptionInterceptor(
                    c =>
                    {
                        c.AcceptSubscription = true;
                    }).WithApplicationMessageInterceptor(
                    c =>
                    {
                        c.AcceptPublish = true;
                    });

                    mqttServer = new MqttFactory().CreateMqttServer() as MQTTnet.Server.MqttServer;
                    mqttServer.StartedHandler = new MqttServerStartedHandlerDelegate(OnMqttServerStarted);
                    mqttServer.StoppedHandler = new MqttServerStoppedHandlerDelegate(OnMqttServerStopped);

                    mqttServer.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(OnMqttServerClientConnected);
                    mqttServer.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(OnMqttServerClientDisconnected);
                    mqttServer.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(OnMqttServerClientSubscribedTopic);
                    mqttServer.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(OnMqttServerClientUnsubscribedTopic);
                    mqttServer.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(OnMqttServer_ApplicationMessageReceived);
                    await mqttServer.StartAsync(optionsBuilder.Build());

                }
            }
            catch (Exception eX)
            {
                throw eX;
            }
        }



        public async void StopMqttServer()
        {
            if (mqttServer == null) return;
            try
            {
                await mqttServer?.StopAsync();
                mqttServer = null;

            }
            catch (Exception eX)
            {
                throw eX;
            }
        }

        #region 事件
        private void OnMqttServerStarted(EventArgs obj)
        {
            if (MqttServerStartedHandler!=null)
            {
                MqttServerStartedHandler(obj);
            }
        }

        private void OnMqttServerStopped(EventArgs obj)
        {
            if (MqttServerStoppedHandler != null)
            {
                MqttServerStoppedHandler(obj);
            }
        }
     
        private void OnMqttServerClientConnected(MqttServerClientConnectedEventArgs obj)
        {
            if (MqttServerClientConnectedHandler != null)
            {
                MqttServerClientConnectedHandler(obj);
            }
        }

        private void OnMqttServerClientDisconnected(MqttServerClientDisconnectedEventArgs obj)
        {
            if (MqttServerClientDisConnectedHandler != null)
            {
                MqttServerClientDisConnectedHandler(obj);
            }
        }

        public delegate void MqttServerClientUnsubscribedTopic(MqttServerClientUnsubscribedTopicEventArgs obj);
        public MqttServerClientUnsubscribedTopic MqttServerClientUnsubscribedTopicHandler;

        private void OnMqttServerClientUnsubscribedTopic(MqttServerClientUnsubscribedTopicEventArgs obj)
        {
            if (MqttServerClientUnsubscribedTopicHandler != null)
            {
                MqttServerClientUnsubscribedTopicHandler(obj);
            }
        }

        public delegate void MqttServerClientSubscribedTopic(MqttServerClientSubscribedTopicEventArgs obj);
        public MqttServerClientSubscribedTopic MqttServerClientSubscribedTopicHandler;

        private void OnMqttServerClientSubscribedTopic(MqttServerClientSubscribedTopicEventArgs obj)
        {
            if (MqttServerClientSubscribedTopicHandler != null)
            {
                MqttServerClientSubscribedTopicHandler(obj);
            }
        }


        public delegate void MqttServer_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs obj);
        public MqttServer_ApplicationMessageReceived MqttServer_ApplicationMessageReceivedHandler;
        private void OnMqttServer_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs obj)
        {
            if (MqttServer_ApplicationMessageReceivedHandler != null)
            {
                MqttServer_ApplicationMessageReceivedHandler(obj);
            }
        }

        #endregion
    }

}

客户端代码

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
using MQTTnet.Protocol;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace LR.Utils.MQTT
{
    public class MqttClient: IDisposable
    {
        private MQTTnet.Client.MqttClient mqttClient = null;

        public async Task ClientStart(string ip,int port,string username,string password)
        {
            try
            {
                var tcpServer = ip.Trim();
                var tcpPort = port;
                var mqttUser = username.Trim();
                var mqttPassword = password.Trim();
                var clientID = Guid.NewGuid().ToString();

                var mqttFactory = new MqttFactory();
                var options = new MqttClientOptions
                {
                    ClientId = Guid.NewGuid().ToString(),
                    ProtocolVersion = MQTTnet.Formatter.MqttProtocolVersion.V311,
                    ChannelOptions = new MqttClientTcpOptions
                    {
                        Server = tcpServer,
                        Port = tcpPort
                    },
                    WillDelayInterval = 10,
                    WillMessage = new MqttApplicationMessage()
                    {
                        Topic = $"LastWill/{clientID}",
                        Payload = Encoding.UTF8.GetBytes("I Lost the connection!"),
                        QualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce
                    }

                };
                if (options.ChannelOptions == null)
                {
                    throw new InvalidOperationException();
                }
                if (!string.IsNullOrEmpty(mqttUser))
                {
                    options.Credentials = new MqttClientCredentials
                    {
                        Username = mqttUser,
                        Password = Encoding.UTF8.GetBytes(mqttPassword)
                    };
                }

                options.CleanSession = true;
                options.KeepAlivePeriod = TimeSpan.FromSeconds(5);

                mqttClient = mqttFactory.CreateMqttClient() as MQTTnet.Client.MqttClient;
                mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(OnMqttClientConnected);
                mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(OnMqttClientDisConnected);
                mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(OnSubscriberMessageReceived);
                await mqttClient.ConnectAsync(options);

            }
            catch (Exception ex)
            {
                throw ex;
            }
        }

        public async Task WSClientStart(string ip, int port, string username, string password)
        {
            try
            {
                var tcpServer = ip.Trim();
                var tcpPort = port;
                var mqttUser = username.Trim();
                var mqttPassword = password.Trim();
                var clientID = Guid.NewGuid().ToString();

                var mqttFactory = new MqttFactory();
                var options = mqttFactory.CreateClientOptionsBuilder()
                    .WithWebSocketServer("ws://" + ip + ":" + port + "/ws")
                    .WithCredentials(username, password)
                    .WithClientId(clientID)
                    .Build();


                mqttClient = mqttFactory.CreateMqttClient() as MQTTnet.Client.MqttClient;

                mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(OnMqttClientConnected);
                mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(OnMqttClientDisConnected);
                mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(OnSubscriberMessageReceived);
                await mqttClient.ConnectAsync(options);

            }
            catch (Exception ex)
            {
                throw ex;
            }
        }
        #region 事件

        public delegate void MqttClientConnected(MqttClientConnectedEventArgs obj);
        public MqttClientConnected MqttClientConnectedHandler;
        public delegate void MqttClientDisConnected(MqttClientDisconnectedEventArgs obj);
        public MqttClientDisConnected MqttClientDisConnectedHandler;
        public delegate void SubscriberMessageReceived(MqttApplicationMessageReceivedEventArgs obj);
        public SubscriberMessageReceived SubscriberMessageReceivedHandler;

        private void OnSubscriberMessageReceived(MqttApplicationMessageReceivedEventArgs obj)
        {
            if (SubscriberMessageReceivedHandler != null)
            {
                SubscriberMessageReceivedHandler(obj);
            }
        }

        private void OnMqttClientDisConnected(MqttClientDisconnectedEventArgs obj)
        {
            if (MqttClientDisConnectedHandler != null)
            {
                MqttClientDisConnectedHandler(obj);
            }
        }

        private void OnMqttClientConnected(MqttClientConnectedEventArgs obj)
        {
            if (MqttClientConnectedHandler != null)
            {
                MqttClientConnectedHandler(obj);
            }
        }
        #endregion


        private async Task ClientStop()
        {
            try
            {
                if (mqttClient == null) return;
                await mqttClient.DisconnectAsync();
                mqttClient = null;
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }

        public async void ClientPublishMqttTopic(string topic, string payload)
        {
            try
            {
                var message = new MqttApplicationMessage()
                {
                    Topic = topic,
                    Payload = Encoding.UTF8.GetBytes(payload),
                    QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce,
                    Retain =true
                };
                await mqttClient.PublishAsync(message);
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }

        public async void ClientSubscribeTopic(string topic)
        {
            await mqttClient.SubscribeAsync(topic);
        }

        public async void ClientUnSubscribeTopic(string topic)
        {
            await mqttClient.UnsubscribeAsync(topic);
        }

        public bool IsConnected()
        {
            if (mqttClient == null) throw new Exception("客户端未创建");
            return mqttClient.IsConnected;
        }

        public void Dispose()
        {
            mqttClient.Dispose();
        }
    }
}

客户端使用示例:

      private Dictionary<string,List<MqttSensor>> GetMqttSensorData(string ip,int port,string username,string pwd,List<string> topics)
        {
            //声明一个MqttClient客户端对象
            MqttClient client = new MqttClient();
            var allDicts = new Dictionary<string, List<MqttSensor>>();
            try
            {
                //链接成功事件
                client.MqttClientConnectedHandler += delegate
                {
                    //Console.WriteLine("连接成功!");
                    foreach (var topic in topics)
                        client.ClientSubscribeTopic(topic);
                };
                //接收消息事件
                client.SubscriberMessageReceivedHandler += delegate (MqttApplicationMessageReceivedEventArgs args)
                {
                    string ctopic = args.ApplicationMessage.Topic;
                    string msg = Encoding.UTF8.GetString(args.ApplicationMessage.Payload);
                    var dicts = JsonConvert.DeserializeObject<Dictionary<string, MqttSensor>>(msg);
                    var result = new List<MqttSensor>();
                    foreach (var key in dicts.Keys)
                    {
                        if (result.Where(x => x.ID == key).Count() == 0)
                        {
                            var d = dicts[key] as MqttSensor;
                            d.ID = key;
                            result.Add(d);
                        }
                    }
                    if (!allDicts.ContainsKey(ctopic) && topics.Contains(ctopic))
                    {
                        allDicts.Add(ctopic, result);
                        //Console.WriteLine("-------------------获取数据:" + ctopic + "-----------------");
                        //Console.WriteLine(JsonConvert.SerializeObject(result));
                        client.ClientUnSubscribeTopic(ctopic);
                        topics.Remove(ctopic);
                    }
                };
                //异步开启客户端
                client.WSClientStart(ip, port, username, pwd).GetAwaiter().GetResult();
                while (true)
                {
                    Thread.Sleep(100);
                    if (topics.Count() == 0)
                    {
                        break;
                    }
                }
            }
            catch (Exception)
            {

                throw;
            }
            finally
            {
                client.Dispose();
            }
            return allDicts;
        }

        private string GetValue(string value, string type)
        {
            string result = "";
            if (type == "运行状态")
            {
                result = (value.ToLower() == "true" ? "运行正常" : "停止运行");
            }
            else
            {
                decimal v = 0;
                if (decimal.TryParse(value, out v))
                {
                    result = v + "";
                }
                else
                {
                    result = "-";
                }
            }
            return result;
        }

参考文献:

https://zhuanlan.zhihu.com/p/419561816

https://www.jianshu.com/p/73d9c6668dfc

https://www.cnblogs.com/MyWPF/p/15129549.html

标签:obj,string,c#,void,笔记,MQTT,var,new,public
From: https://www.cnblogs.com/misakayoucn/p/17577059.html

相关文章

  • ValueNotifier<T> ValueListenableBuilder<T> Stack() positioned.fill()
     1、在Column下面增加可以滚动的Row2、在widget外部控件其内部的变量ValueNotifier<T> ValueListenableBuilder<T>(valueListenable:...,builder:()=>)  import'package:flutter/material.dart';classSettingsPageextendsStatelessWidget{finalint_cou......
  • Python【18】 pytorch中的one_hot() (独热编码函数)
    参考:https://pytorch.org/docs/stable/generated/torch.nn.functional.one_hot.html......
  • AcrelEMS企业微电网数字化系统在电力系统中的应用
    安科瑞虞佳豪壹捌柒陆壹伍玖玖零玖叁1.入夏以来,用电负荷持续攀升。截至7月22日,南方电网电力负荷已4次创新高,最高达2.31亿千瓦。当前正值迎峰度夏的关键时期,记者在广东、浙江等地调查发现,虚拟电厂正在走进现实。聚合充电桩和储能等资源,为电力保供服务。虚拟电厂就是凭借能源互联......
  • C语言模拟银行排队叫号系统(链队)
    一.队列队列是一种具有先进先出(FIFO)特性的线性数据结构,它只允许在队列的两端进行插入和删除操作。队列的一端称为队尾(rear),另一端称为队头(front)。新元素总是插入在队列的队尾,而从队列中删除元素时则总是删除队头元素。由于队列具有FIFO特性,因此队列通常用于需要按照顺序处理数据的场......
  • 2.dockerfile指令及数据卷
    dockerfile指令总结FROM:指定基础镜像LABEL:指定镜像元数据,即标签RUN:指定shell命令CMD:容器启动命令EXPORT:暴露端口ENV:设置环境变量ADD:复制和解包文件,增强版的copyCOPY:复制本地宿主机的文本到容器VLOUME:匿名卷WORKDIR:指定工作目录ARG:构建参数USER:指定当前用户ENTRYPOINT:......
  • vue项目使用vue-virtual-scroll-list虚拟滚动超多千万条数据 cv可用案例
    当我们有大量数据需要显示在列表中时,传统的滚动列表会将所有数据渲染到DOM中,导致性能下降和内存占用增加。虚拟滚动列表通过仅渲染当前视窗内可见的一小部分数据,动态地根据滚动位置更新列表内容,从而实现更高效的列表渲染。vue-virtual-scroll-list是一个用于Vue.js的虚拟滚动......
  • 3.docker网络管理
    #MySQL容器默认使用了匿名数据卷[root@ubuntu1804~]#dockerrun-d--namemysql-p3306:3306-eMYSQL_ROOT_PASSWORD=123456mysql:5.7.30#备份数据库[root@ubuntu1804~]#dockerrun-it--rm--volumes-frommysql-v$(pwd):/backupcentostarxvf/backup/mysql.tar......
  • m基于DVB-T的COFDM+16QAM+LDPC码通信链路matlab性能仿真,包括载波同步,定时同步,信道
    1.算法仿真效果matlab2022a仿真结果如下:包括小数倍及整数倍载波同步,粗及细定时同步2.算法涉及理论知识概要基于DVB-T的COFDM+16QAM+LDPC码通信链路是一种常用的数字视频广播系统,用于实现高效的传输和接收。该系统结合了正交频分复用(COFDM)、16QAM调制和低密度奇偶校验(LDPC)编码......
  • 使用mysqldump备份数据库时报错表不存在,提示信息Table 'mysql.engine_cost' doesn't e
    问题描述:使用mysqldump备份数据库时报错表不存在,提示信息Table'mysql.engine_cost'doesn'texist,如下所示:数据库:mysql5.7.211、异常重现[mysql@hisdb1~]$mysqldump-uroot-S/mysql/data/mysql.sock-P3306--max_allowed_packet=1G--master-data=2--single-transaction......
  • 4.镜像制作方式和dockerfile(Ubuntu、nginx)
    制作镜像方式dockercommit通过修改现有的容器,将之手动构建为镜像dockerbuild通过dockerfile文件,批量构建为镜像用镜像做成容器,在容器的基础上定制一个镜像手动制作镜像:commit基于busybox容器创建busybox:v1.0镜像[root@ubuntu2004~]#dockercommit-ali-m"initbusybo......