首页 > 其他分享 >MQTT消息发送和接收的实现

MQTT消息发送和接收的实现

时间:2023-06-12 11:56:41浏览次数:43  
标签:Items void Add private 发送 MQTT new using 接收

我是不会的,全是从网上搜的,最终整理拼合的可以使用了,使用C#和VS2019,MQTT使用3.1.0版本,需要注意的是不同的版本代码是不一样的,对于咱这种不会的,当然是以能用为主了,你要是安装的最新的4.0版本,那还是换换吧

首先需要在nuGet中引用下面的三个,MQTTnet 3.1.0、MQTTnet.Extensions.ManagedClient 3.1.0、Newtonsoft.json 10.0.1,如果不引用的话,代码中会有错误,自己还解决不了,白白浪费你时间

代码是可以完全跑起来的,我在本机上已经运行过了,.net 版本是4.5.2

 

 

一、服务端

服务端主要是设置你要监听哪个端口,是否要验证客户端的用户名和密码,然后开启服务监听就行了,代码里设置的用户名是test,密码是1234,可以根据实际换成数据库中的用户名密码来验证,对于接收到的消息,现在是直接显示,以后可以保存到日志文件中,也可以保存到数据库中。

using MQTTnet.Server;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Forms;
using MQTTnet.Client.Receiving;

namespace MQTT
{
    public partial class Form1 : Form
    {
        public Form1()
        {
            InitializeComponent();
        }
        int Port = 2023;
        IMqttServer server = new MqttFactory().CreateMqttServer();
        private void Form1_Load(object sender, EventArgs e)
        {

        }

        private void btn_start_Click(object sender, EventArgs e)
        {
            StartMQTTAsync();
        }

        //启动服务端
        public async Task StartMQTTAsync()
        {
            MqttServerOptionsBuilder serverOptions = new MqttServerOptionsBuilder();
            serverOptions.WithConnectionValidator(client =>
            {
                string Account = client.Username;
                string PassWord = client.Password;
                string clientid = client.ClientId;
                if (Account == "test" && PassWord == "1234")
                {
                    client.ReasonCode = MqttConnectReasonCode.Success;
                    Console.WriteLine("校验成功");
                }
                else
                {
                    client.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
                    Console.WriteLine("校验失败");
                }
            });
            serverOptions.WithDefaultEndpointPort(Port);
            //服务启动
            server.StartedHandler = new MqttServerStartedHandlerDelegate((Action<EventArgs>)StartedHandler);
            //服务停止
            server.StoppedHandler = new MqttServerStoppedHandlerDelegate((Action<EventArgs>)StoppedHandler);
            //客户端连接事件
            server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate((Action<MqttServerClientConnectedEventArgs>)ClientConnectedHandler);
            //客户端断开连接事件
            server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate((Action<MqttServerClientDisconnectedEventArgs>)ClientDisconnectedHandler);
            //消息监听
            server.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate((Action<MqttApplicationMessageReceivedEventArgs>)MessageReceivedHandler);
            //客户端订阅主题事件
            server.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate((Action<MqttServerClientSubscribedTopicEventArgs>)ClientSubscribedTopicHandler);
            //客户端取消订阅主题事件
            server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate((Action<MqttServerClientUnsubscribedTopicEventArgs>)ClientUnsubscribedTopicHandler);
            await server.StartAsync(serverOptions.Build());
        }
        public void StartedHandler(EventArgs obj)
        {
            L1.Items.Add("MQTT程序已启动,监听端口:"+Port.ToString());
        }
        /// <summary>
        /// MQTT服务器停止事件
        /// </summary>
        /// <param name="obj"></param>
        private void StoppedHandler(EventArgs obj)
        {
            L1.Items.Add("MQTT程序已经关闭!");
        }



        /// <summary>
        /// 客户端连接到服务器事件
        /// </summary>
        /// <param name="obj"></param>
        private void ClientConnectedHandler(MqttServerClientConnectedEventArgs obj)
        {
            L1.Items.Add($"{obj.ClientId}此客户端已经连接到服务器");
        }

