首页 > 其他分享 >13.分布式事件总线DotNetCore.CAP的简单使用

13.分布式事件总线DotNetCore.CAP的简单使用

时间:2024-02-27 12:44:34浏览次数:28  
标签:13 cap builder CAP tran DotNetCore var app public

DotNetCore.CAP框架提供了一个简单易用的API和多种消息传输协议支持(包括Redis、RabbitMQ等),可以让用户轻松地实现消息队列、事件发布/订阅、分布式事务等功能。它还具备自动重试、异常处理、数据序列化等高级特性,可以保证消息的可靠性和一致性。 使用DotNetCore.CAP框架,你可以: 1. 发布并订阅消息,支持同步和异步模式; 2. 实现分布式事务管理,确保各个步骤的一致性; 3. 建立多种消息传输协议,如Kafka、RabbitMQ等; 4. 支持多种数据库驱动程序,如MySql、Oracle等; 5. 灵活采用自定义消息格式,以满足你的需求。 除此之外,该框架还具有良好的可扩展性,支持在集群环境下进行部署,并与其他.NET Core框架(如Entity Framework Core)完美配合使用。   安装包: DotNetCore.CAP DotNetCore.CAP.Dashboard DotNetCore.CAP.MySql DotNetCore.CAP.RabbitMQ Microsoft.AspNetCore.OpenApi Newtonsoft.Json Pomelo.EntityFrameworkCore.MySql Swashbuckle.AspNetCore   添加一个项目EventBus.DotnetCap(http://localhost:5284)   appsettings.Development.json配置

{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft.AspNetCore": "Warning"
    }
  },
  "RabbitMQ": {
    "HostName": "127.0.0.1",
    "VirtualHost": "/",
    "UserName": "admin",
    "Password": "你的密码",
    "Port": "5672"
  },
  "ConnectionStrings": {
    "mysql": "server=127.0.0.1;uid=root;pwd=123456;database=DotnetCoreCap"
  }
}
安装RabbitMQ 1. 下载带web管理客户端的镜像:
docker pull rabbitmq:management
2. 创建容器并运行(15672是管理界面的端口,5672是服务的端口。这里顺便将管理系统的用户名和密码设置为admin admin)
docker run -d --name=rabbit --net=happy_travel --restart=always -e
RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -p 15672:15672 -p
5672:5672 rabbitmq:management
3. 若未设置,RABBITMQ_DEFAULT_USER,RABBITMQ_DEFAULT_PASS 时,默认账号密码都为:admin 4. 打开浏览器,输入网址:http://localhost:15672 ,输入账号与密码即可登录rabbitmq 客户端管理界面   添加一个EF链接Mysql的上下文MyDbContextContext.cs
public class MyDbContextContext:DbContext
{
    public MyDbContextContext(DbContextOptions<MyDbContextContext> options) : base(options)
    {
        
    }

    public DbSet<UserInfo> UserInfo { get; set; }
}

 

Program.cs配置
var builder = WebApplication.CreateBuilder(args);

builder.Services.AddControllers();
builder.Services.AddSwaggerGen();
builder.Services.AddDbContext<MyDbContextContext>(p =>
{
    p.UseMySql(builder.Configuration.GetConnectionString("mysql"),new MySqlServerVersion("5.7"));
});
builder.Services.AddTransient<ReceiveService>();


// 配置事件总线以及分布式事务
var rabbitConfig = builder.Configuration.GetSection("RabbitMQ");
builder.Services.Configure<RabbitMQOptions>(rabbitConfig);
var rabbitOptions = rabbitConfig.Get<RabbitMQOptions>();

