首页 > 其他分享 >本地消息表模式

本地消息表模式

时间:2023-05-05 21:22:11浏览次数:65  
标签:CAP 模式 CapDemo 消息 本地 using logger capPublisher public

分布式事务 | 使用 dotnetcore/CAP 的本地消息表模式

 

<iframe frameborder="no" height="400" scrolling="no" src="https://player.bilibili.com/player.html?aid=395449358&bvid=BV1Co4y1r76T&cid=1038507510&page=1" width="600"></iframe>

本地消息表模式

本地消息表模式,其作为柔性事务的一种,核心是将一个分布式事务拆分为多个本地事务,事务之间通过事件消息衔接,事件消息和上个事务共用一个本地事务存储到本地消息表,再通过定时任务轮询本地消息表进行消息投递,下游业务订阅消息进行消费,本质上是依靠消息的重试机制达到最终一致性。其示意图如下所示,主要分为以下三步:

  1. 本地业务数据和发布的事件消息共享同一个本地事务,进行数据落库,其中事件消息持久化到单独的事件发件箱表中。
  2. 单独的进程或线程不断查询发件箱表中未发布的事件消息。
  3. 将未发布的事件消息发布到消息代理,然后将消息的状态更新为已发布。

dotnetcore/CAP 简介

在《.NET 微服务:适用于容器化 .NET 应用程序的体系结构》电子书中,提及了如何设计兼具原子性和弹性的事件总线,其中提出了三种思路:使用完整的事件溯源模式,使用事务日志挖掘,使用发件箱模式(The outbox pattern)。其中事件溯源模式实现相对复杂,事务日志挖掘局限于特定类型数据库,而发件箱模式则是一种相对平衡的实现方式,其基于事务数据库表和简化的事件溯源模式。发件箱模式的示意图如下所示:

从上图可以看出,其实现原理与上面提及的本地消息表模式十分相似,我们可以理解其也是本地消息表模式的一种实现。作者Savorboard也正是受该电子书启发,实现了.NET版本的本地消息表模式,并命名为dotnetcore/CAP,其架构如下图所示。其同时也兼具EventBus的功能,其支持主流消息代理,如RabbitMQ、Redis、Kafka和Pulsar,同时支持多种持久化存储方式进行消息存储,包括MySQL、PostgreSQL、SQL Server和MongoDB。因此基于dotnetcore/CAP,.NET 开发者也可以快速实现微服务间的异步通信和解决分布式事务问题。

基于dotnetcore/CAP 实现分布式事务

那具体如何使用dotnetcore/CAP来解决分布式事务问题呢,基于本地消息表加补偿模式实现。dotnetcore/CAP的补偿模式比较巧妙,其基于发布事件的方法签名中提供了一个回调参数。发布方法的事件签名为:PublishAsync<T>(string name, T? contentObj, string? callbackName=null),第一个参数是事件名称,第二个参数为事件数据包,第三个参数用来指定于接收事件消费结果的回调地址(事件),但是否触发回调,取决于事件订阅方是否定义返回参数,若有则触发。如果基于CAP实现下单流程,则其流程如下所示:

接下来就来创建解决方案来实现以上下单流程示例。依次创建以下项目,订单服务、库存服务和支付服务均依赖共享类库项目,其中共享类库添加DotNetCore.CapDotNetCore.Cap.MySqlDotNetCore.Cap.RabbitMQNuGet包。

项目项目名项目类型
订单服务 CapDemo.OrderService ASP.NET Core Web API
库存服务 CapDemo.InventoryService Worker Service
支付服务 CapDemo.PaymentService Worker Service
共享类库 CapDemo.Shared Class Library

订单服务

订单服务首先需要暴露WebApi用于订单的创建,为了方便数据的持久化,首先添加Pomelo.EntityFrameworkCore.MySqlNuget包,然后创建OrderDbContext

  using System;
  using System.Collections.Generic;
  using System.Linq;
  using System.Threading.Tasks;
  using Microsoft.EntityFrameworkCore;
  using CapDemo.OrderService.Domains;
   
  namespace CapDemo.OrderService.Data
  {
  public class OrderDbContext : DbContext
  {
  public OrderDbContext (DbContextOptions<OrderDbContext> options)
  : base(options) {}
   
  public DbSet<CapDemo.OrderService.Domains.Order> Order { get; set; } = default!;
  }
  }

