首页 > 其他分享 >14.分布式事件总线MassTransit的简单使用

14.分布式事件总线MassTransit的简单使用

时间:2024-03-01 11:55:24浏览次数:32  
标签:builder context string rabbitOptions app 总线 MassTransit public 14

简介:

        MassTransit,直译公共交通, 是由 Chris Patterson 开发的基于消息驱动的.NET 分布式应用框架,其核心思想是借助消息来实现服务之间的松耦合异步通信,进而确保应用更高的可用性、可靠性和可扩展性。通过对消息模型的高度抽象,以及对主流的消息代理(包括RabbitMQ、ActiveMQ、Kafaka、Azure Service Bus、Amazon SQS等)的集成,大大简化了基于消息驱动的开发门槛,同时内置了连接管理、消息序列化和消费者生命周期管理,以及诸如重试、限流、断路器等异常处理机制,让开发者更好的专注于业务实现。
       简而言之,MassTransit实现了消息代理透明化。无需面向消息代理编程进行诸如连接管理、队列的申明和绑定等操作,即可轻松实现应用间消息的传递和消费。

官网:Concepts · MassTransit

新添加一个项目EventBus.MassTraint(http://localhost:5129)【发布】

安装包:

MassTransit

MassTransit.RabbitMQ

Microsoft.AspNetCore.OpenApi

Swashbuckle.AspNetCore

 

添加一个MQ配置类RabbitMQOptions

public class RabbitMQOptions
{
    /// <summary>
        /// Default password (value: "guest").
        /// </summary>
        /// <remarks>PLEASE KEEP THIS MATCHING THE DOC ABOVE.</remarks>
        public const string DefaultPass = "guest";

        /// <summary>
        /// Default user name (value: "guest").
        /// </summary>
        /// <remarks>PLEASE KEEP THIS MATCHING THE DOC ABOVE.</remarks>
        public const string DefaultUser = "guest";

        /// <summary>
        /// Default virtual host (value: "/").
        /// </summary>
        /// <remarks> PLEASE KEEP THIS MATCHING THE DOC ABOVE.</remarks>
        public const string DefaultVHost = "/";
        
        /// <summary>
        /// The host to connect to.
        /// If you want connect to the cluster, you can assign like “192.168.1.111,192.168.1.112”
        /// </summary>
        public string HostName { get; set; } = "localhost";

        /// <summary>
        /// Password to use when authenticating to the server.
        /// </summary>
        public string Password { get; set; } = DefaultPass;

        /// <summary>
        /// Username to use when authenticating to the server.
        /// </summary>
        public string UserName { get; set; } = DefaultUser;

        /// <summary>
        /// Virtual host to access during this connection.
        /// </summary>
        public string VirtualHost { get; set; } = DefaultVHost;

    
        /// <summary>
        /// The port to connect on.
        /// </summary>
        public ushort Port { get; set; }

        
}

 

appsettings.Development.json配置

{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft.AspNetCore": "Warning"
    }
  },
  "RabbitMQ": {
    "HostName": "127.0.0.1",
    "VirtualHost": "/",
    "UserName": "admin",
    "Password": "你的密码",
    "Port": "5672"
  }
}

 

Program.cs注入

var builder = WebApplication.CreateBuilder(args);

// Add services to the container.

builder.Services.AddControllers();
builder.Services.AddSwaggerGen();

var rabbitConfig = builder.Configuration.GetSection("RabbitMQ");
builder.Services.Configure<RabbitMQOptions>(rabbitConfig);
var rabbitOptions = rabbitConfig.Get<RabbitMQOptions>();

builder.Services.AddMassTransit(p =>
{
    p.UsingRabbitMq((context, mqConfig) =>
    {
        mqConfig.Host(
            host: rabbitOptions.HostName, // 配置主机地址
            port: rabbitOptions.Port,
            virtualHost: rabbitOptions.VirtualHost, // 虚拟主机
            configure: hostConfig =>
            {
                hostConfig.Username(rabbitOptions.UserName);
                hostConfig.Password(rabbitOptions.Password);
            });
        mqConfig.ConfigureEndpoints(context);
    });
});


