首页 > 其他分享 >.NET 中RabbitMQ消息队列的使用

.NET 中RabbitMQ消息队列的使用

时间:2023-03-20 11:58:32浏览次数:46  
标签:string 队列 RabbitMQ consumeIndex static private NET public

一、安装RabbitMQ

  1. 下载安装包

    1) 安装RabbitMQ需要依赖erlang语言环境,所以需要我们下载erlang的环境安装程序。

    1) erlang环境安装程序下载路径:https://www.erlang.org/downloads

    

    2) RabbitMQ安装程序下载路径:https://www.rabbitmq.com/install-windows.html

  2. 安装erlang,配置erlang环境

    1) 添加系统环境变量:变量名 ERLANG_HOME 变量值  C:\Program Files\Erlang OTP  (erlang安装目录)

    2) 添加系统环境变量:变量名 Path 变量值  %ERLANG_HOME%\bin

    3) 验证是否配置成功:打开cmd,输入erl,看到我们的erlang版本号就说明配置成功了
    

  3. 安装RabbitMQ并启动

    (注:安装路径不允许带有空格或者文字,否则可能出现无法访问的问题)

    1) 进入指定目录:{安装路径}\rabbitmq_server-3.11.10\sbin

    2) 打开CMD执行命令 rabbitmq-plugins enable rabbitmq_management  看到启动了一下三项,然后执行 rabbitmqctl start_app,即启动成功

      

    3) 浏览器访问 http://localhost:15672/ 查看是否访问成功,使用 账号密码进行登录,默认账号:guest  密码:guest

    4) 常用命令:net start RabbitMQ(启动MQ)、net stop RabbitMQ(停止MQ)、rabbitmqctl status(查看MQ状态)

二、dotNet Core API 中添加 RabbitMQ

  1. 新建MQ配置信息类 MqConfigInfo

  

  详细代码:

  MqConfigInfo.cs

  2. 创建ReadMqConfigHelper类,用于读取配置文件

  

   详细代码:

  
 1         private readonly ILogger<ReadMqConfigHelper> _logger;
 2         public ReadMqConfigHelper(ILogger<ReadMqConfigHelper> logger)
 3         {
 4             _logger = logger;
 5         }
 6         /// <summary>
 7         /// 读取配置文件
 8         /// </summary>
 9         /// <returns></returns>