然后创建OrdersController并添加PostOrder方法如下所示:

  using System;
  using System.Collections.Generic;
  using System.Linq;
  using System.Threading.Tasks;
  using Microsoft.AspNetCore.Http;
  using Microsoft.AspNetCore.Mvc;
  using Microsoft.EntityFrameworkCore;
  using CapDemo.OrderService.Data;
  using CapDemo.OrderService.Domains;
  using DotNetCore.CAP;
  using CapDemo.Shared;
  using CapDemo.Shared.Models;
   
  namespace CapDemo.OrderService.Controllers
  {
  [Route("api/[controller]")]
  [ApiController]
  public class OrdersController : ControllerBase
  {
  private readonly OrderDbContext _context;
  private readonly ICapPublisher _capPublisher;
  private readonly ILogger<OrdersController> _logger;
   
  public OrdersController(OrderDbContext context, ICapPublisher capPublisher,ILogger<OrdersController> logger)
  {
  _context = context;
  _capPublisher = capPublisher;
  _logger = logger;
  }
  [HttpPost]
  public async Task<ActionResult<Order>> PostOrder(CreateOrderDto orderDto)
  {
  var shoppingItems =
  orderDto.ShoppingCartItems.Select(item => new ShoppingCartItem(item.SkuId, item.Price, item.Qty));
  var order = new Order(orderDto.CustomerId).NewOrder(shoppingItems.ToArray());
   
  using (var trans = _context.Database.BeginTransaction(_capPublisher, autoCommit: false))
  {
  _context.Order.Add(order);
   
  var deduceDto = new DeduceInventoryDto()
  {
  OrderId = order.OrderId,
  DeduceStockItems = order.OrderItems.Select(
  item => new DeduceStockItem(item.SkuId, item.Qty, item.Price)).ToList()
  };
  await _capPublisher.PublishAsync(TopicConsts.DeduceInventoryCommand,deduceDto,
  callbackName: TopicConsts.CancelOrderCommand);
  await _context.SaveChangesAsync();
  await trans.CommitAsync();
  }
   
  _logger.LogInformation($"Order [{order.OrderId}] created successfully!");
   
  return CreatedAtAction("GetOrder", new { id = order.OrderId }, order);
  }
  }
  }

从代码中可以看出,在订单持久化和事件发布之前先行使用事务包裹:using (var trans = _context.Database.BeginTransaction(_capPublisher, autoCommit: false)) {},以确保订单和事件的持久化共享同一个事务,这一步是使用CAP的重中之重。订单服务通过注入了ICapPublisher服务,并通过PublishAsync方法发布扣减库存事件,并指定了callbackName: TopicConsts.CancelOrderCommand
订单服务还需要订阅取消订单和订单支付结果的事件,进行订单状态的更新,添加OrderConsumers如下所示,其中通过实现ICapSubscribe接口来显式标记为消费者,然后定义方法并在方法体上通过[CapSubscribe]特性指定订阅的事件名称来完成事件的消费。

  using CapDemo.OrderService.Data;
  using CapDemo.Shared;
  using DotNetCore.CAP;
   
  namespace CapDemo.OrderService.Consumers;
   
  public class OrderConsumers:ICapSubscribe
  {
  private readonly OrderDbContext _orderDbContext;
  private readonly ILogger<OrderConsumers> _logger;
   
  public OrderConsumers(OrderDbContext orderDbContext,ILogger<OrderConsumers> logger)
  {
  _orderDbContext = orderDbContext;
  _logger = logger;
  }
  [CapSubscribe(TopicConsts.CancelOrderCommand)]
  public async Task CancelOrder(string orderId)
  {
  if(string.IsNullOrEmpty(orderId)) return;
  var order = await _orderDbContext.Order.FindAsync(orderId);
  order?.CancelOrder();
  _logger.LogWarning($"Order [{orderId}] has been canceled!");
  await _orderDbContext.SaveChangesAsync();
  }
   
  [CapSubscribe(TopicConsts.PayOrderSucceedTopic)]
  public async Task MarkToPaid(string orderId)
  {
  var order = await _orderDbContext.Order.FindAsync(orderId);
   
  order?.UpdateToPaid();
   
  await _orderDbContext.SaveChangesAsync();
  }
  }

