首页 > 其他分享 >RabbitMQ在.net core中的应用

RabbitMQ在.net core中的应用

时间:2024-09-24 14:48:58浏览次数:8  
标签:core logger 队列 RabbitMQ options 消息 net public

RabbitMQ 是一个可靠且成熟的消息传递和流代理,它很容易部署在云环境、内部部署和本地机器上。它目前被全世界数百万人使用。

1.基本概念

生产者(Producer)

生产者是一个发送消息的程序。发送消息的程序可以是任何语言编写的,只要它能够连接到RabbitMQ服务器,并且能够发送消息到RabbitMQ服务器。

消费者(Consumer)

消费者是一个接收消息的程序。接收消息的程序可以是任何语言编写的,只要它能够连接到RabbitMQ服务器,并且能够从RabbitMQ服务器接收消息。

队列(Queue)

队列是RabbitMQ的内部对象,用于存储消息。多个生产者可以向一个队列发送消息,多个消费者可以尝试从一个队列接收消息。队列支持多种消息分发策略。

交换机(Exchange)

交换机是消息的分发中心。它接收来自生产者的消息,然后将这些消息分发给队列。交换机有多种类型,包括直连交换机、主题交换机、扇形交换机、头交换机。

绑定(Binding)

绑定是交换机和队列之间的关联关系。绑定可以使用路由键进行绑定,也可以使用通配符进行绑定。

路由键(Routing Key)

路由键是生产者发送消息时附带的一个属性。路由键的作用是决定消息被分发到哪个队列。

通配符(Wildcard)

通配符是一种模式匹配的方式。RabbitMQ支持两种通配符:`*`和`#`。

绑定键(Binding Key)

绑定键是交换机和队列之间的关联关系。绑定键可以使用路由键进行绑定,也可以使用通配符进行绑定。

持久化(Durable)

持久化是指RabbitMQ服务器重启后,消息是否还存在。持久化可以应用到交换机、队列、绑定、消息等。

确认机制(Acknowledge)

确认机制是指消费者接收到消息后,向RabbitMQ服务器发送一个确认消息。RabbitMQ服务器收到确认消息后,会删除这条消息。

自动确认
	消费者接收到消息后,RabbitMQ服务器会自动删除这条消息。

手动确认
	消费者接收到消息后,需要向RabbitMQ服务器发送一个确认消息。RabbitMQ服务器收到确认消息后,会删除这条消息。

拒绝机制(Reject)

拒绝机制是指消费者接收到消息后,向RabbitMQ服务器发送一个拒绝消息。RabbitMQ服务器收到拒绝消息后,会将这条消息重新发送给其他消费者。

死信队列(Dead Letter Queue)

死信队列是指消息被拒绝、过期或者达到最大重试次数后,会被发送到死信队列。

消息过期(Message TTL)

消息过期是指消息在指定时间内没有被消费者消费,会被删除。

消息优先级(Message Priority)

消息优先级是指消息在队列中的优先级。消息优先级高的消息会被优先消费。

消息分发

消息分发是指消息在队列中的分发策略。消息分发策略包括轮询分发、公平分发、负载均衡分发。

2.环境搭建

Docker 安装 RabbitMQ

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 --restart=always --hostname my-rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -e TZ=Asia/Shanghai rabbitmq:management
  • -d:后台运行
  • --restart:重启策略
  • --name:容器名称
  • -p:端口映射
  • --hostname:主机名
  • -e:环境变量
    • RABBITMQ_DEFAULT_USER:默认用户名
    • RABBITMQ_DEFAULT_PASS:默认密码
  • TZ:时区
  • rabbitmq:management:镜像名称

Docker Compose 安装 RabbitMQ

version: "3.1"
services:
rabbitmq:
    restart: always
    image: rabbitmq:management
    container_name: rabbitmq
    hostname: my-rabbit
    ports:
        - 5672:5672
        - 15672:15672 # RabbitMQ管理界面端口
    environment:
        TZ: Asia/Shanghai
            RABBITMQ_DEFAULT_USER: admin
            RABBITMQ_DEFAULT_PASS: admin
  • restart:重启策略
  • image:镜像名称
  • container_name:容器名称
  • hostname:主机名
  • ports:端口映射
  • environment:环境变量
    • TZ:时区
    • RABBITMQ_DEFAULT_USER:默认用户名
    • RABBITMQ_DEFAULT_PASS:默认密码
  • rabbitmq:management:镜像名称

3.使用