var app = builder.Build();

if (app.Environment.IsDevelopment())
{
    app.UseSwagger();
    app.UseSwaggerUI();
}

app.UseHttpsRedirection();

app.UseAuthorization();

app.MapControllers();

app.Run();

添加一个发布的控制器PublishController

[Route("[controller]/[action]")]
[ApiController]
public class PublishController:ControllerBase
{
    private readonly IBus _bus;

    public PublishController(IBus bus)
    {
        _bus = bus;
    }

    // 生产者、发布。执行1.0
    [HttpGet]
    public ActionResult PublishMsg()
    {
        _bus.Publish(new UserInfo {Id = 1, NickName = "张三"});
        return Ok("发布成功");
    }
    
//批量请求接口 [HttpGet] public ActionResult PublishBatchMsg() { _bus.Publish(new BatchMsg {Id = Thread.CurrentThread.ManagedThreadId}); return Ok("发布成功"); } }

 

 

 

 

新添加一个项目EventBus.MassTransit.Subscribe(http://localhost:5166)【订阅】

安装包:

MassTransit.RabbitMQ

Microsoft.AspNetCore.OpenApi

Swashbuckle.AspNetCore

 

MQ配置类RabbitMQOptions照抄下来

public class RabbitMQOptions
{
    /// <summary>
        /// Default password (value: "guest").
        /// </summary>
        /// <remarks>PLEASE KEEP THIS MATCHING THE DOC ABOVE.</remarks>
        public const string DefaultPass = "guest";

        /// <summary>
        /// Default user name (value: "guest").
        /// </summary>
        /// <remarks>PLEASE KEEP THIS MATCHING THE DOC ABOVE.</remarks>
        public const string DefaultUser = "guest";

        /// <summary>
        /// Default virtual host (value: "/").
        /// </summary>
        /// <remarks> PLEASE KEEP THIS MATCHING THE DOC ABOVE.</remarks>
        public const string DefaultVHost = "/";
        
        /// <summary>
        /// The host to connect to.
        /// If you want connect to the cluster, you can assign like “192.168.1.111,192.168.1.112”
        /// </summary>
        public string HostName { get; set; } = "localhost";

        /// <summary>
        /// Password to use when authenticating to the server.
        /// </summary>
        public string Password { get; set; } = DefaultPass;

        /// <summary>
        /// Username to use when authenticating to the server.
        /// </summary>
        public string UserName { get; set; } = DefaultUser;

        /// <summary>
        /// Virtual host to access during this connection.
        /// </summary>
        public string VirtualHost { get; set; } = DefaultVHost;

    
        /// <summary>
        /// The port to connect on.
        /// </summary>
        public ushort Port { get; set; }

        
}

 

 

appsettings.Development.json配置

{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft.AspNetCore": "Warning"
    }
  },
  "RabbitMQ": {
    "HostName": "127.0.0.1",
    "VirtualHost": "/",
    "UserName": "admin",
    "Password": "你的密码",
    "Port": "5672"
  }
}

 

Program.cs注入

var builder = WebApplication.CreateBuilder(args);

// Add services to the container.

builder.Services.AddControllers();
builder.Services.AddSwaggerGen();
var rabbitConfig = builder.Configuration.GetSection("RabbitMQ");
builder.Services.Configure<RabbitMQOptions>(rabbitConfig);
var rabbitOptions = rabbitConfig.Get<RabbitMQOptions>();

