首页 > 其他分享 >.Net Core中使用RabbitMQ

.Net Core中使用RabbitMQ

时间:2024-05-08 11:22:21浏览次数:20  
标签:Core rabbitMQPersistentConnection RabbitMQ public connection new Net logger

开发中经常用到发布订阅的功能,之前一直用的Redis,使用过程中也出现了一些问题,后来换了RabbitMQ,用上去更顺手,简单记录一下。

正文开始:

RabbitMQ是一个开源的,基于AMQP(Advanced Message Queuing Protocol)协议的完整的可复用的企业级消息队,RabbitMQ可以实现点对点,发布订阅等消息处理模式。

RabbitMQ有五种模式

  1. 简单工作模式(一对一):一个生产者,一个队列,一个消费者
  2. 工作模式(一对多):一个生产者,一个队列,多个消费者
  3. 发布订阅模式(Fanout):一个生产者,一个交换机,多个队列,多个消费者,一个消息可以被多个消费者消费
  4. 路由模式(Direct):一个生产者,一个交换机,多个队列,多个消费者,key,消息只发送给符合条件的消息队列
  5. 通配符模式(Topic):通配符和路由模式类似,路由是匹配Key,通配符是模糊匹配,主要符合模糊匹配条件,都可以收到消息

下面粘贴一下,项目中用到的一些RabbitMQ相关内容。

1、配置文件设置RabbitMQ的URL链接

{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft.AspNetCore": "Warning"
    }
  },
  "AllowedHosts": "*",
  "ConnectionStrings": {
    "MySql": "server=192.168.0.140;port=3306;userid=root;password=1qaz@WSX3edc;database=netsituation",
    "RedisConnection": "127.0.0.1:6379,defaultDatabase=0"
  },
  "rabbit": {
    "uri": "amqp://guest:[email protected]:5672/"
  },
  "Kestrel": {
    "EndPoints": {
      "Http": {
        "Url": "http://*:8001"
      }
    }
  }
}
View Code

2、连接字符串类

    /// <summary>
    /// 连接字符串类
    /// </summary>
    public class RabbitOption
    {
        public RabbitOption(IConfiguration config)
        {
            if (config == null)
                throw new ArgumentException(nameof(config));

            var section = config.GetSection("rabbit");
            section.Bind(this);
        }

        public string Uri { get; set; }
    }
View Code

3、通过IOC创建单例的RabbitMQ的客户端

    /// <summary>
    /// 通过IOC创建单例的RabbitMQ的客户端
    /// </summary>
    public static class ServiceCollectionExtensions
    {
        public static IServiceCollection AddeRabbitMQConnection(this IServiceCollection services, IConfiguration configuration)
        {
            if (services == null)
                throw new ArgumentException(nameof(services));

            if (configuration == null)
                throw new ArgumentException(nameof(configuration));

            var rabbitOption = new RabbitOption(configuration);

            var factory = new ConnectionFactory { Uri = new Uri(rabbitOption.Uri), AutomaticRecoveryEnabled = true, NetworkRecoveryInterval = TimeSpan.FromSeconds(60) };

            services.AddSingleton<IRabbitMQPersistentConnection>(x =>
            {
                var logger = x.GetRequiredService<ILogger<RabbitMQPersistentConnection>>();
                return new RabbitMQPersistentConnection(factory, logger);
            });

            return services;
        }
    }
View Code

4、定义连接RabbitMQ的接口

    public interface IRabbitMQPersistentConnection
    {
        /// <summary>
        /// 判断是否连接
        /// </summary>
        bool IsConnected { get; }
        /// <summary>
        /// 连接
        /// </summary>
        /// <returns></returns>
        bool TryConnect();
        /// <summary>
        /// 创建模型
        /// </summary>
        /// <returns></returns>
        IModel CreateModel();
    }
View Code

