首页 > 其他分享 >.NetCore中使用分布式事务DTM的二阶段消息

.NetCore中使用分布式事务DTM的二阶段消息

时间:2023-04-01 10:56:50浏览次数:72  
标签:事务 dtm DTM settings NetCore gid public 分布式

一、概述

二阶段消息是DTM新提出的,可以完美代替现有的事务消息和本地消息表架构。无论从复杂度、性能、便利性还是代码量都是完胜现有的方案。

相比现有的消息架构借助于各种消息中间件比如RocketMQ等,DTM自己实现了无需额外的学习成本。它能够保证本地事务的提交和全局事务提交是“原子的”,适合解决不需要回滚的分布式事务场景

二阶段消息保证提交的原子性和如何保证业务成功执行如下时序图:

 

 二阶段消息主要是指PrepareSubmit两个阶段,主程序向DTM服务发送Prepare消息,成功后执行本地事务,完成本地事务后发送Submit消息至DTM服务,之后DTM会调用分支事件执行其他服务,最后完成全局事务。

 当发送了Prepare但是Submit没有提交的话,会进行回调请求来确认消息的情况,具体工作过程如下:

 1、在处理本地事务时,会将gid插入到barrier表中,同时带上插入原因为committed。该表有一个唯一索引,主要字段为gid

 2、当进行回查时,二阶段消息的操作不是直接查gid是否存在,而是再insert ignore一条带有相同gid的数据,同时带上插入原因为rollbacked。此时如果表中如果已有gid的记录,那么新的插入操作就会被ignore,否则数据会被插入。

 3、然后再用gid查询表中的记录,如果查到记录的reasoncommitted,那么说明本地事务已提交;如果查到记录的reasonrollbacked,那么说明本地事务已回滚。

二、安装DTM

 我使用二进制包下载安装地址,我是Window环境所以下载后解压,点击dtm.exe进行运行即可,如下启动成功

启动成功后可以访问http://localhost:36789,进入管理后台

三、创建DTM所需的表

我们需要创建一个表处理消息的回查,表里保存全局事务ID,具体作用在后续说明,我这里用的SqlServer数据库,所以执行如下:

CREATE TABLE [dbo].[barrier]
(
    [id] bigint NOT NULL IDENTITY(1,1) PRIMARY KEY,
    [trans_type] varchar(45) NOT NULL DEFAULT(''),
    [gid] varchar(128) NOT NULL DEFAULT(''),
    [branch_id] varchar(128) NOT NULL DEFAULT(''),
    [op] varchar(45) NOT NULL DEFAULT(''),
    [barrier_id] varchar(45) NOT NULL DEFAULT(''),
    [reason] varchar(45) NOT NULL DEFAULT(''),
    [create_time] datetime NOT NULL DEFAULT(getdate()) ,
    [update_time] datetime NOT NULL DEFAULT(getdate())
)

GO

CREATE UNIQUE INDEX[ix_uniq_barrier] ON[dbo].[barrier]
        ([gid] ASC, [branch_id] ASC, [op] ASC, [barrier_id] ASC)
WITH(IGNORE_DUP_KEY = ON)

GO

这里比较关键的是那个唯一索引,有一个IGNORE_DUP_KEY = ON,这个其实就是为了等价mysqlinsert ignore表示存在相关字段的信息则不插入,否则就插入数据

当然还支持很多其他的数据库,建表语句可以从这里查看地址

 四、创建项目

 我们简单的创建两个.net core webapi项目进行测试,两个项目都进行相同的如下操作:

 1、安装Dtmcli和Microsoft.EntityFrameworkCore.SqlServer

 安装Dtmcli是因为其中已经帮我们集成了DTM客户端SDK HTTP版本,想要GRPC版本可以安装Dtmgrpc

 安装Microsoft.EntityFrameworkCore.SqlServer很显然是为了处理数据库。

Install-Package Dtmcli
Install-Package Microsoft.EntityFrameworkCore.SqlServer

2、配置

接下来我们配置服务,先在配置文件appsetting.json中添加如下

  "AppSettings": {
    "DtmUrl": "http://localhost:36789",
    "BusiUrl": "http://localhost:5056",
    "QueryPreparedUrl": "http://localhost:5046",
    "BarrierConn": "Data Source=.;Initial Catalog=HTGL;TrustServerCertificate=True;;Integrated Security=True"
  }

 DtmUrlDTM的监听地址,http的是36789grpc的是36790

 BusiUrl:访问其他服务的地址

 QueryPreparedUrl:回查的地址

 BarrierConn:数据库连接语句

 添加一个配置类:

    public class AppSettings
    {
        public string DtmUrl { get; set; }
        public string BusiUrl { get; set; }
        public string BarrierConn { get; set; }
        public string QueryPreparedUrl { get; set; }
    }

 之后注入服务如下:

