首页 > 其他分享 >.net core使用RabbitMQ

.net core使用RabbitMQ

时间:2024-09-19 19:48:21浏览次数:1  
标签:core logger 队列 RabbitMQ options 消息 net public

目录

 

RabbitMQ 是一个可靠且成熟的消息传递和流代理,它很容易部署在云环境、内部部署和本地机器上。它目前被全世界数百万人使用。

1.基本概念

  • 生产者(Producer)

    生产者是一个发送消息的程序。发送消息的程序可以是任何语言编写的,只要它能够连接到RabbitMQ服务器,并且能够发送消息到RabbitMQ服务器。

  • 消费者(Consumer)

    消费者是一个接收消息的程序。接收消息的程序可以是任何语言编写的,只要它能够连接到RabbitMQ服务器,并且能够从RabbitMQ服务器接收消息。

  • 队列(Queue)

    队列是RabbitMQ的内部对象,用于存储消息。多个生产者可以向一个队列发送消息,多个消费者可以尝试从一个队列接收消息。队列支持多种消息分发策略。

  • 交换机(Exchange)

    交换机是消息的分发中心。它接收来自生产者的消息,然后将这些消息分发给队列。交换机有多种类型,包括直连交换机、主题交换机、扇形交换机、头交换机。

  • 绑定(Binding)

    绑定是交换机和队列之间的关联关系。绑定可以使用路由键进行绑定,也可以使用通配符进行绑定。

  • 路由键(Routing Key)

    路由键是生产者发送消息时附带的一个属性。路由键的作用是决定消息被分发到哪个队列。

  • 通配符(Wildcard)

    通配符是一种模式匹配的方式。RabbitMQ支持两种通配符:`*`和`#`。

  • 绑定键(Binding Key)

    绑定键是交换机和队列之间的关联关系。绑定键可以使用路由键进行绑定,也可以使用通配符进行绑定。

  • 持久化(Durable)

    持久化是指RabbitMQ服务器重启后,消息是否还存在。持久化可以应用到交换机、队列、绑定、消息等。

  • 确认机制(Acknowledge)

    确认机制是指消费者接收到消息后,向RabbitMQ服务器发送一个确认消息。RabbitMQ服务器收到确认消息后,会删除这条消息。

    • 自动确认

      消费者接收到消息后,RabbitMQ服务器会自动删除这条消息。

    • 手动确认

      消费者接收到消息后,需要向RabbitMQ服务器发送一个确认消息。RabbitMQ服务器收到确认消息后,会删除这条消息。

  • 拒绝机制(Reject)

    拒绝机制是指消费者接收到消息后,向RabbitMQ服务器发送一个拒绝消息。RabbitMQ服务器收到拒绝消息后,会将这条消息重新发送给其他消费者。

  • 死信队列(Dead Letter Queue)

    死信队列是指消息被拒绝、过期或者达到最大重试次数后,会被发送到死信队列。

  • 消息过期(Message TTL)

    消息过期是指消息在指定时间内没有被消费者消费,会被删除。

  • 消息优先级(Message Priority)

    消息优先级是指消息在队列中的优先级。消息优先级高的消息会被优先消费。

  • 消息分发

    消息分发是指消息在队列中的分发策略。消息分发策略包括轮询分发、公平分发、负载均衡分发。

2.环境搭建

  • Docker 安装 RabbitMQ

    docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 --restart=always --hostname my-rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -e TZ=Asia/Shanghai rabbitmq:management
    
    • -d:后台运行
    • --restart:重启策略
    • --name:容器名称
    • -p:端口映射
    • --hostname:主机名
    • -e:环境变量
      • RABBITMQ_DEFAULT_USER:默认用户名
      • RABBITMQ_DEFAULT_PASS:默认密码
      • TZ:时区
    • rabbitmq:management:镜像名称
  • Docker Compose 安装 RabbitMQ

    version: "3.1"
    services:
        rabbitmq:
            restart: always
            image: rabbitmq:management
            container_name: rabbitmq
            hostname: my-rabbit
            ports:
                - 5672:5672
                - 15672:15672 # RabbitMQ管理界面端口
            environment:
                TZ: Asia/Shanghai
                RABBITMQ_DEFAULT_USER: admin
                RABBITMQ_DEFAULT_PASS: admin
    
    • restart:重启策略
    • image:镜像名称
    • container_name:容器名称
    • hostname:主机名
    • ports:端口映射
    • environment:环境变量
      • TZ:时区
      • RABBITMQ_DEFAULT_USER:默认用户名
      • RABBITMQ_DEFAULT_PASS:默认密码
    • rabbitmq:management:镜像名称