最后修改Program.cs添加CAP服务和消费者的注册。

  using CapDemo.OrderService.Consumers;
  using CapDemo.OrderService.Data;
  using Microsoft.EntityFrameworkCore;
  using DotNetCore.CAP;
   
  var builder = WebApplication.CreateBuilder(args);
   
  // 注册 DbContext
  var connectionStr = builder.Configuration.GetConnectionString("Default");
  builder.Services.AddDbContext<OrderDbContext>(options =>
  options.UseMySql(connectionStr ?? throw new InvalidOperationException("Connection string 'OrderDbContext' not found."), ServerVersion.AutoDetect(connectionStr)));
  // 注册CAP
  builder.Services.AddCap(x =>
  {
  x.UseEntityFramework<OrderDbContext>();
  x.UseRabbitMQ("localhost");
  });
  // 注册消费者
  builder.Services.AddTransient<OrderConsumers>();

库存服务

库存服务在整个下单流程的职责主要是库存的扣减和返还,添加InventoryConsumer来消费库存扣减和返还事件即可。

  using System;
  using System.Collections.Generic;
  using System.Linq;
  using System.Text;
  using System.Text.Json;
  using System.Threading.Tasks;
  using CapDemo.Shared;
  using CapDemo.Shared.Models;
  using DotNetCore.CAP;
   
  namespace CapDemo.InventoryService.Consumers
  {
  public class InventoryConsumer : ICapSubscribe
  {
  private readonly ILogger<InventoryConsumer> _logger;
  private readonly ICapPublisher _capPublisher;
   
  public InventoryConsumer(ILogger<InventoryConsumer> logger, ICapPublisher capPublisher)
  {
  _logger = logger;
  _capPublisher = capPublisher;
  }
   
  [CapSubscribe(TopicConsts.DeduceInventoryCommand)]
  public async Task DeduceInventory(DeduceInventoryDto deduceStockDto)
  {
  // 省略扣减库存逻辑,直接成功
  _logger.LogInformation($"Inventory has been deducted for order [{deduceStockDto.OrderId}]!");
  var amount = deduceStockDto.DeduceStockItems.Sum(t => t.Price * t.Qty);
  await _capPublisher.PublishAsync(TopicConsts.PayOrderCommand, new PayDto(deduceStockDto.OrderId, amount),
  callbackName: TopicConsts.ReturnInventoryTopic);
  }
   
  [CapSubscribe(TopicConsts.ReturnInventoryTopic)]
  public void ReturnInventory(PayResult payResult)
  {
  // 若支付失败,则执行库存返还并发布取消订单命令
  if (!payResult.IsSucceed)
  {
  // 省略返还库存逻辑
  _logger.LogWarning($"Inventory has been returned for order [{payResult.OrderId}]");
  _capPublisher.PublishAsync(TopicConsts.CancelOrderCommand, payResult.OrderId);
  }
  }
  }
  }

以上的库存扣减实现中省略了扣减库存逻辑,直接模拟成功扣减,也就无需触发回调,那就可以通过将方法签名定义为public async Task DeduceInventory(DeduceInventoryDto deduceStockDto),这样就不会触发订单服务发布扣减库存事件时指定的回调。库存扣减成功随即发布支付订单的命令,由于不涉及其他数据持久化,因此无需手动开启事务。发布支付订单命令时指定了callbackName: TopicConsts.ReturnInventoryTopic,其将根据订单支付结果也就是ReturnInventory(PayResult payResult)中指定的入参决定是否返还库存。
最后同样需要在Program.cs中注入CAP服务和消费者:

  using CapDemo.InventoryService;
  using CapDemo.InventoryService.Consumers;
   
  IHost host = Host.CreateDefaultBuilder(args)
  .ConfigureServices((context, services) =>
  {
  var connStr = context.Configuration.GetConnectionString("Default");
  services.AddCap(x =>
  {
  x.UseMySql(connStr);
  x.UseRabbitMQ("localhost");
  });
   
  services.AddTransient<InventoryConsumer>();
  })
  .Build();
   
  await host.RunAsync();
   

