首页 > 其他分享 >【系统设计】高效的分布式系统:使用 Spring Boot 和 Kafka 实现 Saga 模式

【系统设计】高效的分布式系统:使用 Spring Boot 和 Kafka 实现 Saga 模式

时间:2024-10-30 21:18:20浏览次数:7  
标签:event mapper String Spring class private Kafka 分布式系统 public

在现代分布式系统中,管理跨多个服务的长事务至关重要。传统的分布式事务解决方案往往面临性能瓶颈和复杂性问题,而 Saga 模式 作为一种灵活高效的解决方案,逐渐受到开发者的青睐。本文将探讨如何利用 Spring BootKafka 实现 Saga 模式,并详细介绍事务补偿机制,帮助你构建稳定可靠的分布式系统。

什么是 Saga 模式?

原理介绍

在微服务架构中,一个业务流程通常涉及多个独立的服务。这些服务必须协同工作以完成完整的业务操作。例如,用户下单可能需要订单服务、支付服务和库存服务的合作。然而,跨服务操作通常涉及复杂的事务管理,传统的分布式事务(如两阶段提交)不仅效率低下,还难以扩展和维护。

Saga 模式 提供了一种替代方案,通过将一个长事务分解为一系列的本地事务,并通过事件或命令进行协调,从而实现最终一致性。这种方法不仅提高了系统的可扩展性,还简化了事务管理。

解决的问题及其重要性

Saga 模式解决了以下问题:

  1. 分布式事务管理:通过拆分事务,避免了传统分布式事务的性能和复杂性问题。
  2. 系统可扩展性:各服务独立运行,易于扩展和维护。
  3. 错误恢复:通过补偿机制,确保在步骤失败时,系统能恢复到一致状态。

在现代微服务架构中,确保跨服务操作的可靠性和一致性至关重要。Saga 模式提供了一个高效且灵活的解决方案,使系统在面对复杂业务流程和潜在错误时能够稳定运行。

Saga 模式的组成部分与实现方法

Saga 模式主要有两种实现方式:Choreography(编排)Orchestration(指挥)。下面将详细介绍这两种模式,并展示如何使用 Spring Boot 和 Kafka 实现它们,包括事务补偿机制。

架构图

编排模式
+----------------+        +----------------+        +----------------+
|  OrderService  |        | PaymentService |        | InventoryService|
+----------------+        +----------------+        +----------------+
        |                          |                         |
        |  CreateOrderCommand      |                         |
        |------------------------->|                         |
        |                          |                         |
        |      OrderCreatedEvent   |                         |
        |<-------------------------|                         |
        |                          |                         |
        |                          |      PaymentCommand     |
        |                          |------------------------>|
        |                          |                         |
        |                          |    PaymentProcessedEvent |
        |                          |<------------------------|
        |                          |                         |
        |      InventoryCommand    |                         |
        |------------------------->|                         |
        |                          |                         |
        |                          |    InventoryUpdatedEvent|
        |<-------------------------|                         |
        |                          |                         |
指挥模式
+----------------+          +----------------+          +----------------+
| SagaOrchestrator|          | OrderService  |          | PaymentService |
+----------------+          +----------------+          +----------------+
        |                        |                           |
        |   CreateOrderCommand   |                           |
        |----------------------->|                           |
        |                        |                           |
        |   OrderCreatedEvent    |                           |
        |<-----------------------|                           |
        |                        |                           |
        |   PaymentCommand       |                           |
        |--------------------------------------------------->|
        |                        |                           |
        |   PaymentApprovedEvent |                           |
        |<---------------------------------------------------|
        |                        |                           |
        |                        |                           |
        |   InventoryCommand     |                           |
        |----------------------->|                           |
        |                        |                           |

1. 编排模式

编排模式 下,各服务通过事件进行通信和协调,没有中央控制器。每个服务独立地监听和发布事件,以完成整个业务流程。

1.1 组成部分
  • 事件定义:服务之间传递的消息,如 OrderCreatedEventPaymentProcessedEvent 等。
  • Kafka 生产者与消费者:用于事件的发布和订阅。
  • 各服务逻辑:根据收到的事件执行相应的操作,并发布下一个事件。
1.2 实现步骤与代码
1.2.1 定义事件
// OrderCreatedEvent.java
public class OrderCreatedEvent {
    private String orderId;
    private String product;
    private int quantity;
    // getters and setters
}