builder.Services.AddDtmcli(dtm => { 
    dtm.DtmUrl = builder.Configuration.GetValue<string>("AppSettings:DtmUrl");
    dtm.SqlDbType = DtmCommon.Constant.Barrier.DBTYPE_SQLSERVER;
    dtm.BarrierSqlTableName = "[HTGL].[dbo].[barrier]";
});
builder.Services.Configure<AppSettings>(builder.Configuration.GetSection("AppSettings"));

SqlDbType:表示使用的数据库类型

BarrierSqlTableNameBarrier表的名字

3、添加代码

我们在其中一个项目添加主程序代码如下:

    [ApiController]public class DtmController : ControllerBase
    {

        private readonly ILogger<DtmController> _logger;
        private readonly IDtmClient _dtmClient;
        private readonly IDtmTransFactory _transFactory;
        private readonly AppSettings _settings;
        private readonly IBranchBarrierFactory _factory;
        public DtmController(ILogger<DtmController> logger, IDtmClient dtmClient,IDtmTransFactory transFactory, IOptions<AppSettings> settings, IBranchBarrierFactory factory)
        {
            _logger = logger;
            _dtmClient = dtmClient;
            _transFactory = transFactory;
            _settings = settings.Value;
            _factory = factory;
        }
        private DbConnection GetConn() => new Microsoft.Data.SqlClient.SqlConnection(_settings.BarrierConn);
        [HttpPost("post-dtm-msg")]
        public async Task<IActionResult> Get(CancellationToken cancellationToken)
        {
            //1、创建gid
            var gid = await _dtmClient.GenGid(cancellationToken);
            //2、设置分支事务
            var msg = _transFactory.NewMsg(gid)
                .Add(_settings.BusiUrl + "/TransOut", new { id = 123 })
                .Add(_settings.BusiUrl + "/TransIn", new { id = 321 });//3、执行submit
            using (DbConnection conn = GetConn())
            {
                await msg.DoAndSubmitDB(_settings.QueryPreparedUrl + "/msg-queryprepared", conn, async tx =>
                {
                    //4、执行本地事务
                    await Task.CompletedTask;
                });
            }
            _logger.LogInformation("result gid is {0}", gid);
            return Content("SUCCESS");
        }
        [HttpGet("msg-queryprepared")]
        public async Task<IActionResult> QueryPrepared(CancellationToken cancellationToken)
        {
            var bb = _factory.CreateBranchBarrier(Request.Query);
            _logger.LogInformation("bb {0}", bb);
            using (DbConnection conn = GetConn())
            {
                //回调查询消息状态
                var res = await bb.QueryPrepared(conn);
                return Ok(new { dtm_result = res });
            }
        }
    }

然后我们向另一个服务项目添加如下代码,作为一个简单的服务方法,没有任何操作只是返回成功:

[ApiController]
    public class TransController : ControllerBase
    {
        private readonly ILogger<TransController> _logger;
        private readonly IBranchBarrierFactory _factory;
        private readonly AppSettings _settings;
        private DbConnection GetConn() => new Microsoft.Data.SqlClient.SqlConnection(_settings.BarrierConn);
        public TransController(ILogger<TransController> logger, IBranchBarrierFactory factory, IOptions<AppSettings> settings)
        {
            _logger = logger;
            _factory = factory;
            _settings = settings.Value;
        }
        [HttpPost("TransIn")]
        public async Task<IResult> In()
        {
            return Results.Ok(new { dtm_result = "SUCCESS" });
            //return Results.Ok(new { dtm_result = "FAILURE" });
        }
        [HttpPost("TransOut")]
        public async Task<IResult> Out()
        {
            return Results.Ok(new { dtm_result = "SUCCESS" });
        }
    }

五、执行查看结果

我们正常执行,可以看到下面的动图结果,在执行完本地事务后会访问分支事务,然后数据库表中添加了一条记录

可以在管理后台看到我们请求成功的信息

 如果要演示失败,需要做以下修改直接报错,我们可以看到访问了回调方法,然后数据库中看到rollback标记的消息

