首页 > 编程语言 >《ASP.NET Core技术内幕与项目实战》精简集-DDD准备5.5:集成事件RabbitMQ

《ASP.NET Core技术内幕与项目实战》精简集-DDD准备5.5:集成事件RabbitMQ

时间:2022-11-27 17:26:10浏览次数:71  
标签:Core ASP routingKey RabbitMQ 信道 交换机 消息 channel 5.5

 

本节内容,部分为补充内容,部分涉及到9.3.10-9.3.12(P335-342)。主要NuGet包:

  • RabbitMQ.Client

 

微服务间,跨进程的事件发布和订阅,需要借助第三方服务器作为事件总线,目前常用的有Redis、RabbitMQ、Kafka等,本章节介绍RabbitMQ。

一、基本过程

1、生产者(通常指一个微服务应用),将消息发布到RabbitMQ服务器的指定交换机,如果没有则新建一个交换机,每个消息都有一个routingKey(本质是一个事件)。

2、消费者(通常指另一个微服务应用),声明一个队列用于接收消息,队列通过交换机和routingKey进行匹配,监听指定交换机和routingKey的消息。

3、生产者发布消息或消费者接收消息,首先要与服务器建立TCP连接,TCP连接比较消耗资源,所以建立连接后,通过创建虚拟信道来发布和接收消息,发布或接收到消息后,信道关闭,但TCP连接会保留重复使用。

4、如果有A和B两个消费者,想读取同一条消息,应分别声明不同的队列名称,即同一条消息可以被多个消费者接收到。如果声明的队列名称相同,则同一条消息按先到先得的方式被其中一个消费者接收。

5、消息失败重发,消费者如果在接收消息进行处理时出错,可以通知队列“消息处理出错”,队列会重新发送消息。

 

 

 

 

 

二、两个控制台应用的事件发布和订阅案例。由于RabbitMQ的使用比较繁琐,所在大多数框架都会对RabbitMQ进行二次封装,并和领域事件进行集成,使我们在使用事件机制时,无论是领域事件,还是集成事件,API和用法都基本相同。但仍然需要了解一下RabbitMQ的原生使用方式。下面代码,是书中案例,但跑不起来,因为主要使用框架开发,都有集成事件的封装,所以懒得高度了,主要是了解一下集成事件底层的代码实现原理。

1、创建一个控制台应用,作为生产者

//声明RabbitMQ服务器,案例中以开发本机作为服务器
var factory = new ConnectionFactory();
factory.HostName = "localhost";
factory.DispatchConsumersAsync = true;

//命名交换机和路由名称,路由routinKey本质是一个事件,消费端通过routingKey来匹配队列
string exchangeName = "exchange1";
string routingKey = "event1";

//声明TCP连接,多个信道复用这个连接
using var conn = factory.CreateConnection();

//创建虚拟信道、连接交换机,发布信息
//本案例中,每延迟1秒,就发送一次信息
while (true)
{
    //定义传递的信息,每次发关事件
    string msg = DateTime.Now.TimeOfDay.ToString();

    //创建一个虚拟信道,连接交换机,发送消息
    using (var channel = conn.CreateModel())
    {
        //准备①:配置信道属性
        var properties = channel.CreateBasicProperties();
        properties.DeliveryMode = 2;

        //准备②:声明交接机。本例中,每次新的信道都使用相同的交接机
        channel.ExchangeDeclare(exchange:exchangeName,type:"direct");

        //准备③:RabbitMQ中的消息都是按照byte[]来传递,将消息转为byte[]格式
        byte[] body = Encoding.UTF8.GetBytes(msg);

        //发布消息,参数分别为交换机、routingKey(事件)、消息、信息属性
        channel.BasicPublish(exchange:exchangeName,routingKey:routingKey,body:body,basicProperties:properties,mandatory:true);
    }

    //每次发送完信息后,都关闭信道
    Console.WriteLine($"发布了消息:{msg}");

    //暂停1秒后,循环发送消息
    Thread.Sleep(1000);
}

 

2、创建另一个控制台应用,作为消费者

//声明RabbitMQ服务器,案例中以开发本机作为服务器
var factory = new ConnectionFactory();
factory.HostName = "127.0.0.1";
factory.DispatchConsumersAsync = true;

//定义交换机和路由名称
string exchangeName = "exchange1";
string routingKey = "event1";

//声明TCP连接,多个信道复用这个连接
using var conn = factory.CreateConnection();

//声明一个信道
using var channel = conn.CreateModel();

//声明交换机
channel.ExchangeDeclare(exchange:exchangeName,type:"direct");

//声明队列
string queueName = "queue1";
channel.QueueDeclare(queue:queueName,durable:true,exclusive:false,autoDelete:false,arguments:null);

