一、在Nuget中引用以下包:
dotnetcore.cap DotNetCore.CAP.Dashboard DotNetCore.CAP.RabbitMQ DotNetCore.CAP.SqlServer
二、在Program.cs中注册服务
//配置CAP builder.Services.AddCap(x => { x.UseEntityFramework<AppDbContext>();//从AppDbContext获取数据库连接字符串 x.UseRabbitMQ("localhost"); // Register Dashboard x.UseDashboard(); //使用仪表盘,默认网址:http://localhost:当前网站端口/cap //x.UseDashboard(opt => { opt.PathMatch = "/mycap"; });//配置仪表盘为新的网址 // Register to Consul x.UseDiscovery(d => { d.DiscoveryServerHostName = "localhost"; d.DiscoveryServerPort = 8500; d.CurrentNodeHostName = "localhost"; d.CurrentNodePort = 5800; d.NodeId = 1.ToString(); d.NodeName = "CAP No.1 Node"; }); });
三、发布消息
using CAPWebApplication.Entities; using DotNetCore.CAP; using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Mvc; using Microsoft.Data.SqlClient; using Microsoft.EntityFrameworkCore; namespace CAPWebApplication.Controllers { [Route("api/[controller]")] [ApiController] public class PublishController : ControllerBase { private readonly ILogger<PublishController> _logger; private readonly ICapPublisher _capPublisher; private readonly AppDbContext _dbContext; public PublishController(ILogger<PublishController> logger, ICapPublisher capPublisher, AppDbContext dbContext) { _logger = logger; _capPublisher = capPublisher; _dbContext = dbContext; } //发送消息 [HttpGet("send")] public async Task<ActionResult> SendMessage() {//发布无事务的消息,消息重要性低时可以使用 await _capPublisher.PublishAsync("test.show.time", DateTime.Now); return Ok(); } [HttpGet("sendheader")] public async Task SendHeaderMessage() {//发布带有包头的消息 var header = new Dictionary<string, string> { ["my.header.first"] = "first", ["my.header.second"] = "second" }; _capPublisher.Publish("test.show.time", DateTime.Now, header); } [Route("adonet/transaction")] [HttpGet] public async Task AdonetWithTransaction() {//ADO.NET 事务处理 操作数据库并且发送消息 using (var conn = new SqlConnection(_dbContext.Database.GetConnectionString())) { using (var trans = conn.BeginTransaction(_capPublisher)) { await _capPublisher.PublishAsync("test.order", DateTime.Now); trans.Commit(); } } } [Route("efcore/transaction")] [HttpGet] public async Task EfcoreWithTransaction() {//EFCore 事件处理 操作数据库并且发送消息 using (var trans = _dbContext.Database.BeginTransaction(_capPublisher)) { Order order = new Order { Name = "赵六", Address = "湖南岳阳临湘" }; _dbContext.Add(order); await _dbContext.SaveChangesAsync(); await _capPublisher.PublishAsync("test.order", order); await trans.CommitAsync(); } } } }
四、订阅消息
4.1 在控制器中订阅消息:
using CAPWebApplication.Entities; using DotNetCore.CAP; using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Mvc; using Newtonsoft.Json; namespace CAPWebApplication.Controllers { [Route("api/[controller]")] [ApiController] public class ConsumerController : ControllerBase { //private readonly ILogger _logger; //public ConsumerController(ILogger logger) //{ // _logger = logger; //} //[NonAction] //[CapSubscribe("test.show.time")] //public async Task ReceiveMessage(DateTime time) //{ // Console.WriteLine($"接收到的时间 :{time}"); //} [NonAction] [CapSubscribe("test.show.time")] public async Task ReceiveHeaderMessage(DateTime time, [FromCap] CapHeader header) { Console.WriteLine($"接收到的时间:{time}"); Console.WriteLine($"第一个头部:{header["my.header.first"]}"); Console.WriteLine($"第二个头部:{header["my.header.second"]}"); } [NonAction] [CapSubscribe("test.order")] public async Task CheckReceivedMessage(Order order) { //string msg= System.Text.Json.JsonSerializer.Serialize(order); Console.WriteLine($"接受来自[test.order]的订单:{order}"); } } }
标签:CAP,RabbitMQ,order,header,using,capPublisher,public,分布式 From: https://www.cnblogs.com/friend/p/16756316.html