客户端SDK代码在GitHub:https://github.com/Tangtang1997/IKunLibrary

新建 TestRequest 类,实现 IRabbitMqRequest 接口,定义消息体

public class TestRequest : IRabbitMqRequest
{
/// <summary>
/// 重试次数
/// </summary>
public int RetryCount { get; set; }

#region 自定义字段

/// <summary>
/// id
/// </summary>
public string Id { get; set; } = default!;

/// <summary>
/// 名称
/// </summary>
public string Name { get; set; } = default!;

/// <summary>
/// 年龄
/// </summary>
public int Age { get; set; }

#endregion
}

新建TestRequestHandler类,实现IRabbitMqRequestHandler接口,处理消息

public class TestRequestHanlder : IRequestProcessorHandler<TestRequest>
{
private readonly ILogger<TestRequestHanlder> _logger;

public TestRequestHanlder(ILogger<TestRequestHanlder> logger)
{
    _logger = logger;
}

public Task StartAsync(CancellationToken cancellationToken)
{
    return Task.CompletedTask;
}

public Task StopAsync(int milliseconds, CancellationToken cancellationToken = default)
{
    return Task.CompletedTask;
}

public async Task HandleAsync(TestRequest request, CancellationToken cancellationToken = default)
{
    _logger.LogInformation($"开始处理消息: {request.Id}");

    //模拟处理消息耗时操作
    await Task.Delay(1000, cancellationToken);

    _logger.LogInformation($"消息处理完成: {request.Id}");
    }
}

使用 IHostedService 来托管服务

public class SampleHostedService : IHostedService
{
private readonly IConsumerProcessorManager<TestRequest> _consumerProcessorManager;
private readonly IHostApplicationLifetime _applicationLifetime;
private readonly ILogger<SampleHostedService> _logger;

public SampleHostedService(
    IConsumerProcessorManager<TestRequest> consumerProcessorManager,
    IHostApplicationLifetime applicationLifetime,
    ILogger<SampleHostedService> logger)
{
    _consumerProcessorManager = consumerProcessorManager;
    _applicationLifetime = applicationLifetime;
    _logger = logger;
}

public async Task StartAsync(CancellationToken cancellationToken)
{
    _applicationLifetime.ApplicationStarted.Register(() =>
    {
        _logger.LogInformation("SampleHostedService is starting.");
        _consumerProcessorManager.StartAsync(cancellationToken);
    });

    _applicationLifetime.ApplicationStopping.Register(() =>
    {
        _logger.LogInformation("SampleHostedService is stopping.");
        _consumerProcessorManager.StopAsync(3000, cancellationToken);
    });

    await Task.CompletedTask;
}

public async Task StopAsync(CancellationToken cancellationToken)
{
    await Task.CompletedTask;
    }
}

注册并启用服务

IHost host = Host.CreateDefaultBuilder(args)
    .ConfigureServices(services =>
{
    services.AddHostedService<SampleHostedService>();

    var configuration = services.BuildServiceProvider().GetRequiredService<IConfiguration>();

    var hostName = configuration["RabbitMq:Host"] ?? throw new Exception("HostName is not configured");
    var port = int.Parse(configuration["RabbitMq:Port"] ?? throw new Exception("Port is not configured"));
    var userName = configuration["RabbitMq:Username"] ?? throw new Exception("Username is not configured");
    var password = configuration["RabbitMq:Password"] ?? throw new Exception("Password is not configured");
    var queueName = configuration["RabbitMq:QueueName"] ?? throw new Exception("QueueName is not configured");

    services.AddRabbitMq<TestRequest, TestRequestHanlder>(options =>
    {
        options.UseSsl = false;
        options.HostName = hostName;
        options.Port = port;
        options.UserName = userName;
        options.Password = password;
        options.Durable = true;
        options.NetworkRecoveryInterval = 10000;
        options.ExchangeType = ExchangeType.Direct;
        options.QueueName = queueName;
        options.Exchange = $"{queueName}_SERVICE_EXCHANGE";
        options.RoutingKey = $"{queueName}_ROUTING_KEY";
        options.DeadLetterExchange = $"{queueName}_SERVICE_EXCHANGE_DEAD";
        options.DeadLetterQueueName = $"{queueName}_DEAD";
        options.DeadLetterRoutingKey = $"{queueName}_ROUTING_KEY";
    });
})
.Build();

await host.RunAsync();

4.参考资料

https://www.cnblogs.com/Tangtang1997/p/18067763

