首页 > 其他分享 >理解Saga模式:分布式事务的优雅解决方案

理解Saga模式:分布式事务的优雅解决方案

时间:2024-03-07 22:00:43浏览次数:28  
标签:事务 Saga 步骤 优雅 补偿 事件 操作 分布式

理解Saga模式:分布式事务的优雅解决方案

在微服务架构中,系统通常被拆分成多个独立的服务,每个服务管理着自己的数据和逻辑。这种拆分带来了灵活性和可扩展性,但同时也引入了分布式事务管理的挑战。传统的事务管理方法,如数据库的ACID(原子性、一致性、隔离性、持久性)事务,不再适用于跨多个微服务的操作。这时,Saga模式应运而生,提供了一种解决分布式事务问题的有效策略。

什么是Saga模式?

Saga模式是一种在微服务架构中管理分布式事务的设计模式。它将一个长期运行的事务拆分成一系列较小的本地事务,每个本地事务由一个微服务管理。Saga保证,即使在分布式系统中,事务要么全部成功,要么在发生故障时通过执行补偿操作(即回滚)来保持数据的一致性。

Saga模式的工作原理

Saga模式通过定义每个事务步骤的补偿操作来管理事务的一致性。这些补偿操作是在某个步骤失败时触发的,用来撤销之前已经完成的步骤,从而保持整个系统的数据一致性。Saga可以是长期运行的,支持异步通信,并通过事件来协调各个微服务间的操作。

Saga的两种实现方式

  • 编程式补偿:直接在代码中管理每个步骤的执行和补偿逻辑。这种方式直观且易于实现,适用于较简单的事务场景。
  • 基于事件的Saga:通过发布和订阅事件来协调事务的执行和补偿。这种方式提高了系统的解耦度,更适合复杂的分布式环境。

理解Saga模式在微服务架构中的应用

Saga模式是处理分布式事务的一种有效策略,特别是在微服务架构中,它帮助保持系统的数据一致性和稳定性。Saga模式可以通过编程式补偿或基于事件的方式来实现,下面我们将通过Spring Cloud的应用案例来详细探讨这两种实现方式。

编程式补偿示例

编程式补偿是直接在代码中管理每个步骤的执行和补偿逻辑。这种方式直观且易于实现,适用于较简单的事务场景。以下是一个简单的Spring Boot服务示例,用于展示如何实现编程式补偿:

public class OrderSaga {
    private Deque<Runnable> compensations = new ArrayDeque<>();

    public void executeSaga() {
        try {
            // A:创建订单
            createOrder();
            compensations.push(() -> deleteOrder());
            
            // B:验证库存
            checkInventory();
            compensations.push(() -> restoreInventory());
            
            // C:扣除账户余额
            deductBalance();
            compensations.push(() -> refundBalance());
            
            // D:同步到DMS
            syncToDMS(); // 假设失败
        } catch (Exception e) {
            // 回滚
            rollback();
        }
    }

    private void rollback() {
        while (!compensations.isEmpty()) {
            compensations.pop().run();
        }
    }
}

在这个示例中,executeSaga 方法尝试创建一个订单。如果在创建过程中发生异常,则调用 rollback 方法来执行补偿逻辑,以确保系统的一致性。

基于事件的Saga模式

事件驱动的Saga模式,基于事件的Saga模式依赖于事件的发布和订阅来驱动事务的执行和补偿。每个微服务在完成其事务步骤后发布事件,其他服务订阅这些事件并根据事件类型执行下一步操作或补偿操作。

如何通过事件实现层层回滚?当某个事务步骤失败,触发一个失败事件,监听该事件的服务将执行对应的补偿操作,并可能进一步发布补偿事件以触发前面步骤的补偿操作,从而实现整个事务链的回滚。

基于事件的Saga模式依赖于事件的发布和订阅来驱动事务的执行和补偿。这种方式提高了系统的解耦度,更适合复杂的分布式环境。下面是一个使用Spring Cloud Stream实现的基于事件的Saga模式的简单例子:

