首页 > 其他分享 >Abp vnext + MQTT

Abp vnext + MQTT

时间:2023-02-10 16:47:23浏览次数:54  
标签:vnext Hub Artizan Iot Mqtt Abp MQTT using MqttServer

目录

Artizan.Iot.Hub.Mqtt.Application.Contracts

新建类库项目 【Artizan.Iot.Hub.Mqtt.Application.Contracts】,添加对包MQTTnet的引用

 <PackageReference Include="MQTTnet" Version="4.1.4.563" />

IMqttServiceBase

在项目【Artizan.Iot.Hub.Mqtt.Application.Contracts】中新建接口 IMqttServiceBase ,用于配置 MqttServer
代码清单:Artizan.Iot.Hub.Mqtt.Application.Contracts/Server/IMqttServiceBase.cs

using MQTTnet.Server;

namespace Artizan.Iot.Hub.Mqtt.Server
{
    public interface IMqttServiceBase
    {
        protected MqttServer MqttServer { get; }
        /// <summary>
        /// 配置MqttServer
        /// </summary>
        /// <param name="mqttServer"></param>
        void ConfigureMqttServer(MqttServer mqttServer);
    }
}

其中,MqttServer 保留对 MqttServer 的,以便在接口方法

void ConfigureMqttServer(MqttServer mqttServer)

中对其进行配置。

IMqttConnectionService :负责 MqttServer 与链接相关初始化

代码清单:Artizan.Iot.Hub.Mqtt.Application.Contracts/Server/IMqttConnectionService.cs

namespace Artizan.Iot.Hub.Mqtt.Server
{
    public interface IMqttConnectionService : IMqttServiceBase
    {
    }
}

IMqttPublishingService:负责 MqttServer 的发布相关初始化

代码清单:Artizan.Iot.Hub.Mqtt.Application.Contracts/Server/IMqttPublishingService .cs

namespace Artizan.Iot.Hub.Mqtt.Server
{
    public interface IMqttPublishingService : IMqttServiceBase
    {
    }
}

IMqttSubscriptionService:负责 MqttServer 的订阅相关初始化

代码清单:Artizan.Iot.Hub.Mqtt.Application.Contracts/Server/IMqttPublishingService .cs

namespace Artizan.Iot.Hub.Mqtt.Server
{
    public interface IMqttSubscriptionService : IMqttServiceBase
    {
    }
}

Artizan.Iot.Hub.Mqtt.Application

新建类库项目 【Artizan.Iot.Hub.Mqtt.Application】,添加对项目 【Artizan.Iot.Hub.Mqtt.Application.Contracts的引用

MqttServiceBase

在项目【Artizan.Iot.Hub.Mqtt.Application】中新建接口 MqttServiceBase
代码清单:Artizan.Iot.Hub.Mqtt.Application/Server/MqttServiceBase.cs

using MQTTnet;
using MQTTnet.Server;

namespace Artizan.Iot.Hub.Mqtt.Server
{
    public class MqttServiceBase : IMqttServiceBase
    {
        public MqttServer MqttServer { get; private set; }

        public MqttServiceBase() { }

        public virtual void ConfigureMqttServer(MqttServer mqttServer)
        {
            MqttServer = mqttServer;
        }

        protected virtual async Task<MqttClientStatus?> GetClientStatusAsync(string clientId)
        {
            var allClientStatuses = await MqttServer.GetClientsAsync();
            return allClientStatuses.FirstOrDefault(cs => cs.Id == clientId);
        }

        protected virtual string GetClientIdFromPayload(MqttApplicationMessage message)
        {
            var payload = System.Text.Encoding.UTF8.GetString(message.Payload);
            // TODO: for JSON type data transfer get clientId from json payload
            return payload;
        }

        protected virtual async Task DisconnectClientAsync(string clientId)
        {
            var clientStatus = await GetClientStatusAsync(clientId);
            if (clientStatus != null)
            {
                await clientStatus.DisconnectAsync();
            }
        }

   
    }
}

MqttConnectionService :负责 MqttServer 与链接相关初始化

代码清单:Artizan.Iot.Hub.Mqtt.Application/Server/MqttConnectionService.cs