10         public List<MqConfigInfo> ReadMqConfig()
11         {
12             try
13             {
14                 //List<MqConfigInfo> config = AppHelper.ReadAppSettings<MqConfigInfo>(new string[] { "MQ" }); // 读取MQ配置信息
15 
16                 List<MqConfigInfo> config = new List<MqConfigInfo>();
17                 MqConfigInfo configInfo = new MqConfigInfo();
18                 configInfo.Host = "127.0.0.1";
19                 configInfo.Port = 5672;
20                 configInfo.User = "admin";
21                 configInfo.Password = "123456";
22                 configInfo.ExchangeName = "ExchangeName_Demo";
23                 configInfo.Durable = true;
24                 config.Add(configInfo);
25 
26                 if (config.Any())
27                 {
28                     return config;
29                 }
30                 _logger.LogError($"获取MQ配置信息失败:没有可用数据集");
31                 return null;
32             }
33             catch (Exception ex)
34             {
35                 _logger.LogError($"获取MQ配置信息失败:{ex.Message}");
36                 return null;
37             }
38         }
ReadMqConfigHelper.cs

  3. 创建MqConnectionHelper类,用于连接和消息发送接收

    NuGet安装 RabbitMQ.Client

  

  

   详细代码:

  
  1         #region RabbitMQ参数定义
  2         /*
  3          备注:使用数组的部分,是给消费端用的。目前生产者只设置了一个,消费者可能存在多个。
  4                      当然,有条件的还可以上RabbitMQ集群进行处理,会更好玩一点。
  5          */
  6 
  7         /// <summary>
  8         /// RabbitMQ工厂:发送端
  9         /// </summary>
 10         private static IConnectionFactory _connectionSendFactory;
 11         /// <summary>
 12         /// RabbitMQ工厂:接收端
 13         /// </summary>
 14         private static IConnectionFactory[] _connectionReceiveFactory;
 15         /// <summary>
 16         /// 连接:发送端
 17         /// </summary>
 18         private static IConnection _connectionSend;
 19         /// <summary>
 20         /// 连接:消费端
 21         /// </summary>
 22         private static IConnection[] _connectionReceive;
 23         /// <summary>
 24         /// 配置信息
 25         /// </summary>
 26         public static List<MqConfigInfo> _mqConfig;
 27         /// <summary>
 28         /// 通道:发送端
 29         /// </summary>
 30         private static IModel _modelSend;
 31         /// <summary>
 32         /// 通道:消费端
 33         /// </summary>
 34         private static IModel[] _modelReceive;
 35 
 36         /// <summary>
 37         /// 事件
 38         /// </summary>
 39         private static EventingBasicConsumer[] _basicConsumer;
 40         #endregion
 41 
 42         private readonly ILogger<MqConnectionHelper> _logger;
 43         public MqConnectionHelper(ILogger<MqConnectionHelper> logger)
 44         {
 45             _logger = logger;
 46 
 47             _connectionReceiveFactory = new IConnectionFactory[_costomerCount];
 48             _connectionReceive = new IConnection[_costomerCount];
 49             _modelReceive = new IModel[_costomerCount];
 50             _basicConsumer = new EventingBasicConsumer[_costomerCount];
 51 
 52         }
 53 
 54         /// <summary>
 55         /// 消费者数量
 56         /// </summary>
 57         public static int _costomerCount = 2;
 58         public static string[] _routingKey = new string[] { "WeskyNet001", "WeskyNet002" };
 59         public static string[] _queueName = new string[] { "Queue001", "Queue002" };
 60 
 61         /// <summary>
 62         /// 生产者初始化连接配置
 63         /// </summary>
 64         public void SendFactoryConnectionInit()
 65         {
 66             _connectionSendFactory = new ConnectionFactory
 67             {
 68                 HostName = _mqConfig.FirstOrDefault().Host,
 69                 Port = _mqConfig.FirstOrDefault().Port,
 70                 UserName = _mqConfig.FirstOrDefault().User,
 71                 Password = _mqConfig.FirstOrDefault().Password
 72             };
 73         }
 74 
 75         /// <summary>
 76         /// 生产者连接
 77         /// </summary>
 78         public void SendFactoryConnection()
 79         {
 80             if (null != _connectionSend && _connectionSend.IsOpen)
 81             {
 82                 // 已有连接
 83                 return; 
 84             }
 85             // 创建生产者连接
 86             _connectionSend = _connectionSendFactory.CreateConnection(); 
 87 
 88             if (null != _modelSend && _modelSend.IsOpen)
 89             {
 90                 // 已有通道
 91                 return; 
 92             }
 93             // 创建生产者通道
 94             _modelSend = _connectionSend.CreateModel(); 
 95             // 定义交换机名称和类型(direct)
 96             _modelSend.ExchangeDeclare(_mqConfig.FirstOrDefault().ExchangeName, ExchangeType.Direct); 
 97 
 98         }
 99 
100         /// <summary>
101         /// 消费者初始化连接配置
102         /// </summary>
103         public void ReceiveFactoryConnectionInit()
104         {
105             var factories = new ConnectionFactory
106             {
107                 HostName = _mqConfig.FirstOrDefault().Host,
108                 Port = _mqConfig.FirstOrDefault().Port,
109                 UserName = _mqConfig.FirstOrDefault().User,
110                 Password = _mqConfig.FirstOrDefault().Password
111             };
112 
113             for (int i = 0; i < _costomerCount; i++)
114             {
115                 _connectionReceiveFactory[i] = factories;  // 给每个消费者绑定一个连接工厂
116             }
117         }
118 
119         /// <summary>
120         /// 消费者连接
121         /// </summary>
122         /// <param name="consumeIndex"></param>
123         /// <param name="exchangeName">交换机名称</param>
124         /// <param name="routeKey">消费者</param>
125         /// <param name="queueName">队列名称</param>
126         public void ConnectionReceive(int consumeIndex, string exchangeName, string routeKey, string queueName)
127         {
128             _logger.LogInformation($"开始连接RabbitMQ消费者:{routeKey}");
129 
130             if (null != _connectionReceive[consumeIndex] && _connectionReceive[consumeIndex].IsOpen)
131             {
132                 //已有消费者连接
133                 return;
134             }
135             // 创建消费者连接
136             _connectionReceive[consumeIndex] = _connectionReceiveFactory[consumeIndex].CreateConnection(); 
137 
138             if (null != _modelReceive[consumeIndex] && _modelReceive[consumeIndex].IsOpen)
139             {
140                 //已有消费者通道
141                 return;
142             }
143             // 创建消费者通道
144             _modelReceive[consumeIndex] = _connectionReceive[consumeIndex].CreateModel();  
145 
146             _basicConsumer[consumeIndex] = new EventingBasicConsumer(_modelReceive[consumeIndex]);
147             // 定义交换机名称和类型  与生产者保持一致
148             _modelReceive[consumeIndex].ExchangeDeclare(exchangeName, ExchangeType.Direct); 
149             // 定义消费者队列
150             _modelReceive[consumeIndex].QueueDeclare(
151                          queue: queueName, //消息队列名称
152                          durable: _mqConfig.FirstOrDefault().Durable, // 是否可持久化,此处配置在文件中,默认全局持久化(true),也可以自定义更改
153                          exclusive: false,
154                          autoDelete: false,
155                          arguments: null
156            );  
157 
158             // 队列绑定给指定的交换机
159             _modelReceive[consumeIndex].QueueBind(queueName, exchangeName, routeKey); 
160             // 设置消费者每次只接收一条消息
161             _modelReceive[consumeIndex].BasicQos(0, 1, false); 
162 
163             StartListener((model, ea) =>
164             {
165                 // 接收到的消息
166                 byte[] message = ea.Body.ToArray(); 
167 
168                 string msg = Encoding.UTF8.GetString(message);
169 
170                 _logger.LogInformation($"队列{queueName}接收到消息:{msg}");
171                 Thread.Sleep(2000);
172 
173                 _modelReceive[consumeIndex].BasicAck(ea.DeliveryTag, true);
174             }, queueName, consumeIndex);
175 
176         }
177 
178         /// <summary>
179         /// 消费者接收消息的确认机制
180         /// </summary>
181         /// <param name="basicDeliverEventArgs"></param>
182         /// <param name="queueName">队列名</param>
183         /// <param name="consumeIndex"></param>
184         private static void StartListener(EventHandler<BasicDeliverEventArgs> basicDeliverEventArgs, string queueName, int consumeIndex)
185         {
186             _basicConsumer[consumeIndex].Received += basicDeliverEventArgs;
187             _modelReceive[consumeIndex].BasicConsume(queue: queueName, autoAck: false, consumer: _basicConsumer[consumeIndex]); // 设置手动确认。
188         }
189 
190         /// <summary>
191         /// 消息发布
192         /// </summary>
193         /// <param name="message">消息</param>
194         /// <param name="exchangeName">交换机名</param>
195         /// <param name="routingKey"></param>
196         public static void PublishExchange(string message, string exchangeName, string routingKey = "")
197         {
198             byte[] body = Encoding.UTF8.GetBytes(message);
199             _modelSend.BasicPublish(exchangeName, routingKey, null, body);
200         }
MqConnectionHelper.cs