首先,定义一个简单的订单服务和库存服务,使用Spring Cloud Stream进行通信:
对于基于事件流的Saga模式实现,考虑到之前的四个步骤(A, B, C, D),并且步骤D失败需要触发回滚,我们可以利用Spring Cloud Stream来实现跨服务的事件发布和订阅。这样的实现允许每个微服务独立处理它们的业务逻辑并在必要时触发补偿操作。

实现基于事件流的Saga

在基于事件流的Saga中,每个步骤的完成或失败会发布相应的事件,而其他服务会订阅这些事件并据此执行下一步或补偿操作。

假设步骤A, B, C, D分别对应于订单创建、库存检查、账户扣费、同步到DMS。每个操作成功完成后会发布一个成功事件,如果操作失败,则发布一个失败事件。

Spring Cloud Stream配置

首先,确保你的Spring Boot应用集成了Spring Cloud Stream,并配置好了消息中间件(如RabbitMQ或Kafka)。

spring:
  cloud:
    stream:
      bindings:
        orderCreatedOutput:
          destination: order-created-topic
        inventoryCheckedOutput:
          destination: inventory-checked-topic
        paymentProcessedOutput:
          destination: payment-processed-topic
        dmsSyncedOutput:
          destination: dms-synced-topic
        dmsSyncFailedOutput:
          destination: dms-sync-failed-topic

基本假设

  • 使用Spring Cloud Stream作为事件驱动的通信方式。
  • 每个步骤都有对应的成功和失败事件,以及一个补偿操作。

步骤和事件定义

  1. 创建订单 (步骤A)

    • 成功事件: OrderCreatedEvent
    • 补偿事件: OrderCreationCompensatedEvent (用于回滚订单创建)
  2. 验证库存 (步骤B)

    • 成功事件: InventoryCheckedEvent
    • 补偿事件: InventoryCheckCompensatedEvent (用于恢复库存)
  3. 扣除账户余额 (步骤C)

    • 成功事件: PaymentProcessedEvent
    • 补偿事件: PaymentProcessingCompensatedEvent (用于退款)
  4. 同步到DMS (步骤D)

    • 成功事件: DmsSyncedEvent
    • 失败事件: DmsSyncFailedEvent (触发回滚)

步骤A: 创建订单

Service

@EnableBinding(Source.class)
public class OrderService {

    @Autowired
    private MessageChannel output;

    public void createOrder(Order order) {
        // 创建订单逻辑...
        output.send(MessageBuilder.withPayload(new OrderCreatedEvent(order)).build());
    }

    // 监听DMS同步失败事件,触发订单创建的补偿操作
    @StreamListener(target = Sink.INPUT, condition = "headers['type']=='DmsSyncFailedEvent'")
    public void compensateOrderCreation(OrderFailedEvent event) {
        // 执行订单创建的补偿操作,如删除已创建的订单
        output.send(MessageBuilder.withPayload(new OrderCreationCompensatedEvent(event.getOrderId())).build());
    }
}

步骤B: 验证库存

Listener & Compensate Logic

假设库存服务监听到OrderCreatedEvent,进行库存验证:

@StreamListener(target = Sink.INPUT, condition = "headers['type']=='OrderCreatedEvent'")
public void checkInventory(OrderCreatedEvent event) {
    try {
        // 验证库存逻辑...
        output.send(MessageBuilder.withPayload(new InventoryCheckedEvent(event.getOrder())).build());
    } catch (Exception e) {
        // 库存验证失败逻辑,触发库存验证补偿操作
    }
}

步骤C: 扣除账户余额

Listener & Compensate Logic

账户服务监听到InventoryCheckedEvent,进行账户扣费:

@StreamListener(target = Sink.INPUT, condition = "headers['type']=='InventoryCheckedEvent'")
public void processPayment(InventoryCheckedEvent event) {
    try {
        // 扣除账户余额逻辑...
        output.send(MessageBuilder.withPayload(new PaymentProcessedEvent(event.getOrder())).build());
    } catch (Exception e) {
        // 账户扣费失败逻辑,触发账户扣费补偿操作
    }
}