3.使用

客户端SDK代码在GitHub:https://github.com/Tangtang1997/IKunLibrary

  • 新建 TestRequest 类,实现 IRabbitMqRequest 接口,定义消息体
public class TestRequest : IRabbitMqRequest
{
    /// <summary>
    /// 重试次数
    /// </summary>
    public int RetryCount { get; set; }

    #region 自定义字段

    /// <summary>
    /// id
    /// </summary>
    public string Id { get; set; } = default!;

    /// <summary>
    /// 名称
    /// </summary>
    public string Name { get; set; } = default!;

    /// <summary>
    /// 年龄
    /// </summary>
    public int Age { get; set; }

    #endregion
}
  • 新建TestRequestHandler类,实现IRabbitMqRequestHandler<TestRequest>接口,处理消息
public class TestRequestHanlder : IRequestProcessorHandler<TestRequest>
{
    private readonly ILogger<TestRequestHanlder> _logger;

    public TestRequestHanlder(ILogger<TestRequestHanlder> logger)
    {
        _logger = logger;
    }

    public Task StartAsync(CancellationToken cancellationToken)
    {
        return Task.CompletedTask;
    }

    public Task StopAsync(int milliseconds, CancellationToken cancellationToken = default)
    {
        return Task.CompletedTask;
    }

    public async Task HandleAsync(TestRequest request, CancellationToken cancellationToken = default)
    {
        _logger.LogInformation($"开始处理消息: {request.Id}");

        //模拟处理消息耗时操作
        await Task.Delay(1000, cancellationToken);

        _logger.LogInformation($"消息处理完成: {request.Id}");
    }
}
  • 使用 IHostedService 来托管服务
public class SampleHostedService : IHostedService
{
    private readonly IConsumerProcessorManager<TestRequest> _consumerProcessorManager;
    private readonly IHostApplicationLifetime _applicationLifetime;
    private readonly ILogger<SampleHostedService> _logger;

    public SampleHostedService(
        IConsumerProcessorManager<TestRequest> consumerProcessorManager,
        IHostApplicationLifetime applicationLifetime,
        ILogger<SampleHostedService> logger)
    {
        _consumerProcessorManager = consumerProcessorManager;
        _applicationLifetime = applicationLifetime;
        _logger = logger;
    }

    public async Task StartAsync(CancellationToken cancellationToken)
    {
        _applicationLifetime.ApplicationStarted.Register(() =>
        {
            _logger.LogInformation("SampleHostedService is starting.");
            _consumerProcessorManager.StartAsync(cancellationToken);
        });

        _applicationLifetime.ApplicationStopping.Register(() =>
        {
            _logger.LogInformation("SampleHostedService is stopping.");
            _consumerProcessorManager.StopAsync(3000, cancellationToken);
        });

        await Task.CompletedTask;
    }

    public async Task StopAsync(CancellationToken cancellationToken)
    {
        await Task.CompletedTask;
    }
}
  • 注册并启用服务
IHost host = Host.CreateDefaultBuilder(args)
    .ConfigureServices(services =>
    {
        services.AddHostedService<SampleHostedService>();

        var configuration = services.BuildServiceProvider().GetRequiredService<IConfiguration>();

        var hostName = configuration["RabbitMq:Host"] ?? throw new Exception("HostName is not configured");
        var port = int.Parse(configuration["RabbitMq:Port"] ?? throw new Exception("Port is not configured"));
        var userName = configuration["RabbitMq:Username"] ?? throw new Exception("Username is not configured");
        var password = configuration["RabbitMq:Password"] ?? throw new Exception("Password is not configured");
        var queueName = configuration["RabbitMq:QueueName"] ?? throw new Exception("QueueName is not configured");

        services.AddRabbitMq<TestRequest, TestRequestHanlder>(options =>
        {
            options.UseSsl = false;
            options.HostName = hostName;
            options.Port = port;
            options.UserName = userName;
            options.Password = password;
            options.Durable = true;
            options.NetworkRecoveryInterval = 10000;
            options.ExchangeType = ExchangeType.Direct;
            options.QueueName = queueName;
            options.Exchange = $"{queueName}_SERVICE_EXCHANGE";
            options.RoutingKey = $"{queueName}_ROUTING_KEY";
            options.DeadLetterExchange = $"{queueName}_SERVICE_EXCHANGE_DEAD";
            options.DeadLetterQueueName = $"{queueName}_DEAD";
            options.DeadLetterRoutingKey = $"{queueName}_ROUTING_KEY";
        });
    })
    .Build();