// PaymentProcessedEvent.java
public class PaymentProcessedEvent {
    private String orderId;
    private boolean success;
    // getters and setters
}

// PaymentFailedEvent.java
public class PaymentFailedEvent {
    private String orderId;
    private String reason;
    // getters and setters
}

// OrderCancelledEvent.java
public class OrderCancelledEvent {
    private String orderId;
    // getters and setters
}
1.2.2 配置 Kafka
# application.yml
spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: saga-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
1.2.3 实现 OrderService
@Service
public class OrderService {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    private ObjectMapper mapper = new ObjectMapper();

    @KafkaListener(topics = "create-order-command", groupId = "order-service-group")
    public void handleCreateOrder(String message) throws JsonProcessingException {
        CreateOrderCommand command = mapper.readValue(message, CreateOrderCommand.class);
        
        // 创建订单逻辑
        // TODO: 保存订单到数据库

        // 发布 OrderCreatedEvent
        OrderCreatedEvent event = new OrderCreatedEvent();
        event.setOrderId(command.getOrderId());
        event.setProduct(command.getProduct());
        event.setQuantity(command.getQuantity());
        String eventMsg = mapper.writeValueAsString(event);
        kafkaTemplate.send("order-created", eventMsg);
    }

    @KafkaListener(topics = "payment-failed", groupId = "saga-group")
    public void handlePaymentFailed(String message) throws JsonProcessingException {
        PaymentFailedEvent failedEvent = mapper.readValue(message, PaymentFailedEvent.class);
        
        // 取消订单逻辑
        cancelOrder(failedEvent.getOrderId());
        
        // 发布 OrderCancelledEvent
        OrderCancelledEvent cancelledEvent = new OrderCancelledEvent();
        cancelledEvent.setOrderId(failedEvent.getOrderId());
        String cancelledMsg = mapper.writeValueAsString(cancelledEvent);
        kafkaTemplate.send("order-cancelled", cancelledMsg);
    }

    private void cancelOrder(String orderId) {
        // TODO: 取消订单逻辑
    }
}
1.2.4 实现 PaymentService
@Service
public class PaymentService {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    private ObjectMapper mapper = new ObjectMapper();

    @KafkaListener(topics = "order-created", groupId = "saga-group")
    public void handleOrderCreated(String message) throws JsonProcessingException {
        OrderCreatedEvent event = mapper.readValue(message, OrderCreatedEvent.class);
        
        // 处理支付逻辑
        boolean success = processPayment(event.getOrderId(), event.getQuantity());

        if (success) {
            // 发布 PaymentProcessedEvent
            PaymentProcessedEvent paymentEvent = new PaymentProcessedEvent(event.getOrderId(), true);
            String paymentMsg = mapper.writeValueAsString(paymentEvent);
            kafkaTemplate.send("payment-processed", paymentMsg);
        } else {
            // 发布 PaymentFailedEvent
            PaymentFailedEvent failedEvent = new PaymentFailedEvent();
            failedEvent.setOrderId(event.getOrderId());
            failedEvent.setReason("Payment processing failed.");
            String failedMsg = mapper.writeValueAsString(failedEvent);
            kafkaTemplate.send("payment-failed", failedMsg);
        }
    }

    private boolean processPayment(String orderId, int quantity) {
        // TODO: 实现支付逻辑,模拟支付失败
        return false;
    }
}
1.2.5 实现 InventoryService
@Service
public class InventoryService {
    @KafkaListener(topics = "order-cancelled", groupId = "saga-group")
    public void handleOrderCancelled(String message) throws JsonProcessingException {
        OrderCancelledEvent cancelledEvent = new ObjectMapper().readValue(message, OrderCancelledEvent.class);
        
        // 回滚库存逻辑
        rollbackInventory(cancelledEvent.getOrderId());
    }

    private void rollbackInventory(String orderId) {
        // TODO: 实现库存回滚逻辑
    }
}
1.3 事务补偿机制

编排模式 中,当某个服务的操作失败时,需要通过发布补偿事件来反向撤销之前的操作。上述代码中,PaymentService 在支付失败时发布 PaymentFailedEventOrderService 监听该事件并执行订单取消逻辑,随后发布 OrderCancelledEvent,最后 InventoryService 监听并回滚库存。

2. 指挥模式