标签:core,logger,队列,RabbitMQ,options,消息,net,public
From: https://www.cnblogs.com/chenshibao/p/18429050

相关文章

  • .net core开源工作流程框架elsa源码阅读之容器的理解
    官方文档:https://v3.elsaworkflows.io/这个框架的依赖注入容器,底层是靠原生的IServiceCollection,没有使用其他的三方容器;然后在这个基础上,作者进行了封装。主要是用了Module类和继承了IFeature接口的类完成了依赖注入容器的封装。Module是用来管理feature和依赖的。Module我称......
  • 论文速递!时序预测!DCSDNet:双卷积季节性分解网络,应用于天然气消费预测过程
    本期推文将介绍一种新的时序预测方法:双卷积季节性分解网络(DualConvolutionwithSeasonalDecompositionNetwork,DCSDNet)在天然气消费预测的应用,这项研究发表于《AppliedEnergy》期刊。针对天然气消费的多重季节性和非规律性,推荐的文献提出了一种新的预测方法:双卷积季节性分解......
  • DsExcel,GcExcel .NET 7.2.2 Crack
    DsExcel,GcExcel.NET 高速C#.NETExcel电子表格API库Excel文档解决方案(DsExcel,以前称为GcExcel).NET版本允许您使用此快速电子表格API在C#.NET6+、.NETCore、.NETFramework和Xamarin跨平台应用程序中以编程方式创建、编辑、导入和导出Excel电子表格。......
  • asp.net core webapi 获取请求头token
    usingMicrosoft.AspNetCore.Mvc;usingMicrosoft.Extensions.Primitives;usingSystem.Collections.Generic;[ApiController][Route("[controller]")]publicclassMyController:ControllerBase{[HttpGet]publicIActionResultGet(){......
  • CodeMaid:一款基于.NET开发的Visual Studio代码简化和整理实用插件
    前言今天大姚给大家分享一款由.NET开源、免费、强大的VisualStudio代码简化、整理、格式化实用插件:CodeMaid。工具介绍CodeMaid是一款由.NET开源、免费、强大的VisualStudio实用插件,旨在帮助开发者简化、清理和格式化他们的C#、C++、VB.NET、F#、XAML、CSS、LESS、SCSS、Java......
  • CodeMaid:一款基于.NET开发的Visual Studio代码简化和整理实用插件
    前言今天大姚给大家分享一款由.NET开源、免费、强大的VisualStudio代码简化、整理、格式化实用插件:CodeMaid。工具介绍CodeMaid是一款由.NET开源、免费、强大的VisualStudio实用插件,旨在帮助开发者简化、清理和格式化他们的C#、C++、VB.NET、F#、XAML、CSS、LESS、SCSS、JavaScri......
  • 基于ASP.NET+SQLServer的美妆网站的设计与实现
    ASP.NETMVC美妆商城项目文档计算机毕业设计案例C#社团软件CS基于Java的商品评价系统Java北方民族大学停车场管理系统PHP教学管理系统基于Java的报考指南微信小程序基于MVC的高校学生成果管理系统的设计与实现C#社团软件CSJava鲜花购物商城基于PHP的家居交流设计......
  • 基于ASP.NET的人事管理系统的设计与实现
    ASP.NET人事管理系统毕业设计案例Java北方民族大学停车场管理系统基于Java的电子产品比价系统基于C#的月子网站开发基于Java的在线问答学习系统基于MVC的高校学生成果管理系统的设计与实现基于Java电商管理平台Java鲜花购物商城基于Java的美容护理预约系统基于Jav......
  • 网口环保212设备数据 转 profinet IO项目案例
    目录1 案例说明 12 VFBOX网关工作原理 13 准备工作 24 电脑上采集环保HJ212设备的数据 25 配置网关参数 46 用PROFINETIO协议转发数据 77 其他事项 98 案例总结 101 案例说明设置网关采集环保212设备数据把采集的数据转成profinetIO从站协议转发给其他系统。2 VFB......
  • 基于卷积神经网络的布料、布匹原料识别系统,resnet50,mobilenet模型【pytorch框架+pytho
       更多目标检测和图像分类识别项目可看我主页其他文章功能演示:基于卷积神经网络的布料、布匹原料识别系统,resnet50,mobilenet【pytorch框架,python,tkinter】_哔哩哔哩_bilibili(一)简介基于卷积神经网络的布料、布匹原料识别系统是在pytorch框架下实现的,这是一个完整的项目......