//将队列和指定交换机及routingKey绑定
channel.QueueBind(queue: queueName, exchange:exchangeName,routingKey:routingKey);

//通过信道,从队列中接收消息
var consumer = new AsyncEventingBasicConsumer(channel);

//定义一个事件处理函数
consumer.Received += ConsumerReceived ;

//执行事件处理
channel.BasicConsume(queue:queueName,autoAck:false,consumer:consumer);
Console.ReadLine();

//收到消息后的事件处理函数
async Task ConsumerReceived(object sender,BasicDeliverEventArgs args)
{
    try
    {
        var bytes = args.Body.ToArray();
        string msg = Encoding.UTF8.GetString(bytes);
        Console.WriteLine(DateTime.Now + "收到了消息" + msg);
        channel.BasicAck(args.DeliveryTag, multiple: false); //通知队外,事件处理成功
        await Task.Delay(1000);
    }
    catch (Exception ex)
    {
        channel.BasicReject(args.DeliveryTag, true); //通知队列,事件处理失败,请重发
        Console.WriteLine("处理消息出错"+ex);
    }
}

 

 

三、书中有提供一个Zack.EventBus库,提供集成事件的再封装,极大的简化了集成事件的使用,详见P338-342

 

 

特别说明:
1、本系列内容主要基于杨中科老师的书籍《ASP.NET Core技术内幕与项目实战》及配套的B站视频视频教程,同时会增加极少部分的小知识点
2、本系列教程主要目的是提炼知识点,追求快准狠,以求快速复习,如果说书籍学习的效率是视频的2倍,那么“简读系列”应该做到再快3-5倍

 

标签:Core,ASP,routingKey,RabbitMQ,信道,交换机,消息,channel,5.5
From: https://www.cnblogs.com/functionMC/p/16930116.html

相关文章

  • 基于.NetCore开发博客项目 StarBlog - (20) 图片显示优化
    前言我的服务器带宽比较高,博客部署在上面访问的时候几乎没感觉有加载延迟,就没做图片这块的优化,不过最近有小伙伴说博客的图片加载比较慢,那就来把图片优化完善一下吧~目前......
  • asp.net中的报销多级审批工作流 (状态机版本)
    asp.net中的报销多级审批工作流(状态机版本)     上篇​​asp.net中的报销多级审批工作流​​,提到参考了网上一个具体的项目,项目中用状态机工作流完成,基于学习......
  • .NET Core中的AOP
    1.AOP的应用场景AOP全称AspectOrientedProgarmming(面向切面编程),其实AOP对ASP.NET程序员来说一点都不神秘,你也许早就通过Filter来完成一些通用的功能,例如你使用Authori......
  • ASP.NET获取远程网页下载到本地文件
    通过ASP.NET生成静态文件的文章网上有好多文章,而本站也有不少的相关文章教程,通常ASP.NET生成静态文件的做法是使用文件流读取模板内容,之后替换模板内容中相关关键字,再生成静......
  • ASP.NET获取远程网页下载到本地文件
    通过ASP.NET生成静态文件的文章网上有好多文章,而本站也有不少的相关文章教程,通常ASP.NET生成静态文件的做法是使用文件流读取模板内容,之后替换模板内容中相关关键字,再生成静......
  • ASP.NET Core教程-Model Binding(模型绑定)
    更新记录转载请注明出处:2022年11月27日发布。2022年11月25日从笔记迁移到博客。模型绑定是什么模型绑定是指:使用来自HTTP请求的值来创建.NET对象的过程。模型绑......
  • 会话丢失-NGINX配置之underscores_in_headers
    1.描述问题NGINX代理某个web服务时,单机情况下也出现不停的要求认证的情况初步分析去掉NGINX代理,直接访问服务,未出现上述情况;进一步分析:查看经过NGINX的请求和直接访问服......
  • 1527_AURIX_TriCore内核架构开篇与架构概述
    全部学习汇总:​​GreyZhang/g_tricore_architecture:somelearningnoteabouttricorearchitecture.(github.com)​​看文档的时候,引用了内核架构的内容。这方面我没有......
  • 跨平台网页开发框架ASP.NET Core 7性能大提升,正式支持HTTP/3
     微软跨平台网页开发框架ASP.NETCore即将迎来第7个主要版本,由于.NET7对性能的诸多改善,ASP.NETCore7也同时受益,官方解释,由于.NET7在性能部分的提升,许多方面直接或间接......
  • .NetCore【工作应用】AutoMapper
    AutoMapperOOM(Object-Object-Mapping)组件为了实现实体间的相互转换,从而避免我们每次采用手工的方式进行转换。使用安装nuget包install-packageAutoMapperinstall-......