using MQTT; using MQTTnet.Client; using MQTTnet.Protocol; using MQTTnet; using MQTTnet.Server; using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Threading.Tasks; using MQTTnet.Client.Receiving; namespace MQTT { internal class Program { static void Main(string[] args) { MqttServerClass serverClass = new MqttServerClass(); serverClass.StartMqttServer().Wait(); Console.ReadLine(); } } public static class Config { public static int Port { get; set; } = 1883; public static string UserName { get; set; } = "cyw"; public static string Password { get; set; } = "123456"; } public class UserInstance { public string ClientId { get; set; } public string UserName { get; set; } public string Password { get; set; } } public class MqttServerClass { private IMqttServer mqttServer; private List<MqttApplicationMessage> messages = new List<MqttApplicationMessage>(); public async Task StartMqttServer() { try { if (mqttServer == null) { var optionsBuilder = new MqttServerOptionsBuilder() .WithDefaultEndpoint() .WithDefaultEndpointPort(Config.Port) .WithConnectionValidator( c => { var flag = c.Username == Config.UserName && c.Password == Config.Password; if (!flag) { c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword; return; } //设置代码为 Success c.ReasonCode = MqttConnectReasonCode.Success; //instances.Add(new UserInstance() //缓存到内存的List集合当中 //{ // ClientId = c.ClientId, // UserName = c.Username, // Password = c.Password //}); }) //订阅拦截器 .WithSubscriptionInterceptor( c => { if (c == null) return; c.AcceptSubscription = true; }) //应用程序消息拦截器 .WithApplicationMessageInterceptor( c => { if (c == null) return; c.AcceptPublish = true; }) //clean sesison是否生效 .WithPersistentSessions(); mqttServer = new MqttFactory().CreateMqttServer(); //客户端断开连接拦截器 //mqttServer.UseClientDisconnectedHandler(c => //{ // //var user = instances.FirstOrDefault(t => t.ClientId == c.ClientId); // //if (user != null) // //{ // // instances.Remove(user); // //} //}); //服务开始 mqttServer.StartedHandler = new MqttServerStartedHandlerDelegate(OnMqttServerStarted); //服务停止 mqttServer.StoppedHandler = new MqttServerStoppedHandlerDelegate(OnMqttServerStopped); //客户端连接 mqttServer.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(OnMqttServerClientConnected); //客户端断开连接(此事件会覆盖拦截器) mqttServer.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(OnMqttServerClientDisconnected); //客户端订阅 mqttServer.ClientSubscribedTopicHandler = new MqttServerClientSubscribedTopicHandlerDelegate(OnMqttServerClientSubscribedTopic); //客户端取消订阅 mqttServer.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(OnMqttServerClientUnsubscribedTopic); //服务端收到消息 mqttServer.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(OnMqttServerApplicationMessageReceived); await mqttServer.StartAsync(optionsBuilder.Build()); //主动发送消息到客户端 //await mqttServer.PublishAsync(new // MqttApplicationMessage //{ // Topic = "testtopic", // Payload = Encoding.UTF8.GetBytes("dsdsd") //}); //mqttServer.GetClientStatusAsync(); //mqttServer.GetRetainedApplicationMessagesAsync(); //mqttServer.GetSessionStatusAsync(); } } catch (Exception ex) { Console.WriteLine($"MQTT Server start fail.>{ex.Message}"); } } private void OnMqttServerStarted(EventArgs e) { if (mqttServer.IsStarted) { Console.WriteLine("MQTT服务启动完成!"); } } private void OnMqttServerStopped(EventArgs e) { if (!mqttServer.IsStarted) { Console.WriteLine("MQTT服务停止完成!"); } } private void OnMqttServerClientConnected(MqttServerClientConnectedEventArgs e) { Console.WriteLine($"客户端[{e.ClientId}]已连接"); } private void OnMqttServerClientDisconnected(MqttServerClientDisconnectedEventArgs e) { Console.WriteLine($"客户端[{e.ClientId}]已断开连接!"); } private void OnMqttServerClientSubscribedTopic(MqttServerClientSubscribedTopicEventArgs e) { Console.WriteLine($"客户端[{e.ClientId}]已成功订阅主题[{e.TopicFilter}]!"); } private void OnMqttServerClientUnsubscribedTopic(MqttServerClientUnsubscribedTopicEventArgs e) { Console.WriteLine($"客户端[{e.ClientId}]已成功取消订阅主题[{e.TopicFilter}]!"); } private void OnMqttServerApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e) { messages.Add(e.ApplicationMessage); Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff")); Console.WriteLine($"客户端[{e.ClientId}]>> Topic[{e.ApplicationMessage.Topic}] Payload[{Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? new byte[] { })}] Qos[{e.ApplicationMessage.QualityOfServiceLevel}] Retain[{e.ApplicationMessage.Retain}]"); } } }
标签:Console,public,using,new,MQTTnet3.1,mqttServer,服务端,客户端 From: https://www.cnblogs.com/5ishare/p/16740199.html