5、接口实现

    public class RabbitMQPersistentConnection : IRabbitMQPersistentConnection
    {
        private readonly IConnectionFactory connectionFactory;
        private readonly ILogger<RabbitMQPersistentConnection> logger;

        private IConnection connection;

        private const int RETTRYCOUNT = 6;

        private static readonly object lockObj = new object();
        public static bool IsBreak = false;
        public RabbitMQPersistentConnection(IConnectionFactory connectionFactory, ILogger<RabbitMQPersistentConnection> logger)
        {
            this.connectionFactory = connectionFactory;
            this.logger = logger;
        }

        public bool IsConnected
        {
            get
            {
                return connection != null && connection.IsOpen;
            }
        }

        public void Cleanup()
        {
            try
            {
                connection.Dispose();
                connection = null;

            }
            catch (IOException ex)
            {
                logger.LogCritical(ex.ToString());
            }
        }

        public IModel CreateModel()
        {
            if (!IsConnected)
            {
                connection.Close();
                throw new InvalidOperationException("连接不到rabbitmq");
            }
            return connection.CreateModel();
        }

        public bool TryConnect()
        {
            logger.LogInformation("RabbitMQ客户端尝试连接");

            lock (lockObj)
            {
                if (connection == null)
                {
                    var policy = Policy.Handle<SocketException>()
                        .Or<BrokerUnreachableException>()
                        .WaitAndRetry(RETTRYCOUNT, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
                        {
                            logger.LogWarning(ex.ToString());
                        });

                    policy.Execute(() =>
                    {
                        connection = connectionFactory.CreateConnection();
                    });
                }

                if (IsConnected)
                {
                    connection.ConnectionShutdown += OnConnectionShutdown;
                    connection.CallbackException += OnCallbackException;
                    connection.ConnectionBlocked += OnConnectionBlocked;

                    logger.LogInformation($"RabbitMQ{connection.Endpoint.HostName}获取了连接");
                    return true;
                }
                else
                {
                    logger.LogCritical("无法创建和打开RabbitMQ连接");
                    return false;
                }
            }
        }

        private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e)
        {

            logger.LogWarning("RabbitMQ连接异常,尝试重连...");

            Cleanup();
            TryConnect();
        }

        private void OnCallbackException(object sender, CallbackExceptionEventArgs e)
        {

            logger.LogWarning("RabbitMQ连接异常,尝试重连...");

            Cleanup();
            TryConnect();
        }

        private void OnConnectionShutdown(object sender, ShutdownEventArgs reason)
        {

            logger.LogWarning("RabbitMQ连接异常,尝试重连...");
            IsBreak = true;
            Cleanup();
            TryConnect();
        }
    }
View Code

6、定义发布消息接口

    public interface IMessageService
    {
        /// <summary>
        /// 发送消息(工作队列模式)
        /// </summary>
        /// <param name="queueName"></param>
        /// <param name="body"></param>
        void SendMessage(string queueName, byte[] body);

        /// <summary>
        /// 发送消息(发布订阅模式)
        /// </summary>
        /// <param name="exchangeName"></param>
        /// <param name="body"></param>
        void SendFanoutMessage(string exchangeName, byte[] body);
    }
View Code

7、实现发布消息接口

    /// <summary>
    ///  RabbitMQ实现
    /// </summary>
    public class MessageService : IMessageService
    {
        private readonly IRabbitMQPersistentConnection rabbitMQPersistentConnection;
        public MessageService(IRabbitMQPersistentConnection rabbitMQPersistentConnection)
        {
            this.rabbitMQPersistentConnection = rabbitMQPersistentConnection;
        }

        public void SendFanoutMessage(string exchangeName,byte[] body)
        {
            if (!rabbitMQPersistentConnection.IsConnected)
            {
                rabbitMQPersistentConnection.TryConnect();
            }

            using (var channel = rabbitMQPersistentConnection.CreateModel())
            {
                //把交换机设置成Fanout模式
                channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, false, false);
                var properties = channel.CreateBasicProperties();
                properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());
                properties.Persistent = true;
                channel.BasicPublish(exchangeName, "", properties, body);
            }
        }

        public void SendMessage(string queueName, byte[] body)
        {
            if (!rabbitMQPersistentConnection.IsConnected)
            {
                rabbitMQPersistentConnection.TryConnect();
            }

            using (var channel = rabbitMQPersistentConnection.CreateModel())
            {
                channel.QueueDeclare(queueName, true, false, false, null);
                var properties = channel.CreateBasicProperties();
                properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());
                properties.Persistent = true;
                channel.BasicPublish("", queueName, properties, body);
            }
        }
    }
View Code

8、RabbitMQ连接参数

    /// <summary>
    /// RabbitMQ连接参数
    /// </summary>
    public class RabbitMQModel
    {
        /// <summary>
        /// 交换机名称
        /// </summary>
        public string ExchangeName = "pvm";
        /// <summary>
        /// 消息队列名称
        /// </summary>
        public string QueueName { get; set; }
        /// <summary>
        /// 消息内容
        /// </summary>
        public byte[] Body { get; set; }
    }