using Artizan.Iot.Hub.Mqtt.Topics;
using Artizan.Iot.Hub.Mqtt.Server.Etos;
using MQTTnet;
using MQTTnet.Server;
using System;
using System.Collections.Generic;
using System.Diagnostics.Tracing;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
using Volo.Abp.EventBus.Distributed;
using MQTTnet.Protocol;
using Microsoft.Extensions.Logging;
using Artizan.Iot.Hub.Mqtt.Server.Extensions;

namespace Artizan.Iot.Hub.Mqtt.Server
{
    public class MqttConnectionService : MqttServiceBase, IMqttConnectionService, ITransientDependency
    {
        private readonly ILogger<MqttConnectionService> _logger;
        private readonly IDistributedEventBus _distributedEventBus;

        public MqttConnectionService(
            ILogger<MqttConnectionService> logger,
            IDistributedEventBus distributedEventBus)
            : base()
        {
            _logger = logger;
            _distributedEventBus = distributedEventBus;
        }

        public override void ConfigureMqttServer(MqttServer mqttServer)
        {
            base.ConfigureMqttServer(mqttServer);

            MqttServer.ValidatingConnectionAsync += ValidatingConnectionHandlerAsync; ;
            MqttServer.ClientConnectedAsync += ClientConnectedHandlerAsync;
            MqttServer.ClientDisconnectedAsync += ClientDisconnectedHandlerAsync;
        }

        private Task ValidatingConnectionHandlerAsync(ValidatingConnectionEventArgs eventArgs)
        {
            // TODO:检查设备
            // 无效客户端:if(context.ClientId != deviceId) context.ReasonCode = MQTTnet.Protocol.MqttConnectReasonCode.ClientIdentifierNotValid; // id不对
            // 用户名和密码不对:MQTTnet.Protocol.MqttConnectReasonCode.BadUserNameOrPassword;
            // 没有权限:        MQTTnet.Protocol.MqttConnectReasonCode.NotAuthorized;
            // ???:          MqttConnectReasonCode.UnspecifiedError;

            if (eventArgs.ClientId == "SpecialClient")
            {
                if (eventArgs.UserName != "USER" || eventArgs.Password != "PASS")
                {
                    eventArgs.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
                }
            }

            return Task.CompletedTask;
        }

        private async Task ClientConnectedHandlerAsync(ClientConnectedEventArgs eventArgs)
        {
            MqttServer.PublishByHub(BrokerTopics.Event.NewClientConnected, eventArgs.ClientId);

            await _distributedEventBus.PublishAsync(
               new ClientConnectedEto
               {
                   ClientId = eventArgs.ClientId
               }
            );
        }

        private async Task ClientDisconnectedHandlerAsync(ClientDisconnectedEventArgs eventArgs)
        {
            MqttServer.PublishByHub(BrokerTopics.Event.NewClientDisconnected, eventArgs.ClientId);

            await _distributedEventBus.PublishAsync(
               new ClientDisconnectedEto
               {
                   ClientId = eventArgs.ClientId
               }
            );
        }
    }
}

其中,

  • MQTTnet的MqttServer对象的初始化,
            MqttServer.ValidatingConnectionAsync += ValidatingConnectionHandlerAsync; // 处理连接验证
            MqttServer.ClientConnectedAsync += ClientConnectedHandlerAsync;  // 处理连接
            MqttServer.ClientDisconnectedAsync += ClientDisconnectedHandlerAsync; // 处理客户端断开
  • IotHub 发布内部主题
 MqttServer.PublishByHub(BrokerTopics.Event.NewClientConnected, eventArgs.ClientId);

其中使用的是自定义扩展方法:
代码清单:Artizan.Iot.Hub.Mqtt.Application/Server/Extensions/MqttServerExtensions

