一、安装RabbitMQ
1. 下载安装包
1) 安装RabbitMQ需要依赖erlang语言环境,所以需要我们下载erlang的环境安装程序。
1) erlang环境安装程序下载路径:https://www.erlang.org/downloads
2) RabbitMQ安装程序下载路径:https://www.rabbitmq.com/install-windows.html
2. 安装erlang,配置erlang环境
1) 添加系统环境变量:变量名 ERLANG_HOME 变量值 C:\Program Files\Erlang OTP (erlang安装目录)
2) 添加系统环境变量:变量名 Path 变量值 %ERLANG_HOME%\bin
3) 验证是否配置成功:打开cmd,输入erl,看到我们的erlang版本号就说明配置成功了
3. 安装RabbitMQ并启动
(注:安装路径不允许带有空格或者文字,否则可能出现无法访问的问题)
1) 进入指定目录:{安装路径}\rabbitmq_server-3.11.10\sbin
2) 打开CMD执行命令 rabbitmq-plugins enable rabbitmq_management 看到启动了一下三项,然后执行 rabbitmqctl start_app,即启动成功
3) 浏览器访问 http://localhost:15672/ 查看是否访问成功,使用 账号密码进行登录,默认账号:guest 密码:guest
4) 常用命令:net start RabbitMQ(启动MQ)、net stop RabbitMQ(停止MQ)、rabbitmqctl status(查看MQ状态)
二、dotNet Core API 中添加 RabbitMQ
1. 新建MQ配置信息类 MqConfigInfo
详细代码:
MqConfigInfo.cs2. 创建ReadMqConfigHelper类,用于读取配置文件
详细代码:
1 private readonly ILogger<ReadMqConfigHelper> _logger; 2 public ReadMqConfigHelper(ILogger<ReadMqConfigHelper> logger) 3 { 4 _logger = logger; 5 } 6 /// <summary> 7 /// 读取配置文件 8 /// </summary> 9 /// <returns></returns> 10 public List<MqConfigInfo> ReadMqConfig() 11 { 12 try 13 { 14 //List<MqConfigInfo> config = AppHelper.ReadAppSettings<MqConfigInfo>(new string[] { "MQ" }); // 读取MQ配置信息 15 16 List<MqConfigInfo> config = new List<MqConfigInfo>(); 17 MqConfigInfo configInfo = new MqConfigInfo(); 18 configInfo.Host = "127.0.0.1"; 19 configInfo.Port = 5672; 20 configInfo.User = "admin"; 21 configInfo.Password = "123456"; 22 configInfo.ExchangeName = "ExchangeName_Demo"; 23 configInfo.Durable = true; 24 config.Add(configInfo); 25 26 if (config.Any()) 27 { 28 return config; 29 } 30 _logger.LogError($"获取MQ配置信息失败:没有可用数据集"); 31 return null; 32 } 33 catch (Exception ex) 34 { 35 _logger.LogError($"获取MQ配置信息失败:{ex.Message}"); 36 return null; 37 } 38 }ReadMqConfigHelper.cs
3. 创建MqConnectionHelper类,用于连接和消息发送接收
NuGet安装 RabbitMQ.Client
详细代码:
1 #region RabbitMQ参数定义 2 /* 3 备注:使用数组的部分,是给消费端用的。目前生产者只设置了一个,消费者可能存在多个。 4 当然,有条件的还可以上RabbitMQ集群进行处理,会更好玩一点。 5 */ 6 7 /// <summary> 8 /// RabbitMQ工厂:发送端 9 /// </summary> 10 private static IConnectionFactory _connectionSendFactory; 11 /// <summary> 12 /// RabbitMQ工厂:接收端 13 /// </summary> 14 private static IConnectionFactory[] _connectionReceiveFactory; 15 /// <summary> 16 /// 连接:发送端 17 /// </summary> 18 private static IConnection _connectionSend; 19 /// <summary> 20 /// 连接:消费端 21 /// </summary> 22 private static IConnection[] _connectionReceive; 23 /// <summary> 24 /// 配置信息 25 /// </summary> 26 public static List<MqConfigInfo> _mqConfig; 27 /// <summary> 28 /// 通道:发送端 29 /// </summary> 30 private static IModel _modelSend; 31 /// <summary> 32 /// 通道:消费端 33 /// </summary> 34 private static IModel[] _modelReceive; 35 36 /// <summary> 37 /// 事件 38 /// </summary> 39 private static EventingBasicConsumer[] _basicConsumer; 40 #endregion 41 42 private readonly ILogger<MqConnectionHelper> _logger; 43 public MqConnectionHelper(ILogger<MqConnectionHelper> logger) 44 { 45 _logger = logger; 46 47 _connectionReceiveFactory = new IConnectionFactory[_costomerCount]; 48 _connectionReceive = new IConnection[_costomerCount]; 49 _modelReceive = new IModel[_costomerCount]; 50 _basicConsumer = new EventingBasicConsumer[_costomerCount]; 51 52 } 53 54 /// <summary> 55 /// 消费者数量 56 /// </summary> 57 public static int _costomerCount = 2; 58 public static string[] _routingKey = new string[] { "WeskyNet001", "WeskyNet002" }; 59 public static string[] _queueName = new string[] { "Queue001", "Queue002" }; 60 61 /// <summary> 62 /// 生产者初始化连接配置 63 /// </summary> 64 public void SendFactoryConnectionInit() 65 { 66 _connectionSendFactory = new ConnectionFactory 67 { 68 HostName = _mqConfig.FirstOrDefault().Host, 69 Port = _mqConfig.FirstOrDefault().Port, 70 UserName = _mqConfig.FirstOrDefault().User, 71 Password = _mqConfig.FirstOrDefault().Password 72 }; 73 } 74 75 /// <summary> 76 /// 生产者连接 77 /// </summary> 78 public void SendFactoryConnection() 79 { 80 if (null != _connectionSend && _connectionSend.IsOpen) 81 { 82 // 已有连接 83 return; 84 } 85 // 创建生产者连接 86 _connectionSend = _connectionSendFactory.CreateConnection(); 87 88 if (null != _modelSend && _modelSend.IsOpen) 89 { 90 // 已有通道 91 return; 92 } 93 // 创建生产者通道 94 _modelSend = _connectionSend.CreateModel(); 95 // 定义交换机名称和类型(direct) 96 _modelSend.ExchangeDeclare(_mqConfig.FirstOrDefault().ExchangeName, ExchangeType.Direct); 97 98 } 99 100 /// <summary> 101 /// 消费者初始化连接配置 102 /// </summary> 103 public void ReceiveFactoryConnectionInit() 104 { 105 var factories = new ConnectionFactory 106 { 107 HostName = _mqConfig.FirstOrDefault().Host, 108 Port = _mqConfig.FirstOrDefault().Port, 109 UserName = _mqConfig.FirstOrDefault().User, 110 Password = _mqConfig.FirstOrDefault().Password 111 }; 112 113 for (int i = 0; i < _costomerCount; i++) 114 { 115 _connectionReceiveFactory[i] = factories; // 给每个消费者绑定一个连接工厂 116 } 117 } 118 119 /// <summary> 120 /// 消费者连接 121 /// </summary> 122 /// <param name="consumeIndex"></param> 123 /// <param name="exchangeName">交换机名称</param> 124 /// <param name="routeKey">消费者</param> 125 /// <param name="queueName">队列名称</param> 126 public void ConnectionReceive(int consumeIndex, string exchangeName, string routeKey, string queueName) 127 { 128 _logger.LogInformation($"开始连接RabbitMQ消费者:{routeKey}"); 129 130 if (null != _connectionReceive[consumeIndex] && _connectionReceive[consumeIndex].IsOpen) 131 { 132 //已有消费者连接 133 return; 134 } 135 // 创建消费者连接 136 _connectionReceive[consumeIndex] = _connectionReceiveFactory[consumeIndex].CreateConnection(); 137 138 if (null != _modelReceive[consumeIndex] && _modelReceive[consumeIndex].IsOpen) 139 { 140 //已有消费者通道 141 return; 142 } 143 // 创建消费者通道 144 _modelReceive[consumeIndex] = _connectionReceive[consumeIndex].CreateModel(); 145 146 _basicConsumer[consumeIndex] = new EventingBasicConsumer(_modelReceive[consumeIndex]); 147 // 定义交换机名称和类型 与生产者保持一致 148 _modelReceive[consumeIndex].ExchangeDeclare(exchangeName, ExchangeType.Direct); 149 // 定义消费者队列 150 _modelReceive[consumeIndex].QueueDeclare( 151 queue: queueName, //消息队列名称 152 durable: _mqConfig.FirstOrDefault().Durable, // 是否可持久化,此处配置在文件中,默认全局持久化(true),也可以自定义更改 153 exclusive: false, 154 autoDelete: false, 155 arguments: null 156 ); 157 158 // 队列绑定给指定的交换机 159 _modelReceive[consumeIndex].QueueBind(queueName, exchangeName, routeKey); 160 // 设置消费者每次只接收一条消息 161 _modelReceive[consumeIndex].BasicQos(0, 1, false); 162 163 StartListener((model, ea) => 164 { 165 // 接收到的消息 166 byte[] message = ea.Body.ToArray(); 167 168 string msg = Encoding.UTF8.GetString(message); 169 170 _logger.LogInformation($"队列{queueName}接收到消息:{msg}"); 171 Thread.Sleep(2000); 172 173 _modelReceive[consumeIndex].BasicAck(ea.DeliveryTag, true); 174 }, queueName, consumeIndex); 175 176 } 177 178 /// <summary> 179 /// 消费者接收消息的确认机制 180 /// </summary> 181 /// <param name="basicDeliverEventArgs"></param> 182 /// <param name="queueName">队列名</param> 183 /// <param name="consumeIndex"></param> 184 private static void StartListener(EventHandler<BasicDeliverEventArgs> basicDeliverEventArgs, string queueName, int consumeIndex) 185 { 186 _basicConsumer[consumeIndex].Received += basicDeliverEventArgs; 187 _modelReceive[consumeIndex].BasicConsume(queue: queueName, autoAck: false, consumer: _basicConsumer[consumeIndex]); // 设置手动确认。 188 } 189 190 /// <summary> 191 /// 消息发布 192 /// </summary> 193 /// <param name="message">消息</param> 194 /// <param name="exchangeName">交换机名</param> 195 /// <param name="routingKey"></param> 196 public static void PublishExchange(string message, string exchangeName, string routingKey = "") 197 { 198 byte[] body = Encoding.UTF8.GetBytes(message); 199 _modelSend.BasicPublish(exchangeName, routingKey, null, body); 200 }MqConnectionHelper.cs
三、dotNet Core Api 测试 使用RabbitMQ
1. 注入RabbitMQ项目
1 var builder = WebApplication.CreateBuilder(args); 2 builder.Services.AddScoped<IMqConnectionHelper, MqConnectionHelper>(); 3 builder.Services.AddScoped<IReadMqConfigHelper, ReadMqConfigHelper>();Program.cs
2. 使用API测试
详细代码:
1 private readonly IMqConnectionHelper _mqConnection; 2 private readonly IReadMqConfigHelper _mqConfigHelper; 3 public RabbitMQController(IMqConnectionHelper mqConnection, IReadMqConfigHelper mqConfigHelper) 4 { 5 _mqConnection = mqConnection; 6 _mqConfigHelper = mqConfigHelper; 7 } 8 9 [HttpGet] 10 public Task StartAsync(CancellationToken token) 11 { 12 InitRabbitMQConnection(); 13 return Task.CompletedTask; 14 } 15 16 private void InitRabbitMQConnection() 17 { 18 //读取配置信息 19 MqConnectionHelper._mqConfig = _mqConfigHelper.ReadMqConfig(); 20 21 Console.WriteLine("开始生产者连接"); 22 _mqConnection.SendFactoryConnectionInit(); 23 _mqConnection.SendFactoryConnection(); 24 25 Console.WriteLine("生产者连接完成,开始连接消费者"); 26 27 _mqConnection.ReceiveFactoryConnectionInit(); 28 for(int i=0;i<MqConnectionHelper._costomerCount;i++) 29 { 30 _mqConnection.ConnectionReceive(i, MqConnectionHelper._mqConfig.FirstOrDefault().ExchangeName, MqConnectionHelper._routingKey[i], MqConnectionHelper._queueName[i]); 31 } 32 Console.WriteLine("消费者连接完成,准备发送消息"); 33 34 for(int i=0;i<10;i++) 35 { 36 MqConnectionHelper.PublishExchange("Hello,MQ~~~" + i.ToString(), MqConnectionHelper._mqConfig.FirstOrDefault().ExchangeName, MqConnectionHelper._routingKey[i % 2]); 37 } 38 39 40 Console.WriteLine("消息发送完成"); 41 }View Code
3. 调用接口,查看效果
该方案来自网络加以修改,如有侵权,请联系删除
标签:string,队列,RabbitMQ,consumeIndex,static,private,NET,public From: https://www.cnblogs.com/2023-02-14/p/17227884.html