builder.Services.AddCap(p =>
{
    // 在数据库中会生成cap.receive 与 cap.publish 表
    // 同时也支持分布式事务
    p.UseMySql(builder.Configuration.GetConnectionString("mysql") ?? string.Empty);
    p.UseEntityFramework<MyDbContextContext>();
    p.UseRabbitMQ(mq =>
    {
        mq.HostName = rabbitOptions.HostName;
        mq.VirtualHost = rabbitOptions.VirtualHost;
        mq.UserName = rabbitOptions.UserName;
        mq.Password = rabbitOptions.Password;
        mq.Port = rabbitOptions.Port;
    });
     p.UseDashboard(); // 注册仪表盘
    // 仪表盘默认的访问地址是:http://localhost:xxx/cap,你可以在d.MatchPath配置项中修改cap路径后缀为其他的名字。
    //设置处理成功的数据在数据库中保存的时间(秒),为保证系统新能,数据会定期清理。
    p.SucceedMessageExpiredAfter = 24 * 3600;

    //设置失败重试次数
    p.FailedRetryCount = 5;
});


var app = builder.Build();


if (app.Environment.IsDevelopment())
{
    app.UseSwagger();
    app.UseSwaggerUI();
}

app.UseHttpsRedirection();

app.UseAuthorization();

app.MapControllers();

app.Run();

 

添加一个控制器HomeController( 发布 )
[Route("[controller]/[action]")]
[ApiController]
public class HomeController:ControllerBase
{
    private readonly ICapPublisher _capPublisher;


    public HomeController(ICapPublisher capPublisher)
    {
        _capPublisher = capPublisher;
    }

    [HttpGet]
    public IActionResult TestPub()
    {
        _capPublisher.Publish("eventbus_Key01",new UserInfo{Id = 1,NickName = "张三"});
        return Ok("消息发布成功,请等待处理结果");
    }
    
//在EventBus.DotnetCoreCap.Subscribe项目中接收 [HttpGet] public IActionResult TestSubOnOtherProject() { _capPublisher.Publish("eventbus_Key02",new UserInfo{Id = 2,NickName = "李四"}); return Ok("消息发布成功,请等待处理结果"); } }

 

添加一个控制器ReceiveController( 接收、订阅 )

[Route("[controller]/[action]")]
[ApiController]
public class ReceiveController:ControllerBase
{
    [NonAction]
    [CapSubscribe("eventbus_Key01",Group = "eventbus_Key01")]
    public void CheckReceive(UserInfo info)
    {
        var json = JsonConvert.SerializeObject(info);
        Console.WriteLine(json);
    }
}

或者

public class ReceiveService:ICapSubscribe
{
    [CapSubscribe("eventbus_Key01",Group = "eventbus_Key01")]
    public void CheckReceive(UserInfo info)
    {
        var json = JsonConvert.SerializeObject(info);
        Console.WriteLine(json);
    }
}

 

事务:

[Route("[controller]/[action]")]
[ApiController]
public class TransactionController:ControllerBase
{
    private readonly ICapPublisher _cap;
    private readonly MyDbContextContext _context;

    public TransactionController(ICapPublisher cap, MyDbContextContext context)
    {
        _cap = cap;
        _context = context;
    }
    // 原理:拆分成多个本地事务来看待
    // 场景1:正常事务消息发布
    [HttpGet]
    public IActionResult Publish1()
    {
        var tran = _context.Database.BeginTransaction(_cap);
        try
        {
            _context.UserInfo.Add(new() {NickName = "张三"});
            _context.SaveChanges();
            
            _cap.Publish("tran.cap","李四");
            
            tran.Commit();
            return Ok("成功");
        }
        catch (Exception e)
        {
            tran.Rollback();
            return Ok("失败");
        }
    }
    
    
    // 场景2:发布者发生异常, 消费者根本就收不到消息
    [HttpGet]
    public IActionResult Publish2()
    {
        var tran = _context.Database.BeginTransaction(_cap);
        try
        {
            _context.UserInfo.Add(new() {NickName = "张三2"});
            _context.SaveChanges();
            
            _cap.Publish("tran.cap2","李四2");
            throw new Exception("测试发布者出异常");
            
            tran.Commit();
            return Ok("成功");
        }
        catch (Exception e)
        {
            tran.Rollback();
            return Ok("失败");
        }
    }
    
    
    // 场景3: 发布者正常,消费者发生异常,不会导致发布者回滚
    [HttpGet]
    public IActionResult Publish3()
    {
        var tran = _context.Database.BeginTransaction(_cap);
        try
        {
            _context.UserInfo.Add(new() {NickName = "张三3"});
            _context.SaveChanges();
            
            _cap.Publish("tran.cap3","李四3");
            
            tran.Commit();
            return Ok("成功");
        }
        catch (Exception e)
        {
            tran.Rollback();
            return Ok("失败");
        }
    }
}

 

添加另一个项目EventBus.DotnetCoreCap.Subscribe(http://localhost:5187) appsettings.Development.json配置
{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft.AspNetCore": "Warning"
    }
  },
  "RabbitMQ": {
    "HostName": "127.0.0.1",
    "VirtualHost": "/",
    "UserName": "admin",
    "Password": "你的密码",
    "Port": "5672"
  },
  "ConnectionStrings": {
    "mysql": "server=127.0.0.1;uid=root;pwd=123456;database=DotnetCoreCap"
  }
}