using Artizan.Iot.Hub.Mqtt.Topics;
using MQTTnet;
using MQTTnet.Server;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Artizan.Iot.Hub.Mqtt.Server.Extensions
{
    public static class MqttServerExtensions
    {
        /// <summary>
        ////  -----------------------------------------------------------------  
        ///  MQTTnet v3.x :
        ///     MQTTnet v3.x 中的MqttServer 类中的方法PublishAsync() 已被删除,故以下代码不能再使用:
        ///
        ///     await MqttServer.PublishAsync(BrokerEventTopics.NewClientConnected, arg.ClientId);
        ///     ,其源码: https://github.com/dotnet/MQTTnet/blob/release/3.1.x/Source/MQTTnet/Server/MqttServer.cs
        ///     
        ///  ----------------------------------------------------------------- 
        ///   MQTTnet v4.1 :
        ///     该版本中可以调用:MqttServer.InjectApplicationMessage() 方法注入消息
        ///    
        /// </summary>
        /// <param name="mqttServer"></param>
        /// <param name="mqttApplicationMessage"></param>
        /// <exception cref="ArgumentNullException"></exception>

        public static void PublishByHub(this MqttServer mqttServer, string topic, string payload)
        {
            if (topic == null) throw new ArgumentNullException(nameof(topic));

            mqttServer.PublishByHub(new MqttApplicationMessageBuilder()
                .WithTopic(topic)
                .WithPayload(payload)
                .Build());
        }

        public static void PublishByHub(this MqttServer mqttServer, MqttApplicationMessage mqttApplicationMessage)
        {
            if (mqttServer == null) throw new ArgumentNullException(nameof(mqttServer));
            if (mqttApplicationMessage == null) throw new ArgumentNullException(nameof(mqttApplicationMessage));

            mqttServer.InjectApplicationMessage(new InjectedMqttApplicationMessage(mqttApplicationMessage)
            {
                SenderClientId = MqttServiceConsts.IotHubMqttClientId
            });

        }
    }
}

  • 通过分布式事件总线,IDistributedEventBus _distributedEventBus 先其它模块或者微服务发送事件总线,降低耦合度
    事件Eto:

namespace Artizan.Iot.Hub.Mqtt.Server.Etos
{
///


/// EventName 属性是可选的,但是建议使用。
/// 如果没有为事件类型(ETO 类)声明它,
/// 则事件名称将是事件类的全名,
/// 参见:Abp:Distributed Event Bus: https://docs.abp.io/en/abp/latest/Distributed-Event-Bus
///
/// 客户端上线
///
///

[EventName(MqttServerEventConsts.ClientConnected)]
public class ClientConnectedEto
{
public string ClientId { get; set; }
}

/// <summary>
/// Eto: Client下线
/// </summary>
[EventName(MqttServerEventConsts.ClientDisconnected)]
public class ClientDisconnectedEto
{
    public string ClientId { get; set; }
}

}