builder.Services.AddMassTransit(p =>
{
   
    // 批量注册消费者,类名以Consumer结尾和此类继承IConsumer接口,才能注册
    p.AddConsumers(t=>t.Name.EndsWith("Consumer") && t.GetInterface("IConsumer")!=null
        ,Assembly.GetExecutingAssembly());

    // 添加消费者与ConsumerDefinition
    // p.AddConsumer<SubscribeConsumer, SubscribeConsumerDefinition>(p =>
    // {
//     //里面可以做并发数,消息,过滤,限流,超时,事务,重试,熔断,并发限制,延迟 // p.UseMessageRetry(r=>r.Interval(5,TimeSpan.FromSeconds(3))); // // });
//批量请求处理数设置 // p.AddConsumer<BatchMsgConsumer>(b => // { // b.Options<BatchOptions>(opt => // { // opt.SetMessageLimit(30); // }); // }); p.UsingRabbitMq((context, mqConfig) => { mqConfig.Host( host: rabbitOptions.HostName, // 配置主机地址 port: rabbitOptions.Port, virtualHost: rabbitOptions.VirtualHost, // 虚拟主机 configure: hostConfig => { hostConfig.Username(rabbitOptions.UserName); hostConfig.Password(rabbitOptions.Password); }); mqConfig.ConfigureEndpoints(context); }); }); var app = builder.Build(); // Configure the HTTP request pipeline. if (app.Environment.IsDevelopment()) { app.UseSwagger(); app.UseSwaggerUI(); } app.UseAuthorization(); app.MapControllers(); app.Run();

 

 

添加一个类SubscribeConsumer用于消费PublishController控制器里的PublishMsg方法。

// 消费者、订阅。执行1.1
// Message : 只能是接口,记录(record),类, 换句话说,只能是引用类型
public class SubscribeConsumer:IConsumer<UserInfo>
{
    // 由MQ自动调用
    public Task Consume(ConsumeContext<UserInfo> context)
    {
        UserInfo user = context.Message;
        Console.WriteLine(user.NickName);
        return Task.CompletedTask;
    }
}

 

批量处理

添加一个类,用于批量处理类型。

/// <summary>
/// MassTransit 批量处理消息的类型
/// </summary>
[EntityName("batch-msg")]
public class BatchMsg
{
    public int Id { get; set; }
}

 

消费、订阅

/// <summary>
/// 批量处理消息
/// </summary>
public class BatchMsgConsumer:IConsumer<Batch<BatchMsg>>
{
    public Task Consume(ConsumeContext<Batch<BatchMsg>> context)
    {
        foreach (var consumeContext in context.Message)
        {
            Console.WriteLine(consumeContext.Message.Id);
        }

        Console.WriteLine($"{context.Message.Length}===================================");
        
        return Task.CompletedTask;
    }
}

 

打开jmeter工具做批量请求。

设置批量请求数,此方法内可以做并发数,消息,过滤,限流,超时,重试,事务,熔断,并发限制,延迟等。

public class BatchMsgConsumerDefinition:ConsumerDefinition<BatchMsgConsumer>
{
    protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator
        , IConsumerConfigurator<BatchMsgConsumer> consumerConfigurator,
        IRegistrationContext context)
    {
        // endpointConfigurator.ConcurrentMessageLimit = 8;// 并发数据限制,默认是1
        // endpointConfigurator.PrefetchCount = 5; // 每次同时可以执行多少条(取决于CPU的核数)
        
        consumerConfigurator.Options<BatchOptions>(opt =>
        {
            opt.MessageLimit = 30;
        });
    }
}

 

重试

 

// 因为Program中AddConsumers 会将这个ConsumerDefinition一起添加至容器中来
public class SubscribeConsumerDefinition:ConsumerDefinition<SubscribeConsumer>
{
    protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
        IConsumerConfigurator<SubscribeConsumer> consumerConfigurator,
        IRegistrationContext context)
    {
        // 重试(可以解决数据库/网络异常   等 短暂性的异常),重试2次,一秒执行一次
        endpointConfigurator.UseMessageRetry(r=>r.Interval(2,TimeSpan.FromSeconds(1)));

        /**
         * 如果执行失败,则会先进行重试,如果重试次数用完了,则会在设置的时间内进行消息重发
         * 使用场景:为了防止服务器宕机的情况
         */
        // endpointConfigurator.UseDelayedRedelivery(r=>r.Interval(1,TimeSpan.FromSeconds(5)));
        
    }
}

 

脏数据,补偿机制

 