指挥模式 下,存在一个中央的 Saga 管理器(Orchestrator),负责调度和协调各个服务的操作,并在需要时触发补偿机制。

2.1 组成部分
  • Saga Orchestrator:负责整个事务流程的控制和协调。
  • 命令和事件定义:如 CreateOrderCommandPaymentCommand 等。
  • Kafka 生产者与消费者:用于命令和事件的发布与订阅。
  • 各服务逻辑:根据接收到的命令执行操作,并发布相应的事件。
2.2 实现步骤与代码
2.2.1 定义命令和事件
// CreateOrderCommand.java
public class CreateOrderCommand {
    private String orderId;
    private String product;
    private int quantity;
    // getters and setters
}

// OrderCreatedEvent.java
public class OrderCreatedEvent {
    private String orderId;
    // getters and setters
}

// PaymentCommand.java
public class PaymentCommand {
    private String orderId;
    private double amount;
    // getters and setters
}

// PaymentApprovedEvent.java
public class PaymentApprovedEvent {
    private String orderId;
    // getters and setters
}

// PaymentRejectedEvent.java
public class PaymentRejectedEvent {
    private String orderId;
    private String reason;
    // getters and setters
}

// CancelOrderCommand.java
public class CancelOrderCommand {
    private String orderId;
    // getters and setters
}

// OrderCancelledEvent.java
public class OrderCancelledEvent {
    private String orderId;
    // getters and setters
}
2.2.2 实现 SagaOrchestrator
@Service
public class SagaOrchestrator {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    private ObjectMapper mapper = new ObjectMapper();

    @KafkaListener(topics = "order-created", groupId = "saga-orchestrator-group")
    public void handleOrderCreated(String message) throws JsonProcessingException {
        OrderCreatedEvent event = mapper.readValue(message, OrderCreatedEvent.class);
        
        try {
            // 发送 PaymentCommand
            PaymentCommand paymentCommand = new PaymentCommand();
            paymentCommand.setOrderId(event.getOrderId());
            paymentCommand.setAmount(calculateAmount(event.getOrderId()));
            String paymentCmd = mapper.writeValueAsString(paymentCommand);
            kafkaTemplate.send("payment-command", paymentCmd);
        } catch (Exception e) {
            // 发送补偿命令
            sendCancelOrderCommand(event.getOrderId());
        }
    }

    @KafkaListener(topics = "payment-approved", groupId = "saga-orchestrator-group")
    public void handlePaymentApproved(String message) throws JsonProcessingException {
        PaymentApprovedEvent event = mapper.readValue(message, PaymentApprovedEvent.class);
        
        // 继续后续操作,如库存更新
        // TODO: 发送其他命令或处理逻辑
    }

    @KafkaListener(topics = "payment-rejected", groupId = "saga-orchestrator-group")
    public void handlePaymentRejected(String message) throws JsonProcessingException {
        PaymentRejectedEvent event = mapper.readValue(message, PaymentRejectedEvent.class);
        
        // 发送补偿命令
        sendCancelOrderCommand(event.getOrderId());
    }

    public void startSaga(Order order) throws JsonProcessingException {
        // 发送 CreateOrderCommand
        CreateOrderCommand createCommand = new CreateOrderCommand();
        createCommand.setOrderId(order.getId());
        createCommand.setProduct(order.getProduct());
        createCommand.setQuantity(order.getQuantity());
        String createCmd = mapper.writeValueAsString(createCommand);
        kafkaTemplate.send("create-order-command", createCmd);
    }

    private void sendCancelOrderCommand(String orderId) throws JsonProcessingException {
        CancelOrderCommand cancelCommand = new CancelOrderCommand();
        cancelCommand.setOrderId(orderId);
        String cancelMsg = mapper.writeValueAsString(cancelCommand);
        kafkaTemplate.send("cancel-order-command", cancelMsg);
    }

    private double calculateAmount(String orderId) {
        // TODO: 实现金额计算逻辑
        return 100.0;
    }
}
2.2.3 修改 OrderService
@Service
public class OrderService {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    private ObjectMapper mapper = new ObjectMapper();