        /// <summary>
        /// 客户端断开连接事件
        /// </summary>
        /// <param name="obj"></param>
        private void ClientDisconnectedHandler(MqttServerClientDisconnectedEventArgs obj)
        {
            L1.Items.Add($"断开连接的客户端:{obj.ClientId}");
            L1.Items.Add($"断开连接类型:{obj.DisconnectType.ToString()}");
        }

        /// <summary>
        /// 收到各个客户端发送的消息
        /// </summary>
        /// <param name="obj"></param>
        private void MessageReceivedHandler(MqttApplicationMessageReceivedEventArgs obj)
        {
            L1.Items.Add("===================================================");
            L1.Items.Add("收到消息:");
            L1.Items.Add("消息时间:" + DateTime.Now.ToString()); ;
            L1.Items.Add($"客户端:{obj.ClientId}");
            L1.Items.Add($"主题:{obj.ApplicationMessage.Topic}");
            L1.Items.Add($"消息:{Encoding.UTF8.GetString(obj.ApplicationMessage.Payload)}");
            L1.Items.Add("+++++++++++++++++++++++++++++++++++++++++++++++++++");
            L1.Items.Add("");
        }

        /// <summary>
        /// 客户端订阅的主题
        /// </summary>
        /// <param name="obj"></param>
        private void ClientSubscribedTopicHandler(MqttServerClientSubscribedTopicEventArgs obj)
        {
            L1.Items.Add($"客户端:{obj.ClientId}");
            L1.Items.Add($"订阅主题:{obj.TopicFilter.Topic}");
        }

        /// <summary>
        /// 客户端取消订阅主题
        /// </summary>
        /// <param name="obj"></param>
        private void ClientUnsubscribedTopicHandler(MqttServerClientUnsubscribedTopicEventArgs obj)
        {
            L1.Items.Add($"客户端:{obj.ClientId}");
            L1.Items.Add($"取消订阅主题:{obj.TopicFilter}");
        }



        /// <summary>
        /// 关闭服务
        /// </summary>
        /// <returns></returns>
        public async Task StopAsync()
        {
            if (server != null)
            {
                if (server.IsStarted)
                {
                    await server.StopAsync();
                    server.Dispose();
                }
            }
        }

        private void btn_stop_Click(object sender, EventArgs e)
        {
            StopAsync();
        }
    }
}

二、客户端

客户端就是设置服务端的地址和端口,用户名和密码,然后订阅主题,然后发送消息,咱们不会,就用各种框框来直观的操作了,可根据需要设计成简洁界面的,更方便使用

不知道是客户端ID不能重复还是不能在一台电脑上开两个客户端,反正开两个客户端后会反复的连接和断开,留给你们试吧

 

using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Forms;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
using MQTTnet.Extensions.ManagedClient;
using MQTTnet.Formatter;
using Newtonsoft.Json;

namespace MQTT
{
    public partial class Form2 : Form
    {
        public Form2()
        {
            InitializeComponent();
        }
        private MqttClientOptions options;
        private IManagedMqttClient mqttClient;

        private void Form2_Load(object sender, EventArgs e)
        {
            CheckForIllegalCrossThreadCalls = false;
        }

        private async void OnSubscriberConnected(MqttClientConnectedEventArgs x)
        {
            L_info.Items.Add("已连接到MQTT服务器!");
        }


        private async void OnSubscriberDisconnected(MqttClientDisconnectedEventArgs x)
        {
            L_info.Items.Add("已断开MQTT服务器连接!");
        }

        private void OnSubscriberMessageReceived(MqttApplicationMessageReceivedEventArgs x)
        {
            var payloadString = x.ApplicationMessage.ConvertPayloadToString();

            payloadString = ConvertJsonString(payloadString);

            var item = $"{Environment.NewLine}Topic: {x.ApplicationMessage.Topic}{Environment.NewLine}Payload: {payloadString} {Environment.NewLine}QoS: {x.ApplicationMessage.QualityOfServiceLevel}";
            this.BeginInvoke((MethodInvoker)delegate
            {
                AppendReceiveMsg(item);
            });
        }