// 如果消息重试之后,还是执行失败,则会进入此类来
public class SubscribeFaultConsumer:IConsumer<Fault<UserInfo>>
{
    // 我们可以在这里做一些补偿,或者记录/存储这条失败的数据
    public Task Consume(ConsumeContext<Fault<UserInfo>> context)
    {
        var user = context.Message.Message;
        Console.WriteLine(user.Id);
        return Task.CompletedTask;
    }
}

 

标签:builder,context,string,rabbitOptions,app,总线,MassTransit,public,14
From: https://www.cnblogs.com/MingQiu/p/18046660

相关文章

  • Codeforces 1446D1 Frequency Problem (Easy Version)
    考虑求出全局的众数\(A\)。那么有一个结论,就是答案区间的众数中绝对有\(A\)。考虑反证法,如果没有\(A\),\(A\)在序列中出现的个数一定\(\ge\)区间内众数的出现个数,所以可以一直往外扩展直到\(A\)出现的个数与区间内非\(A\)众数的个数持平,这样肯定更优。于是可以考虑钦......
  • Codeforces 1446D2 Frequency Problem (Hard Version)
    考虑求出全局的众数\(A\)。那么有一个结论,就是答案区间的众数中绝对有\(A\)。考虑反证法,如果没有\(A\),\(A\)在序列中出现的个数一定\(\ge\)区间内众数的出现个数,所以可以一直往外扩展直到\(A\)出现的个数与区间内非\(A\)众数的个数持平,这样肯定更优。于是可以考虑钦......
  • p3214-solution
    P3214Solutionlink为了方便,我们求有序的答案最后再除掉\(m!\)。题目的限制包括:每种元素总共出现偶数次不存在相同的两个集合没有空集考虑偶数的限制,你发现每个集合中元素出现次数要么\(0\)要么\(1\)。于是如果你确定了前\(m-1\)个集合,最后一个集合会被唯一......
  • 14
    uml作业:部署视图建模: 充值消费子系统(1)确定节点并描述数据库服务器: 描述:运行校园网系统,为充值消费子系统提供数据库支持。用途:存储和管理与充值、消费、账户信息等相关的数据。充值管理: 描述:驻留的构件是充值管理子系统。用途:处理用户充值流程,包括充值申请的生成......
  • 145. 二叉树的后序遍历c
    /***Definitionforabinarytreenode.*structTreeNode{*intval;*structTreeNode*left;*structTreeNode*right;*};*//***Note:Thereturnedarraymustbemalloced,assumecallercallsfree().*/voidpostorder(structT......
  • 144. 二叉树的前序遍历c
    /***Definitionforabinarytreenode.*structTreeNode{*intval;*structTreeNode*left;*structTreeNode*right;*};*//***Note:Thereturnedarraymustbemalloced,assumecallercallsfree().*/voidpreorder(structTr......
  • 龙年惠来首聚|20240214|周三
    今年没失约在培民的组织下,我们决定年初五出来惠来聚一下。初五9点多起床吃了早餐后,开着小电驴就去找培民,车放某广场然后我们扫了共享电动,志豪早早就到了,因此我们要去车站集合。首先是我们仨集合后去了奶茶店喝奶茶聊天然后时间来到了中午决定去吃早茶类的午饭后浩沛就从家......
  • Vue CLI 系列之(十二)全局事件总线
    全局事件总线【GlobalEventBus】......
  • Codeforces 1455E Four Points
    首先确定每个点最后走到的是哪一个点。这部分可以枚举全排列。假设左上角的点为\((x_0,y_0)\),右上角的点为\((x_1,y_1)\),左下角的点为\((x_2,y_2)\),右下角的点为\((x_3,y_3)\)。令最终的点为\((x'_0,y'_0)\),以此类推。那么最终的答案就是\(\sum\limits_{i=0}^3|......
  • 视频监控平台Easy1400视图库平台使用订阅功能的详细步骤
    Easy1400是一个智慧安防平台的视图平台设备级联操作指南。该平台提供内容图库作为信息存储和展示的核心,并应用设备级联技术实现不同平台和设备之间的高效协同工作。通过内容图库,企业和组织能够更高效地管理和展示视觉内容,提升品牌形象和用户体验。设备级联技术则使得不同平台和设......