原文:Transactional Outbox in .NET Cloud Native Development via Aspire
作者:Oleksii Nikiforov
总览
这篇文章提供了使用 Aspire、DotNetCore.CAP、Azure Service Bus、Azure SQL、Bicep 和 azd 实现 Outbox 模式的示例。
发件箱模式简介
发件箱模式是分布式系统领域中的一个重要组件。随着现代软件开发朝着更加分布式和解耦的架构发展,确保可靠的消息传递变得越来越重要。
在分布式系统中,不同的组件需要相互通信,通常通过异步消息传递。发件箱模式提供了一种可靠的方法来处理这些消息。它确保即使系统在执行本地事务之后但在发送消息之前发生故障,消息也不会丢失。相反,它会暂时存储在“发件箱”中,并在系统恢复时检索和发送。
通过使用发件箱模式,我们可以确保系统的所有组件以可靠的方式接收必要的消息,从而确保整个系统的完整性和一致性。
在没有发件箱模式的分布式系统中,有几种场景可能会出错,从而导致数据不一致或消息丢失。以下是几个示例:
- 事务提交和消息发送不是原子的:在通常情况下,Service 可能首先将事务提交到其数据库,然后向消息代理(Broker)发送消息。如果服务在事务提交之后但在消息发送之前崩溃,则消息将丢失。其他服务将不知道已提交到数据库的更改。
- 消息发送失败:即使服务没有崩溃,由于网络问题或消息代理问题,消息发送也可能失败。如果不重试消息发送操作,消息将丢失。
- 重复消息:如果服务在失败后重试消息发送操作,如果第一次发送实际上成功但确认丢失,则最终可能会多次发送同一条消息。如果消息使用者不是幂等的,这可能会导致重复处理。
- 顺序问题:如果单个事务发送了多条消息,并且这些发送不是原子性的,则这些消息可能会无序接收。如果消息的顺序很重要,这可能会导致不正确的处理。
发件箱模式通过确保事务提交和消息发送操作是原子的,并提供即使在出现故障时也能可靠地发送消息的机制来解决这些问题。
下面是一个序列图,说明了没有发件箱模式的系统所存在的的问题:
幂等消费者在发件箱模式中扮演着重要的角色。在分布式系统的背景下,幂等性是指系统无论执行特定操作多少次都能产生相同结果的能力。这对于确保分布式环境中的数据一致性和可靠性至关重要。
然而,这可能会导致同一条消息被多次发送,尤其是在系统在发送消息后但在发件箱中将消息标记为已发送之前发生故障的情况下。这就是幂等消费者发挥作用的地方。
幂等消费者旨在妥善处理重复消息。它们确保消除多次接收同一条消息的副作用。这通常通过跟踪所有已处理消息的 ID 来实现。当收到一条消息时,消费者会检查它是否已经处理了一条具有相同 ID 的消息。如果已经处理过,它就会忽略该消息。
下面是一个序列图,说明了发件箱模式如何解决问题:
发件箱模式的实现
现在您已经了解了发件箱模式的重要性和好处,让我们深入研究一下实现它需要什么:
发件箱模式的实现涉及以下步骤:
- 创建发件箱表:第一步是在数据库中创建发件箱表。此表将存储所有需要发送的消息。每条消息都应具有唯一的 ID 和一个指示消息是否已发送的状态字段。
- 修改应用程序代码:下一步是修改应用程序代码。每当您的应用程序需要将消息作为事务的一部分发送时,它都应将该消息作为同一事务的一部分添加到发件箱表中。
- 实现发件箱发布器:发件箱发布器是一个单独的组件,它会轮询发件箱表中未发送的消息。当它发现未发送的消息时,它会发送该消息并将发件箱表中该消息的状态更新为“已发送”。
DotNetCore.CAP 简介
幸运的是,有一个名为 DotNetCore.CAP 的 .NET 库可以为我们简化 Outbox 模式的实现。
DotNetCore.CAP 是一个开源库,它提供了一组 API,允许开发人员轻松地将消息作为数据库事务的一部分发送,将其存储在发件箱中,并确保即使在出现故障的情况下也能可靠地将它们传递给所有感兴趣的消费者。
该库还支持幂等消费者,这对于确保分布式环境中的数据一致性和可靠性至关重要。这意味着即使同一条消息被传递多次,也可以消除接收同一条消息的副作用。
通过使用DotNetCore.CAP,开发人员可以专注于其应用程序的业务逻辑,而库则负责确保分布式系统中可靠消息传递的复杂性。
例子
此代码演示了如何在 ASP.NET Core 应用程序中使用 CAP 库进行事件发布和处理。
在生产者中:
- 定义了“/send”端点的路由处理程序。
- 它启动一个事务,执行一个 SQL 命令来获取当前服务器时间,并将该时间的消息发布到“test.show.time”主题。
- 消息发布延迟500毫秒。
- 如果所有操作都成功,则事务被提交并返回响应。
// Producer/Program.cs
app.MapGet("/send", async (
SqlConnection connection,
ICapPublisher capPublisher,
TimeProvider timeProvider) =>
{
var startTime = timeProvider.GetTimestamp();
using var transaction = connection
.BeginTransaction(capPublisher, autoCommit: false);
var command = connection.CreateCommand();
command.Transaction = (SqlTransaction)transaction;
command.CommandText = "SELECT GETDATE()";
var serverTime = await command.ExecuteScalarAsync();
await capPublisher.PublishDelayAsync(
TimeSpan.FromMilliseconds(500),
"test.show.time",
new MyMessage(serverTime?.ToString()!));
transaction.Commit();
return TypedResults.Ok(new
{
Status = "Published",
Duration = timeProvider.GetElapsedTime(startTime)
});
});
标签:builder,发送,Azure,消息,Aspire,NET,发件箱
From: https://www.cnblogs.com/savorboard/p/18401708/aspire-cap