    @KafkaListener(topics = "create-order-command", groupId = "order-service-group")
    public void handleCreateOrder(String message) throws JsonProcessingException {
        CreateOrderCommand command = mapper.readValue(message, CreateOrderCommand.class);
        
        // 创建订单逻辑
        // TODO: 保存订单到数据库

        // 发布 OrderCreatedEvent
        OrderCreatedEvent event = new OrderCreatedEvent();
        event.setOrderId(command.getOrderId());
        String eventMsg = mapper.writeValueAsString(event);
        kafkaTemplate.send("order-created", eventMsg);
    }

    @KafkaListener(topics = "cancel-order-command", groupId = "order-service-group")
    public void handleCancelOrder(String message) throws JsonProcessingException {
        CancelOrderCommand command = mapper.readValue(message, CancelOrderCommand.class);
        
        // 取消订单逻辑
        cancelOrder(command.getOrderId());
        
        // 发布 OrderCancelledEvent
        OrderCancelledEvent event = new OrderCancelledEvent();
        event.setOrderId(command.getOrderId());
        String cancelledMsg = mapper.writeValueAsString(event);
        kafkaTemplate.send("order-cancelled", cancelledMsg);
    }

    private void cancelOrder(String orderId) {
        // TODO: 实现取消订单逻辑
    }
}
2.2.4 修改 PaymentService
@Service
public class PaymentService {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    private ObjectMapper mapper = new ObjectMapper();

    @KafkaListener(topics = "payment-command", groupId = "payment-service-group")
    public void handlePaymentCommand(String message) throws JsonProcessingException {
        PaymentCommand command = mapper.readValue(message, PaymentCommand.class);
        
        // 处理支付逻辑
        boolean approved = processPayment(command.getOrderId(), command.getAmount());

        if (approved) {
            // 发布 PaymentApprovedEvent
            PaymentApprovedEvent event = new PaymentApprovedEvent();
            event.setOrderId(command.getOrderId());
            String approvedMsg = mapper.writeValueAsString(event);
            kafkaTemplate.send("payment-approved", approvedMsg);
        } else {
            // 发布 PaymentRejectedEvent
            PaymentRejectedEvent rejectedEvent = new PaymentRejectedEvent();
            rejectedEvent.setOrderId(command.getOrderId());
            rejectedEvent.setReason("Payment was rejected.");
            String rejectedMsg = mapper.writeValueAsString(rejectedEvent);
            kafkaTemplate.send("payment-rejected", rejectedMsg);
        }
    }

    private boolean processPayment(String orderId, double amount) {
        // TODO: 实现支付逻辑,模拟支付失败
        return false;
    }
}
2.2.5 实现 InventoryService
@Service
public class InventoryService {
    @KafkaListener(topics = "order-cancelled", groupId = "saga-orchestrator-group")
    public void handleOrderCancelled(String message) throws JsonProcessingException {
        OrderCancelledEvent cancelledEvent = new ObjectMapper().readValue(message, OrderCancelledEvent.class);
        
        // 回滚库存逻辑
        rollbackInventory(cancelledEvent.getOrderId());
    }

    private void rollbackInventory(String orderId) {
        // TODO: 实现库存回滚逻辑
    }
}
2.3 事务补偿机制

指挥模式 中,Saga Orchestrator 作为中央控制器,负责监控事务流程并在发生错误时触发补偿操作。例如,当 PaymentService 处理支付失败时,发布 PaymentRejectedEvent,Saga Orchestrator 监听到该事件后,发送 CancelOrderCommandOrderService 执行订单取消操作,确保系统一致性。

总结

Saga 模式为分布式系统中的长事务管理提供了一种高效且灵活的解决方案。通过 编排指挥 两种实现方式,开发者可以根据具体业务需求和系统架构选择最合适的方式。

  • 编排模式 适用于服务间关系较为松散、需要高扩展性的场景,各服务通过事件进行独立协调,但补偿逻辑较为分散。
  • 指挥模式 适用于需要集中控制事务流程、易于追踪和调试的场景,Saga Orchestrator 作为中央管理者,补偿逻辑统一管理,但可能成为系统的瓶颈。

无论选择哪种模式,事务补偿机制 都是确保系统可靠性和一致性的关键。通过合理设计补偿逻辑,可以有效应对分布式环境下的各种故障和异常,提升系统的健壮性。

借助 Spring BootKafka 强大的生态和工具支持,实现 Saga 模式变得更加便捷和高效。希望本文能够帮助你深入理解 Saga 模式,并在实际项目中灵活应用,打造稳定可靠的分布式系统。

参考资料

版权声明

本文为原创内容,转载请注明出处。