public class MyDbContextContext:DbContext
{
    public MyDbContextContext(DbContextOptions<MyDbContextContext> options) : base(options)
    {
        
    }

    public DbSet<UserInfo> UserInfo { get; set; }
}

Program.cs配置

var builder = WebApplication.CreateBuilder(args);

// Add services to the container.

builder.Services.AddControllers();
builder.Services.AddSwaggerGen();
builder.Services.AddDbContext<MyDbContextContext>(p =>
{
    p.UseMySql(builder.Configuration.GetConnectionString("mysql"),new MySqlServerVersion("5.7"));
});

var rabbitConfig = builder.Configuration.GetSection("RabbitMQ");
builder.Services.Configure<RabbitMQOptions>(rabbitConfig);
var rabbitOptions = rabbitConfig.Get<RabbitMQOptions>();

builder.Services.AddCap(p =>
{
    // 在数据库中会生成cap.receive 与 cap.publish 表
    p.UseMySql(builder.Configuration.GetConnectionString("mysql") ?? string.Empty);
    //p.UseEntityFramework<MyDbContextContext>();
    p.UseRabbitMQ(mq =>
    {
        mq.HostName = rabbitOptions.HostName;
        mq.VirtualHost = rabbitOptions.VirtualHost;
        mq.UserName = rabbitOptions.UserName;
        mq.Password = rabbitOptions.Password;
        mq.Port = rabbitOptions.Port;
    });
     p.UseDashboard(); // 注册仪表盘
    // 仪表盘默认的访问地址是:http://localhost:xxx/cap,你可以在d.MatchPath配置项中修改cap路径后缀为其他的名字。
    //设置处理成功的数据在数据库中保存的时间(秒),为保证系统新能,数据会定期清理。
    p.SucceedMessageExpiredAfter = 24 * 3600;

    //设置失败重试次数
    p.FailedRetryCount = 5;
});

var app = builder.Build();

if (app.Environment.IsDevelopment())
{
    app.UseSwagger();
    app.UseSwaggerUI();
}

app.UseHttpsRedirection();

app.UseAuthorization();

app.MapControllers();

app.Run();

添加一个控制器SubController用于订阅和接收

[Route("[controller]/[action]")]
[ApiController]
public class SubController:ControllerBase
{
    [NonAction]
    [CapSubscribe("eventbus_Key02",Group = "eventbus_Key02")]
    public void Receive(UserInfo info)
    {
        var json = JsonConvert.SerializeObject(info);
        Console.WriteLine(json);
    }
    
    
    [NonAction]
    [CapSubscribe("tran.cap",Group = "tran.cap")]
    public void ReceiveTran1(string nickName)
    {
        Console.WriteLine(nickName);
    }
    
    [NonAction]
    [CapSubscribe("tran.cap2",Group = "tran.cap2")]
    public void ReceiveTran2(string nickName)
    {
        Console.WriteLine(nickName);
    }
    
