本节内容,部分为补充内容,部分涉及到9.3.10-9.3.12(P335-342)。主要NuGet包:
- RabbitMQ.Client
微服务间,跨进程的事件发布和订阅,需要借助第三方服务器作为事件总线,目前常用的有Redis、RabbitMQ、Kafka等,本章节介绍RabbitMQ。
一、基本过程
1、生产者(通常指一个微服务应用),将消息发布到RabbitMQ服务器的指定交换机,如果没有则新建一个交换机,每个消息都有一个routingKey(本质是一个事件)。
2、消费者(通常指另一个微服务应用),声明一个队列用于接收消息,队列通过交换机和routingKey进行匹配,监听指定交换机和routingKey的消息。
3、生产者发布消息或消费者接收消息,首先要与服务器建立TCP连接,TCP连接比较消耗资源,所以建立连接后,通过创建虚拟信道来发布和接收消息,发布或接收到消息后,信道关闭,但TCP连接会保留重复使用。
4、如果有A和B两个消费者,想读取同一条消息,应分别声明不同的队列名称,即同一条消息可以被多个消费者接收到。如果声明的队列名称相同,则同一条消息按先到先得的方式被其中一个消费者接收。
5、消息失败重发,消费者如果在接收消息进行处理时出错,可以通知队列“消息处理出错”,队列会重新发送消息。
二、两个控制台应用的事件发布和订阅案例。由于RabbitMQ的使用比较繁琐,所在大多数框架都会对RabbitMQ进行二次封装,并和领域事件进行集成,使我们在使用事件机制时,无论是领域事件,还是集成事件,API和用法都基本相同。但仍然需要了解一下RabbitMQ的原生使用方式。下面代码,是书中案例,但跑不起来,因为主要使用框架开发,都有集成事件的封装,所以懒得高度了,主要是了解一下集成事件底层的代码实现原理。
1、创建一个控制台应用,作为生产者
//声明RabbitMQ服务器,案例中以开发本机作为服务器 var factory = new ConnectionFactory(); factory.HostName = "localhost"; factory.DispatchConsumersAsync = true; //命名交换机和路由名称,路由routinKey本质是一个事件,消费端通过routingKey来匹配队列 string exchangeName = "exchange1"; string routingKey = "event1"; //声明TCP连接,多个信道复用这个连接 using var conn = factory.CreateConnection(); //创建虚拟信道、连接交换机,发布信息 //本案例中,每延迟1秒,就发送一次信息 while (true) { //定义传递的信息,每次发关事件 string msg = DateTime.Now.TimeOfDay.ToString(); //创建一个虚拟信道,连接交换机,发送消息 using (var channel = conn.CreateModel()) { //准备①:配置信道属性 var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; //准备②:声明交接机。本例中,每次新的信道都使用相同的交接机 channel.ExchangeDeclare(exchange:exchangeName,type:"direct"); //准备③:RabbitMQ中的消息都是按照byte[]来传递,将消息转为byte[]格式 byte[] body = Encoding.UTF8.GetBytes(msg); //发布消息,参数分别为交换机、routingKey(事件)、消息、信息属性 channel.BasicPublish(exchange:exchangeName,routingKey:routingKey,body:body,basicProperties:properties,mandatory:true); } //每次发送完信息后,都关闭信道 Console.WriteLine($"发布了消息:{msg}"); //暂停1秒后,循环发送消息 Thread.Sleep(1000); }
2、创建另一个控制台应用,作为消费者
//声明RabbitMQ服务器,案例中以开发本机作为服务器 var factory = new ConnectionFactory(); factory.HostName = "127.0.0.1"; factory.DispatchConsumersAsync = true; //定义交换机和路由名称 string exchangeName = "exchange1"; string routingKey = "event1"; //声明TCP连接,多个信道复用这个连接 using var conn = factory.CreateConnection(); //声明一个信道 using var channel = conn.CreateModel(); //声明交换机 channel.ExchangeDeclare(exchange:exchangeName,type:"direct"); //声明队列 string queueName = "queue1"; channel.QueueDeclare(queue:queueName,durable:true,exclusive:false,autoDelete:false,arguments:null); //将队列和指定交换机及routingKey绑定 channel.QueueBind(queue: queueName, exchange:exchangeName,routingKey:routingKey); //通过信道,从队列中接收消息 var consumer = new AsyncEventingBasicConsumer(channel); //定义一个事件处理函数 consumer.Received += ConsumerReceived ; //执行事件处理 channel.BasicConsume(queue:queueName,autoAck:false,consumer:consumer); Console.ReadLine(); //收到消息后的事件处理函数 async Task ConsumerReceived(object sender,BasicDeliverEventArgs args) { try { var bytes = args.Body.ToArray(); string msg = Encoding.UTF8.GetString(bytes); Console.WriteLine(DateTime.Now + "收到了消息" + msg); channel.BasicAck(args.DeliveryTag, multiple: false); //通知队外,事件处理成功 await Task.Delay(1000); } catch (Exception ex) { channel.BasicReject(args.DeliveryTag, true); //通知队列,事件处理失败,请重发 Console.WriteLine("处理消息出错"+ex); } }
三、书中有提供一个Zack.EventBus库,提供集成事件的再封装,极大的简化了集成事件的使用,详见P338-342
特别说明:
1、本系列内容主要基于杨中科老师的书籍《ASP.NET Core技术内幕与项目实战》及配套的B站视频视频教程,同时会增加极少部分的小知识点
2、本系列教程主要目的是提炼知识点,追求快准狠,以求快速复习,如果说书籍学习的效率是视频的2倍,那么“简读系列”应该做到再快3-5倍
标签:Core,ASP,routingKey,RabbitMQ,信道,交换机,消息,channel,5.5 From: https://www.cnblogs.com/functionMC/p/16930116.html