using (DbConnection conn = GetConn())
            {
                await msg.DoAndSubmitDB(_settings.QueryPreparedUrl + "/msg-queryprepared", conn, async tx =>
                {
                    throw new Exception("报错了");
                    //4、执行本地事务
                    await Task.CompletedTask;
                });
            }

 提交后再宕机演示比较麻烦,我就不演示了,大家意会即可。

 如果分支事务返回的不是SUCCESS而是FAILURE会由DTM隔一段时间重新请求,dtm对每个事务的重试是指数退避策略,具体为间隔是每失败一次,间隔加倍,避免过多的重试,导致系统负载异常上升。

 如果您经过长时间的的宕机,因指数退避算法导致要很久才会重试。如果您想要手动触发立即重试,您可以手动把相应事务的next_cron_time(Redis存储引擎的该功能还在开发中)修改为当前时间,就会在数秒内被定时轮询,事务就会继续往前执行。

 

标签:事务,dtm,DTM,settings,NetCore,gid,public,分布式
From: https://www.cnblogs.com/xwc1996/p/17252311.html

相关文章

  • 分布式事务讲解之CAP,2PC,3PC,TCC
    目录1CAP1.1CAP原则1.1.1数据一致性1.1.2图示讲解1.1.2.1一致性1.1.2.2可用性1.1.2.3分区容错性1.2CAP如何舍弃1.3eureka与zookeeper区别1.4CAP对应的模型和应用1.4.1CAwithoutP1.4.2CPwithoutA1.4.3APwihtoutC1.4.4常见注册中心1.5BASE理论2分布式事务2.1......
  • SequoiaDB分布式数据库2023.3月刊
    本月看点速览赋能行业,参编《分布式数据库金融应用发展报告》脱颖而出,入选2022专精特新黑马大赛年度十强激烈角逐,成功晋级全国信创优秀解决方案决赛新穗新彩,多家权威媒体走进巨杉青杉计划2023持续进行,一起攀登更高的“杉”赋能行业,参编《分布式数据库金融应用发......
  • RocketMQ x OpenTelemetry 分布式全链路追踪最佳实践
    作者简介:艾阳坤,ApacheRocketMQPMCMember/Committer,CNCFOpenTelemetryMember,CNCFEnvoycontributor。在分布式系统中,多个服务之间的交互涉及到复杂的网络通信和数据传输,其中每个服务可能由不同的团队或组织负责维护和开发。因此,在这样的环境下,当一个请求被发出并经过多个......
  • 分布式学习笔记-zookeeper以及kafka
    zookeeper所谓分布式系统就是在不同的地域分布的多个服务器,共同组成一个应用系统来为用户提供服务,在分布式系统中最重要的是进程调度。多个进程的应用需要竞争资源,此时需要......
  • Eureka NetCore 服务注册与发现
    1创建springbooteureka项目。<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-b......
  • 分布式技术原理与算法解析 04 - 存储&高可靠
    分布式存储分布式数据复制技术常用于数据备份同步复制技术注重一致性,用户请求更新数据库时,主数据库要同步到备数据库后才结束阻塞返回给用户异步复制技术注重可用......
  • Redis分布式Session和普通的cookie session有什么区别?
    Redis是一种高性能的缓存和key-value存储系统,常被用来实现分布式Session的方案。在这种方案中,用户的登录信息存储在Redis中,而不是存储在本地的cookie或session......
  • 配电网分布式电源和储能选址定容 以配电网总成本最低为目标函数
    配电网分布式电源和储能选址定容以配电网总成本最低为目标函数,其中包括年运行成本,设备维护折损成本、环境成本;以系统潮流运行为约束条件,采用粒子群算法求解,实现光伏、风电......
  • Matlab风电光伏储能分布式能源微电网运行,并网运行,虚拟同步机控制策略
    Matlab风电光伏储能分布式能源微电网运行,并网运行,虚拟同步机控制策略,VSG风电,储能,光伏封不是电源带直流负载独立运行断开直流负载后,将模型转换为风光储+VSG+交流负载引入......
  • 浅谈分布式事务-Seata
    传统的单机事务。在传统数据库事务中,必须要满足四个原则(ACID):ACID:Atomic(原子性)、Consistency(一致性)、Isolation(隔离性)和Durability(持久性)原子性(Atomicity)一个事务(transact......