标签:event,mapper,String,Spring,class,private,Kafka,分布式系统,public
From: https://blog.csdn.net/yhkal/article/details/143373469

相关文章

  • Java项目:232基于Springboot+vue图书个性化推荐系统的设计与实现(含论文+答辩PPT)
    作者主页:夜未央5788 简介:Java领域优质创作者、Java项目、学习资料、技术互助文末获取源码项目介绍基于Springboot+vue图书个性化推荐系统的设计与实现本系统为分为前后台,包含管理员、学生两种角色,前台为学生登录,后台为管理员登录。学生:首页、图书信息、好书推荐、图......
  • Spring Cache
    1.介绍SpringCache是Spring提供的一个缓存框架,基于AOP原理,实现了基于注解的缓存功能,只需要简单地加一个注解就能实现缓存功能,对业务代码的侵入性很小。1.常用注解@EnableCaching:开启缓存注解功能@Cacheable:查询数据时缓存,将方法的返回值进行缓存。@CacheEvict:用于删除缓存,将......
  • 基于SpringBoot + Vue的在线项目管理与任务分配中的应用
    文章目录前言一、详细操作演示视频二、具体实现截图三、技术栈1.前端-Vue.js2.后端-SpringBoot3.数据库-MySQL4.系统架构-B/S四、系统测试1.系统测试概述2.系统功能测试3.系统测试结论五、项目代码参考六、数据库代码参考七、项目论文示例结语前言......
  • 基于SpringBoot + Vue的高性能集群共享平台(角色:用户、教师、管理员)
    文章目录前言一、详细操作演示视频二、具体实现截图三、技术栈1.前端-Vue.js2.后端-SpringBoot3.数据库-MySQL4.系统架构-B/S四、系统测试1.系统测试概述2.系统功能测试3.系统测试结论五、项目代码参考六、数据库代码参考七、项目论文示例结语前言......
  • 不使用docker-compose不使用zookeeper启动ApacheKafka3.8.0单机运行KRAFT模式
    dockerrun-d-v/kafka_data:/opt/kafka-logs-eKAFKA_ENABLE_KRAFT=yes-eKAFKA_PROCESS_ROLES=broker,controller-eKAFKANODEID=1-eKAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093-eKAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.1......
  • 基于SpringBoot + Vue的语言的医疗设备管理系统
    具体实现截图系统测试从多个角度进行测试找到系统中存在的问题是本系统首要的测试目的,通过功能测试寻找出系统缺陷并改正,确保系统没有缺陷。在测试过程中证明系统满足客户需求,发现问题和不足及时改正。测试完成之后得出测试结论。系统测试目的在酒店管理系统的开发......
  • 基于SpringBoot + Vue的健身房管理系统(角色:用户、健身教练、管理员)
    具体实现截图系统测试从多个角度进行测试找到系统中存在的问题是本系统首要的测试目的,通过功能测试寻找出系统缺陷并改正,确保系统没有缺陷。在测试过程中证明系统满足客户需求,发现问题和不足及时改正。测试完成之后得出测试结论。系统测试目的在酒店管理系......
  • 基于SpringBoot + Vue的口腔诊所系统
    具体实现截图系统测试从多个角度进行测试找到系统中存在的问题是本系统首要的测试目的,通过功能测试寻找出系统缺陷并改正,确保系统没有缺陷。在测试过程中证明系统满足客户需求,发现问题和不足及时改正。测试完成之后得出测试结论。系统测试目的在酒店管理系......
  • 基于springboot小区物联网平台源码
    小区物联网平台是一个专为小区硬件管理设计的物联网管理平台,其核心功能在于与各大厂商的门禁设备、道闸设备、监控设备、智能锁以及充电桩等进行高效对接。该平台支持HTTP、MQTT、ComNet等多种协议,以便轻松实现与各大小区云平台的互联互通。目前,该平台已成功对接了包括月轮......
  • 基于SpringBoot的项目工时统计成本核算管理源码带教程
    该系统是基于若依前后端分离的架构,前端使用vue2,后端使用SpringBoot2。技术框架:SpringBoot2.0.0+Mybatis1.3.2+Shiro+swagger-ui+jpa+lombok+Vue2+Mysql5.7运行环境:jdk8+IntelliJIDEA+maven+宝塔面板系统与功能介绍这是一款轻量级工时记录和管理工......