步骤D: 同步到DMS

Listener & Failure Logic

DMS同步服务监听到PaymentProcessedEvent,尝试同步到DMS:

@StreamListener(target = Sink.INPUT, condition = "headers['type']=='PaymentProcessedEvent'")
public void syncToDms(PaymentProcessedEvent event) {
    try {
        // 同步到DMS逻辑...
    } catch (Exception e) {
        output.send(MessageBuilder.withPayload(new DmsSyncFailedEvent(event.getOrder())).build());
    }
}

补偿逻辑

在基于事件流的Saga模式中,补偿操作的执行顺序是通过事件的发布和订阅机制自然形成的。当某个步骤失败时,它会发布一个失败事件,触发需要回滚的前一步骤发布其补偿事件。这种方式确保了即使在复杂的分布式事务中,每个步骤的补偿操作也能按照正确的顺序执行,从而实现整个业务流程的回滚。

步骤D失败时的补偿流程

当步骤D(同步到DMS)失败时,它应该发布一个DmsSyncFailedEvent。此事件的接收者应触发步骤C的补偿操作,即发布PaymentProcessingCompensatedEvent。相应地,监听到PaymentProcessingCompensatedEvent的服务将触发步骤B的补偿操作,发布InventoryCheckCompensatedEvent,以此类推,直至整个事务链被回滚。

步骤D: 同步到DMS失败时触发补偿

@EnableBinding(Source.class)
public class DmsSyncService {

    @Autowired
    private MessageChannel output;

    public void syncToDms(Order order) {
        try {
            // 尝试同步到DMS
            // 假设失败,抛出异常
            throw new RuntimeException("DMS Sync Failed");
        } catch (Exception e) {
            // 发布DMS同步失败事件,触发步骤C的补偿操作
            output.send(MessageBuilder.withPayload(new DmsSyncFailedEvent(order)).build());
        }
    }
}

监听DmsSyncFailedEvent,发布PaymentProcessingCompensatedEvent

@StreamListener(target = Sink.INPUT, condition = "headers['type']=='DmsSyncFailedEvent'")
public void onDmsSyncFailed(DmsSyncFailedEvent event) {
    // 执行步骤C的补偿操作,如退款,并发布PaymentProcessingCompensatedEvent
    output.send(MessageBuilder.withPayload(new PaymentProcessingCompensatedEvent(event.getOrder())).build());
}

监听PaymentProcessingCompensatedEvent,发布InventoryCheckCompensatedEvent

@StreamListener(target = Sink.INPUT, condition = "headers['type']=='PaymentProcessingCompensatedEvent'")
public void onPaymentProcessingCompensated(PaymentProcessingCompensatedEvent event) {
    // 执行步骤B的补偿操作,如恢复库存,并发布InventoryCheckCompensatedEvent
    output.send(MessageBuilder.withPayload(new InventoryCheckCompensatedEvent(event.getOrder())).build());
}

监听InventoryCheckCompensatedEvent,发布OrderCreationCompensatedEvent

@StreamListener(target = Sink.INPUT, condition = "headers['type']=='InventoryCheckCompensatedEvent'")
public void onInventoryCheckCompensated(InventoryCheckCompensatedEvent event) {
    // 执行步骤A的补偿操作,如删除订单,并发布OrderCreationCompensatedEvent
    output.send(MessageBuilder.withPayload(new OrderCreationCompensatedEvent(event.getOrder())).build());
}

注意

按照上述实现,一旦DMS同步失败,系统将通过一系列事件的发布和订阅,逐步激活之前步骤的补偿操作,以确保整个业务流程能够安全、有序地回滚到初始状态。这种基于事件流的Saga模式实现方式不仅保证了事务的最终一致性,而且通过解耦各个微服务间的直接依赖,提高了系统的健壮性和可维护性。

