在分布式系统中,事务的处理分布在不同组件、服务中,因此分布式事务的ACID保障面临着一些特殊难点。本系列文章介绍了21种分布式事务设计模式,并分析其实现原理和优缺点,在面对具体分布式事务问题时,可以选择合适的模式进行处理。原文: Exploring Solutions for Distributed Transactions (1)
由于良好的可伸缩性和容错性,分布式系统现在变得越来越普遍,但在分布式系统中维护数据一致性可能非常困难,尤其是在在需要处理跨多个节点的事务时。接下来我们就来探讨那些可以确保分布式系统数据一致性的技术、模式和算法。
分布式事务涉及多个节点,它们一起工作以执行单个事务。确保所有节点对事务结果达成一致,同时保持数据一致性是一项具有挑战性的任务。传统事务的ACID(原子性、一致性、隔离性、持久性)属性在分布式环境中变得更加难以实现。
本文旨在全面介绍可用于维护分布式系统数据一致性的不同方法,讨论每种方法的优缺点,以及在什么场景下适用。
在不同业务场景下,可以有不同的解决方案,常见方法有:
- 阻塞重试(Blocking Retry)
- 二阶段和三阶段提交(Two-Phase Commit (2PC) and Three-Phase Commit (3PC))
- 基于后台队列的异步处理(Using Queues to Process Asynchronously in the Background)
- TCC补偿(TCC Compensation Matters)
- 本地消息表(异步保证)/发件箱模式(Local Message Table (Asynchronously Ensured)/Outbox Pattern)
- MQ事务(MQ Transaction)
- Saga模式(Saga Pattern)
- 事件驱动(Event Sourcing)
- 命令查询职责分离(Command Query Responsibility Segregation, CQRS)
- 原子提交(Atomic Commitment)
- 并行提交(Parallel Commits)
- 事务复制(Transactional Replication)
- 一致性算法(Consensus Algorithms)
- 时间戳排序(Timestamp Ordering)
- 乐观并发控制(Optimistic Concurrency Control)
- 拜占庭容错(Byzantine Fault Tolerance, BFT)
- 分布式锁(Distributed Locking)
- 分片(Sharding)
- 多版本并发控制(Multi-Version Concurrency Control, MVCC)
- 分布式快照(Distributed Snapshots)
- 主从复制(Leader-Follower Replication)
本文将介绍阻塞重试、2PC/3PC、后台队列三种模式。
1. 阻塞重试(Blocking Retry)
- 处理与远程服务或资源交互时可能发生的错误或失败。
- 当发生错误时,可以自动重试,通常在重试之前会等待一段时间,重试直到操作成功或达到最大重试次数为止。
- 业务执行代码同步等待操作完成(成功或失败),然后再尝试重试操作。这意味着执行操作的线程在等待操作完成时被阻塞,在操作完成或失败之前,不能在该线程上执行其他工作。
- 在Python中使用
retrying
库实现阻塞重试。
import retrying
@retrying.retry(wait_fixed=1000, stop_max_delay=10000)
def my_function():
# code to retry goes here
# if this function throws an exception, it will be retried according to the parameters specified above
示例代码:
- 使用
retrying
库实现阻塞重试行为。 - 用
@retrying.retry
装饰想要重试的函数,并传入参数指定重试行为逻辑。 - 用
wait_fixed
参数指定重试之间等待1秒,用stop_max_delay
参数指定在10秒后停止重试。 - 如果
my_function
抛出异常,将会触发重试,直到成功或10秒超时为止。
优点
- 通过应用程序或系统自动重试失败的操作,直到操作成功,从而提高应用程序或系统的可靠性,防止由瞬时问题引起的错误和故障。
- 减少停机时间,确保关键系统对用户的可用性。
- 自动处理失败操作,减少系统管理员和支持团队的工作量。
缺点
- 当重试很频繁或需要很长时间才能完成时,会在系统中引入额外延迟。
- 当重试机制比较敏感并且频繁重试时,会消耗额外资源。
- 阻塞重试机制可能陷入无限循环或导致死锁,导致不断重试失败的操作而无法成功。
适用场景
- 在执行的操作非常关键且必须成功完成的场景中非常有用。
- 电子商务 —— 可以用来自动重试失败的交易,如购买或退款,以提高可靠性并减少收入损失的风险。
- 企业软件 —— 可以自动重试企业软件中失败的数据库或API操作,提高可靠性并减少IT支持团队的工作量。
- 云应用程序 —— 阻塞重试机制可用于自动重试失败的云API请求或服务调用,提高可靠性并降低基于云的应用程序的停机风险。
挑战
- 确定适当的重试参数,如重试次数、重试间隔和回退策略。
- 处理无法通过重试机制自动解决的错误,例如由网络问题、数据损坏或无效输入引起的错误。
- 跨多个节点协调重试可能具有挑战性,特别是当节点具有不同的故障模式或重试机制时。
2. 二阶段和三阶段提交(Two-Phase Commit (2PC) and Three-Phase Commit (3PC))
- 两阶段提交(two-phase commit, 2PC)协议是一种分布式算法,用于确保分布式系统中多个数据库或资源之间事务的原子性和一致性。
- 第一阶段,协调器向事务中所有参与者发送消息,要求他们为提交做准备。然后,每个参与者将在本地执行事务,并以"yes"(表示准备提交)或"no"(表示不能提交)投票进行响应。
- 第二阶段,如果所有参与者投票"yes",协调器将向所有参与者发送提交消息,要求他们提交事务。如果有参与者投票"no",或者协调器在指定时间内没有收到所有参与者的响应,协调器将向所有参与者发送中止消息,要求他们回滚事务。
- 提交操作要么成功完成,要么终止,所有参与者都会收到相应通知。协调器必须等待直到收到来自所有参与者的确认,才能宣布事务完成。
# Coordinator code
def two_phase_commit(coordinator, participants, data):
# Phase 1: Prepare phase
prepare_ok = True
for participant in participants:
try:
participant.prepare(data)
except:
prepare_ok = False
break
if not prepare_ok:
# Phase 1b: Abort
for participant in participants:
participant.abort()
return False
else:
# Phase 2: Commit phase
commit_ok = True
for participant in participants:
try:
participant.commit()
except:
commit_ok = False
break
if not commit_ok:
# Phase 2b: Rollback
for participant in participants:
participant.rollback()
return False
else:
return True
# Participant code
class Participant:
def prepare(self, data):
# Perform prepare operations on local data
# If prepare fails, raise an exception
pass
def commit(self):
# Perform commit operations on local data
pass
def rollback(self):
# Perform rollback operations on local data
pass
示例代码:
- 涉及2个组件(协调器和参与者)
- 协调器向参与者发送"准备(prepare)"请求,以确保参与者已经准备好提交事务。如果参与者回答"Yes",协调器将向参与者发送"提交(commit)"请求。如果参与者再次响应"Yes",协调器将向参与者和协调器发送全局提交消息。
- 如果任何参与者响应"No"或在处理过程中出现错误,协调器将向参与者和协调器发送中止消息以回滚事务。
优点
- 确保事务的原子性和一致性
- 提供了处理失败以及中止事务的机制
- 实现多节点的协调,而不需要人工干预
缺点
- 可能需要很长时间等待所有节点的确认,并可能导致阻塞
- 协调器节点可能造成单点故障
- 实现可能比较复杂,需要仔细设计以避免死锁和竞态条件
适用场景
- 事务涉及多个账户和系统的银行、金融系统
- 需要跨多个仓库和系统更新库存的电子商务系统
- 订单需要跨多个供应商和系统进行处理和跟踪的供应链管理系统
挑战
- 确保所有参与节点都接收到"准备"和"提交"消息
- 通知所有参与节点中止事务
- 确保协调器节点可用并正常工作
- 三阶段提交(3PC) 是2PC协议的扩展,解决了2PC中准备阶段的阻塞。3PC引入了第三个阶段,称为"预提交(pre-commit)"阶段,降低了阻塞的可能性。
- CanCommit阶段 —— 事务协调器向每个参与者发送消息,询问是否准备好提交事务。如果参与者准备好了,向协调器发送"Yes"消息,如果没有准备好,发送"No"消息。如果所有参与者都回答"Yes",协调器将进入预提交阶段。但是,如果一个或多个参与者响应"No",协调器立即向所有参与者发送中止消息,回滚事务。
- Pre-Commit阶段 —— 协调器向所有参与者发送预提交消息,为提交做准备。如果所有参与者都确认预提交消息,协调器将切换到DoCommit阶段。但是,如果一个或多个参与者没有响应,协调器就认为失败了,并向所有参与者发送中止消息。
- DoCommit阶段 —— 协调器向所有参与者发送Commit消息,要求提交事务。如果所有参与者都接收到Commit消息并成功提交事务,将向协调器发送确认消息。如果任何参与者未能提交事务,将向协调器发送中止消息。如果协调器从任何参与者接收到中止消息,将向所有参与者发送中止消息,回滚事务。
# Coordinator code
def three_phase_commit(coordinator, participants, data):
# Phase 1: CanCommit phase
can_commit_ok = True
for participant in participants:
try:
participant.can_commit(data)
except:
can_commit_ok = False
break
if not can_commit_ok:
# Phase 1b: Abort
for participant in participants:
participant.abort()
return False
else:
# Phase 2: PreCommit phase
pre_commit_ok = True
for participant in participants:
try:
participant.pre_commit()
except:
pre_commit_ok = False
break
if not pre_commit_ok:
# Phase 2b: Abort
for participant in participants:
participant.abort()
return False
else:
# Phase 3: DoCommit phase
do_commit_ok = True
for participant in participants:
try:
participant.do_commit()
except:
do_commit_ok = False
break
if not do_commit_ok:
# Phase 3b: Rollback
for participant in participants:
participant.rollback()
return False
else:
return True
# Participant code
class Participant:
def can_commit(self, data):
# Determine if participant can commit
# If participant cannot commit, raise an exception
pass
def pre_commit(self):
# Perform pre-commit operations on local data
pass
def do_commit(self):
# Perform commit operations on local data
pass
def rollback(self):
# Perform rollback operations on local data
pass
示例代码:
- 涉及3个组件(1个协调器和2个参与者)。
- 协调器向两个参与者发送"can-commit"消息,确保已经准备好提交事务。如果两个参与者都响应"Yes",协调器将向两个参与者发送"pre-commit"消息,通知他们事务即将提交。如果两个参与者都响应确认,协调器将向两个参与者发送"commit"消息以提交事务。
- 如果任何参与者响应"No"或在处理过程中出现错误,协调器将向两个参与者发送中止消息以回滚事务。
优点
- 由于决策过程分布在3个阶段,因此缩短了阻塞时间
- 在协调器失败的情况下,参与者仍然可以做出决定并提交或终止事务
- 允许多个参与者独自决策,避免了由单个协调器引起的瓶颈
缺点
- 很难实现和维护
- 引入额外的消息交换,可能会增加事务的延迟
- 不是分布式事务的完整解决方案
适用场景
- 需要高可用性、容错、涉及多方的事务和高可伸缩性的场景
- 需要高可用性、容错和一致性的金融事务
- 涉及多方的电子商务交易
- 涉及敏感数据的医疗保健事务
挑战
- 比2PC更复杂
- 会导致网络延迟增加,从而影响事务的整体性能
- 3PC容易受到系统故障的影响,可能导致数据不一致
3. 基于后台队列的异步处理(Using Queues to Process Asynchronously in the Background)
- 基于消息代理或消息队列在后台异步处理任务,主线程可以继续执行其他任务,而不会被耗时的任务阻塞。
- 任务被推送到队列中,单独的工作进程从队列中读取并执行任务。
- 通常包括以下步骤:
- 创建队列,将异步任务存储在队列中。
- 向队列中添加需要异步处理的任务,如发送电子邮件、处理文件或生成报告。
- 创建工作进程或线程,负责执行队列中的任务,工作进程/线程可以使用多进程或线程库创建。
- 一旦工作进程/线程被创建,使用
start()
方法。 - 工作进程/线程将从队列中检索任务,并在后台异步执行,反复执行直到队列中没有更多的任务。
- 一旦任务完成,工作进程/线程将标记该任务已完成,并将结果存储在单独的输出队列中。
- 使用
get()
方法从输出队列中检索结果。 - 使用
try-except
块处理任务执行过程中可能发生的任何异常。 - 所有任务一旦完成,并且能够检索到结果,就用
join()
方法阻塞工作线程。 - 通过关闭队列并终止工作进程/线程来清理资源
import multiprocessing
# Create a Queue
task_queue = multiprocessing.Queue()
result_queue = multiprocessing.Queue()
# Define a task function
def do_task(task):
# Code to perform the task
result = task.upper()
# Store the result in the output queue
result_queue.put(result)
# Add tasks to the queue
tasks = ['task1', 'task2', 'task3']
for task in tasks:
task_queue.put(task)
# Create workers
num_workers = multiprocessing.cpu_count()
workers = [multiprocessing.Process(target=do_task, args=(task_queue,)) for i in range(num_workers)]
# Start workers
for worker in workers:
worker.start()
# Retrieve tasks from the queue
while not task_queue.empty():
task = task_queue.get()
# Process completed tasks
while not result_queue.empty():
result = result_queue.get()
print(result)
# Join workers
for worker in workers:
worker.join()
# Clean up
task_queue.close()
result_queue.close()
示例代码
- 通过Python的
multiprocessing
模块创建任务队列和输出队列。 - 任务队列用于存储需要异步执行的任务,而输出队列用于存储任务的结果。
- 通过
put()
方法将任务添加到任务队列。 - 通过
cpu_count()
方法确定工作进程/线程数量。 - 通过
multiprocessing
模块的Process类创建工作进程对象,并将do_tasks()
函数分配为工作进程对象的目标。 - 通过
start()
方法启动工作进程,通过get()
方法从任务队列中检索任务,直到队列中没有更多任务为止。 - 检索到任务后,
do_task()
函数将对其进行处理,并使用put()
方法将结果存储在输出队列中。 - 完成所有任务后,使用
get()
方法从输出队列检索结果。 - 通过
join()
方法阻塞工作进程,并使用close()
方法关闭任务和结果队列。
优点
- 队列可以支持同时执行多个任务
- 队列提供了处理错误和重试的方法,确保任务在失败时不会被丢弃
- 队列有助于解耦系统的不同组件
缺点
- 因为队列需要额外组件(如消息代理或消息队列),增加了额外的复杂性
- 在消息序列化和网络通信方面引入额外开销,可能会影响性能
适用场景
- 需要处理大量任务并对任务进行异步处理的系统
- 图像和视频处理、支付和电子邮件发送等任务的异步处理
- 可以用来实现微服务之间的通信
- 可用于批量处理大量数据,如数据清洗和转换
挑战
- 维护队列中消息的顺序可能会有问题
- 消息被传递到正确的工作进程/线程,没有消息丢失或重复消息
- 大量消息的处理具有挑战性
参考文献
Implementing Retry In Kafka Consumer
retry logic blocks the main consumer while its waiting for the retry in spring
Main Difference between 2PC and 3PC Protocols
Distributed DBMS - Commit Protocols
Asynchronous Task Processing in Cloud
Asynchronous Messaging, Part 3: Backend Service
Async and Background Processing
Queue instruction-run an activity asynchronously
你好,我是俞凡,在Motorola做过研发,现在在Mavenir做技术工作,对通信、网络、后端架构、云原生、DevOps、CICD、区块链、AI等技术始终保持着浓厚的兴趣,平时喜欢阅读、思考,相信持续学习、终身成长,欢迎一起交流学习。
微信公众号:DeepNoMind
本文由mdnice多平台发布
标签:事务,participant,21,队列,重试,commit,参与者,分布式 From: https://www.cnblogs.com/deepnomind/p/17430294.html