首页 > 其他分享 >基于donetcore/CAP实现分布式事务一致性

基于donetcore/CAP实现分布式事务一致性

时间:2024-04-03 14:22:36浏览次数:28  
标签:OrderId cAPDbContext CAP donetcore var new capPublisher 分布式

官网:https://cap.dotnetcore.xyz

相关介绍

CAP 是一个EventBus,同时也是一个在微服务或者SOA系统中解决分布式事务问题的一个框架。它有助于创建可扩展,可靠并且易于更改的微服务系统。

在微软的 eShop 微服务示例项目中,推荐使用 CAP 作为生产环境可用的 EventBus。

什么是 EventBus?

事件总线是一种机制,它允许不同的组件彼此通信而不彼此了解。 组件可以将事件发送到Eventbus,而无需知道是谁来接听或有多少其他人来接听。 组件也可以侦听Eventbus上的事件,而无需知道谁发送了事件。 这样,组件可以相互通信而无需相互依赖。 同样,很容易替换一个组件。 只要新组件了解正在发送和接收的事件,其他组件就永远不会知道.

相对于其他的 Service Bus 或者 Event Bus, CAP 拥有自己的特色,它不要求使用者发送消息或者处理消息的时候实现或者继承任何接口,拥有非常高的灵活性。我们一直坚信约定大于配置,所以CAP使用起来非常简单,对于新手非常友好,并且拥有轻量级。

CAP 采用模块化设计,具有高度的可扩展性。你有许多选项可以选择,包括消息队列,存储,序列化方式等,系统的许多元素内容可以替换为自定义实现。

简单实战

使用CAP需要依赖的第三方库,这里我们持久化选用了SqlServer数据库,消息队列使用了Rabbitmq

 

相关配置(上游系统、下游系统配置相同)

// 注入EF上下文对象 
var CapConnectionString = builder.Configuration.GetSection("CAPConnectionStrings").GetValue<string>("DefaultConnection");
builder.Services.AddDbContext<CAPDbContext>(options =>
    options.UseSqlServer(CapConnectionString));
//注入CAP
builder.Services.AddCap(x =>
{
    // 使用RabbitMQ作为消息队列  
    x.UseRabbitMQ(opt =>
    {
        opt.HostName = "192.168.3.128";
        opt.Port = 5674;
        opt.UserName = "guest";
        opt.Password = "guest";
        opt.VirtualHost = "/";
    });
    // 使用SqlServer作为CAP的存储  
    x.UseSqlServer(opt =>
    {
        opt.ConnectionString = CapConnectionString;
    });
    // 设置CAP的其他选项  
    x.FailedRetryCount = 5; // 失败重试次数  
    x.FailedRetryInterval = 60; // 失败重试间隔(秒)
    x.UseDashboard(dashoptions =>
    {
        dashoptions.PathMatch = "/cap";  //面板地址
    });
});

上游系统发布

 /// <summary>
 /// CAP:基于消息队列通过事件驱动+回调方法补偿的机制实现分布式事务的最终一致性
 /// 1、消息被消费者至少接收到一次,需要通过接口幂等性/redis唯一ID实现消息不会重复消费
 /// </summary>
 [Route("api/[controller]")]
 [ApiController]
 public class CAPController : ControllerBase
 {
     private readonly ICapPublisher capPublisher;
     private readonly CAPDbContext cAPDbContext;
     public CAPController(ICapPublisher capPublisher, CAPDbContext cAPDbContext)
     {
         this.capPublisher = capPublisher;
         this.cAPDbContext = cAPDbContext;
     }

     [HttpGet("CreateOrder")]
     public IActionResult CreateOrder()
     {
         using (var trans = cAPDbContext.Database.BeginTransaction(capPublisher, autoCommit: true))
         {
             //业务代码
             try
             {
                 var order = new OrderInfo { OrderId = Guid.NewGuid().ToString(), ProId= "Pro_13327530-e706-4ef3-aa5b-02aac9227499", Status = 0 };
                 cAPDbContext.orderInfos.Add(order);
                 capPublisher.Publish("test_createorder_v1", order, "test_createorder_callback_v1");
                 cAPDbContext.SaveChanges();
             }
             catch (Exception ex)
             {
                 Console.WriteLine(ex);
             }
         }
         return Ok();
     }
     [CapSubscribe("test_createorder_callback_v1")]
     private void CreateOrderCallback(JsonElement param)
     {
         var isSuccess = param.GetProperty("IsSuccess").GetBoolean();
         var OrderId = param.GetProperty("OrderId").GetString();
         if (isSuccess)
         {
             var order = cAPDbContext.orderInfos.FirstOrDefault(c => c.OrderId == OrderId);
             order.Status = 1;
             cAPDbContext.Update(order);
             cAPDbContext.SaveChanges();
             Console.WriteLine("修改订单状态成功");
         }
         else
         {
             var order = cAPDbContext.orderInfos.FirstOrDefault(c => c.OrderId == OrderId);
             order.Status = -1;
             cAPDbContext.Update(order);
             cAPDbContext.SaveChanges();
             Console.WriteLine("下单失败,执行补偿");
         }
     }
 }