View Code

 

 

 

 

 

 

 

 

 

 

标签:Core,rabbitMQPersistentConnection,RabbitMQ,public,connection,new,Net,logger
From: https://www.cnblogs.com/zhangjd/p/18179313

相关文章

  • 在Windows运行Gitlab Runner打包基于.NET Framework 4.6.1的项目
    摘要本文详细描述了运行在Windows商的GitlabRunner,如何自动集成.NETFramework的项目。Gitlab中的变量变量1:NUPKG_OUTPUT_ROOT这个目录是在git获取的解决方案根目录之外,因为stages变了以后,当前GitlabRunner工作的当前解决方案根目录下会被清空。我们希望build了以后经过单元......
  • Kubernetes脚本——检查K8S组件/服务/配置/POD
    #!/bin/bash#echo"运维账号是否有执行常用kubectl运维命令的权限,期望结果:输出/apps/bin/kubelet-----------------------------------------">k8s_check_result.txt#ansible-i./hostsk8s-mshell-a"foriin\`sudo-l\`;doecho\$i|grep-Eikubectl;done"&......
  • Kubernetes脚本——K8s日志检查
    #!/bin/bashecho"docker日志采用建议的syslog收集检查,期望结果:1---------------------------------------------------------------------">log_check_result.txtansible-i./hostsall-mshell-a'sudofind/apps/logs/docker/-namedockerd.log|wc-l'&......
  • Kubernetes脚本——检查K8s基础信息
    #!/bin/sh#version#node,master,slave#arch#kernelversion#dockerversion#image#cpu,memandusage#pod,podlimit#service,nodeport,lb#deploy,statefulset,deamonset#cm,secret#namespaces#set-xecho_left(){if["$2&q......
  • DeepFilterNet复现
    大概框架有两路特征,一个ERB特征,另外一个是STFT之后的复数特征。整体时延最低可达5ms。这里提到的DeepFilter,其实就是说用神经网络对TF谱进行操作。因为这篇文章比较早,所以叫这么一个名字。ERB特征ERB(EquivalentRectangularBandwidth)是一个与人耳听觉敏感性密切相关的概念,......
  • 解决远程调用三方接口:javax.net.ssl.SSLHandshakeException:sun.security.validator.Va
    一、前言最近在对接腾讯会议API接口,在鉴权完成后开始调用对方的接口,在此过程中出现调用报错:javax.net.ssl.SSLHandshakeException。二、出现原因当你在进行https请求时,JDK中不存在三方服务的信任证书,导致出现错误javax.net.ssl.SSLHandshakeException:sun.security.validator.Va......
  • FiddlerCore源码
    这是FiddlerCore5.0.2版本源码,支持.netframework和.netcore,百分百c#源码,功能全而且免费,无功能限制,还修复了多处bug。介绍:FiddlerCore可捕获和修改HTTP和HTTPS流量,而无需任何FiddlerUI。特点:HTTP和HTTPS流量捕获和修改。用于内容过滤和修改的强大对象模型。存......
  • .net 8中使用过滤器记录系统日志 ActionFilter+Serilog
    1、添加自定义日志过滤器类usingSerilog;usingMicrosoft.AspNetCore.Mvc.Filters;namespaceADTO.CMS.Common.Filter{///<summary>///日志记录过滤器///</summary>publicclassLogActionFilter:IActionFilter{///<summary>///......
  • Springboot+Netty实现http和ws统一端口处理
    http:/localhost:8080/apiws:/localhost:8080/ws核心就是两个channel处理器,http和wswebsocketpackagecom.example.netty;importio.netty.channel.ChannelHandlerContext;importio.netty.channel.SimpleChannelInboundHandler;importio.netty.handler.codec.http.HttpH......
  • .NET 9 的几个新特性,新颖吗?
    简介继.NET8之后,.NET9在云原生应用程序得到了增强和性能得到提升。它是STS版本,将获得为期18个月的标准支持服务。你可以到官网下载.NET9。它的几个改进如下:序列化在System.Text.Json中,.NET9为序列化JSON提供了新的选项和一个新的单例,使得使用Web默认值进行序列化变得更加容......