理解Saga模式:分布式事务的优雅解决方案
在微服务架构中,系统通常被拆分成多个独立的服务,每个服务管理着自己的数据和逻辑。这种拆分带来了灵活性和可扩展性,但同时也引入了分布式事务管理的挑战。传统的事务管理方法,如数据库的ACID(原子性、一致性、隔离性、持久性)事务,不再适用于跨多个微服务的操作。这时,Saga模式应运而生,提供了一种解决分布式事务问题的有效策略。
什么是Saga模式?
Saga模式是一种在微服务架构中管理分布式事务的设计模式。它将一个长期运行的事务拆分成一系列较小的本地事务,每个本地事务由一个微服务管理。Saga保证,即使在分布式系统中,事务要么全部成功,要么在发生故障时通过执行补偿操作(即回滚)来保持数据的一致性。
Saga模式的工作原理
Saga模式通过定义每个事务步骤的补偿操作来管理事务的一致性。这些补偿操作是在某个步骤失败时触发的,用来撤销之前已经完成的步骤,从而保持整个系统的数据一致性。Saga可以是长期运行的,支持异步通信,并通过事件来协调各个微服务间的操作。
Saga的两种实现方式
- 编程式补偿:直接在代码中管理每个步骤的执行和补偿逻辑。这种方式直观且易于实现,适用于较简单的事务场景。
- 基于事件的Saga:通过发布和订阅事件来协调事务的执行和补偿。这种方式提高了系统的解耦度,更适合复杂的分布式环境。
理解Saga模式在微服务架构中的应用
Saga模式是处理分布式事务的一种有效策略,特别是在微服务架构中,它帮助保持系统的数据一致性和稳定性。Saga模式可以通过编程式补偿或基于事件的方式来实现,下面我们将通过Spring Cloud的应用案例来详细探讨这两种实现方式。
编程式补偿示例
编程式补偿是直接在代码中管理每个步骤的执行和补偿逻辑。这种方式直观且易于实现,适用于较简单的事务场景。以下是一个简单的Spring Boot服务示例,用于展示如何实现编程式补偿:
public class OrderSaga {
private Deque<Runnable> compensations = new ArrayDeque<>();
public void executeSaga() {
try {
// A:创建订单
createOrder();
compensations.push(() -> deleteOrder());
// B:验证库存
checkInventory();
compensations.push(() -> restoreInventory());
// C:扣除账户余额
deductBalance();
compensations.push(() -> refundBalance());
// D:同步到DMS
syncToDMS(); // 假设失败
} catch (Exception e) {
// 回滚
rollback();
}
}
private void rollback() {
while (!compensations.isEmpty()) {
compensations.pop().run();
}
}
}
在这个示例中,executeSaga
方法尝试创建一个订单。如果在创建过程中发生异常,则调用 rollback
方法来执行补偿逻辑,以确保系统的一致性。
基于事件的Saga模式
事件驱动的Saga模式,基于事件的Saga模式依赖于事件的发布和订阅来驱动事务的执行和补偿。每个微服务在完成其事务步骤后发布事件,其他服务订阅这些事件并根据事件类型执行下一步操作或补偿操作。
如何通过事件实现层层回滚?当某个事务步骤失败,触发一个失败事件,监听该事件的服务将执行对应的补偿操作,并可能进一步发布补偿事件以触发前面步骤的补偿操作,从而实现整个事务链的回滚。
基于事件的Saga模式依赖于事件的发布和订阅来驱动事务的执行和补偿。这种方式提高了系统的解耦度,更适合复杂的分布式环境。下面是一个使用Spring Cloud Stream实现的基于事件的Saga模式的简单例子:
首先,定义一个简单的订单服务和库存服务,使用Spring Cloud Stream进行通信:
对于基于事件流的Saga模式实现,考虑到之前的四个步骤(A, B, C, D),并且步骤D失败需要触发回滚,我们可以利用Spring Cloud Stream来实现跨服务的事件发布和订阅。这样的实现允许每个微服务独立处理它们的业务逻辑并在必要时触发补偿操作。
实现基于事件流的Saga
在基于事件流的Saga中,每个步骤的完成或失败会发布相应的事件,而其他服务会订阅这些事件并据此执行下一步或补偿操作。
假设步骤A, B, C, D分别对应于订单创建、库存检查、账户扣费、同步到DMS。每个操作成功完成后会发布一个成功事件,如果操作失败,则发布一个失败事件。
Spring Cloud Stream配置
首先,确保你的Spring Boot应用集成了Spring Cloud Stream,并配置好了消息中间件(如RabbitMQ或Kafka)。
spring:
cloud:
stream:
bindings:
orderCreatedOutput:
destination: order-created-topic
inventoryCheckedOutput:
destination: inventory-checked-topic
paymentProcessedOutput:
destination: payment-processed-topic
dmsSyncedOutput:
destination: dms-synced-topic
dmsSyncFailedOutput:
destination: dms-sync-failed-topic
基本假设
- 使用Spring Cloud Stream作为事件驱动的通信方式。
- 每个步骤都有对应的成功和失败事件,以及一个补偿操作。
步骤和事件定义
-
创建订单 (步骤A)
- 成功事件:
OrderCreatedEvent
- 补偿事件:
OrderCreationCompensatedEvent
(用于回滚订单创建)
- 成功事件:
-
验证库存 (步骤B)
- 成功事件:
InventoryCheckedEvent
- 补偿事件:
InventoryCheckCompensatedEvent
(用于恢复库存)
- 成功事件:
-
扣除账户余额 (步骤C)
- 成功事件:
PaymentProcessedEvent
- 补偿事件:
PaymentProcessingCompensatedEvent
(用于退款)
- 成功事件:
-
同步到DMS (步骤D)
- 成功事件:
DmsSyncedEvent
- 失败事件:
DmsSyncFailedEvent
(触发回滚)
- 成功事件:
步骤A: 创建订单
Service
@EnableBinding(Source.class)
public class OrderService {
@Autowired
private MessageChannel output;
public void createOrder(Order order) {
// 创建订单逻辑...
output.send(MessageBuilder.withPayload(new OrderCreatedEvent(order)).build());
}
// 监听DMS同步失败事件,触发订单创建的补偿操作
@StreamListener(target = Sink.INPUT, condition = "headers['type']=='DmsSyncFailedEvent'")
public void compensateOrderCreation(OrderFailedEvent event) {
// 执行订单创建的补偿操作,如删除已创建的订单
output.send(MessageBuilder.withPayload(new OrderCreationCompensatedEvent(event.getOrderId())).build());
}
}
步骤B: 验证库存
Listener & Compensate Logic
假设库存服务监听到OrderCreatedEvent
,进行库存验证:
@StreamListener(target = Sink.INPUT, condition = "headers['type']=='OrderCreatedEvent'")
public void checkInventory(OrderCreatedEvent event) {
try {
// 验证库存逻辑...
output.send(MessageBuilder.withPayload(new InventoryCheckedEvent(event.getOrder())).build());
} catch (Exception e) {
// 库存验证失败逻辑,触发库存验证补偿操作
}
}
步骤C: 扣除账户余额
Listener & Compensate Logic
账户服务监听到InventoryCheckedEvent
,进行账户扣费:
@StreamListener(target = Sink.INPUT, condition = "headers['type']=='InventoryCheckedEvent'")
public void processPayment(InventoryCheckedEvent event) {
try {
// 扣除账户余额逻辑...
output.send(MessageBuilder.withPayload(new PaymentProcessedEvent(event.getOrder())).build());
} catch (Exception e) {
// 账户扣费失败逻辑,触发账户扣费补偿操作
}
}
步骤D: 同步到DMS
Listener & Failure Logic
DMS同步服务监听到PaymentProcessedEvent
,尝试同步到DMS:
@StreamListener(target = Sink.INPUT, condition = "headers['type']=='PaymentProcessedEvent'")
public void syncToDms(PaymentProcessedEvent event) {
try {
// 同步到DMS逻辑...
} catch (Exception e) {
output.send(MessageBuilder.withPayload(new DmsSyncFailedEvent(event.getOrder())).build());
}
}
补偿逻辑
在基于事件流的Saga模式中,补偿操作的执行顺序是通过事件的发布和订阅机制自然形成的。当某个步骤失败时,它会发布一个失败事件,触发需要回滚的前一步骤发布其补偿事件。这种方式确保了即使在复杂的分布式事务中,每个步骤的补偿操作也能按照正确的顺序执行,从而实现整个业务流程的回滚。
步骤D失败时的补偿流程
当步骤D(同步到DMS)失败时,它应该发布一个DmsSyncFailedEvent
。此事件的接收者应触发步骤C的补偿操作,即发布PaymentProcessingCompensatedEvent
。相应地,监听到PaymentProcessingCompensatedEvent
的服务将触发步骤B的补偿操作,发布InventoryCheckCompensatedEvent
,以此类推,直至整个事务链被回滚。
步骤D: 同步到DMS失败时触发补偿
@EnableBinding(Source.class)
public class DmsSyncService {
@Autowired
private MessageChannel output;
public void syncToDms(Order order) {
try {
// 尝试同步到DMS
// 假设失败,抛出异常
throw new RuntimeException("DMS Sync Failed");
} catch (Exception e) {
// 发布DMS同步失败事件,触发步骤C的补偿操作
output.send(MessageBuilder.withPayload(new DmsSyncFailedEvent(order)).build());
}
}
}
监听DmsSyncFailedEvent,发布PaymentProcessingCompensatedEvent
@StreamListener(target = Sink.INPUT, condition = "headers['type']=='DmsSyncFailedEvent'")
public void onDmsSyncFailed(DmsSyncFailedEvent event) {
// 执行步骤C的补偿操作,如退款,并发布PaymentProcessingCompensatedEvent
output.send(MessageBuilder.withPayload(new PaymentProcessingCompensatedEvent(event.getOrder())).build());
}
监听PaymentProcessingCompensatedEvent,发布InventoryCheckCompensatedEvent
@StreamListener(target = Sink.INPUT, condition = "headers['type']=='PaymentProcessingCompensatedEvent'")
public void onPaymentProcessingCompensated(PaymentProcessingCompensatedEvent event) {
// 执行步骤B的补偿操作,如恢复库存,并发布InventoryCheckCompensatedEvent
output.send(MessageBuilder.withPayload(new InventoryCheckCompensatedEvent(event.getOrder())).build());
}
监听InventoryCheckCompensatedEvent,发布OrderCreationCompensatedEvent
@StreamListener(target = Sink.INPUT, condition = "headers['type']=='InventoryCheckCompensatedEvent'")
public void onInventoryCheckCompensated(InventoryCheckCompensatedEvent event) {
// 执行步骤A的补偿操作,如删除订单,并发布OrderCreationCompensatedEvent
output.send(MessageBuilder.withPayload(new OrderCreationCompensatedEvent(event.getOrder())).build());
}
注意
按照上述实现,一旦DMS同步失败,系统将通过一系列事件的发布和订阅,逐步激活之前步骤的补偿操作,以确保整个业务流程能够安全、有序地回滚到初始状态。这种基于事件流的Saga模式实现方式不仅保证了事务的最终一致性,而且通过解耦各个微服务间的直接依赖,提高了系统的健壮性和可维护性。
补偿操作的实现
在实际的补偿操作实现中,你需要确保每个操作都是幂等的,以便在失败时可以安全地执行补偿操作。每个服务在处理补偿逻辑时,可能需要访问之前操作的状态信息,这通常意味着需要额外的存储来保存操作的上下文或状态快照。
为了支持基于事件流的Saga模式中的回滚操作,我们需要设计一张表来记录每个事务步骤及其补偿信息。这样,在任何步骤失败时,我们可以依据这张表中的信息来触发相应的补偿操作。以下是一种简单的表设计示例,旨在捕捉每个事务步骤的关键信息:
事件和补偿操作记录表设计
假设这张表的名称为 saga_event_log
:
字段名 | 类型 | 描述 |
---|---|---|
id |
BIGINT | 主键,自增 |
transaction_id |
VARCHAR | 事务的唯一标识符 |
event_type |
VARCHAR | 事件类型(如OrderCreated , DmsSyncFailed 等) |
payload |
TEXT | 事件的具体内容,通常为JSON格式 |
status |
VARCHAR | 事件状态(如pending , completed , compensated 等) |
create_time |
TIMESTAMP | 事件创建时间 |
update_time |
TIMESTAMP | 事件更新时间 |
表设计说明
- id: 每条记录的唯一标识。
- transaction_id: 关联同一事务中的所有事件,以便于跟踪和回滚整个事务链。
- event_type: 标识事件的类型,这对于触发相应的业务逻辑和补偿操作非常关键。
- payload: 存储与事件相关的数据,如订单编号、用户ID等,格式为JSON以保持灵活性。
- status: 记录事件的当前状态,指示是否需要执行补偿操作。
- create_time, update_time: 记录事件的创建和最后更新时间,有助于事务的时间线追踪。
使用场景
当发生一个业务操作时(如创建订单),系统将在该表中插入一条记录,记录下该操作的事件类型和相关数据。如果后续步骤成功,更新该事件的状态为completed
;如果某一步骤失败,触发回滚,那么通过transaction_id
查找到之前的步骤,逐个执行补偿操作,并将相应事件的状态更新为compensated
。
实现补偿逻辑
补偿逻辑的实现将依赖于读取saga_event_log
表中的记录,根据transaction_id
和event_type
确定哪些步骤需要被补偿,然后按逆序执行每个步骤的补偿操作,最终达到回滚整个事务链的目的。
这种表设计为基于事件的Saga模式提供了数据支撑,使得事务的每个步骤都可以被可靠地跟踪和管理,确保了分布式事务处理的一致性和可回溯性。
Saga模式提供了一种在微服务架构中管理分布式事务的有效策略。无论是采用编程式补偿还是基于事件的Saga模式,它们都能帮助开发者在分布式系统中维护数据的一致性和稳定性。通过合理地应用Saga模式,开发者可以在保证系统解耦的同时,有效地管理跨服务的业务
与数据库事务的对比
不同于数据库事务的即时性和原子性,Saga模式提供了一种最终一致性的保证。这意味着系统允许在短时间内存在不一致状态,但保证最终通过补偿操作达到一致状态。虽然这增加了实现的复杂度和存储成本(例如,需要记录补偿操作的事件和状态),Saga模式为分布式系统提供了高度的灵活性和可靠性。
总结
Saga模式是解决微服务架构中分布式事务问题的有效方法。通过拆分长事务为多个本地事务,并定义相应的补偿操作,Saga模式确保了分布式系统中事务的一致性和可靠性。无论是采用编程式补偿还是基于事件的实现,Saga模式都为分布式事务管理提供了一种灵活而强大的解决方案。对于正在
标签:事务,Saga,步骤,优雅,补偿,事件,操作,分布式 From: https://www.cnblogs.com/irobotzz/p/18059856