支付服务

对于下单流程的支付用例来说,要么成功要么失败,并不需要像以上两个服务一样定义补偿逻辑,因此仅需要订阅支付订单命令即可,定义PaymentConsumers如下所示,因为库存服务发布支付订单命令时指定的回调依赖支付结果,因此该方法必须指定与回调匹配的返回参数类型,也就是PayResult

  using CapDemo.Shared;
  using CapDemo.Shared.Models;
  using DotNetCore.CAP;
   
  namespace CapDemo.PaymentService.Consumers;
   
  public class PaymentConsumers:ICapSubscribe
  {
  private readonly ICapPublisher _capPublisher;
  private readonly ILogger<PaymentConsumers> _logger;
   
  public PaymentConsumers(ICapPublisher capPublisher,ILogger<PaymentConsumers> logger)
  {
  _capPublisher = capPublisher;
  _logger = logger;
  }
  [CapSubscribe(TopicConsts.PayOrderCommand)]
  public async Task<PayResult> Pay(PayDto payDto)
  {
  bool isSucceed = false;
  if (payDto.Amount % 2 == 0)
  {
  isSucceed = true;
  _logger.LogInformation($"Order [{payDto.OrderId}] paid successfully!");
  await _capPublisher.PublishAsync(TopicConsts.PayOrderSucceedTopic, payDto.OrderId);
  }
  else
  {
  isSucceed = false;
  _logger.LogWarning($"Order [{payDto.OrderId}] payment failed!");
  }
   
  return new PayResult(payDto.OrderId, isSucceed);
  }
  }

最后同样需要在Program.cs中注入CAP服务和消费者:

  using CapDemo.PaymentService;
  using CapDemo.PaymentService.Consumers;
   
  IHost host = Host.CreateDefaultBuilder(args)
  .ConfigureServices((context, services) =>
  {
  var connStr = context.Configuration.GetConnectionString("Default");
  services.AddCap(x =>
  {
  x.UseMySql(connStr);
  x.UseRabbitMQ("localhost");
  });
  services.AddTransient<PaymentConsumers>();
  })
  .Build();
   
  await host.RunAsync();
   

运行结果

使用docker启动MySQL和RabbitMQ,然后再启动三个服务,并在订单服务的Swagger中发起订单创建请求,如下图所示:

最终执行结果如下图所示:

打开RabbitMQ后台,可以看见CAP为每个服务创建了一个唯一队列接收消息,并通过创建的名为cap.default.router的Exchange根据事件名称作为RoutingKey进行消息路由。

其中通过dotnetcore/CAP发布的消息结构如下图所示,该图是订单服务发布的扣减库存的消息。

打开MySQL,可以发现dotnetcore/CAP 根据配置的连接字符串,分别为各个服务创建了cap.publishedcap.received消息表,如下图所示:

小结

通过以上示例,可以发现dotnetcore/CAP无疑是一个出色的事件总线,简单易用且能确保事件的有效送达。同时基于dotnetcore/CAP的本地消息表模式和补偿模式,也可以有效的实现分布式事务。但相较而言,补偿仅限于直接上下游服务之间,不能链式反向补偿,控制逻辑比较分散,属于协同式事务,各个服务需要订阅自己关注的事件并实现,适用于小中型项目,对于大型项目而言尤其需要注意事件的流转,以避免陷入事件漩涡。

推荐链接:你必须知道的.NET Core开发指南
推荐链接:你必须知道的ML.NET开发指南
推荐链接:你必须知道的Office开发指南
推荐链接:你必须知道的IOT开发指南
推荐链接:你必须知道的Azure基础知识
推荐链接:你必须知道的PowerBI基础知识