await host.RunAsync();

 

2024-09-19 19:31:56【出处】:https://www.cnblogs.com/Tangtang1997/p/18067763

=======================================================================================

标签:core,logger,队列,RabbitMQ,options,消息,net,public
From: https://www.cnblogs.com/mq0036/p/18421206

相关文章

  • FastReport.OpenSource .Net下开源免费报表打印组件
    解决了这个问题:《winForm下,fastReport.net从.netframework升级到.net5遇到的错误“Operationisnotsupportedonthisplatform.”》本文内容转载自:https://www.fcnsoft.com/Home/ShowArticleNews/473最近被fastreport.net搞得有点烦躁,网上有很多破解版本下载可以下载使用,......
  • dotnet framework 4.7.2 webapi 配置的swagger添加登录验证
    项目是.netframework4.7.2加webapi写的接口,使用Swashbuckle包添加的swagger支持 App_Start\SwaggerConfig.cs中加c.CustomAsset("index",thisAssembly,"WebApi.Jwt.SwaggerExtensions.index.html",false);1usingSystem.Web.Http;2usingWebActivato......
  • 论文阅读:Unsupervised Representation Learning with Deep Convolutional Generative
    Abstract背景:希望能缩小CNN在监督学习和无监督学习之间成功应用的差距。贡献:引入了一类称为深度卷积生成对抗网络(DCGAN)的CNN。结果:DCGAN在生成器和判别器中都能从对象到场景学习表示层次结构。1.Introduction贡献:提出DCGAN用于图像分类任务,展示其性能对滤波器......
  • 从零开始掌握 Kubernetes:Pod 和 Deployment 的幕后故事
     1.引言在如今的技术世界中,随着微服务架构的广泛应用和云原生理念的兴起,应用程序的开发、部署和管理发生了翻天覆地的变化。容器技术的出现使得开发者可以轻松地将应用及其所有依赖打包在一个轻量级、可移植的容器中,这种方式大大提升了应用的部署效率和一致性。然而,随着应......
  • ASP.NET Core中如何对不同类型的用户进行区别限流
    老板提出了一个新需求,从某某天起,免费用户每天只能查询100次,收费用户100W次。这是一个限流问题,聪明的你也一定想到了如何去做:记录用户每一天的查询次数,然后根据当前用户的类型使用不同的数字做比较,超过指定的数字就返回错误。嗯,原理就是这么简单。不过真正写起来还要考虑更多问题......
  • SVN的安装和使用手册 https://blog.csdn.net/sinat_37812785/article/det
    下载`TortoiseSVN官网下载址:https://www.visualsvn.com/visualsvn/download/tortoisesvn/ 下载完成后是这样的安装TortoiseSVN:  此处的安装地址建议不动,当然你也可以选择你要安装的地址    安装完成后在桌面点击右键查看 如果有标记的两个文件说明已经安装......
  • 易优eyoucms网站报错 \core\library\think\App.php Fatal error: Call to undefin
    当你遇到 Fatalerror:Calltoundefinedfunctionthink\switch_citysite() 这样的错误时,说明在代码中调用了一个未定义的函数 think\switch_citysite()。这种情况通常是因为函数没有被正确地引入或者该函数根本不存在于当前的代码库中。解决方案确认函数的存在检查 s......
  • 易优eyoucms网站详情页报错报错 \core\library\think\Loader.php 类不存在:app\co
    类不存在:app\common\model\Pic,这个错误表明PHP无法找到类 app\common\model\Pic。这通常是因为类文件未被正确加载或命名空间配置不正确导致的。以下是一些可能的解决步骤:1.确认类文件路径确保类文件 Pic 的路径正确并且文件存在。检查文件路径确认 app\common\model......
  • 易优eyoucms网站报错,\\core\\library\\think\\db\\Connection.php
     报错\\\\core\\\\library\\\\think\\\\db\\\\Connection.php第380行左右数据表或视图不存在,请联系技术处理。[错误代码]SQLSTATE[42S02]:Basetableorviewnotfound:1146Table'eyoucms.ey_channeltype'doesn'texist根据提供的错误信息 SQLSTATE[42S02]:......
  • 易优eyoucms网站报错 \core\library\think\db\Connection.php 第 307 行左右,SQLS
    根据提供的错误信息 SQLSTATE[HY000][1045]Accessdeniedforuser'cs2021'@'localhost'(usingpassword:YES),这个错误表明数据库访问被拒绝了,通常是因为用户名或密码不正确导致的。以下是几个可能的解决步骤:检查数据库连接配置:确认数据库连接配置文件中的用户名和密......