三、dotNet Core Api 测试 使用RabbitMQ

  1. 注入RabbitMQ项目

  

  
1 var builder = WebApplication.CreateBuilder(args);
2 builder.Services.AddScoped<IMqConnectionHelper, MqConnectionHelper>();
3 builder.Services.AddScoped<IReadMqConfigHelper, ReadMqConfigHelper>();
Program.cs

  2. 使用API测试

  

 

   详细代码:

  
 1         private readonly IMqConnectionHelper _mqConnection;
 2         private readonly IReadMqConfigHelper _mqConfigHelper;
 3         public RabbitMQController(IMqConnectionHelper mqConnection, IReadMqConfigHelper mqConfigHelper)
 4         {
 5             _mqConnection = mqConnection;
 6             _mqConfigHelper = mqConfigHelper;
 7         }
 8 
 9         [HttpGet]
10         public Task StartAsync(CancellationToken token)
11         {
12             InitRabbitMQConnection();
13             return Task.CompletedTask;
14         }
15 
16         private void InitRabbitMQConnection()
17         {
18             //读取配置信息
19             MqConnectionHelper._mqConfig = _mqConfigHelper.ReadMqConfig();
20 
21             Console.WriteLine("开始生产者连接");
22             _mqConnection.SendFactoryConnectionInit();
23             _mqConnection.SendFactoryConnection();
24 
25             Console.WriteLine("生产者连接完成,开始连接消费者");
26 
27             _mqConnection.ReceiveFactoryConnectionInit();
28             for(int i=0;i<MqConnectionHelper._costomerCount;i++)
29             {
30                 _mqConnection.ConnectionReceive(i, MqConnectionHelper._mqConfig.FirstOrDefault().ExchangeName, MqConnectionHelper._routingKey[i], MqConnectionHelper._queueName[i]);
31             }
32             Console.WriteLine("消费者连接完成,准备发送消息");
33 
34             for(int i=0;i<10;i++)
35             {
36                     MqConnectionHelper.PublishExchange("Hello,MQ~~~" + i.ToString(), MqConnectionHelper._mqConfig.FirstOrDefault().ExchangeName, MqConnectionHelper._routingKey[i % 2]);
37             }
38 
39 
40             Console.WriteLine("消息发送完成");
41         }
View Code

  3. 调用接口,查看效果

  

 

 

 

该方案来自网络加以修改,如有侵权,请联系删除

 

标签:string,队列,RabbitMQ,consumeIndex,static,private,NET,public
From: https://www.cnblogs.com/2023-02-14/p/17227884.html

相关文章