    [NonAction]
    [CapSubscribe("tran.cap3",Group = "tran.cap3")]
    public void ReceiveTran3(string nickName)
    {
        Console.WriteLine(nickName);
        // throw new Exception("消费者异常");
    }
}

 

标签:13,cap,builder,CAP,tran,DotNetCore,var,app,public
From: https://www.cnblogs.com/MingQiu/p/18036641

相关文章

  • 【13.0】JavaScript之流程控制
    【一】if判断【1】语法//if-elseif(条件){条件成立执行的代码块}else{条件不成立时执行的代码块}//if-elseif-elseif(条件){条件成立执行的代码块}elseif(条件){条件成立执行的代码块}else{条件不成立时执行的代码块}//()条件{}执行的代码块【2】if~e......
  • P1240+P1350+ AT_abc282_g得出的一些dp技巧
    在遇到一些题目在设状态时,前面的状态对后面的有影响,比如在P1240和P1350中前面的放置会对后面的有影响,正常的状态是不行的。以前可能考虑状态压缩,但现在我们可以通过发现前面状态的一些共性,比如在P1240+P1350中前面放了k个車那么一定有k行被占用,所以就不用记录之前那些行被占用了。......
  • I recommend a very small Linux, it is Watt OS version 13
    Dearall,MyfirsttimeusingLinuxWattOSversion12,itisverynice. Superfast!However,fornewusers,youneedthesecommandtostart:sudopasswdsudodate--setmm/dd/yyyysudoaptinstallgdebiItisworthytostudythesecommandline,because......
  • Exception in thread "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThr
    这个问题集合遍历修改了集合结构,这样是不被允许的,需要换种方式报错示意图 第一可以采用for(inti=0;i<registryList.size();i++)解决第二采用迭代处理Iterator<XxlJobRegistry>iterator=registryList.iterator();while(iterator.hasNext()){XxlJobRegist......
  • 【2024-02-13】连岳摘抄
    23:59你微微地笑着,不同我说什么话,而我觉得,为了这个,我已等待得久了。                                                 ——泰戈尔多数家庭冲突都是这样,谁都占一点理,......
  • 寒假学习 13 使用Avro数据源测试Flume
    1.1  创建avro.conf#Namethecomponentsonthisagenta1.sources=r1a1.sinks=k1a1.channels=c1#Describe/configurethesourcea1.sources.r1.type=avroa1.sources.r1.channels=c1a1.sources.r1.bind=0.0.0.0a1.sources.r1.port=4141#Describet......
  • AT_abc213_d [ABC213D] Takahashi Tour 题解(图&深搜)
    传送门题意有一个\(n\)个点的无向图。从根节点\(1\)开始,按如下规则遍历整个图:如果有连接这个点的其他点没有走过,则到这个点。如果有多个点,那么按从小到大的顺序走。如果有这个点没有其他点或者连接这个点的其他点都走过了,那么:如果这个点是根节点\(1\),结束。否则回......
  • Crypto( 13 )
    [WUSTCTF2020]B@se题目给出的提示是base编码,尝试base64行不通,看了大佬wp说是base64变表,下面是脚本运行后的结果flag{base64_1s_v3ry_e@sy_and_fuN}[网鼎杯2020青龙组]you_raise_me_up直接运行报错,说的是语法无效,大佬是这么写的:从附件的代码我们可以找到关键字:flag(c=......
  • P1137 旅行计划
    原题链接题解拓扑排序+dp。首先以入度为零的结点为起始结点,其游览城市数量为1,接下来每到下一结点,游览城市数++;即当前结点的游览城市数是上一结点的游览数+1,并取最大值。code #include<bits/stdc++.h>usingnamespacestd;constintN=1e5+5;inthead[N],Next[N*2],to[N......
  • 2-13. 实装攻击判定
    为三段攻击分别添加对应的触发器用同样的方式给Attack2和Attack3动画也添加触发器设置完触发器攻击野猪不掉血因为野猪身上有两个碰撞体,我们希望capsulecollider2D起作用,所以需要将CapsuleCollider2D的LayerOverridePriority调高,使其优先触发让人物在攻击的......