目录
- Artizan.Iot.Hub.Mqtt.Application.Contracts
- Artizan.Iot.Hub.Mqtt.Application
- MqttServiceBase
- MqttServer 初始化汇总
- MQTT集成 AspNet Core
- MQTTnet 与 AspNetCore 集成
Artizan.Iot.Hub.Mqtt.Application.Contracts
新建类库项目 【Artizan.Iot.Hub.Mqtt.Application.Contracts】,添加对包MQTTnet
的引用
<PackageReference Include="MQTTnet" Version="4.1.4.563" />
IMqttServiceBase
在项目【Artizan.Iot.Hub.Mqtt.Application.Contracts】中新建接口 IMqttServiceBase
,用于配置 MqttServer
代码清单:Artizan.Iot.Hub.Mqtt.Application.Contracts/Server/IMqttServiceBase.cs
using MQTTnet.Server;
namespace Artizan.Iot.Hub.Mqtt.Server
{
public interface IMqttServiceBase
{
protected MqttServer MqttServer { get; }
/// <summary>
/// 配置MqttServer
/// </summary>
/// <param name="mqttServer"></param>
void ConfigureMqttServer(MqttServer mqttServer);
}
}
其中,MqttServer
保留对 MqttServer 的,以便在接口方法
void ConfigureMqttServer(MqttServer mqttServer)
中对其进行配置。
IMqttConnectionService :负责 MqttServer
与链接相关初始化
代码清单:Artizan.Iot.Hub.Mqtt.Application.Contracts/Server/IMqttConnectionService.cs
namespace Artizan.Iot.Hub.Mqtt.Server
{
public interface IMqttConnectionService : IMqttServiceBase
{
}
}
IMqttPublishingService:负责 MqttServer
的发布相关初始化
代码清单:Artizan.Iot.Hub.Mqtt.Application.Contracts/Server/IMqttPublishingService .cs
namespace Artizan.Iot.Hub.Mqtt.Server
{
public interface IMqttPublishingService : IMqttServiceBase
{
}
}
IMqttSubscriptionService:负责 MqttServer
的订阅相关初始化
代码清单:Artizan.Iot.Hub.Mqtt.Application.Contracts/Server/IMqttPublishingService .cs
namespace Artizan.Iot.Hub.Mqtt.Server
{
public interface IMqttSubscriptionService : IMqttServiceBase
{
}
}
Artizan.Iot.Hub.Mqtt.Application
新建类库项目 【Artizan.Iot.Hub.Mqtt.Application】,添加对项目 【Artizan.Iot.Hub.Mqtt.Application.Contracts的引用
MqttServiceBase
在项目【Artizan.Iot.Hub.Mqtt.Application】中新建接口 MqttServiceBase
代码清单:Artizan.Iot.Hub.Mqtt.Application/Server/MqttServiceBase.cs
using MQTTnet;
using MQTTnet.Server;
namespace Artizan.Iot.Hub.Mqtt.Server
{
public class MqttServiceBase : IMqttServiceBase
{
public MqttServer MqttServer { get; private set; }
public MqttServiceBase() { }
public virtual void ConfigureMqttServer(MqttServer mqttServer)
{
MqttServer = mqttServer;
}
protected virtual async Task<MqttClientStatus?> GetClientStatusAsync(string clientId)
{
var allClientStatuses = await MqttServer.GetClientsAsync();
return allClientStatuses.FirstOrDefault(cs => cs.Id == clientId);
}
protected virtual string GetClientIdFromPayload(MqttApplicationMessage message)
{
var payload = System.Text.Encoding.UTF8.GetString(message.Payload);
// TODO: for JSON type data transfer get clientId from json payload
return payload;
}
protected virtual async Task DisconnectClientAsync(string clientId)
{
var clientStatus = await GetClientStatusAsync(clientId);
if (clientStatus != null)
{
await clientStatus.DisconnectAsync();
}
}
}
}
MqttConnectionService :负责 MqttServer
与链接相关初始化
代码清单:Artizan.Iot.Hub.Mqtt.Application/Server/MqttConnectionService.cs
using Artizan.Iot.Hub.Mqtt.Topics;
using Artizan.Iot.Hub.Mqtt.Server.Etos;
using MQTTnet;
using MQTTnet.Server;
using System;
using System.Collections.Generic;
using System.Diagnostics.Tracing;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
using Volo.Abp.EventBus.Distributed;
using MQTTnet.Protocol;
using Microsoft.Extensions.Logging;
using Artizan.Iot.Hub.Mqtt.Server.Extensions;
namespace Artizan.Iot.Hub.Mqtt.Server
{
public class MqttConnectionService : MqttServiceBase, IMqttConnectionService, ITransientDependency
{
private readonly ILogger<MqttConnectionService> _logger;
private readonly IDistributedEventBus _distributedEventBus;
public MqttConnectionService(
ILogger<MqttConnectionService> logger,
IDistributedEventBus distributedEventBus)
: base()
{
_logger = logger;
_distributedEventBus = distributedEventBus;
}
public override void ConfigureMqttServer(MqttServer mqttServer)
{
base.ConfigureMqttServer(mqttServer);
MqttServer.ValidatingConnectionAsync += ValidatingConnectionHandlerAsync; ;
MqttServer.ClientConnectedAsync += ClientConnectedHandlerAsync;
MqttServer.ClientDisconnectedAsync += ClientDisconnectedHandlerAsync;
}
private Task ValidatingConnectionHandlerAsync(ValidatingConnectionEventArgs eventArgs)
{
// TODO:检查设备
// 无效客户端:if(context.ClientId != deviceId) context.ReasonCode = MQTTnet.Protocol.MqttConnectReasonCode.ClientIdentifierNotValid; // id不对
// 用户名和密码不对:MQTTnet.Protocol.MqttConnectReasonCode.BadUserNameOrPassword;
// 没有权限: MQTTnet.Protocol.MqttConnectReasonCode.NotAuthorized;
// ???: MqttConnectReasonCode.UnspecifiedError;
if (eventArgs.ClientId == "SpecialClient")
{
if (eventArgs.UserName != "USER" || eventArgs.Password != "PASS")
{
eventArgs.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
}
}
return Task.CompletedTask;
}
private async Task ClientConnectedHandlerAsync(ClientConnectedEventArgs eventArgs)
{
MqttServer.PublishByHub(BrokerTopics.Event.NewClientConnected, eventArgs.ClientId);
await _distributedEventBus.PublishAsync(
new ClientConnectedEto
{
ClientId = eventArgs.ClientId
}
);
}
private async Task ClientDisconnectedHandlerAsync(ClientDisconnectedEventArgs eventArgs)
{
MqttServer.PublishByHub(BrokerTopics.Event.NewClientDisconnected, eventArgs.ClientId);
await _distributedEventBus.PublishAsync(
new ClientDisconnectedEto
{
ClientId = eventArgs.ClientId
}
);
}
}
}
其中,
- MQTTnet的MqttServer对象的初始化,
MqttServer.ValidatingConnectionAsync += ValidatingConnectionHandlerAsync; // 处理连接验证
MqttServer.ClientConnectedAsync += ClientConnectedHandlerAsync; // 处理连接
MqttServer.ClientDisconnectedAsync += ClientDisconnectedHandlerAsync; // 处理客户端断开
- IotHub 发布内部主题
MqttServer.PublishByHub(BrokerTopics.Event.NewClientConnected, eventArgs.ClientId);
其中使用的是自定义扩展方法:
代码清单:Artizan.Iot.Hub.Mqtt.Application/Server/Extensions/MqttServerExtensions
using Artizan.Iot.Hub.Mqtt.Topics;
using MQTTnet;
using MQTTnet.Server;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Artizan.Iot.Hub.Mqtt.Server.Extensions
{
public static class MqttServerExtensions
{
/// <summary>
//// -----------------------------------------------------------------
/// MQTTnet v3.x :
/// MQTTnet v3.x 中的MqttServer 类中的方法PublishAsync() 已被删除,故以下代码不能再使用:
///
/// await MqttServer.PublishAsync(BrokerEventTopics.NewClientConnected, arg.ClientId);
/// ,其源码: https://github.com/dotnet/MQTTnet/blob/release/3.1.x/Source/MQTTnet/Server/MqttServer.cs
///
/// -----------------------------------------------------------------
/// MQTTnet v4.1 :
/// 该版本中可以调用:MqttServer.InjectApplicationMessage() 方法注入消息
///
/// </summary>
/// <param name="mqttServer"></param>
/// <param name="mqttApplicationMessage"></param>
/// <exception cref="ArgumentNullException"></exception>
public static void PublishByHub(this MqttServer mqttServer, string topic, string payload)
{
if (topic == null) throw new ArgumentNullException(nameof(topic));
mqttServer.PublishByHub(new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(payload)
.Build());
}
public static void PublishByHub(this MqttServer mqttServer, MqttApplicationMessage mqttApplicationMessage)
{
if (mqttServer == null) throw new ArgumentNullException(nameof(mqttServer));
if (mqttApplicationMessage == null) throw new ArgumentNullException(nameof(mqttApplicationMessage));
mqttServer.InjectApplicationMessage(new InjectedMqttApplicationMessage(mqttApplicationMessage)
{
SenderClientId = MqttServiceConsts.IotHubMqttClientId
});
}
}
}
- 通过分布式事件总线,
IDistributedEventBus _distributedEventBus
先其它模块或者微服务发送事件总线,降低耦合度
事件Eto:
namespace Artizan.Iot.Hub.Mqtt.Server.Etos
{
///
/// EventName 属性是可选的,但是建议使用。
/// 如果没有为事件类型(ETO 类)声明它,
/// 则事件名称将是事件类的全名,
/// 参见:Abp:Distributed Event Bus: https://docs.abp.io/en/abp/latest/Distributed-Event-Bus
///
/// 客户端上线
///
///
[EventName(MqttServerEventConsts.ClientConnected)]
public class ClientConnectedEto
{
public string ClientId { get; set; }
}
/// <summary>
/// Eto: Client下线
/// </summary>
[EventName(MqttServerEventConsts.ClientDisconnected)]
public class ClientDisconnectedEto
{
public string ClientId { get; set; }
}
}
## IMqttPublishingService:负责 `MqttServer `的发布相关初始化
代码清单:Artizan.Iot.Hub.Mqtt.Application/Server/MqttPublishingService.cs
```C#
using Artizan.Iot.Hub.Mqtt.Server.Etos;
using MQTTnet.Server;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
using Volo.Abp.EventBus.Distributed;
using Artizan.Iot.Hub.Mqtt.Topics;
using MQTTnet;
using Microsoft.Extensions.Logging;
namespace Artizan.Iot.Hub.Mqtt.Server
{
public class MqttPublishingService : MqttServiceBase, IMqttPublishingService, ITransientDependency
{
private readonly ILogger<MqttPublishingService> _logger;
private readonly IDistributedEventBus _distributedEventBus;
public MqttPublishingService(
ILogger<MqttPublishingService> logger,
IDistributedEventBus distributedEventBus)
: base()
{
_logger = logger;
_distributedEventBus = distributedEventBus;
}
public override void ConfigureMqttServer(MqttServer mqttServer)
{
base.ConfigureMqttServer(mqttServer);
MqttServer.InterceptingPublishAsync += InterceptingPublishHandlerAsync;
}
private async Task InterceptingPublishHandlerAsync(InterceptingPublishEventArgs eventArgs)
{
var topic = eventArgs.ApplicationMessage.Topic;
_logger.LogDebug($"'{eventArgs.ClientId}' published '{eventArgs.ApplicationMessage.Topic}' > '{Encoding.UTF8.GetString(eventArgs.ApplicationMessage.Payload ?? new byte[0])}'");
#region TODO:根据ClientId,用户等判断是能发布这种主题 等具体的业务逻辑,慢慢规划
/// TDOD
if (topic == "not_allowed_topic") // TODO:
{
eventArgs.ProcessPublish = false;
eventArgs.CloseConnection = true;
}
if (MqttTopicFilterComparer.Compare(eventArgs.ApplicationMessage.Topic, "/myTopic/WithTimestamp/#") == MqttTopicFilterCompareResult.IsMatch)
{
// Replace the payload with the timestamp. But also extending a JSON
// based payload with the timestamp is a suitable use case.
eventArgs.ApplicationMessage.Payload = Encoding.UTF8.GetBytes(DateTime.Now.ToString("O"));
}
#endregion
// TODO:根据ClientId, 判断是否有权限去发布主题
if (MqttTopicHelper.IsSystemTopic(topic))
{
if (!MqttTopicHelper.IsBrokerItself(eventArgs.ClientId))
{
// 客户端要发布系统主题,不接受,而是由系统执行系统主题的发布
// await _mqttInternalService.ExecuteSystemCommandAsync(context);
eventArgs.ProcessPublish = false;
return;
}
}
else
{
if (!eventArgs.SessionItems.Contains(topic))
{
var payloadString = Encoding.UTF8.GetString(eventArgs.ApplicationMessage.Payload);
eventArgs.SessionItems.Add(eventArgs.ApplicationMessage.Topic, eventArgs.ApplicationMessage.Payload);
}
else
{
var retainPayload = (byte[])eventArgs.SessionItems[eventArgs.ApplicationMessage.Topic];
if (!retainPayload.SequenceEqual(eventArgs.ApplicationMessage.Payload))
{
}
}
}
if (eventArgs.ProcessPublish)
{
await _distributedEventBus.PublishAsync(
new ClientPublishTopicEto
{
ClientId = eventArgs.ClientId,
Topic = eventArgs.ApplicationMessage.Topic,
Payload = eventArgs.ApplicationMessage.Payload
}
);
}
}
}
}
IMqttSubscriptionService:负责 MqttServer
的订阅相关初始化
代码清单:Artizan.Iot.Hub.Mqtt.Application/Server/MqttPublishingService.cs
using Artizan.Iot.Hub.Mqtt.Topics;
using Microsoft.Extensions.Logging;
using MQTTnet.Protocol;
using MQTTnet.Server;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
using Volo.Abp.EventBus.Distributed;
namespace Artizan.Iot.Hub.Mqtt.Server
{
/// <summary>
/// TODO: 加入权限系统
/// </summary>
public class MqttSubscriptionService : MqttServiceBase, IMqttSubscriptionService, ITransientDependency
{
private readonly ILogger<MqttPublishingService> _logger;
private readonly IDistributedEventBus _distributedEventBus;
public MqttSubscriptionService(
ILogger<MqttPublishingService> logger,
IDistributedEventBus distributedEventBus)
: base()
{
_logger= logger;
_distributedEventBus = distributedEventBus;
}
public override void ConfigureMqttServer(MqttServer mqttServer)
{
base.ConfigureMqttServer(mqttServer);
MqttServer.ClientSubscribedTopicAsync += ClientSubscribedTopicHandlerAsync;
MqttServer.InterceptingSubscriptionAsync += InterceptingSubscriptionHandlerAsync;
MqttServer.ClientUnsubscribedTopicAsync += ClientUnsubscribedTopicHandlerAsync;
MqttServer.InterceptingUnsubscriptionAsync += InterceptingUnsubscriptionHandlerAsync;
}
/// <summary>
/// 处理客户端订阅
/// </summary>
/// <param name="eventArgs"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
private Task ClientSubscribedTopicHandlerAsync(ClientSubscribedTopicEventArgs eventArgs)
{
_logger.LogDebug($"'{eventArgs.ClientId}' subscribed '{eventArgs.TopicFilter.Topic}'");
return Task.CompletedTask;
}
/// <summary>
/// 拦截客户端订阅
/// </summary>
/// <param name="eventArgs"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
private Task InterceptingSubscriptionHandlerAsync(InterceptingSubscriptionEventArgs eventArgs)
{
/// TODO: 使用数据库+缓存 Client 的订阅权限
if (MqttTopicHelper.IsSystemTopic(eventArgs.TopicFilter.Topic) && eventArgs.ClientId != "Administrator") // TODO:后续支持可配置是否可订阅指定的主题
{
eventArgs.Response.ReasonCode = MqttSubscribeReasonCode.ImplementationSpecificError;
}
if (eventArgs.TopicFilter.Topic.StartsWith(BrokerTopics.Secret.Base) && eventArgs.ClientId != "Imperator")
{
eventArgs.Response.ReasonCode = MqttSubscribeReasonCode.ImplementationSpecificError;
eventArgs.CloseConnection = true;
}
return Task.CompletedTask;
}
/// <summary>
/// 处理客户端取消订阅
/// </summary>
/// <param name="eventArgs "></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
private Task ClientUnsubscribedTopicHandlerAsync(ClientUnsubscribedTopicEventArgs eventArgs )
{
//_logger.LogDebug($"'{eventArgs.ClientId}' unsubscribed topic '{eventArgs.xxx}'");
return Task.CompletedTask;
}
/// <summary>
/// 拦截取消订阅
/// </summary>
/// <param name="eventArgs"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
private Task InterceptingUnsubscriptionHandlerAsync(InterceptingUnsubscriptionEventArgs eventArgs)
{
_logger.LogDebug($"'{eventArgs.ClientId}' unsubscribed topic '{eventArgs.Topic}'");
return Task.CompletedTask;
}
}
}
MqttServer 初始化汇总
在项目【Artizan.Iot.Hub.Mqtt.Application.Contracts】中新建接口 IMqttServerService
,
代码清单:Artizan.Iot.Hub.Mqtt.Application.Contracts/Server/IMqttServerService.cs
using MQTTnet.Server;
namespace Artizan.Iot.Hub.Mqtt.Server
{
public interface IMqttServerService
{
MqttServer MqttServer { get; }
/// <summary>
/// 配置MqttServer
/// </summary>
/// <param name="mqttServer"></param>
void ConfigureMqttService(MqttServer mqttServer);
}
}
在项目【Artizan.Iot.Hub.Mqtt.Application】中实现接口 IMqttServerService
,
代码清单:Artizan.Iot.Hub.Mqtt.ApplicationServer/MqttServerService.cs
using MQTTnet.Server;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Volo.Abp.Application.Services;
using Volo.Abp.DependencyInjection;
namespace Artizan.Iot.Hub.Mqtt.Server
{
public class MqttServerService: IMqttServerService, ITransientDependency
{
private readonly IMqttConnectionService _mqttConnectionService;
private readonly IMqttPublishingService _mqttPublishingService;
private readonly IMqttSubscriptionService _mqttSubscriptionService;
private readonly IMqttInternalService _mqttInternalService;
public MqttServer MqttServer { get; private set; }
public MqttServerService(
IMqttConnectionService mqttConnectionService,
IMqttPublishingService mqttPublishingService,
IMqttSubscriptionService mqttSubscriptionService,
IMqttInternalService mqttInternalService)
{
_mqttConnectionService = mqttConnectionService;
_mqttPublishingService = mqttPublishingService;
_mqttSubscriptionService = mqttSubscriptionService;
_mqttInternalService = mqttInternalService;
}
public void ConfigureMqttService(MqttServer mqttServer)
{
MqttServer = mqttServer;
_mqttConnectionService.ConfigureMqttServer(mqttServer);
_mqttPublishingService.ConfigureMqttServer(mqttServer);
_mqttSubscriptionService.ConfigureMqttServer(mqttServer);
_mqttInternalService.ConfigureMqttServer(mqttServer);
}
}
}
MQTT集成 AspNet Core
MQTT集成 AspNet Core需要做一些额外的代码编写,参见:
https://github.com/dotnet/MQTTnet/wiki/Server#validating-mqtt-clients
下面编写一个类库【Artizan.Iot.Hub.Mqtt.AspNetCore】,封装这些初始化工作,
新建类库项目【Artizan.Iot.Hub.Mqtt.AspNetCore】,添加如下包引用:
<ItemGroup>
<PackageReference Include="MQTTnet.AspNetCore" Version="4.1.4.563" />
<PackageReference Include="Volo.Abp.AspNetCore.Mvc" Version="7.0.1" />
</ItemGroup>
新建接口IIotHubMqttServer
,注意这里继承接口:ISingletonDependency,使其实现类称为单例
代码清单:Artizan.Iot.Hub.Mqtt.AspNetCore/Server/IIotHubMqttServer
using MQTTnet.Server;
namespace Artizan.Iot.Hub.Mqtt.AspNetCore.Servser
{
public interface IIotHubMqttServer : ISingletonDependency
{
MqttServer MqttServer { get; }
void ConfigureMqttServer(MqttServer mqttServer);
}
}
其实现类:IotHubMqttServer,
代码清单:Artizan.Iot.Hub.Mqtt.AspNetCore/Server/IotHubMqttServer
using Artizan.Iot.Hub.Mqtt.Server;
using MQTTnet.Server;
using Volo.Abp.DependencyInjection;
namespace Artizan.Iot.Hub.Mqtt.AspNetCore.Servser
{
/// <summary>
/// MQTTnet 集成 AspNetCore:
/// https://github.com/dotnet/MQTTnet/wiki/Server#validating-mqtt-clients
///
/// 特别注意:这里使用的是单例接口 ISingletonDependency
///
/// ----------
/// 使用MQTTnet部署MQTT服务:
/// https://www.cnblogs.com/wx881208/p/14325011.html--+
/// C# MQTTnet 3.1升级到MQTTnet 4.0 Client编程变化:
/// https://blog.csdn.net/yuming/article/details/125834921?spm=1001.2101.3001.6650.7&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7ERate-7-125834921-blog-127175694.pc_relevant_3mothn_strategy_recovery&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7ERate-7-125834921-blog-127175694.pc_relevant_3mothn_strategy_recovery&utm_relevant_index=12
/// </summary>
public class IotHubMqttServer : IIotHubMqttServer, ISingletonDependency
{
private readonly IMqttServerService _mqttServerService;
/// <summary>
/// </summary>
public MqttServer MqttServer { get; private set; }
public IotHubMqttServer(IMqttServerService mqttServerService)
{
_mqttServerService = mqttServerService;
}
public void ConfigureMqttServer(MqttServer mqttServer)
{
MqttServer = mqttServer;
_mqttServerService.ConfigureMqttService(mqttServer);
}
}
}
MQTTnet 与 AspNetCore 集成
新建 【AspNet Core WebApi】项目,名为【Artizan.Iot.Hub.Mqtt.HttpApi.Host】,添加如下包的引用
<ItemGroup>
<PackageReference Include="Serilog.AspNetCore" Version="6.1.0" />
<PackageReference Include="Serilog.Sinks.Async" Version="1.5.0" />
<PackageReference Include="Volo.Abp.AspNetCore.Serilog" Version="7.0.1" />
<PackageReference Include="Volo.Abp.Autofac" Version="7.0.1" />
<PackageReference Include="Volo.Abp.Swashbuckle" Version="7.0.1" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Artizan.Iot.Hub.Mqtt.Application\Artizan.Iot.Hub.Mqtt.Application.csproj" />
<ProjectReference Include="..\Artizan.Iot.Hub.Mqtt.AspNetCore\Artizan.Iot.Hub.Mqtt.AspNetCore.csproj" />
<ProjectReference Include="..\Artizan.Iot.Hub.Mqtt.HttpApi\Artizan.Iot.Hub.Mqtt.HttpApi.csproj" />
</ItemGroup>
配置 MqttServer
代码清单:Artizan.Iot.Hub.Mqtt.HttpApi.Host/ArtizanIotHubMqttHttpApiHostModule.cs
namespace Artizan.Iot.Hub.Mqtt.HttpApi.Host
{
[DependsOn(
typeof(AbpAutofacModule),
typeof(AbpAspNetCoreSerilogModule),
typeof(AbpSwashbuckleModule),
typeof(ArtizanIotHubMqttApplicationModule),
typeof(ArtizanIotHubMqttHttpApiModule),
typeof(ArtizanIotHubMqttAspNetCoreModule)
)]
public class ArtizanIotHubMqttHttpApiHostModule : AbpModule
{
...
public override void ConfigureServices(ServiceConfigurationContext context)
{
...
ConfigureIotHubMqttServer(context);
}
private void ConfigureIotHubMqttServer(ServiceConfigurationContext context)
{
context.Services.AddIotHubMqttServer(builder =>
{
builder.WithDefaultEndpoint();
});
}
public override void OnApplicationInitialization(ApplicationInitializationContext context)
{
app.UseRouting();
...
app.UseIotHubMqttServer();
...
}
}
其中,
AddIotHubMqttServer UseIotHubMqttServer
AddIotHubMqttServer()
是自定义扩展方法:
代码清单:Artizan.Iot.Hub.Mqtt.HttpApi.Host/Extensions/IotHubMqttServerExtensions.cs
using Artizan.Iot.Hub.Mqtt.AspNetCore.Servser;
using MQTTnet.AspNetCore;
namespace Artizan.Iot.Hub.Mqtt.HttpApi.Host.Extensions
{
public static class IotHubMqttServerExtensions
{
/// <summary>
/// MQTTnet 集成 AspNetCore:
/// https://github.com/dotnet/MQTTnet/wiki/Server#validating-mqtt-clients
/// </summary>
public static void AddIotHubMqttServer(this IServiceCollection services,
Action<AspNetMqttServerOptionsBuilder> configure)
{
services.AddHostedMqttServerWithServices(builder =>
{
configure(builder);
});
services.AddMqttConnectionHandler();
services.AddConnections();
}
public static void UseIotHubMqttServer(this IApplicationBuilder app)
{
app.UseMqttServer(mqttServer =>
{
app.ApplicationServices.GetRequiredService<IIotHubMqttServer>()
.ConfigureMqttServer(mqttServer);
});
app.UseEndpoints(endpoint =>
{
// endpoint.MapMqtt("/mqtt");
endpoint.MapConnectionHandler<MqttConnectionHandler>(
"/mqtt", // 设置MQTT的访问地址: localhost:端口/mqtt
httpConnectionDispatcherOptions => httpConnectionDispatcherOptions.WebSockets.SubProtocolSelector =
protocolList => protocolList.FirstOrDefault() ?? string.Empty); // MQTT 支持 HTTP WebSockets
});
}
}
}
扩展方法:UseIotHubMqttServer()
代码清单:Program.cs
using Artizan.Iot.Hub.Mqtt.HttpApi.Host;
using MQTTnet.AspNetCore;
using Serilog;
using Serilog.Events;
namespace AbpManual.BookStore;
public class Program
{
public async static Task<int> Main(string[] args)
{
Log.Logger = new LoggerConfiguration()
#if DEBUG
.MinimumLevel.Debug()
#else
.MinimumLevel.Information()
#endif
.MinimumLevel.Override("Microsoft", LogEventLevel.Information)
.MinimumLevel.Override("Microsoft.EntityFrameworkCore", LogEventLevel.Warning)
.Enrich.FromLogContext()
.WriteTo.Async(c => c.File("Logs/logs.txt"))
.WriteTo.Async(c => c.Console())
.CreateLogger();
try
{
Log.Information("Starting Artizan-IotHub HttpApi Host.");
var builder = WebApplication.CreateBuilder(args);
builder.Host.AddAppSettingsSecretsJson()
.UseAutofac()
.UseSerilog();
builder.WebHost.ConfigureKestrel(serverOptions =>
{
// This will allow MQTT connections based on TCP port 1883.
serverOptions.ListenAnyIP(2883, opts => opts.UseMqtt());
// This will allow MQTT connections based on HTTP WebSockets with URI "localhost:5883/mqtt"
// See code below for URI configuration.
serverOptions.ListenAnyIP(5883); // Default HTTP pipeline
});
await builder.AddApplicationAsync<ArtizanIotHubMqttHttpApiHostModule>();
var app = builder.Build();
await app.InitializeApplicationAsync();
await app.RunAsync();
return 0;
}
catch (Exception ex)
{
Log.Fatal(ex, "Host terminated unexpectedly!");
return 1;
}
finally
{
Log.CloseAndFlush();
}
}
}
其中,设置 MQTT的监听端口
builder.WebHost.ConfigureKestrel(serverOptions =>
{
// This will allow MQTT connections based on TCP port 2883.
serverOptions.ListenAnyIP(2883, opts => opts.UseMqtt());
// This will allow MQTT connections based on HTTP WebSockets with URI "localhost:5883/mqtt"
// See code below for URI configuration.
serverOptions.ListenAnyIP(5883); // Default HTTP pipeline
});
普通的TCP端口使用:2883,访问URL:localhost:2883/mqtt,
HTTP WebSockets端口:localhost:5883/mqtt