在分布式系统中,事务的处理分布在不同组件、服务中,因此分布式事务的ACID保障面临着一些特殊难点。本系列文章介绍了21种分布式事务设计模式,并分析其实现原理和优缺点,在面对具体分布式事务问题时,可以选择合适的模式进行处理。原文: Exploring Solutions for Distributed Transactions (3)
在不同业务场景下,可以有不同的解决方案,常见方法有:
- 阻塞重试(Blocking Retry)
- 二阶段和三阶段提交(Two-Phase Commit (2PC) and Three-Phase Commit (3PC))
- 基于后台队列的异步处理(Using Queues to Process Asynchronously in the Background)
- TCC补偿(TCC Compensation Matters)
- 本地消息表(异步保证)/发件箱模式(Local Message Table (Asynchronously Ensured)/Outbox Pattern)
- MQ事务(MQ Transaction)
- Saga模式(Saga Pattern)
- 事件驱动(Event Sourcing)
- 命令查询职责分离(Command Query Responsibility Segregation, CQRS)
- 原子提交(Atomic Commitment)
- 并行提交(Parallel Commits)
- 事务复制(Transactional Replication)
- 一致性算法(Consensus Algorithms)
- 时间戳排序(Timestamp Ordering)
- 乐观并发控制(Optimistic Concurrency Control)
- 拜占庭容错(Byzantine Fault Tolerance, BFT)
- 分布式锁(Distributed Locking)
- 分片(Sharding)
- 多版本并发控制(Multi-Version Concurrency Control, MVCC)
- 分布式快照(Distributed Snapshots)
- 主从复制(Leader-Follower Replication)
本文将介绍Saga、事件驱动以及CQRS三种模式。
7. Saga模式(Saga Pattern)
- 管理跨多个微服务的长时间事务。
- 将事务分解为一系列较小的、独立的步骤,每个步骤都由单独的微服务管理。
- 包含如下步骤:
- 协调微服务负责接收事务初始请求。
- 协调微服务通过向第一个负责处理事务的微服务发送消息来启动事务。
- 第一个微服务执行事务,并将消息发送回协调微服务,反馈其是否成功。
- 如果第一步成功,协调微服务将向负责事务下一步的微服务发送消息。
- 如果第一步失败,协调微服务发送补偿动作来撤消失败步骤的影响。
- 重复步骤3-5,直到每个微服务要么完成其步骤,要么在失败时触发补偿操作(回滚)。
- 一旦所有步骤都成功完成,协调微服务就会发送一条消息,表明整个事务已经成功。
- 如果任何步骤失败并且触发了补偿操作(回滚),则协调微服务将发送一条消息,指示整个事务失败。
import pika
import json
# Define the RabbitMQ connection parameters
credentials = pika.PlainCredentials('guest', 'guest')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
# Define the messages to be sent between services
start_order_message = {'order_id': '12345', 'items': [{'id': '1', 'name': 'item1'}, {'id': '2', 'name': 'item2'}]}
payment_message = {'order_id': '12345', 'amount': 100.0}
shipping_message = {'order_id': '12345', 'items': [{'id': '1', 'name': 'item1'}, {'id': '2', 'name': 'item2'}]}
# Define the compensation messages to be sent in case of failure
cancel_payment_message = {'order_id': '12345', 'amount': 100.0}
cancel_shipping_message = {'order_id': '12345', 'items': [{'id': '1', 'name': 'item1'}, {'id': '2', 'name': 'item2'}]}
# Define the function to send messages to the RabbitMQ broker
def send_message(queue_name, message):
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue=queue_name)
channel.basic_publish(exchange='', routing_key=queue_name, body=json.dumps(message))
connection.close()
# Define the function to handle the start of the order
def start_order():
# Send the start order message to the Order service
send_message('start_order', start_order_message)
# Define the function to handle the payment of the order
def payment():
try:
# Send the payment message to the Payment service
send_message('payment', payment_message)
except Exception as e:
# Send the cancel payment message to the Payment service in case of failure
send_message('cancel_payment', cancel_payment_message)
raise e
# Define the function to handle the shipping of the order
def shipping():
try:
# Send the shipping message to the Shipping service
send_message('shipping', shipping_message)
except Exception as e:
# Send the cancel shipping message to the Shipping service in case of failure
send_message('cancel_shipping', cancel_shipping_message)
raise e
# Define the function to handle the cancellation of the order
def cancel_order():
# Send the cancel payment message to the Payment service
send_message('cancel_payment', cancel_payment_message)
# Send the cancel shipping message to the Shipping service
send_message('cancel_shipping', cancel_shipping_message)
# Define the main function to execute the Saga
def execute_saga():
try:
# Start the order
start_order()
# Perform the payment
payment()
# Perform the shipping
shipping()
except Exception as e:
# Cancel the order in case of failure
cancel_order()
raise e
# Call the main function to execute the Saga
execute_saga()
示例代码
- 用RabbitMQ作为简单消息代理
- 定义了在服务之间发送的五条消息:
start_order_message
、payment_message
、shipping_message
、cancel_payment_message
以及cancel_shipping_message
。 start_order
函数将start_order_message
发送给order_service
。- 在收到来自
start_order
函数的消息后,order_service
创建订单,并发送回包含order_id
的确认消息。 - 一旦
start_order
函数接收到确认消息,将发送payment_message
给payment_service
来处理订单支付。 - 如果支付成功,
payment_service
将返回一条包含payment_id
的确认消息。 start_order
函数将shipping_message
发送给shipping_service
,以便在付款成功后发货。- 如果发货成功,
shipping_service
将返回一条包含shipping_id
的确认消息。 - 如果上述任何步骤失败,则回滚事务,分别给
shipping_service
和payment_service
发送cancel_shipping_message
和cancel_payment_message
,撤销所做的更改。 - 通过向
start_order
发送初始消息、监听确认消息以及在发生故障时处理补偿来处理整个Saga流程。换句话说,Saga模式涉及一系列补偿操作,以便在发生故障时撤消事务的影响。
优点
- 管理跨多个微服务的长时间事务
- 避免服务独立运行时出现不一致或数据损坏
- 如果事务中的某个步骤失败,则提供补偿操作
- 允许服务自主、独立运行
缺点
- 实现这种模式可能比较复杂
- 很难设计和测试补偿逻辑
- 会给系统增加额外的复杂性,使维护和故障排除变得更加困难
适用场景
- 涉及多个服务(如支付处理、订单履约、物流)的电子商务交易
- 涉及多个系统和服务的复杂金融交易
- 涉及多个供应商、制造商和物流供应商的供应链管理系统
尝试-确认-取消(TCC)模式与Saga模式的相似之处
- 两种模式都维护分布式事务中涉及多个微服务的数据一致性
- 两种模式都要求每个服务定义一组需要作为事务一部分的操作
尝试-确认-取消(TCC)模式与Saga模式的不同之处
- Saga模式使用前向恢复法,每个服务在出现故障时启动一个补偿事务,而TCC模式使用后向恢复法,每个服务验证事务是否可以继续,然后才确认或取消。
- Saga模式将事务表示为事件序列,事件由相关服务之间发送的消息表示。TCC模式将事务表示为由所涉及的每个服务执行的操作序列。
- Saga模式适用于涉及多个服务的长时间事务,而TCC模式适用于涉及较少服务的短时间事务。
- Saga模式的实现可能比TCC模式更复杂,要求每个服务能够发起补偿事务并处理潜在故障。
8. 事件驱动(Event Sourcing)
- 对应用程序状态所做的所有更改作为一系列事件。
- 将这些事件存储在数据库或事件日志中,从而提供应用程序状态随时间变化的完整审计跟踪。
- 涉及如下步骤:
- 每当应用程序状态发生变化时,就会捕获相应事件,事件包含所有更改相关信息(例如已修改的数据和进行更改的用户)。
- 事件存储在事件日志中,可以用数据库或消息代理实现,每个事件都有一个唯一标识符,并带有时间戳,以确保事件有序。
- 通过按时间顺序重播事件日志中的事件来重构应用程序当前状态。该过程包括将应用程序的状态初始化为其初始状态,然后依次应用每个事件来更新状态。
- 一旦状态被重构,就可以对其进行查询,以提供有关应用程序当前状态的信息。
- 可以实时处理事件,触发其他动作或更新。
- 事件处理完成后,可以将其归档或删除以释放存储空间。
import uuid
import json
import time
class BankAccount:
def __init__(self):
self.balance = 0
self.event_sourcing = EventSourcing()
def deposit(self, amount):
event = Event('deposit', {'amount': amount})
self.event_sourcing.add_event(event)
self.balance += amount
def withdraw(self, amount):
if self.balance < amount:
raise ValueError('Insufficient balance')
event = Event('withdraw', {'amount': amount})
self.event_sourcing.add_event(event)
self.balance -= amount
def get_balance(self):
return self.balance
def get_events(self):
return self.event_sourcing.get_events()
def get_event_by_id(self, event_id):
return self.event_sourcing.get_event_by_id(event_id)
def replay_events(self):
self.balance = 0
for event in self.event_sourcing.get_events():
if event.type == 'deposit':
self.balance += event.data['amount']
elif event.type == 'withdraw':
self.balance -= event.data['amount']
class Event:
def __init__(self, type, data):
self.id = uuid.uuid4()
self.timestamp = int(time.time())
self.type = type
self.data = data
class EventSourcing:
def __init__(self):
self.event_store = EventStore()
def add_event(self, event):
self.event_store.store_event(event)
def get_events(self):
return self.event_store.get_events()
def get_event_by_id(self, event_id):
return self.event_store.get_event_by_id(event_id)
class EventStore:
def __init__(self):
self.events = []
def store_event(self, event):
self.events.append(event)
def get_events(self):
return self.events
def get_event_by_id(self, event_id):
for event in self.events:
if event.id == event_id:
return event
raise ValueError('Event not found')
class Auditor:
def __init__(self, event_store):
self.event_store = event_store
def log_events(self):
for event in self.event_store.get_events():
print(json.dumps({'id': str(event.id), 'type': event.type, 'data': event.data}))
account = BankAccount()
auditor = Auditor(account.event_sourcing.event_store)
account.deposit(100)
account.withdraw(50)
account.deposit(75)
print('Current balance:', account.get_balance())
print('All events:')
auditor.log_events()
event_id = account.get_events()[1].id
event = account.get_event_by_id(event_id)
print('Event details:', json.dumps({'id': str(event.id), 'type': event.type, 'data': event.data}))
示例代码
BankAccount
类用来演示如何使用事件来重建实体状态,包含balance
属性,支持deposit
(存款)和withdraw
(取款)两种操作。Event
类有类型和数据属性。EventSourcing
类定义了BankAccount的event_sourcing
属性EventSourcing
类包含event_store
属性作为事件列表。EventStore
类有两个主要方法:store_event()
(在列表中存储事件)和get_events()
(从列表中检索事件)。add_event
方法向event_store
添加新事件。get_events
和get_event_by_id
方法可以通过ID检索所有事件或特定事件。deposit
和withdraw
方法创建具有唯一ID、时间戳和字典(包含操作信息,在本例中为操作类型和金额)的新Event
对象,事件被添加到BankAccount
实例的event_sourcing
属性中。- 每次进行
deposit
和withdraw
时,都会创建相应事件,并通过EventStore
类将其存储在事件存储中。 get_balance
方法返回帐户当前余额。replay_events()
方法从事件存储中检索所有事件,并计算当前余额。遍历事件存储中的所有事件,并根据每个事件的类型和数据更新BankAccount
的balance
属性。Auditor
类监听存储在事件存储库中的所有事件,并在终端上输出相应log。- 以JSON格式打印当前余额和所有事件,通过ID检索特定事件并打印其详细信息。
- 事件源模式是创建事件来表示对系统状态的更改,将这些事件存储在事件存储中,并重播事件以重建系统当前状态。
优点
- 捕获、存储所有事件,以便进行审计和保证一致性
- 可以重播所有事件以创建数据的不同视图
- 通过存储事件来处理大量数据
- 可以重播事件以将系统恢复到以前的状态
- 事件日志可以作为一段时间内系统行为的文档,使其更容易维护和扩展。
缺点
- 实现可能比较复杂,特别是在处理事件设计和数据迁移的复杂性时
- 存储所有事件需要更多存储空间
- 必须重放所有事件以确定当前状态,可能导致额外性能开销
适用场景
- 记录交易和财务事项
- 记录健康事件和医疗程序
- 记录订单事件和付款事件
注意事项
- 事件设计 —— 以细粒度方式捕捉系统状态的变化。事件应该是不可变的,这意味着事件被创建后不能被修改。事件的设计应该支持简单的查询和分析。
- 存储需求 —— 所有对系统状态的更改都以事件序列的形式存储。存储空间明显大于传统数据库。
- 数据迁移 —— 提前计划数据迁移,并考虑如何将数据从旧系统迁移到新的事件源系统。
9. 命令查询职责分离(Command Query Responsibility Segregation, CQRS)
- 将读写操作分离到单独的服务或模型中
- 命令: 改变系统状态
- 查询: 返回数据
- 涉及如下步骤:
- 用户向系统发出读取或写入数据的请求
- 如果请求是命令(写操作),则将该命令发送给命令服务,命令服务处理请求并更新系统状态。
- 命令服务更新写入模型,其中包含系统的当前状态,并创建描述更改的事件。事件被添加到事件流中,事件流是系统中发生的所有事件的日志。
- 命令服务将事件发布到消息代理,消息代理将事件传递给感兴趣的订阅者。
- 如果请求是查询(读操作),则将查询发送给查询服务,查询服务从读模型中检索数据。
- 查询服务从读模型中检索数据并将其返回给用户。
- 如果用户想执行另一个写操作,则从步骤2开始重复该过程。
- 如果用户想要执行读操作,则从步骤5开始重复该过程。
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional
class Command(ABC):
pass
class CreateProductCommand(Command):
def __init__(self, name: str, price: float):
self.name = name
self.price = price
class UpdateProductCommand(Command):
def __init__(self, product_id: str, name: Optional[str] = None, price: Optional[float] = None):
self.product_id = product_id
self.name = name
self.price = price
class DeleteProductCommand(Command):
def __init__(self, product_id: str):
self.product_id = product_id
class Query(ABC):
pass
class GetProductQuery(Query):
def __init__(self, product_id: str):
self.product_id = product_id
class GetAllProductsQuery(Query):
pass
class Product:
def __init__(self, id: str, name: str, price: float):
self.id = id
self.name = name
self.price = price
class ProductRepository:
def __init__(self):
self.products = []
def create(self, name: str, price: float) -> Product:
product = Product(str(len(self.products) + 1), name, price)
self.products.append(product)
return product
def get(self, id: str) -> Optional[Product]:
for product in self.products:
if product.id == id:
return product
return None
def get_all(self) -> List[Product]:
return self.products
def update(self, id: str, name: Optional[str] = None, price: Optional[float] = None) -> Optional[Product]:
for product in self.products:
if product.id == id:
if name is not None:
product.name = name
if price is not None:
product.price = price
return product
return None
def delete(self, id: str) -> bool:
for product in self.products:
if product.id == id:
self.products.remove(product)
return True
return False
class ProductCommandHandler:
def __init__(self, repository: ProductRepository):
self.repository = repository
def handle(self, command: Command) -> Optional[Product]:
if isinstance(command, CreateProductCommand):
return self.repository.create(command.name, command.price)
elif isinstance(command, UpdateProductCommand):
return self.repository.update(command.product_id, command.name, command.price)
elif isinstance(command, DeleteProductCommand):
success = self.repository.delete(command.product_id)
return success
class ProductQueryHandler:
def __init__(self, repository: ProductRepository):
self.repository = repository
def handle(self, query: Query) -> Optional[Any]:
if isinstance(query, GetProductQuery):
return self.repository.get(query.product_id)
elif isinstance(query, GetAllProductsQuery):
return self.repository.get_all()
class ProductService:
def __init__(self, command_handler: ProductCommandHandler, query_handler: ProductQueryHandler):
self.command_handler = command_handler
self.query_handler = query_handler
def create_product(self, name: str, price: float) -> Product:
command = CreateProductCommand(name, price)
return self.command_handler.handle(command)
def get_product(self, id: str) -> Optional[Product]:
query = GetProductQuery(id)
return self.query_handler.handle(query)
def get_all_products(self) -> List[Product]:
query = GetAllProductsQuery()
return self
示例代码
- 一个产品管理系统。
- 抽象类
Command
和Query
分别由具体类实现。 - 3个命令类的实现:
CreateProductCommand
、UpdateProductCommand
、DeleteProductCommand
CreateProductCommand
创建产品UpdateProductCommand
更新产品DeleteProductCommand
删除产品- 2个查询类的实现:
GetProductQuery
和GetAllProductQuery
GetProductQuery
检索关于特定产品的信息GetAllProductQuery
检索所有产品的信息Product
类表示一个产品,包含id
、name
和price
ProductRepository
类处理产品数据的持久性,具有创建、检索、更新和删除产品的方法ProductCommandHandler
类处理命令并将ProductRepository
作为依赖项ProductQueryHandler
类处理查询并将ProductRepository
作为依赖项- 两个
handle
方法负责接受命令或查询,并返回适当的响应 ProductService
类作为客户端与产品管理系统交互的入口,将ProductCommandHandler
和ProductQueryHandler
作为依赖项,并公开用于创建、检索和列出产品的方法,这些方法只是对适当命令或查询的包装,并将其传递给相应的处理程序。
优点
- 分离读写模型以分别优化两个过程,可以获得更好的性能。
- 分离读写模型使代码更容易维护。
- 此模式可针对特定用例进行优化。
缺点
- 系统的设计和实现比较复杂
- 比传统的整体架构需要更多时间和资源
- 在发生写操作和更新读模型之间可能存在延迟,从而导致只能保证最终一致性
适用场景
- 具有复杂域逻辑和高读写负载的应用程序
- 具有不同用户界面的系统,例如移动和web应用程序,其中每个界面都有自己特定的读取要求
- 电子商务系统中的产品目录管理和订单管理
- 医疗保健系统中的患者数据检索和数据输入
- 金融交易系统中的实时市场数据检索和订单执行
CQRS和事件驱动的结合
参考文献
Saga Pattern for Microservices Distributed Transactions
Microservice Design Pattern - Saga
How to Use Saga Pattern in Microservices
Saga Orchestration for Microservices Using the Outbox Pattern
Event Sourcing - why a dedicated event store?
Beginner's Guide to Event Sourcing
Microservices Pattern: Event Sourcing
Microservices Pattern: Command Query Responsibility Segregation (CQRS)
CQRS Desgin Pattern in Microservices Architecture
The Command and Query Responsibility Segregation(CQRS)
CQRS Software Architecture Pattern: The Good, the Bad, and the Ugly
你好,我是俞凡,在Motorola做过研发,现在在Mavenir做技术工作,对通信、网络、后端架构、云原生、DevOps、CICD、区块链、AI等技术始终保持着浓厚的兴趣,平时喜欢阅读、思考,相信持续学习、终身成长,欢迎一起交流学习。
微信公众号:DeepNoMind
本文由mdnice多平台发布
标签:事务,21,self,id,事件,message,event,def,分布式 From: https://www.cnblogs.com/deepnomind/p/17428957.html