        private async Task SubscriberStart()
        {

            var tcpServer = txt_ip.Text.Trim().ToString();
            var tcpPort = int.Parse(txt_port.Text.Trim());
            var mqttUser = txt_yhm.Text.Trim();
            var mqttPassword = txt_pwd.Text.Trim();
            var mqttFactory = new MqttFactory();



            this.options = new MqttClientOptions
            {
                ClientId = "Client-1",
                ProtocolVersion = MqttProtocolVersion.V311,
                ChannelOptions = new MqttClientTcpOptions
                {
                    Server = tcpServer,
                    Port = tcpPort
                }
            };
            if (options.ChannelOptions == null)
            {
                throw new InvalidOperationException();
            }

            if (!string.IsNullOrEmpty(mqttUser))
            {
                options.Credentials = new MqttClientCredentials
                {
                    Username = mqttUser,
                    Password = Encoding.UTF8.GetBytes(mqttPassword)
                };
            }

            options.CleanSession = true;
            options.KeepAlivePeriod = TimeSpan.FromSeconds(5);

            this.mqttClient = mqttFactory.CreateManagedMqttClient();
            this.mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate((Action<MqttClientConnectedEventArgs>)OnSubscriberConnected);
            this.mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate((Action<MqttClientDisconnectedEventArgs>)OnSubscriberDisconnected);
            this.mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate((Action<MqttApplicationMessageReceivedEventArgs>)OnSubscriberMessageReceived);
            await this.mqttClient.StartAsync(
                new ManagedMqttClientOptions
                {
                    ClientOptions = options
                });
        }
        /// <summary>
        /// 发送MQTT消息
        /// </summary>
        /// <param name="dyh">订阅号</param>
        /// <param name="msg">具体发送的消息</param>
        private async void sengMsg(string dyh,string msg) 
        {
            var publish_topic = dyh;
            var publish_msg = msg;
            var message = new MqttApplicationMessageBuilder()
            .WithTopic(publish_topic)
            .WithPayload(publish_msg)
            .WithExactlyOnceQoS()
            .Build();

            if (this.mqttClient != null)
            {
                await this.mqttClient.PublishAsync(message);
            }
        }
        private void AppendReceiveMsg(string msg)
        {
            Invoke((new Action(() =>
            {
                L_info.Items.Add(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss: ") + msg + Environment.NewLine + Environment.NewLine);
            })));
        }
        private void AppendSendMsg(string msg)
        {
            Invoke((new Action(() =>
            {
                L_info.Items.Add(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss: ") + msg + Environment.NewLine);
            })));
        }

        private void AppendLogMsg(string msg)
        {
            Invoke((new Action(() =>
            {
                L_info.Items.Add(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss: ") + msg + Environment.NewLine);
            })));
        }

        private string ConvertJsonString(string str)
        {
            try
            {
                //格式化json字符串
                JsonSerializer serializer = new JsonSerializer();
                TextReader tr = new StringReader(str);
                JsonTextReader jtr = new JsonTextReader(tr);
                object obj = serializer.Deserialize(jtr);
                if (obj != null)
                {
                    StringWriter textWriter = new StringWriter();
                    JsonTextWriter jsonWriter = new JsonTextWriter(textWriter)
                    {
                        Formatting = Formatting.Indented,
                        Indentation = 4,
                        IndentChar = ' '
                    };
                    serializer.Serialize(jsonWriter, obj);
                    return textWriter.ToString();
                }

                return str;
            }
            catch (Exception ex)
            {
                return str;
            }
        }

        private async void btn_open_Click(object sender, EventArgs e)
        {
            //打开MQTT连接
            if (this.mqttClient == null)
            {
                await SubscriberStart();
            }
        }

        private async void btn_close_Click(object sender, EventArgs e)
        {
            if (this.mqttClient == null)
            {
                return;
            }
            await this.mqttClient.StopAsync();
            this.mqttClient = null;

        }

        private async void btn_dingyue_Click(object sender, EventArgs e)
        {
            var topicFilter = new MqttTopicFilter { Topic = this.txt_dyh.Text.Trim() };
            await this.mqttClient.SubscribeAsync(topicFilter);
            L_info.Items.Add("已订阅消息!");
        }

        private void btn_quxiao_Click(object sender, EventArgs e)
        {

        }

        private void btn_send_Click(object sender, EventArgs e)
        {
            string dyh = txt_dyh.Text.Trim();
            string msg = txt_msg.Text.Trim();
            sengMsg(dyh, msg);
        }

        private void groupBox4_Enter(object sender, EventArgs e)
        {

        }
    }
}

 

最后,程序和你肯定有一个能跑起来的,祝你好运~

 

标签:Items,void,Add,private,发送,MQTT,new,using,接收
From: https://www.cnblogs.com/wjbych/p/17474650.html

相关文章

  • 修改eyou里留言发送邮件时的发件人信息问题
    如题,默认情况下,如果发件件箱是12345@qq.com,那么收到的发件人信息是12345。想改成其它内容的话,就要改这个文件application/common/logic/EmailLogic.php。找到 privatefunctionsend_phpmailer  里面的内容$mail->setFrom(发邮件地,发送者昵称),不填第二个参数的话,如上述地址的话......
  • 如何用get方式、post方式向http接口发送数据
    1.项目环境如下:myeclipse6.5、tomcat5.0、system:xp、JDK:开发1.5,编译1.4为了方便,在原来的web项目UpDown中新建了一个httpcall包,用来保存http接口和调用的客户端。2.准备需要的jar包*commons-httpclient-3.0.jar*commons-logging.jar*commons-codec-1.3.jar......
  • web接收websocket
    data(){return{websock:null,wsuri:"ws://192.168.2.22:8025/test/fff",//WebSocket的后台地址actiones:{ssid:"fff"},//传入后台的数据};},created(){this.initWebSocket();//开启WebSocket},destroyed(......
  • jquery 实现 点击按钮后倒计时效果,多用于实现发送手机验证码、邮箱验证码
    <!DOCTYPEhtmlPUBLIC"-//W3C//DTDXHTML1.0Transitional//EN""http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"><htmlxmlns="http://www.w3.org/1999/xhtml"><head><title></title>......
  • 如何自动转发接收的请求报头?
    了解OpenTelemetry的朋友应该知道,为了将率属于同一个请求的多个操作(Span)串起来,上游应用会生成一个唯一的TraceId。在进行跨应用的Web调用时,这个TraceId和代表跟踪操作标识的SpanID一并发给目标应用,W3C还专门指定了一份名为TraceContext的标准,该标准确定了一个名为trace-parent的......
  • [转]前端-WebAPI接口-FormData对象的使用(模拟表单用于发送数据及上传文件)
    一、概述FormData对象的使用:用一些键值对来模拟一系列表单控件:即把form中所有的元素的name与value组成一个queryString。异步上传二进制文件。二、使用创建一个空对象实例。 javascript复制代码varmyform=newFormData();使用已有的表单来初始化 ht......
  • Qt之MQTT编译(一)
    一、MQTT简介MQTT(MessageQueuingTelemetryTransport)是一种轻量级的、发布-订阅模式的消息传输协议。它最初是为低带宽和不稳定网络环境设计的,以支持物联网(IoT)设备之间的高效通信。MQTT的工作方式基于发布-订阅模型,其中包含两个角色:发布者(Publisher)和订阅者(Subscriber)。发......
  • 轻松实现物联网通信的利器:MQTT网关神器——FluxMQ
    FluxMQ—引领物联网新时代的高性能MQTT网关随着物联网技术的快速发展,人们越来越意识到实时、可靠、安全的数据传输对于智能化的生产与生活的重要性。因此,市场对于高性能的物联网数据传输解决方案有着强烈的需求。FluxMQ正是为满足这一需求而诞生的一款高性能、可靠且易于使用的MQ......
  • MQTT协议及其在Golang中的实现
    引言:在物联网(IoT)领域中,设备之间的通信是至关重要的。为了实现设备之间的高效、轻量级通信,MQTT(MessageQueuingTelemetryTransport)协议被广泛采用。MQTT是一种基于发布/订阅模式的消息传输协议,被设计为简单、轻量级且易于实现。本文将介绍MQTT协议的核心概念,并演示如何使用Gola......
  • SpringMVC参数接收与数据响应
    SpringMVC如何接受请求参数(普通类型参数/对象类型参数/数组/json数据等)1、普通类型参数(1)在可以在方法参数上使用@RequestParam注解来绑定请求参数,此注解允许指定请求参数的名称,以及是否是必须传的参数。@RequestMapping("/example")publicStringexampleMethod(@RequestPara......