在现代分布式系统中,管理跨多个服务的长事务至关重要。传统的分布式事务解决方案往往面临性能瓶颈和复杂性问题,而 Saga 模式 作为一种灵活高效的解决方案,逐渐受到开发者的青睐。本文将探讨如何利用 Spring Boot 和 Kafka 实现 Saga 模式,并详细介绍事务补偿机制,帮助你构建稳定可靠的分布式系统。
什么是 Saga 模式?
原理介绍
在微服务架构中,一个业务流程通常涉及多个独立的服务。这些服务必须协同工作以完成完整的业务操作。例如,用户下单可能需要订单服务、支付服务和库存服务的合作。然而,跨服务操作通常涉及复杂的事务管理,传统的分布式事务(如两阶段提交)不仅效率低下,还难以扩展和维护。
Saga 模式 提供了一种替代方案,通过将一个长事务分解为一系列的本地事务,并通过事件或命令进行协调,从而实现最终一致性。这种方法不仅提高了系统的可扩展性,还简化了事务管理。
解决的问题及其重要性
Saga 模式解决了以下问题:
- 分布式事务管理:通过拆分事务,避免了传统分布式事务的性能和复杂性问题。
- 系统可扩展性:各服务独立运行,易于扩展和维护。
- 错误恢复:通过补偿机制,确保在步骤失败时,系统能恢复到一致状态。
在现代微服务架构中,确保跨服务操作的可靠性和一致性至关重要。Saga 模式提供了一个高效且灵活的解决方案,使系统在面对复杂业务流程和潜在错误时能够稳定运行。
Saga 模式的组成部分与实现方法
Saga 模式主要有两种实现方式:Choreography(编排) 和 Orchestration(指挥)。下面将详细介绍这两种模式,并展示如何使用 Spring Boot 和 Kafka 实现它们,包括事务补偿机制。
架构图
编排模式
+----------------+ +----------------+ +----------------+
| OrderService | | PaymentService | | InventoryService|
+----------------+ +----------------+ +----------------+
| | |
| CreateOrderCommand | |
|------------------------->| |
| | |
| OrderCreatedEvent | |
|<-------------------------| |
| | |
| | PaymentCommand |
| |------------------------>|
| | |
| | PaymentProcessedEvent |
| |<------------------------|
| | |
| InventoryCommand | |
|------------------------->| |
| | |
| | InventoryUpdatedEvent|
|<-------------------------| |
| | |
指挥模式
+----------------+ +----------------+ +----------------+
| SagaOrchestrator| | OrderService | | PaymentService |
+----------------+ +----------------+ +----------------+
| | |
| CreateOrderCommand | |
|----------------------->| |
| | |
| OrderCreatedEvent | |
|<-----------------------| |
| | |
| PaymentCommand | |
|--------------------------------------------------->|
| | |
| PaymentApprovedEvent | |
|<---------------------------------------------------|
| | |
| | |
| InventoryCommand | |
|----------------------->| |
| | |
1. 编排模式
在 编排模式 下,各服务通过事件进行通信和协调,没有中央控制器。每个服务独立地监听和发布事件,以完成整个业务流程。
1.1 组成部分
- 事件定义:服务之间传递的消息,如
OrderCreatedEvent
、PaymentProcessedEvent
等。 - Kafka 生产者与消费者:用于事件的发布和订阅。
- 各服务逻辑:根据收到的事件执行相应的操作,并发布下一个事件。
1.2 实现步骤与代码
1.2.1 定义事件
// OrderCreatedEvent.java
public class OrderCreatedEvent {
private String orderId;
private String product;
private int quantity;
// getters and setters
}
// PaymentProcessedEvent.java
public class PaymentProcessedEvent {
private String orderId;
private boolean success;
// getters and setters
}
// PaymentFailedEvent.java
public class PaymentFailedEvent {
private String orderId;
private String reason;
// getters and setters
}
// OrderCancelledEvent.java
public class OrderCancelledEvent {
private String orderId;
// getters and setters
}
1.2.2 配置 Kafka
# application.yml
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: saga-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
1.2.3 实现 OrderService
@Service
public class OrderService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private ObjectMapper mapper = new ObjectMapper();
@KafkaListener(topics = "create-order-command", groupId = "order-service-group")
public void handleCreateOrder(String message) throws JsonProcessingException {
CreateOrderCommand command = mapper.readValue(message, CreateOrderCommand.class);
// 创建订单逻辑
// TODO: 保存订单到数据库
// 发布 OrderCreatedEvent
OrderCreatedEvent event = new OrderCreatedEvent();
event.setOrderId(command.getOrderId());
event.setProduct(command.getProduct());
event.setQuantity(command.getQuantity());
String eventMsg = mapper.writeValueAsString(event);
kafkaTemplate.send("order-created", eventMsg);
}
@KafkaListener(topics = "payment-failed", groupId = "saga-group")
public void handlePaymentFailed(String message) throws JsonProcessingException {
PaymentFailedEvent failedEvent = mapper.readValue(message, PaymentFailedEvent.class);
// 取消订单逻辑
cancelOrder(failedEvent.getOrderId());
// 发布 OrderCancelledEvent
OrderCancelledEvent cancelledEvent = new OrderCancelledEvent();
cancelledEvent.setOrderId(failedEvent.getOrderId());
String cancelledMsg = mapper.writeValueAsString(cancelledEvent);
kafkaTemplate.send("order-cancelled", cancelledMsg);
}
private void cancelOrder(String orderId) {
// TODO: 取消订单逻辑
}
}
1.2.4 实现 PaymentService
@Service
public class PaymentService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private ObjectMapper mapper = new ObjectMapper();
@KafkaListener(topics = "order-created", groupId = "saga-group")
public void handleOrderCreated(String message) throws JsonProcessingException {
OrderCreatedEvent event = mapper.readValue(message, OrderCreatedEvent.class);
// 处理支付逻辑
boolean success = processPayment(event.getOrderId(), event.getQuantity());
if (success) {
// 发布 PaymentProcessedEvent
PaymentProcessedEvent paymentEvent = new PaymentProcessedEvent(event.getOrderId(), true);
String paymentMsg = mapper.writeValueAsString(paymentEvent);
kafkaTemplate.send("payment-processed", paymentMsg);
} else {
// 发布 PaymentFailedEvent
PaymentFailedEvent failedEvent = new PaymentFailedEvent();
failedEvent.setOrderId(event.getOrderId());
failedEvent.setReason("Payment processing failed.");
String failedMsg = mapper.writeValueAsString(failedEvent);
kafkaTemplate.send("payment-failed", failedMsg);
}
}
private boolean processPayment(String orderId, int quantity) {
// TODO: 实现支付逻辑,模拟支付失败
return false;
}
}
1.2.5 实现 InventoryService
@Service
public class InventoryService {
@KafkaListener(topics = "order-cancelled", groupId = "saga-group")
public void handleOrderCancelled(String message) throws JsonProcessingException {
OrderCancelledEvent cancelledEvent = new ObjectMapper().readValue(message, OrderCancelledEvent.class);
// 回滚库存逻辑
rollbackInventory(cancelledEvent.getOrderId());
}
private void rollbackInventory(String orderId) {
// TODO: 实现库存回滚逻辑
}
}
1.3 事务补偿机制
在 编排模式 中,当某个服务的操作失败时,需要通过发布补偿事件来反向撤销之前的操作。上述代码中,PaymentService
在支付失败时发布 PaymentFailedEvent
,OrderService
监听该事件并执行订单取消逻辑,随后发布 OrderCancelledEvent
,最后 InventoryService
监听并回滚库存。
2. 指挥模式
在 指挥模式 下,存在一个中央的 Saga 管理器(Orchestrator),负责调度和协调各个服务的操作,并在需要时触发补偿机制。
2.1 组成部分
- Saga Orchestrator:负责整个事务流程的控制和协调。
- 命令和事件定义:如
CreateOrderCommand
、PaymentCommand
等。 - Kafka 生产者与消费者:用于命令和事件的发布与订阅。
- 各服务逻辑:根据接收到的命令执行操作,并发布相应的事件。
2.2 实现步骤与代码
2.2.1 定义命令和事件
// CreateOrderCommand.java
public class CreateOrderCommand {
private String orderId;
private String product;
private int quantity;
// getters and setters
}
// OrderCreatedEvent.java
public class OrderCreatedEvent {
private String orderId;
// getters and setters
}
// PaymentCommand.java
public class PaymentCommand {
private String orderId;
private double amount;
// getters and setters
}
// PaymentApprovedEvent.java
public class PaymentApprovedEvent {
private String orderId;
// getters and setters
}
// PaymentRejectedEvent.java
public class PaymentRejectedEvent {
private String orderId;
private String reason;
// getters and setters
}
// CancelOrderCommand.java
public class CancelOrderCommand {
private String orderId;
// getters and setters
}
// OrderCancelledEvent.java
public class OrderCancelledEvent {
private String orderId;
// getters and setters
}
2.2.2 实现 SagaOrchestrator
@Service
public class SagaOrchestrator {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private ObjectMapper mapper = new ObjectMapper();
@KafkaListener(topics = "order-created", groupId = "saga-orchestrator-group")
public void handleOrderCreated(String message) throws JsonProcessingException {
OrderCreatedEvent event = mapper.readValue(message, OrderCreatedEvent.class);
try {
// 发送 PaymentCommand
PaymentCommand paymentCommand = new PaymentCommand();
paymentCommand.setOrderId(event.getOrderId());
paymentCommand.setAmount(calculateAmount(event.getOrderId()));
String paymentCmd = mapper.writeValueAsString(paymentCommand);
kafkaTemplate.send("payment-command", paymentCmd);
} catch (Exception e) {
// 发送补偿命令
sendCancelOrderCommand(event.getOrderId());
}
}
@KafkaListener(topics = "payment-approved", groupId = "saga-orchestrator-group")
public void handlePaymentApproved(String message) throws JsonProcessingException {
PaymentApprovedEvent event = mapper.readValue(message, PaymentApprovedEvent.class);
// 继续后续操作,如库存更新
// TODO: 发送其他命令或处理逻辑
}
@KafkaListener(topics = "payment-rejected", groupId = "saga-orchestrator-group")
public void handlePaymentRejected(String message) throws JsonProcessingException {
PaymentRejectedEvent event = mapper.readValue(message, PaymentRejectedEvent.class);
// 发送补偿命令
sendCancelOrderCommand(event.getOrderId());
}
public void startSaga(Order order) throws JsonProcessingException {
// 发送 CreateOrderCommand
CreateOrderCommand createCommand = new CreateOrderCommand();
createCommand.setOrderId(order.getId());
createCommand.setProduct(order.getProduct());
createCommand.setQuantity(order.getQuantity());
String createCmd = mapper.writeValueAsString(createCommand);
kafkaTemplate.send("create-order-command", createCmd);
}
private void sendCancelOrderCommand(String orderId) throws JsonProcessingException {
CancelOrderCommand cancelCommand = new CancelOrderCommand();
cancelCommand.setOrderId(orderId);
String cancelMsg = mapper.writeValueAsString(cancelCommand);
kafkaTemplate.send("cancel-order-command", cancelMsg);
}
private double calculateAmount(String orderId) {
// TODO: 实现金额计算逻辑
return 100.0;
}
}
2.2.3 修改 OrderService
@Service
public class OrderService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private ObjectMapper mapper = new ObjectMapper();
@KafkaListener(topics = "create-order-command", groupId = "order-service-group")
public void handleCreateOrder(String message) throws JsonProcessingException {
CreateOrderCommand command = mapper.readValue(message, CreateOrderCommand.class);
// 创建订单逻辑
// TODO: 保存订单到数据库
// 发布 OrderCreatedEvent
OrderCreatedEvent event = new OrderCreatedEvent();
event.setOrderId(command.getOrderId());
String eventMsg = mapper.writeValueAsString(event);
kafkaTemplate.send("order-created", eventMsg);
}
@KafkaListener(topics = "cancel-order-command", groupId = "order-service-group")
public void handleCancelOrder(String message) throws JsonProcessingException {
CancelOrderCommand command = mapper.readValue(message, CancelOrderCommand.class);
// 取消订单逻辑
cancelOrder(command.getOrderId());
// 发布 OrderCancelledEvent
OrderCancelledEvent event = new OrderCancelledEvent();
event.setOrderId(command.getOrderId());
String cancelledMsg = mapper.writeValueAsString(event);
kafkaTemplate.send("order-cancelled", cancelledMsg);
}
private void cancelOrder(String orderId) {
// TODO: 实现取消订单逻辑
}
}
2.2.4 修改 PaymentService
@Service
public class PaymentService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private ObjectMapper mapper = new ObjectMapper();
@KafkaListener(topics = "payment-command", groupId = "payment-service-group")
public void handlePaymentCommand(String message) throws JsonProcessingException {
PaymentCommand command = mapper.readValue(message, PaymentCommand.class);
// 处理支付逻辑
boolean approved = processPayment(command.getOrderId(), command.getAmount());
if (approved) {
// 发布 PaymentApprovedEvent
PaymentApprovedEvent event = new PaymentApprovedEvent();
event.setOrderId(command.getOrderId());
String approvedMsg = mapper.writeValueAsString(event);
kafkaTemplate.send("payment-approved", approvedMsg);
} else {
// 发布 PaymentRejectedEvent
PaymentRejectedEvent rejectedEvent = new PaymentRejectedEvent();
rejectedEvent.setOrderId(command.getOrderId());
rejectedEvent.setReason("Payment was rejected.");
String rejectedMsg = mapper.writeValueAsString(rejectedEvent);
kafkaTemplate.send("payment-rejected", rejectedMsg);
}
}
private boolean processPayment(String orderId, double amount) {
// TODO: 实现支付逻辑,模拟支付失败
return false;
}
}
2.2.5 实现 InventoryService
@Service
public class InventoryService {
@KafkaListener(topics = "order-cancelled", groupId = "saga-orchestrator-group")
public void handleOrderCancelled(String message) throws JsonProcessingException {
OrderCancelledEvent cancelledEvent = new ObjectMapper().readValue(message, OrderCancelledEvent.class);
// 回滚库存逻辑
rollbackInventory(cancelledEvent.getOrderId());
}
private void rollbackInventory(String orderId) {
// TODO: 实现库存回滚逻辑
}
}
2.3 事务补偿机制
在 指挥模式 中,Saga Orchestrator 作为中央控制器,负责监控事务流程并在发生错误时触发补偿操作。例如,当 PaymentService
处理支付失败时,发布 PaymentRejectedEvent
,Saga Orchestrator 监听到该事件后,发送 CancelOrderCommand
给 OrderService
执行订单取消操作,确保系统一致性。
总结
Saga 模式为分布式系统中的长事务管理提供了一种高效且灵活的解决方案。通过 编排 与 指挥 两种实现方式,开发者可以根据具体业务需求和系统架构选择最合适的方式。
- 编排模式 适用于服务间关系较为松散、需要高扩展性的场景,各服务通过事件进行独立协调,但补偿逻辑较为分散。
- 指挥模式 适用于需要集中控制事务流程、易于追踪和调试的场景,Saga Orchestrator 作为中央管理者,补偿逻辑统一管理,但可能成为系统的瓶颈。
无论选择哪种模式,事务补偿机制 都是确保系统可靠性和一致性的关键。通过合理设计补偿逻辑,可以有效应对分布式环境下的各种故障和异常,提升系统的健壮性。
借助 Spring Boot 和 Kafka 强大的生态和工具支持,实现 Saga 模式变得更加便捷和高效。希望本文能够帮助你深入理解 Saga 模式,并在实际项目中灵活应用,打造稳定可靠的分布式系统。
参考资料
- Saga Patterns: Managing Data Consistency in Microservices
- Spring Cloud Stream 与 Kafka 集成
- Axon Framework 官方文档
版权声明
本文为原创内容,转载请注明出处。
标签:event,mapper,String,Spring,class,private,Kafka,分布式系统,public From: https://blog.csdn.net/yhkal/article/details/143373469