下游订阅处理

[Route("api/[controller]")]
[ApiController]
public class CapConsumerController : ControllerBase
{
    private readonly ICapPublisher capPublisher;
    private readonly CAPDbContext cAPDbContext;
    public CapConsumerController(ICapPublisher capPublisher, CAPDbContext cAPDbContext)
    {
        this.capPublisher = capPublisher;
        this.cAPDbContext = cAPDbContext;
    }

    [HttpGet("CreatePro")]
    public IActionResult CreatePro()
    {
        cAPDbContext.proInfos.Add(new ProInfo { ProId = "Pro_" + Guid.NewGuid().ToString(), Count = 10, ProName = "商品1" });
        cAPDbContext.proInfos.Add(new ProInfo { ProId = "Pro_" + Guid.NewGuid().ToString(), Count = 10, ProName = "商品2" });
        cAPDbContext.proInfos.Add(new ProInfo { ProId = "Pro_" + Guid.NewGuid().ToString(), Count = 10, ProName = "商品3" });
        cAPDbContext.proInfos.Add(new ProInfo { ProId = "Pro_" + Guid.NewGuid().ToString(), Count = 10, ProName = "商品4" });
        cAPDbContext.SaveChanges();
        return Ok();
    }

    [CapSubscribe("test_createorder_v1")]
    private object KjProCount(JsonElement param)
    {
        var obj = param.Deserialize<OrderInfo>();
        try
        {
            if (obj != null)
            {
                using (var trans = cAPDbContext.Database.BeginTransaction(capPublisher, autoCommit: true))
                {
                    var pro = cAPDbContext.proInfos.FirstOrDefault(c => c.ProId == obj.ProId);
                    if (pro != null && pro.Count > 0)
                    {
                        pro.Count--;
                        cAPDbContext.Update(pro);
                        cAPDbContext.proOrderRecords.Add(new ProOrderRecord { RecordId = Guid.NewGuid().ToString(), OrderId = obj.OrderId, ProId = pro.ProId });
                        cAPDbContext.SaveChanges();
                        throw new Exception("Error");
                        Console.WriteLine($"下单成功{JsonConvert.SerializeObject(obj)}");
                        return new { IsSuccess = true, OrderId = obj.OrderId };
                    }
                    else
                    {
                        Console.WriteLine($"库存不足{JsonConvert.SerializeObject(obj)}");
                    }
                }
            }
            return new { IsSuccess = false, OrderId = obj.OrderId };
        }
        catch (Exception ex)
        {
            Console.WriteLine("发生异常回滚");
            return new { IsSuccess = false, OrderId = obj.OrderId };
        }
    }
}

在上游和下游系统中会自动创建2个表(发布、接收的消息)

监控界面

 

标签:OrderId,cAPDbContext,CAP,donetcore,var,new,capPublisher,分布式
From: https://www.cnblogs.com/daiwk/p/18112622