## IMqttPublishingService:负责 `MqttServer `的发布相关初始化
代码清单:Artizan.Iot.Hub.Mqtt.Application/Server/MqttPublishingService.cs
```C#
using Artizan.Iot.Hub.Mqtt.Server.Etos;
using MQTTnet.Server;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
using Volo.Abp.EventBus.Distributed;
using Artizan.Iot.Hub.Mqtt.Topics;
using MQTTnet;
using Microsoft.Extensions.Logging;

namespace Artizan.Iot.Hub.Mqtt.Server
{
  public class MqttPublishingService : MqttServiceBase, IMqttPublishingService, ITransientDependency
  {
      private readonly ILogger<MqttPublishingService> _logger;
      private readonly IDistributedEventBus _distributedEventBus;

      public MqttPublishingService(
          ILogger<MqttPublishingService> logger,
          IDistributedEventBus distributedEventBus)
         : base()
      {
          _logger = logger;
          _distributedEventBus = distributedEventBus;
      }

      public override void ConfigureMqttServer(MqttServer mqttServer)
      {
          base.ConfigureMqttServer(mqttServer);

          MqttServer.InterceptingPublishAsync += InterceptingPublishHandlerAsync;
      }

      private async Task InterceptingPublishHandlerAsync(InterceptingPublishEventArgs eventArgs)
      {
          var topic = eventArgs.ApplicationMessage.Topic;

          _logger.LogDebug($"'{eventArgs.ClientId}' published '{eventArgs.ApplicationMessage.Topic}' > '{Encoding.UTF8.GetString(eventArgs.ApplicationMessage.Payload ?? new byte[0])}'");

          #region TODO:根据ClientId,用户等判断是能发布这种主题 等具体的业务逻辑,慢慢规划

          /// TDOD
          if (topic == "not_allowed_topic") // TODO:
          {
              eventArgs.ProcessPublish = false;
              eventArgs.CloseConnection = true;
          }

          if (MqttTopicFilterComparer.Compare(eventArgs.ApplicationMessage.Topic, "/myTopic/WithTimestamp/#") == MqttTopicFilterCompareResult.IsMatch)
          {
              // Replace the payload with the timestamp. But also extending a JSON 
              // based payload with the timestamp is a suitable use case.
              eventArgs.ApplicationMessage.Payload = Encoding.UTF8.GetBytes(DateTime.Now.ToString("O"));
          } 
          #endregion

          // TODO:根据ClientId, 判断是否有权限去发布主题
          if (MqttTopicHelper.IsSystemTopic(topic))
          {
              if (!MqttTopicHelper.IsBrokerItself(eventArgs.ClientId))
              {
                  // 客户端要发布系统主题,不接受,而是由系统执行系统主题的发布
                  // await _mqttInternalService.ExecuteSystemCommandAsync(context);
                  eventArgs.ProcessPublish = false;
                  return;
              }
          }
          else
          {
              if (!eventArgs.SessionItems.Contains(topic))
              {
                  var payloadString = Encoding.UTF8.GetString(eventArgs.ApplicationMessage.Payload);
                  eventArgs.SessionItems.Add(eventArgs.ApplicationMessage.Topic, eventArgs.ApplicationMessage.Payload);
              }
              else
              {
                  var retainPayload = (byte[])eventArgs.SessionItems[eventArgs.ApplicationMessage.Topic];
                  if (!retainPayload.SequenceEqual(eventArgs.ApplicationMessage.Payload))
                  {
                  }
              }
          }

          if (eventArgs.ProcessPublish)
          {
              await _distributedEventBus.PublishAsync(
                 new ClientPublishTopicEto
                 {
                     ClientId = eventArgs.ClientId,
                     Topic = eventArgs.ApplicationMessage.Topic,
                     Payload = eventArgs.ApplicationMessage.Payload
                 }
              );
          }
      }
  }
}

IMqttSubscriptionService:负责 MqttServer 的订阅相关初始化

代码清单:Artizan.Iot.Hub.Mqtt.Application/Server/MqttPublishingService.cs

using Artizan.Iot.Hub.Mqtt.Topics;
using Microsoft.Extensions.Logging;
using MQTTnet.Protocol;
using MQTTnet.Server;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
using Volo.Abp.EventBus.Distributed;

namespace Artizan.Iot.Hub.Mqtt.Server
{
    /// <summary>
    /// TODO: 加入权限系统
    /// </summary>
    public class MqttSubscriptionService : MqttServiceBase, IMqttSubscriptionService, ITransientDependency
    {
        private readonly ILogger<MqttPublishingService> _logger;
        private readonly IDistributedEventBus _distributedEventBus;
        public MqttSubscriptionService(
            ILogger<MqttPublishingService> logger,
            IDistributedEventBus distributedEventBus)
           : base()
        {
            _logger= logger;
            _distributedEventBus = distributedEventBus;
        }
        public override void ConfigureMqttServer(MqttServer mqttServer)
        {
            base.ConfigureMqttServer(mqttServer);

            MqttServer.ClientSubscribedTopicAsync += ClientSubscribedTopicHandlerAsync;
            MqttServer.InterceptingSubscriptionAsync += InterceptingSubscriptionHandlerAsync;
            MqttServer.ClientUnsubscribedTopicAsync += ClientUnsubscribedTopicHandlerAsync;
            MqttServer.InterceptingUnsubscriptionAsync += InterceptingUnsubscriptionHandlerAsync;
        }

        /// <summary>
        /// 处理客户端订阅
        /// </summary>
        /// <param name="eventArgs"></param>
        /// <returns></returns>
        /// <exception cref="NotImplementedException"></exception>
        private Task ClientSubscribedTopicHandlerAsync(ClientSubscribedTopicEventArgs eventArgs)
        {
            _logger.LogDebug($"'{eventArgs.ClientId}' subscribed '{eventArgs.TopicFilter.Topic}'");
            return Task.CompletedTask;
        }

        /// <summary>
        /// 拦截客户端订阅
        /// </summary>
        /// <param name="eventArgs"></param>
        /// <returns></returns>
        /// <exception cref="NotImplementedException"></exception>
        private Task InterceptingSubscriptionHandlerAsync(InterceptingSubscriptionEventArgs eventArgs)
        {
            /// TODO: 使用数据库+缓存 Client 的订阅权限

            if (MqttTopicHelper.IsSystemTopic(eventArgs.TopicFilter.Topic) && eventArgs.ClientId != "Administrator")  // TODO:后续支持可配置是否可订阅指定的主题
            {
                eventArgs.Response.ReasonCode = MqttSubscribeReasonCode.ImplementationSpecificError;
            }

            if (eventArgs.TopicFilter.Topic.StartsWith(BrokerTopics.Secret.Base) && eventArgs.ClientId != "Imperator")
            {
                eventArgs.Response.ReasonCode = MqttSubscribeReasonCode.ImplementationSpecificError;
                eventArgs.CloseConnection = true;
            }

            return Task.CompletedTask;
        }

        /// <summary>
        /// 处理客户端取消订阅
        /// </summary>
        /// <param name="eventArgs "></param>
        /// <returns></returns>
        /// <exception cref="NotImplementedException"></exception>
        private Task ClientUnsubscribedTopicHandlerAsync(ClientUnsubscribedTopicEventArgs eventArgs )
        {
            //_logger.LogDebug($"'{eventArgs.ClientId}' unsubscribed topic '{eventArgs.xxx}'");
            return Task.CompletedTask;
        }

        /// <summary>
        /// 拦截取消订阅
        /// </summary>
        /// <param name="eventArgs"></param>
        /// <returns></returns>
        /// <exception cref="NotImplementedException"></exception>
        private Task InterceptingUnsubscriptionHandlerAsync(InterceptingUnsubscriptionEventArgs eventArgs)
        {
            _logger.LogDebug($"'{eventArgs.ClientId}' unsubscribed topic '{eventArgs.Topic}'");
            return Task.CompletedTask;
        }

    }
}

MqttServer 初始化汇总

在项目【Artizan.Iot.Hub.Mqtt.Application.Contracts】中新建接口 IMqttServerService
代码清单:Artizan.Iot.Hub.Mqtt.Application.Contracts/Server/IMqttServerService.cs

using MQTTnet.Server;

namespace Artizan.Iot.Hub.Mqtt.Server
{
    public interface IMqttServerService
    {
        MqttServer MqttServer { get; }
          
        /// <summary>
        /// 配置MqttServer
        /// </summary>
        /// <param name="mqttServer"></param>
        void ConfigureMqttService(MqttServer mqttServer);
    }
}

在项目【Artizan.Iot.Hub.Mqtt.Application】中实现接口 IMqttServerService
代码清单:Artizan.Iot.Hub.Mqtt.ApplicationServer/MqttServerService.cs

using MQTTnet.Server;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Volo.Abp.Application.Services;
using Volo.Abp.DependencyInjection;

namespace Artizan.Iot.Hub.Mqtt.Server
{
    public class MqttServerService: IMqttServerService, ITransientDependency
    { 
        private readonly IMqttConnectionService _mqttConnectionService;
        private readonly IMqttPublishingService _mqttPublishingService;
        private readonly IMqttSubscriptionService _mqttSubscriptionService;
        private readonly IMqttInternalService _mqttInternalService;
        public MqttServer MqttServer { get; private set; }

        public MqttServerService(
            IMqttConnectionService mqttConnectionService, 
            IMqttPublishingService mqttPublishingService, 
            IMqttSubscriptionService mqttSubscriptionService, 
            IMqttInternalService mqttInternalService)
        {
            _mqttConnectionService = mqttConnectionService;
            _mqttPublishingService = mqttPublishingService;
            _mqttSubscriptionService = mqttSubscriptionService;
            _mqttInternalService = mqttInternalService;
        }


        public void ConfigureMqttService(MqttServer mqttServer)
        {
            MqttServer =  mqttServer;

            _mqttConnectionService.ConfigureMqttServer(mqttServer);
            _mqttPublishingService.ConfigureMqttServer(mqttServer);
            _mqttSubscriptionService.ConfigureMqttServer(mqttServer);
            _mqttInternalService.ConfigureMqttServer(mqttServer);
        }

    
    }
}

MQTT集成 AspNet Core

MQTT集成 AspNet Core需要做一些额外的代码编写,参见:
https://github.com/dotnet/MQTTnet/wiki/Server#validating-mqtt-clients
下面编写一个类库【Artizan.Iot.Hub.Mqtt.AspNetCore】,封装这些初始化工作,
新建类库项目【Artizan.Iot.Hub.Mqtt.AspNetCore】,添加如下包引用:

  <ItemGroup>
    <PackageReference Include="MQTTnet.AspNetCore" Version="4.1.4.563" />
    <PackageReference Include="Volo.Abp.AspNetCore.Mvc" Version="7.0.1" />
  </ItemGroup>

新建接口IIotHubMqttServer,注意这里继承接口:ISingletonDependency,使其实现类称为单例
代码清单:Artizan.Iot.Hub.Mqtt.AspNetCore/Server/IIotHubMqttServer

using MQTTnet.Server;

namespace Artizan.Iot.Hub.Mqtt.AspNetCore.Servser
{
    public interface IIotHubMqttServer : ISingletonDependency
    {
        MqttServer MqttServer { get; }
        void ConfigureMqttServer(MqttServer mqttServer);
    }
}

其实现类:IotHubMqttServer,
代码清单:Artizan.Iot.Hub.Mqtt.AspNetCore/Server/IotHubMqttServer

using Artizan.Iot.Hub.Mqtt.Server;
using MQTTnet.Server;
using Volo.Abp.DependencyInjection;

namespace Artizan.Iot.Hub.Mqtt.AspNetCore.Servser
{

    /// <summary>
    /// MQTTnet 集成 AspNetCore:
    /// https://github.com/dotnet/MQTTnet/wiki/Server#validating-mqtt-clients
    /// 
    /// 特别注意:这里使用的是单例接口 ISingletonDependency
    /// 
    /// ----------
    /// 使用MQTTnet部署MQTT服务:
    /// https://www.cnblogs.com/wx881208/p/14325011.html--+
    /// C# MQTTnet 3.1升级到MQTTnet 4.0 Client编程变化:
    /// https://blog.csdn.net/yuming/article/details/125834921?spm=1001.2101.3001.6650.7&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7ERate-7-125834921-blog-127175694.pc_relevant_3mothn_strategy_recovery&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7ERate-7-125834921-blog-127175694.pc_relevant_3mothn_strategy_recovery&utm_relevant_index=12
    /// </summary>
    public class IotHubMqttServer : IIotHubMqttServer, ISingletonDependency
    {
        private readonly IMqttServerService _mqttServerService;

        /// <summary>

        /// </summary>
        public MqttServer MqttServer { get; private set; }

        public IotHubMqttServer(IMqttServerService mqttServerService)
        {
            _mqttServerService = mqttServerService;
        }
        public void ConfigureMqttServer(MqttServer mqttServer)
        {
            MqttServer = mqttServer;
            _mqttServerService.ConfigureMqttService(mqttServer);
        }
    }
}

MQTTnet 与 AspNetCore 集成

新建 【AspNet Core WebApi】项目,名为【Artizan.Iot.Hub.Mqtt.HttpApi.Host】,添加如下包的引用

  <ItemGroup>
    <PackageReference Include="Serilog.AspNetCore" Version="6.1.0" />
    <PackageReference Include="Serilog.Sinks.Async" Version="1.5.0" />
    <PackageReference Include="Volo.Abp.AspNetCore.Serilog" Version="7.0.1" />
    <PackageReference Include="Volo.Abp.Autofac" Version="7.0.1" />
    <PackageReference Include="Volo.Abp.Swashbuckle" Version="7.0.1" />
  </ItemGroup>

  <ItemGroup>
    <ProjectReference Include="..\Artizan.Iot.Hub.Mqtt.Application\Artizan.Iot.Hub.Mqtt.Application.csproj" />
    <ProjectReference Include="..\Artizan.Iot.Hub.Mqtt.AspNetCore\Artizan.Iot.Hub.Mqtt.AspNetCore.csproj" />
    <ProjectReference Include="..\Artizan.Iot.Hub.Mqtt.HttpApi\Artizan.Iot.Hub.Mqtt.HttpApi.csproj" />
  </ItemGroup>

配置 MqttServer

代码清单:Artizan.Iot.Hub.Mqtt.HttpApi.Host/ArtizanIotHubMqttHttpApiHostModule.cs

namespace Artizan.Iot.Hub.Mqtt.HttpApi.Host
{
    [DependsOn(
        typeof(AbpAutofacModule),
        typeof(AbpAspNetCoreSerilogModule),
        typeof(AbpSwashbuckleModule),
        typeof(ArtizanIotHubMqttApplicationModule),
        typeof(ArtizanIotHubMqttHttpApiModule),
        typeof(ArtizanIotHubMqttAspNetCoreModule)
    )]
    public class ArtizanIotHubMqttHttpApiHostModule : AbpModule
    {
          ...
        public override void ConfigureServices(ServiceConfigurationContext context)
        {
          ...
          ConfigureIotHubMqttServer(context);
        }

        private void ConfigureIotHubMqttServer(ServiceConfigurationContext context)
        {
            context.Services.AddIotHubMqttServer(builder =>
            {
                builder.WithDefaultEndpoint();
            });
        }

        public override void OnApplicationInitialization(ApplicationInitializationContext context)
        {
            app.UseRouting();
            ...
            app.UseIotHubMqttServer();
            ...
        }
   }

其中,

AddIotHubMqttServer UseIotHubMqttServer

AddIotHubMqttServer()是自定义扩展方法:
代码清单:Artizan.Iot.Hub.Mqtt.HttpApi.Host/Extensions/IotHubMqttServerExtensions.cs

using Artizan.Iot.Hub.Mqtt.AspNetCore.Servser;
using MQTTnet.AspNetCore;


namespace Artizan.Iot.Hub.Mqtt.HttpApi.Host.Extensions
{
    public static class IotHubMqttServerExtensions
    {
        /// <summary>
        /// MQTTnet 集成 AspNetCore:
        ///    https://github.com/dotnet/MQTTnet/wiki/Server#validating-mqtt-clients
        /// </summary>
        public static void AddIotHubMqttServer(this IServiceCollection services,
            Action<AspNetMqttServerOptionsBuilder> configure)
        {
            services.AddHostedMqttServerWithServices(builder =>
            {
                configure(builder);
            });
            services.AddMqttConnectionHandler();
            services.AddConnections();
        }

        public static void UseIotHubMqttServer(this IApplicationBuilder app)
        {
            app.UseMqttServer(mqttServer =>
            {
                app.ApplicationServices.GetRequiredService<IIotHubMqttServer>()
                  .ConfigureMqttServer(mqttServer);
            });

            app.UseEndpoints(endpoint =>
            {
                // endpoint.MapMqtt("/mqtt");
                endpoint.MapConnectionHandler<MqttConnectionHandler>(
                    "/mqtt", // 设置MQTT的访问地址: localhost:端口/mqtt
                    httpConnectionDispatcherOptions => httpConnectionDispatcherOptions.WebSockets.SubProtocolSelector =
                        protocolList => protocolList.FirstOrDefault() ?? string.Empty); // MQTT 支持 HTTP WebSockets
            });
        }
    }
}

扩展方法:UseIotHubMqttServer()

代码清单:Program.cs

using Artizan.Iot.Hub.Mqtt.HttpApi.Host;
using MQTTnet.AspNetCore;
using Serilog;
using Serilog.Events;

namespace AbpManual.BookStore;

public class Program
{
    public async static Task<int> Main(string[] args)
    {
        Log.Logger = new LoggerConfiguration()
#if DEBUG
            .MinimumLevel.Debug()
#else
            .MinimumLevel.Information()
#endif
            .MinimumLevel.Override("Microsoft", LogEventLevel.Information)
            .MinimumLevel.Override("Microsoft.EntityFrameworkCore", LogEventLevel.Warning)
            .Enrich.FromLogContext()
            .WriteTo.Async(c => c.File("Logs/logs.txt"))
            .WriteTo.Async(c => c.Console())
            .CreateLogger();

        try
        {
            Log.Information("Starting Artizan-IotHub HttpApi Host.");
            var builder = WebApplication.CreateBuilder(args);
            builder.Host.AddAppSettingsSecretsJson()
                .UseAutofac()
                .UseSerilog();

            builder.WebHost.ConfigureKestrel(serverOptions =>
            {
                // This will allow MQTT connections based on TCP port 1883.
                serverOptions.ListenAnyIP(2883, opts => opts.UseMqtt());

                // This will allow MQTT connections based on HTTP WebSockets with URI "localhost:5883/mqtt"
                // See code below for URI configuration.
                serverOptions.ListenAnyIP(5883); // Default HTTP pipeline
            });

            await builder.AddApplicationAsync<ArtizanIotHubMqttHttpApiHostModule>();
            var app = builder.Build();
            await app.InitializeApplicationAsync();
            await app.RunAsync();
            return 0;
        }
        catch (Exception ex)
        {
            Log.Fatal(ex, "Host terminated unexpectedly!");
            return 1;
        }
        finally
        {
            Log.CloseAndFlush();
        }
    }
}

其中,设置 MQTT的监听端口

          builder.WebHost.ConfigureKestrel(serverOptions =>
            {
                // This will allow MQTT connections based on TCP port 2883.
                serverOptions.ListenAnyIP(2883, opts => opts.UseMqtt());

                // This will allow MQTT connections based on HTTP WebSockets with URI "localhost:5883/mqtt"
                // See code below for URI configuration.
                serverOptions.ListenAnyIP(5883); // Default HTTP pipeline
            });

普通的TCP端口使用:2883,访问URL:localhost:2883/mqtt,
HTTP WebSockets端口:localhost:5883/mqtt

标签:vnext,Hub,Artizan,Iot,Mqtt,Abp,MQTT,using,MqttServer
From: https://www.cnblogs.com/easy5weikai/p/17109217.html

相关文章

  • python mqtt服务器搭建
    一.在Linux中搭建mqtt服务环境:Linux版本Ubuntu 18.04.1 LTS1.进入https://www.emqx.com/zh/try?product=broker下载开源版本 EMQX 此处选择zip格式2.下载后将e......
  • paho-mqtt
    paho-mqtt介绍及安装我们主要从pypi.org网站上进行学习,网址为https://pypi.org/project/paho-mqtt/。先把paho-mqtt安装起来。pip3installpaho-mqtt-ihttps://pypi.......
  • 使用MQTT消息通信-Mosquitto搭建
    MQTT协议广泛用于物联网设备的消息传输,关于MQTT和MQTT的使用,我们准备多讲几次课,主要包括MQTT的概念和原理,MQTT的服务软件Mosquitto的搭建和使用,MQTT的python开发软件包paho......
  • Home Assistant MQTT发现服务的实现
    HomeAssistantMQTT发现服务的实现MQTT收发基于mosquito以下内容摘自官方文档,可查看文档原文MQTTDiscoveryThediscoveryofMQTTdeviceswillenableonetouse......
  • 整合MQTT
    1、步骤(1)dependencecom.google.code.gsongsonorg.springframework.integrationspring-integration-streamorg.springframework.integrationspring-integration......
  • MQTT协议详解
    MQTT协议详解 MQTT是基于Publish/Subscribe(发布订阅)模式的物联网通信协议特点:简单易实现支持Qos(服务质量)报文小MQTT协议构建于TCP/IP协议之上发布订阅模式:......
  • Abp vnext 6.0手机号验证码登录
    abpvnext6.0之后官方替换了原来的ids4,采用了openIddict的oauth认证框架。最近有一个需求是要做手机号+短信验证码登录,故需要对openiddict的授权流程进行扩展,下面记录流程......
  • PLC利用函数块连接MQTT订阅消息(一)
    在亿佰特介绍了西门子PLC如何通过函数块连接MQTT服务器和发布消息,本文为大家介绍如何通过函数与函数块实现MQTT云消息的订阅,直接切入重点。一、飞燕物联网平台配置这里......
  • PLC利用函数块连接MQTT订阅消息(一)
    在亿佰特介绍了西门子PLC如何通过函数块连接MQTT服务器和发布消息,本文为大家介绍如何通过函数与函数块实现MQTT云消息的订阅,直接切入重点。一、飞燕物联网平台配置这里的配......
  • abp vnext自定义claim
    创建UserClaimsPrincipalFactory工厂在Project.Domain中创建ProjectUserClaimsPrincipalFactoryusingSystem;usingSystem.Collections.Generic;usingSystem.Linq;......