开发中经常用到发布订阅的功能,之前一直用的Redis,使用过程中也出现了一些问题,后来换了RabbitMQ,用上去更顺手,简单记录一下。
正文开始:
RabbitMQ是一个开源的,基于AMQP(Advanced Message Queuing Protocol)协议的完整的可复用的企业级消息队,RabbitMQ可以实现点对点,发布订阅等消息处理模式。
RabbitMQ有五种模式
- 简单工作模式(一对一):一个生产者,一个队列,一个消费者
- 工作模式(一对多):一个生产者,一个队列,多个消费者
- 发布订阅模式(Fanout):一个生产者,一个交换机,多个队列,多个消费者,一个消息可以被多个消费者消费
- 路由模式(Direct):一个生产者,一个交换机,多个队列,多个消费者,key,消息只发送给符合条件的消息队列
- 通配符模式(Topic):通配符和路由模式类似,路由是匹配Key,通配符是模糊匹配,主要符合模糊匹配条件,都可以收到消息
下面粘贴一下,项目中用到的一些RabbitMQ相关内容。
1、配置文件设置RabbitMQ的URL链接
{ "Logging": { "LogLevel": { "Default": "Information", "Microsoft.AspNetCore": "Warning" } }, "AllowedHosts": "*", "ConnectionStrings": { "MySql": "server=192.168.0.140;port=3306;userid=root;password=1qaz@WSX3edc;database=netsituation", "RedisConnection": "127.0.0.1:6379,defaultDatabase=0" }, "rabbit": { "uri": "amqp://guest:[email protected]:5672/" }, "Kestrel": { "EndPoints": { "Http": { "Url": "http://*:8001" } } } }View Code
2、连接字符串类
/// <summary> /// 连接字符串类 /// </summary> public class RabbitOption { public RabbitOption(IConfiguration config) { if (config == null) throw new ArgumentException(nameof(config)); var section = config.GetSection("rabbit"); section.Bind(this); } public string Uri { get; set; } }View Code
3、通过IOC创建单例的RabbitMQ的客户端
/// <summary> /// 通过IOC创建单例的RabbitMQ的客户端 /// </summary> public static class ServiceCollectionExtensions { public static IServiceCollection AddeRabbitMQConnection(this IServiceCollection services, IConfiguration configuration) { if (services == null) throw new ArgumentException(nameof(services)); if (configuration == null) throw new ArgumentException(nameof(configuration)); var rabbitOption = new RabbitOption(configuration); var factory = new ConnectionFactory { Uri = new Uri(rabbitOption.Uri), AutomaticRecoveryEnabled = true, NetworkRecoveryInterval = TimeSpan.FromSeconds(60) }; services.AddSingleton<IRabbitMQPersistentConnection>(x => { var logger = x.GetRequiredService<ILogger<RabbitMQPersistentConnection>>(); return new RabbitMQPersistentConnection(factory, logger); }); return services; } }View Code
4、定义连接RabbitMQ的接口
public interface IRabbitMQPersistentConnection { /// <summary> /// 判断是否连接 /// </summary> bool IsConnected { get; } /// <summary> /// 连接 /// </summary> /// <returns></returns> bool TryConnect(); /// <summary> /// 创建模型 /// </summary> /// <returns></returns> IModel CreateModel(); }View Code
5、接口实现
public class RabbitMQPersistentConnection : IRabbitMQPersistentConnection { private readonly IConnectionFactory connectionFactory; private readonly ILogger<RabbitMQPersistentConnection> logger; private IConnection connection; private const int RETTRYCOUNT = 6; private static readonly object lockObj = new object(); public static bool IsBreak = false; public RabbitMQPersistentConnection(IConnectionFactory connectionFactory, ILogger<RabbitMQPersistentConnection> logger) { this.connectionFactory = connectionFactory; this.logger = logger; } public bool IsConnected { get { return connection != null && connection.IsOpen; } } public void Cleanup() { try { connection.Dispose(); connection = null; } catch (IOException ex) { logger.LogCritical(ex.ToString()); } } public IModel CreateModel() { if (!IsConnected) { connection.Close(); throw new InvalidOperationException("连接不到rabbitmq"); } return connection.CreateModel(); } public bool TryConnect() { logger.LogInformation("RabbitMQ客户端尝试连接"); lock (lockObj) { if (connection == null) { var policy = Policy.Handle<SocketException>() .Or<BrokerUnreachableException>() .WaitAndRetry(RETTRYCOUNT, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) => { logger.LogWarning(ex.ToString()); }); policy.Execute(() => { connection = connectionFactory.CreateConnection(); }); } if (IsConnected) { connection.ConnectionShutdown += OnConnectionShutdown; connection.CallbackException += OnCallbackException; connection.ConnectionBlocked += OnConnectionBlocked; logger.LogInformation($"RabbitMQ{connection.Endpoint.HostName}获取了连接"); return true; } else { logger.LogCritical("无法创建和打开RabbitMQ连接"); return false; } } } private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e) { logger.LogWarning("RabbitMQ连接异常,尝试重连..."); Cleanup(); TryConnect(); } private void OnCallbackException(object sender, CallbackExceptionEventArgs e) { logger.LogWarning("RabbitMQ连接异常,尝试重连..."); Cleanup(); TryConnect(); } private void OnConnectionShutdown(object sender, ShutdownEventArgs reason) { logger.LogWarning("RabbitMQ连接异常,尝试重连..."); IsBreak = true; Cleanup(); TryConnect(); } }View Code
6、定义发布消息接口
public interface IMessageService { /// <summary> /// 发送消息(工作队列模式) /// </summary> /// <param name="queueName"></param> /// <param name="body"></param> void SendMessage(string queueName, byte[] body); /// <summary> /// 发送消息(发布订阅模式) /// </summary> /// <param name="exchangeName"></param> /// <param name="body"></param> void SendFanoutMessage(string exchangeName, byte[] body); }View Code
7、实现发布消息接口
/// <summary> /// RabbitMQ实现 /// </summary> public class MessageService : IMessageService { private readonly IRabbitMQPersistentConnection rabbitMQPersistentConnection; public MessageService(IRabbitMQPersistentConnection rabbitMQPersistentConnection) { this.rabbitMQPersistentConnection = rabbitMQPersistentConnection; } public void SendFanoutMessage(string exchangeName,byte[] body) { if (!rabbitMQPersistentConnection.IsConnected) { rabbitMQPersistentConnection.TryConnect(); } using (var channel = rabbitMQPersistentConnection.CreateModel()) { //把交换机设置成Fanout模式 channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, false, false); var properties = channel.CreateBasicProperties(); properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()); properties.Persistent = true; channel.BasicPublish(exchangeName, "", properties, body); } } public void SendMessage(string queueName, byte[] body) { if (!rabbitMQPersistentConnection.IsConnected) { rabbitMQPersistentConnection.TryConnect(); } using (var channel = rabbitMQPersistentConnection.CreateModel()) { channel.QueueDeclare(queueName, true, false, false, null); var properties = channel.CreateBasicProperties(); properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()); properties.Persistent = true; channel.BasicPublish("", queueName, properties, body); } } }View Code
8、RabbitMQ连接参数
/// <summary> /// RabbitMQ连接参数 /// </summary> public class RabbitMQModel { /// <summary> /// 交换机名称 /// </summary> public string ExchangeName = "pvm"; /// <summary> /// 消息队列名称 /// </summary> public string QueueName { get; set; } /// <summary> /// 消息内容 /// </summary> public byte[] Body { get; set; } }View Code
标签:Core,rabbitMQPersistentConnection,RabbitMQ,public,connection,new,Net,logger From: https://www.cnblogs.com/zhangjd/p/18179313