官网:https://cap.dotnetcore.xyz
相关介绍
CAP 是一个EventBus,同时也是一个在微服务或者SOA系统中解决分布式事务问题的一个框架。它有助于创建可扩展,可靠并且易于更改的微服务系统。
在微软的 eShop 微服务示例项目中,推荐使用 CAP 作为生产环境可用的 EventBus。
什么是 EventBus?
事件总线是一种机制,它允许不同的组件彼此通信而不彼此了解。 组件可以将事件发送到Eventbus,而无需知道是谁来接听或有多少其他人来接听。 组件也可以侦听Eventbus上的事件,而无需知道谁发送了事件。 这样,组件可以相互通信而无需相互依赖。 同样,很容易替换一个组件。 只要新组件了解正在发送和接收的事件,其他组件就永远不会知道.
相对于其他的 Service Bus 或者 Event Bus, CAP 拥有自己的特色,它不要求使用者发送消息或者处理消息的时候实现或者继承任何接口,拥有非常高的灵活性。我们一直坚信约定大于配置,所以CAP使用起来非常简单,对于新手非常友好,并且拥有轻量级。
CAP 采用模块化设计,具有高度的可扩展性。你有许多选项可以选择,包括消息队列,存储,序列化方式等,系统的许多元素内容可以替换为自定义实现。
简单实战
使用CAP需要依赖的第三方库,这里我们持久化选用了SqlServer数据库,消息队列使用了Rabbitmq
相关配置(上游系统、下游系统配置相同)
// 注入EF上下文对象 var CapConnectionString = builder.Configuration.GetSection("CAPConnectionStrings").GetValue<string>("DefaultConnection"); builder.Services.AddDbContext<CAPDbContext>(options => options.UseSqlServer(CapConnectionString)); //注入CAP builder.Services.AddCap(x => { // 使用RabbitMQ作为消息队列 x.UseRabbitMQ(opt => { opt.HostName = "192.168.3.128"; opt.Port = 5674; opt.UserName = "guest"; opt.Password = "guest"; opt.VirtualHost = "/"; }); // 使用SqlServer作为CAP的存储 x.UseSqlServer(opt => { opt.ConnectionString = CapConnectionString; }); // 设置CAP的其他选项 x.FailedRetryCount = 5; // 失败重试次数 x.FailedRetryInterval = 60; // 失败重试间隔(秒) x.UseDashboard(dashoptions => { dashoptions.PathMatch = "/cap"; //面板地址 }); });
上游系统发布
/// <summary> /// CAP:基于消息队列通过事件驱动+回调方法补偿的机制实现分布式事务的最终一致性 /// 1、消息被消费者至少接收到一次,需要通过接口幂等性/redis唯一ID实现消息不会重复消费 /// </summary> [Route("api/[controller]")] [ApiController] public class CAPController : ControllerBase { private readonly ICapPublisher capPublisher; private readonly CAPDbContext cAPDbContext; public CAPController(ICapPublisher capPublisher, CAPDbContext cAPDbContext) { this.capPublisher = capPublisher; this.cAPDbContext = cAPDbContext; } [HttpGet("CreateOrder")] public IActionResult CreateOrder() { using (var trans = cAPDbContext.Database.BeginTransaction(capPublisher, autoCommit: true)) { //业务代码 try { var order = new OrderInfo { OrderId = Guid.NewGuid().ToString(), ProId= "Pro_13327530-e706-4ef3-aa5b-02aac9227499", Status = 0 }; cAPDbContext.orderInfos.Add(order); capPublisher.Publish("test_createorder_v1", order, "test_createorder_callback_v1"); cAPDbContext.SaveChanges(); } catch (Exception ex) { Console.WriteLine(ex); } } return Ok(); } [CapSubscribe("test_createorder_callback_v1")] private void CreateOrderCallback(JsonElement param) { var isSuccess = param.GetProperty("IsSuccess").GetBoolean(); var OrderId = param.GetProperty("OrderId").GetString(); if (isSuccess) { var order = cAPDbContext.orderInfos.FirstOrDefault(c => c.OrderId == OrderId); order.Status = 1; cAPDbContext.Update(order); cAPDbContext.SaveChanges(); Console.WriteLine("修改订单状态成功"); } else { var order = cAPDbContext.orderInfos.FirstOrDefault(c => c.OrderId == OrderId); order.Status = -1; cAPDbContext.Update(order); cAPDbContext.SaveChanges(); Console.WriteLine("下单失败,执行补偿"); } } }
下游订阅处理
[Route("api/[controller]")] [ApiController] public class CapConsumerController : ControllerBase { private readonly ICapPublisher capPublisher; private readonly CAPDbContext cAPDbContext; public CapConsumerController(ICapPublisher capPublisher, CAPDbContext cAPDbContext) { this.capPublisher = capPublisher; this.cAPDbContext = cAPDbContext; } [HttpGet("CreatePro")] public IActionResult CreatePro() { cAPDbContext.proInfos.Add(new ProInfo { ProId = "Pro_" + Guid.NewGuid().ToString(), Count = 10, ProName = "商品1" }); cAPDbContext.proInfos.Add(new ProInfo { ProId = "Pro_" + Guid.NewGuid().ToString(), Count = 10, ProName = "商品2" }); cAPDbContext.proInfos.Add(new ProInfo { ProId = "Pro_" + Guid.NewGuid().ToString(), Count = 10, ProName = "商品3" }); cAPDbContext.proInfos.Add(new ProInfo { ProId = "Pro_" + Guid.NewGuid().ToString(), Count = 10, ProName = "商品4" }); cAPDbContext.SaveChanges(); return Ok(); } [CapSubscribe("test_createorder_v1")] private object KjProCount(JsonElement param) { var obj = param.Deserialize<OrderInfo>(); try { if (obj != null) { using (var trans = cAPDbContext.Database.BeginTransaction(capPublisher, autoCommit: true)) { var pro = cAPDbContext.proInfos.FirstOrDefault(c => c.ProId == obj.ProId); if (pro != null && pro.Count > 0) { pro.Count--; cAPDbContext.Update(pro); cAPDbContext.proOrderRecords.Add(new ProOrderRecord { RecordId = Guid.NewGuid().ToString(), OrderId = obj.OrderId, ProId = pro.ProId }); cAPDbContext.SaveChanges(); throw new Exception("Error"); Console.WriteLine($"下单成功{JsonConvert.SerializeObject(obj)}"); return new { IsSuccess = true, OrderId = obj.OrderId }; } else { Console.WriteLine($"库存不足{JsonConvert.SerializeObject(obj)}"); } } } return new { IsSuccess = false, OrderId = obj.OrderId }; } catch (Exception ex) { Console.WriteLine("发生异常回滚"); return new { IsSuccess = false, OrderId = obj.OrderId }; } } }
在上游和下游系统中会自动创建2个表(发布、接收的消息)
监控界面
标签:OrderId,cAPDbContext,CAP,donetcore,var,new,capPublisher,分布式 From: https://www.cnblogs.com/daiwk/p/18112622