相关文章

  • 天翼云充值:分布式消息服务RabbitMQ支持AMQP协议,兼容RabbitMQ生态
    天翼云充值:分布式消息服务RabbitMQ支持AMQP协议,兼容RabbitMQ生态简介:飞机@luotuoemo本文由(天翼云代理商:【金推云】www.jintui.cn)撰写天翼云:支持AMQP协议的RabbitMQ服务分布式消息服务RabbitMQ是一种开源的、基于Erlang语言的消息代理和队列服务器。RabbitMQ最大的特性就......
  • 重庆天翼云代理商:分布式容器云平台面向多云、多集群等场景推出的企业级容器云平台
    重庆天翼云代理商:分布式容器云平台面向多云、多集群等场景推出的企业级容器云平台简介:飞机@luotuoemo本文由(天翼云代理商:【金推云】www.jintui.cn)撰写重庆天翼云代理商:分布式容器云平台在当今的信息化社会,一种名为“云计算”的技术正在逐渐改变我们生活和工作的方式。云......
  • 北京天翼云代理商:分布式消息服务MQTT面向终端设备的轻量级消息产品
    北京天翼云代理商:分布式消息服务MQTT面向终端设备的轻量级消息产品简介:飞机@luotuoemo本文由(天翼云代理商:【金推云】www.jintui.cn)撰写北京天翼云代理商:分布式消息服务MQTT面向终端设备的轻量级消息产品一、天翼云的优势天翼云是中国电信旗下的云计算和大数据服务品牌,依......
  • 【STM32嵌入式系统设计与开发】——16InputCapture(输入捕获应用)
    这里写目录标题STM32资料包:百度网盘下载链接:链接:https://pan.baidu.com/s/1mWx9Asaipk-2z9HY17wYXQ?pwd=8888提取码:8888一、任务描述二、任务实施1、工程文件夹创建2、函数编辑(1)主函数编辑(2)USART1初始化函数(usart1_init())(3)USART数据发送函数(USART1_Send_Data())(4)USART......
  • MySQL、Redis 和 Zookeeper 实现分布式锁方法及优缺点
    MySQL、Redis和Zookeeper都可以用来实现分布式锁,每种技术都有其特定的实现方法以及各自的优缺点。MySQL分布式锁实现方法在MySQL中实现分布式锁通常涉及到使用数据库表。可以创建一个专用的锁表,并利用行的唯一性(例如利用唯一索引)来实现锁机制。使用基于事务的 FORUP......
  • Pod安全上下文与Linux Capabilities浅析
    目录前言一、Pod安全上下文介绍二、使用方法与应用场景2.1以普通用户运行容器2.2限制特权容器的使用2.3设置文件系统只读三、LinuxCapabilities概念使用方式使用示例四、总结前言        在云原生时代,Kubernetes已经成为容器编排的事实标准,提供了强......
  • .NET分布式Orleans - 9 - 贪吃蛇项目演示
    首先看完成效果一个玩家的效果多个玩家的效果 源码地址https://gitee.com/chesterdotchen/snake-with-orleans项目介绍Snake.Common项目IGameGrain:游戏的Grain定义,与State定义ISnakeGrain:蛇的Grain定义,与State定义另外包含了游戏界面的宽高,蛇的初始长度,蛇的四个方向......
  • Redission分布式锁介绍和配置引入
        本人在实际项目用于确保Key一致性经常使用的一种加锁方式,帮助分布式环境中互斥访问。很多人问不用锁不是一样完成目标吗?但需要清楚的是这是在高并发的场景下,多节点同时访问缓存的场景,是一般单体项目所无法比拟的,使用锁方式可以控制并发访问,避免缓存击穿和雪崩等问......
  • hadoop3.0高可用分布式集群安装
    hadoop高可用,依赖于zookeeper。用于生产环境,企业部署必须的模式. 1.部署环境规划1.1.虚拟机及hadoop角色划分主机名称namenodedatanoderesourcemanagernodemanagerzkfcjournalnodezookeepermasterslave1slave21.2.软件......
  • Springboot + redis分布式锁
    1.引入redis和redisson<!--redis--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!......