补偿操作的实现

在实际的补偿操作实现中,你需要确保每个操作都是幂等的,以便在失败时可以安全地执行补偿操作。每个服务在处理补偿逻辑时,可能需要访问之前操作的状态信息,这通常意味着需要额外的存储来保存操作的上下文或状态快照。
为了支持基于事件流的Saga模式中的回滚操作,我们需要设计一张表来记录每个事务步骤及其补偿信息。这样,在任何步骤失败时,我们可以依据这张表中的信息来触发相应的补偿操作。以下是一种简单的表设计示例,旨在捕捉每个事务步骤的关键信息:

事件和补偿操作记录表设计

假设这张表的名称为 saga_event_log:

字段名 类型 描述
id BIGINT 主键,自增
transaction_id VARCHAR 事务的唯一标识符
event_type VARCHAR 事件类型(如OrderCreated, DmsSyncFailed等)
payload TEXT 事件的具体内容,通常为JSON格式
status VARCHAR 事件状态(如pending, completed, compensated等)
create_time TIMESTAMP 事件创建时间
update_time TIMESTAMP 事件更新时间

表设计说明

  • id: 每条记录的唯一标识。
  • transaction_id: 关联同一事务中的所有事件,以便于跟踪和回滚整个事务链。
  • event_type: 标识事件的类型,这对于触发相应的业务逻辑和补偿操作非常关键。
  • payload: 存储与事件相关的数据,如订单编号、用户ID等,格式为JSON以保持灵活性。
  • status: 记录事件的当前状态,指示是否需要执行补偿操作。
  • create_time, update_time: 记录事件的创建和最后更新时间,有助于事务的时间线追踪。

使用场景

当发生一个业务操作时(如创建订单),系统将在该表中插入一条记录,记录下该操作的事件类型和相关数据。如果后续步骤成功,更新该事件的状态为completed;如果某一步骤失败,触发回滚,那么通过transaction_id查找到之前的步骤,逐个执行补偿操作,并将相应事件的状态更新为compensated

实现补偿逻辑

补偿逻辑的实现将依赖于读取saga_event_log表中的记录,根据transaction_idevent_type确定哪些步骤需要被补偿,然后按逆序执行每个步骤的补偿操作,最终达到回滚整个事务链的目的。

这种表设计为基于事件的Saga模式提供了数据支撑,使得事务的每个步骤都可以被可靠地跟踪和管理,确保了分布式事务处理的一致性和可回溯性。

Saga模式提供了一种在微服务架构中管理分布式事务的有效策略。无论是采用编程式补偿还是基于事件的Saga模式,它们都能帮助开发者在分布式系统中维护数据的一致性和稳定性。通过合理地应用Saga模式,开发者可以在保证系统解耦的同时,有效地管理跨服务的业务

与数据库事务的对比

不同于数据库事务的即时性和原子性,Saga模式提供了一种最终一致性的保证。这意味着系统允许在短时间内存在不一致状态,但保证最终通过补偿操作达到一致状态。虽然这增加了实现的复杂度和存储成本(例如,需要记录补偿操作的事件和状态),Saga模式为分布式系统提供了高度的灵活性和可靠性。

总结

Saga模式是解决微服务架构中分布式事务问题的有效方法。通过拆分长事务为多个本地事务,并定义相应的补偿操作,Saga模式确保了分布式系统中事务的一致性和可靠性。无论是采用编程式补偿还是基于事件的实现,Saga模式都为分布式事务管理提供了一种灵活而强大的解决方案。对于正在

标签:事务,Saga,步骤,优雅,补偿,事件,操作,分布式
From: https://www.cnblogs.com/irobotzz/p/18059856