标签:CAP,模式,CapDemo,消息,本地,using,logger,capPublisher,public
From: https://www.cnblogs.com/Leo_wl/p/17375391.html

相关文章

  • Rabbitmq 介绍 、安装、基于Queue实现生产者消费者模型、基本使用、消息安全之ack、du
    师承老刘llnb一、消息队列介绍1.1介绍消息队列就是基础数据结构中的“先进先出”的一种数据机构。想一下,生活中买东西,需要排队,先排的人先买消费,就是典型的“先进先出”1.2MQ解决什么问题MQ是一直存在,不过随着微服务架构的流行,成了解决微服务之间问题的常用工具。应用解耦......
  • 本地新项目上传到git的详细步骤
    通过命令gitinit把这个目录变成git可以管理的仓库gitinit 把文件添加到暂存区中gitadd.//'.'意思是添加文件下的所有文件 把文件提到仓库gitcommit-m'类似备注,自由发挥' 关联到远程仓库gitremoteaddorigin远程仓库地址 拉取远程仓库到代码......
  • 开源项目消息推送平台Austin
    开源项目消息推送平台Austin终于要上线了,迎来在线演示的第一版!......
  • 本地jar包导入maven有两种方法
    本地jar包导入maven有两种方法第一种,安装该jar包到maven的本地仓库中,主要用到maven的命令:mvninstall:install-file-Dfile=C:\Users\Administrator\Desktop\test.jar-DgroupId=com.test-DartifactId=test-Dversion=1.0-Dpackaging=jar上面的命令解释:-Dfi......
  • CentOS虚拟机连接外网,NET模式
    大致思路,将主机和CentOS的ip设置成同一网段,并且网关相同,虚拟网络编辑器中的网关也与主机相同1、主机VMnet8的ip设置  2、CentOS的网关和ip设置1)查看网络设备名称 2)进入编辑IP和网关、DNS等进入network-scripts,网络相关的配置文件存放位置/etc/sysconfig/network-scr......
  • 《3D编程模式》写书-第5次记录
    大家好,这段时间我完成了对初稿的第一轮修改,即将开始第二轮的修改这里是所有的的写书记录:《3D编程模式》写书记录本轮修改主要进行了下面方面的修改:修改错误修改了UML错误、文字错误、代码错误等错误隐藏代码的实现细节,进行抽象这一步修改的目的在于使案例中的代码只展示说......
  • 【解决】mysql本地计算机上的MySQL服务启动后停止。某些服务在未由其他服务或程序使用
    在计算机管理中启动时,发现mysql报错:mysql本地计算机上的MySQL服务启动后停止。某些服务在未由其他服务或程序使用时将自动停止。该问题的解决方式为:1找到Mysql的安装路径,看看有没有data文件夹,如果没有data文件夹,自己重新建一个;如果有的话,就把里面的内容全部清空,但保留该目录......
  • vim编辑器模式和命令
    输入命令:yum-yinstallvim*​或者sudoapt-getinstallvim 命令模式按dd键删除当前光标所在行按D键删除当前光标所在行按G键将光标移动到文件的最后按dG键删除当前光标所在行到最后一行按d1G键删除当前光标所在行到第一......
  • 架构师日记-深入理解软件设计模式
    作者:京东零售刘慧卿一设计模式与编程语言1.1什么是设计模式设计模式(Designpattern):由软件开发人员在软件开发中面临常见问题的解决方案,是经过长时间的试验积累总结出来的,它使设计更加灵活和优雅,复用性更好。从实用的角度来看,它代表了某一类问题的最佳实践。设计模式到底解......
  • C# 获取本地共享目录和网络共享目录
    1.在工程添加对应的cs文件usingSystem;usingSystem.Collections;usingSystem.Collections.Generic;usingSystem.ComponentModel;usingSystem.IO;usingSystem.Linq;usingSystem.Net;usingSystem.Runtime.InteropServices;usingSystem.Text;usingSystem.Web;......