首页 > 其他分享 >用CAP操作RabbitMQ 处理分布式事务的解决方案

用CAP操作RabbitMQ 处理分布式事务的解决方案

时间:2022-10-05 20:44:53浏览次数:55  
标签:CAP RabbitMQ order header using capPublisher public 分布式

一、在Nuget中引用以下包:

dotnetcore.cap
DotNetCore.CAP.Dashboard
DotNetCore.CAP.RabbitMQ
DotNetCore.CAP.SqlServer

二、在Program.cs中注册服务

//配置CAP
builder.Services.AddCap(x =>
{
    x.UseEntityFramework<AppDbContext>();//从AppDbContext获取数据库连接字符串
    x.UseRabbitMQ("localhost");
    // Register Dashboard 
    x.UseDashboard(); //使用仪表盘,默认网址:http://localhost:当前网站端口/cap
    //x.UseDashboard(opt => { opt.PathMatch = "/mycap"; });//配置仪表盘为新的网址
    // Register to Consul
    x.UseDiscovery(d =>
    {
        d.DiscoveryServerHostName = "localhost";
        d.DiscoveryServerPort = 8500;
        d.CurrentNodeHostName = "localhost";
        d.CurrentNodePort = 5800;
        d.NodeId = 1.ToString();
        d.NodeName = "CAP No.1 Node";
    });
});

三、发布消息

using CAPWebApplication.Entities;
using DotNetCore.CAP;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Data.SqlClient;
using Microsoft.EntityFrameworkCore;

namespace CAPWebApplication.Controllers
{

    [Route("api/[controller]")]
    [ApiController]
    public class PublishController : ControllerBase
    {
        private readonly ILogger<PublishController> _logger;
        private readonly ICapPublisher _capPublisher;
        private readonly AppDbContext _dbContext;

        public PublishController(ILogger<PublishController> logger, ICapPublisher capPublisher, AppDbContext dbContext)
        {
            _logger = logger;
            _capPublisher = capPublisher;
            _dbContext = dbContext;
        }
        //发送消息
        [HttpGet("send")]
        public async Task<ActionResult> SendMessage()
        {//发布无事务的消息,消息重要性低时可以使用
            await _capPublisher.PublishAsync("test.show.time", DateTime.Now);
            return Ok();
        }

        [HttpGet("sendheader")]
        public async Task SendHeaderMessage()
        {//发布带有包头的消息
            var header = new Dictionary<string, string>
            {
                ["my.header.first"] = "first",
                ["my.header.second"] = "second"
            };
            _capPublisher.Publish("test.show.time", DateTime.Now, header);
        }
        [Route("adonet/transaction")]
        [HttpGet]
        public async Task AdonetWithTransaction()
        {//ADO.NET 事务处理 操作数据库并且发送消息
            using (var conn = new SqlConnection(_dbContext.Database.GetConnectionString()))
            {
                using (var trans = conn.BeginTransaction(_capPublisher))
                {

                    await _capPublisher.PublishAsync("test.order", DateTime.Now);
                    trans.Commit();
                }
            }
        }
        [Route("efcore/transaction")]
        [HttpGet]
        public async Task EfcoreWithTransaction()
        {//EFCore 事件处理 操作数据库并且发送消息
            using (var trans = _dbContext.Database.BeginTransaction(_capPublisher))
            {
                Order order = new Order { Name = "赵六", Address = "湖南岳阳临湘" };
                _dbContext.Add(order);
                await _dbContext.SaveChangesAsync();
                await _capPublisher.PublishAsync("test.order", order);
                await trans.CommitAsync();
            }
        }

    }
}

四、订阅消息 

4.1 在控制器中订阅消息:

using CAPWebApplication.Entities;
using DotNetCore.CAP;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Newtonsoft.Json;

namespace CAPWebApplication.Controllers
{
    [Route("api/[controller]")]
    [ApiController]
    public class ConsumerController : ControllerBase
    {
        //private readonly ILogger _logger;
        //public ConsumerController(ILogger logger)
        //{
        //    _logger = logger;
        //}
        //[NonAction]
        //[CapSubscribe("test.show.time")]
        //public async Task ReceiveMessage(DateTime time)
        //{
        //    Console.WriteLine($"接收到的时间 :{time}");
        //}
        [NonAction]
        [CapSubscribe("test.show.time")]
        public async Task ReceiveHeaderMessage(DateTime time, [FromCap] CapHeader header)
        {
            Console.WriteLine($"接收到的时间:{time}");
            Console.WriteLine($"第一个头部:{header["my.header.first"]}");
            Console.WriteLine($"第二个头部:{header["my.header.second"]}");
        }
        [NonAction]
        [CapSubscribe("test.order")]
        public async Task CheckReceivedMessage(Order order)
        {
            //string msg= System.Text.Json.JsonSerializer.Serialize(order);
            Console.WriteLine($"接受来自[test.order]的订单:{order}");
        }
    }
}

  

标签:CAP,RabbitMQ,order,header,using,capPublisher,public,分布式
From: https://www.cnblogs.com/friend/p/16756316.html

相关文章

  • 【微电网优化】基于粒子群实现含分布式电源日前两阶段优化调度模型附matlab代码
    ✅作者简介:热爱科研的Matlab仿真开发者,修心和技术同步精进,matlab项目合作可私信。......
  • Docker 安装部署RabbitMQ 的management版本
    这里注意获取镜像的时候要获取management版本的,不要获取last版本的,management版本的才带有管理界面。获查询镜像dockersearchrabbitmq:management获取镜像docke......
  • Spring整合RabbitMQ
    ✨Spring整合RabbitMQ​​1.简单消息模式​​​​1.1生产者​​​​1.1.1创建生产者工程spring-rabbitmq-producer​​​​1.1.2引入依赖​​​​1.1.3属性配置文件​​......
  • CAP 介绍
    https://github.com/dotnetcore/caphttps://www.cnblogs.com/savorboard/作者博客CAP是一个基于.Net标准的库,是处理分布式事务的解决方案,具有EventBus的功能,轻量级、易......
  • 19-RabbitMQ消息一致性问题
    消息一致性问题在使用rabbitmq中,消息的一致性是非常重要的一个话题。在数据一致性方面,发送者发送消息出来,在数据一致性的要求下,我们通常认为必须达到以下条件broke......
  • 20-SpringBoot整合RabbitMQ
    SpringBoot整合RabbitMQ整合就直接使用单机版的了,一直开着5个虚拟机,我电脑不太行新建SpringBoot工程你已经是一个长大的IDEA了,要学会自己新建工程,然后IDEA自己......
  • 21-RabbitMQ延迟队列插件
    RabbitMQ延迟队列插件下载官网https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases我用的是3.10.7的RabbitMQ,但是官网没有这么新版本的,只......
  • 22-FAQ登录RabbitMQ控制台提示不是私密连接
    登录RabbitMQ控制台提示不是私密连接将启动的时候的赋权操作再执行一遍rabbitmqctladd_userrootrabbitmqctlset_permissions-p/root".*"".*"".*"rabbitmqct......
  • 15-RabbitMQ高级特性-消费端限流
    消费端限流什么是消费端限流假设一个场景,首先,我们RabbitMQ服务器有上万条消息未处理的消息,我们随机打开一个消费者客户端,会出现下面情况巨量的消息瞬间全......
  • 16-RabbitMQ高级特性-消费端的消息ACK与重回队列
    消费端的消息ACK与重回队列消费端的手工ACK和NACKACK分为自动和手动消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿如果由于服务器宕......