相关文章

  • Jmeter性能测试:高并发分布式性能测试
    一、为什么要进行分布式性能测试当进行高并发性能测试的时候,受限于Jmeter工具本身和电脑硬件的原因,无法满足我们对大并发性能测试的要求。基于这种场景下,我们就需要采用分布式的方式来实现我们高并发的性能测试要求。二、分布式性能测试原理要进行分布式性能测试,我们首先要一......
  • 开源.NET8.0小项目伪微服务框架(分布式、EFCore、Redis、RabbitMQ、Mysql等)
    1、前言为什么说是伪微服务框架,常见微服务框架可能还包括服务容错、服务间的通信、服务追踪和监控、服务注册和发现等等,而我这里为了在使用中的更简单,将很多东西进行了简化或者省略了。年前到现在在开发一个新的小项目,刚好项目最初的很多功能是比较通用的,所以就想着将这些功能抽......
  • 分布式锁实现方案
    一基于Redis实现分布式锁如何基于Redis实现一个最简易的分布式锁?不论是本地锁还是分布式锁,核心都在于“互斥”。在Redis中,SETNX命令是可以帮助我们实现互斥。SETNX即SETifNoteXists(对应Java中的setIfAbsent方法),如果key不存在的话,才会设置key的值。如......
  • 分布式事务解决方案详解
    1:分布式事务简介大多数场景下,我们的应用都只需要操作单一的数据库,这种情况下的事务称之为本地事务(LocalTransaction)。本地事务的ACID特性是数据库直接提供支持。本地事务应用架构如下所示:但是在微服务架构中,完成某一个业务功能可能需要横跨多个服务,操作多个数据库。这就涉......
  • 开源:Taurus.Idempotent 分布式幂等性锁框架,支持 .Net 和 .Net Core 双系列版本
    分布式幂等性锁介绍:分布式幂等性框架的作用是确保在分布式系统中的操作具有幂等性,即无论操作被重复执行多少次,最终的结果都是一致的。幂等性是指对同一操作的多次执行所产生的效果与仅执行一次的效果相同。以下是分布式幂等性框架的主要作用:避免重复操作:在分布式系统中,由于......
  • Python:如何风骚而又不失优雅的使用Switch...Case
    本渣PHP屌丝一枚,最近在用Python做一个东西碰到了这个问题,在这里给没踩过坑的朋友分享下以Python2.7为例从前有座山碰到参数特别多的情况的时候总是看着一堆if...elseif抓狂,斩不断,理还乱,幸好有Switch...Case可以很方便的处理多种情况的参数但是在Python中没有Switch...Case......
  • TransmittableThreadLocal 如何解决在分布式环境下线程池中使用ThreadLocal的问题
    在分布式环境下,线程池中使用ThreadLocal会出现线程安全问题,因为线程池中的线程是可以被多个请求共享的,当多个请求同时访问同一个ThreadLocal变量时,会出现数据互相干扰的问题。为了解决这个问题,Java提供了TransmittableThreadLocal类。TransmittableThreadLocal是ThreadLocal的一......
  • JMeter分布式安装和HTTP
    JMeter分布式安装分布式JMeter环境主要由一个JMeter控制器(Control)和多个JMeter代理(Agent)组成。被测试的计算机JMeter控制器为控制计算机,JMeter代理为工作计算机。现在每台计算机上安装JMeter。在每太运行JMeter代理的计算机中打开%JMETER_HOME%\bin\jmeter.properties文件。修改se......
  • 微服务(Java分布式)详解
    1.概念微服务是一种软件架构模式,它将应用程序分解为一组小型、自治的服务单元。个人理解上:微服务就是将服务拆分,让一种服务在一台或者多台电脑上运行,如下图微服务技术栈所示:注册中心可以配置在一台或者多台电脑上,将功能拆分,n台电脑共同实现一个软件单体架构:将业务的所有功能......
  • selenium-grid分布式测试环境
    1.什么时候使用grid针对不同的操作系统、浏览器类型、浏览器版本并发执行用例,缩短用例执行时间(单台电脑性能不足)grid是一个庞大的、复杂的分布式组件,本身有额外的开销2.启动grid下载地址:https://www.selenium.dev/downloads/下载jar包配置jdk执行命令,启动jar包启动命......