.net6操作mqtt
客户端
using MQTTnet.Client; using MQTTnet.Protocol; using MQTTnet; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace SendMessageMqttClinet { public class MqttService { private IMqttClient _mqttClient = null ; public IMqttClient MqttServiceCreate(string host, int port, string clientId) { if (_mqttClient == null) { MqttClientStart(host, port, clientId); } else { return _mqttClient; } return null; } public async Task<IMqttClient> MqttClientStart(string host,int port,string clientId) { var optionsBuilder = new MqttClientOptionsBuilder() .WithTcpServer(host, port) // 要访问的mqtt服务端的 ip 和 端口号 //.WithCredentials("admin", "123456") // 要访问的mqtt服务端的用户名和密码 .WithClientId(clientId) // 设置客户端id .WithCleanSession() .WithTls(new MqttClientOptionsBuilderTlsParameters { UseTls = false // 是否使用 tls加密 }); var clientOptions = optionsBuilder.Build(); _mqttClient = new MqttFactory().CreateMqttClient(); _mqttClient.ConnectedAsync += _mqttClient_ConnectedAsync; // 客户端连接成功事件 _mqttClient.DisconnectedAsync += _mqttClient_DisconnectedAsync; // 客户端连接关闭事件 _mqttClient.ApplicationMessageReceivedAsync += _mqttClient_ApplicationMessageReceivedAsync; // 收到消息事件 await _mqttClient.ConnectAsync(clientOptions); return _mqttClient; } /// <summary> /// 客户端连接关闭事件 /// </summary> /// <param name="arg"></param> /// <returns></returns> private Task _mqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg) { Console.WriteLine($"客户端已断开与服务端的连接……"); return Task.CompletedTask; } /// <summary> /// 客户端连接成功事件 /// </summary> /// <param name="arg"></param> /// <returns></returns> private Task _mqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg) { Console.WriteLine($"客户端已连接服务端……"); // 订阅消息主题 // MqttQualityOfServiceLevel: (QoS): 0 最多一次,接收者不确认收到消息,并且消息不被发送者存储和重新发送提供与底层 TCP 协议相同的保证。 // 1: 保证一条消息至少有一次会传递给接收方。发送方存储消息,直到它从接收方收到确认收到消息的数据包。一条消息可以多次发送或传递。 // 2: 保证每条消息仅由预期的收件人接收一次。级别2是最安全和最慢的服务质量级别,保证由发送方和接收方之间的至少两个请求/响应(四次握手)。 _mqttClient.SubscribeAsync("topic_02", MqttQualityOfServiceLevel.AtLeastOnce); return Task.CompletedTask; } /// <summary> /// 收到消息事件 /// </summary> /// <param name="arg"></param> /// <returns></returns> private Task _mqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg) { Console.WriteLine($"ApplicationMessageReceivedAsync:客户端ID=【{arg.ClientId}】接收到消息。 Topic主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】"); return Task.CompletedTask; } public void Publish(string topic,string data) { var message = new MqttApplicationMessage { Topic = topic, Payload = Encoding.Default.GetBytes(data), QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce, Retain = false // 服务端是否保留消息。true为保留,如果有新的订阅者连接,就会立马收到该消息。 }; _mqttClient.PublishAsync(message); } } }
服务端
public class MqttClientService { public static IMqttClient _mqttClient; public void MqttClientStart() { var optionsBuilder = new MqttClientOptionsBuilder() .WithTcpServer("127.0.0.1", 10086) // 要访问的mqtt服务端的 ip 和 端口号 .WithCredentials("admin", "123456") // 要访问的mqtt服务端的用户名和密码 .WithClientId("testclient02") // 设置客户端id .WithCleanSession() .WithTls(new MqttClientOptionsBuilderTlsParameters { UseTls = false // 是否使用 tls加密 }); var clientOptions = optionsBuilder.Build(); _mqttClient = new MqttFactory().CreateMqttClient(); _mqttClient.ConnectedAsync += _mqttClient_ConnectedAsync; // 客户端连接成功事件 _mqttClient.DisconnectedAsync += _mqttClient_DisconnectedAsync; // 客户端连接关闭事件 _mqttClient.ApplicationMessageReceivedAsync += _mqttClient_ApplicationMessageReceivedAsync; // 收到消息事件 _mqttClient.ConnectAsync(clientOptions); } /// <summary> /// 客户端连接关闭事件 /// </summary> /// <param name="arg"></param> /// <returns></returns> private Task _mqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg) { Console.WriteLine($"客户端已断开与服务端的连接……"); return Task.CompletedTask; } /// <summary> /// 客户端连接成功事件 /// </summary> /// <param name="arg"></param> /// <returns></returns> private Task _mqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg) { Console.WriteLine($"客户端已连接服务端……"); // 订阅消息主题 // MqttQualityOfServiceLevel: (QoS): 0 最多一次,接收者不确认收到消息,并且消息不被发送者存储和重新发送提供与底层 TCP 协议相同的保证。 // 1: 保证一条消息至少有一次会传递给接收方。发送方存储消息,直到它从接收方收到确认收到消息的数据包。一条消息可以多次发送或传递。 // 2: 保证每条消息仅由预期的收件人接收一次。级别2是最安全和最慢的服务质量级别,保证由发送方和接收方之间的至少两个请求/响应(四次握手)。 _mqttClient.SubscribeAsync("topic_02", MqttQualityOfServiceLevel.AtLeastOnce); return Task.CompletedTask; } /// <summary> /// 收到消息事件 /// </summary> /// <param name="arg"></param> /// <returns></returns> private Task _mqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg) { Console.WriteLine($"ApplicationMessageReceivedAsync:客户端ID=【{arg.ClientId}】接收到消息。 Topic主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】"); return Task.CompletedTask; } public void Publish(string data) { var message = new MqttApplicationMessage { Topic = "topic_02", Payload = Encoding.Default.GetBytes(data), QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce, Retain = true // 服务端是否保留消息。true为保留,如果有新的订阅者连接,就会立马收到该消息。 }; _mqttClient.PublishAsync(message); } }
参考:https://www.cnblogs.com/weskynet/p/16441219.html
标签:mqttClient,Task,mqtt,消息,arg,net6,操作,服务端,客户端 From: https://www.cnblogs.com/OneSeting/p/17560128.html