在分布式系统中,事务的处理分布在不同组件、服务中,因此分布式事务的ACID保障面临着一些特殊难点。本系列文章介绍了21种分布式事务设计模式,并分析其实现原理和优缺点,在面对具体分布式事务问题时,可以选择合适的模式进行处理。原文: Exploring Solutions for Distributed Transactions (4)
在不同业务场景下,可以有不同的解决方案,常见方法有:
- 阻塞重试(Blocking Retry)
- 二阶段和三阶段提交(Two-Phase Commit (2PC) and Three-Phase Commit (3PC))
- 基于后台队列的异步处理(Using Queues to Process Asynchronously in the Background)
- TCC补偿(TCC Compensation Matters)
- 本地消息表(异步保证)/发件箱模式(Local Message Table (Asynchronously Ensured)/Outbox Pattern)
- MQ事务(MQ Transaction)
- Saga模式(Saga Pattern)
- 事件驱动(Event Sourcing)
- 命令查询职责分离(Command Query Responsibility Segregation, CQRS)
- 原子提交(Atomic Commitment)
- 并行提交(Parallel Commits)
- 事务复制(Transactional Replication)
- 一致性算法(Consensus Algorithms)
- 时间戳排序(Timestamp Ordering)
- 乐观并发控制(Optimistic Concurrency Control)
- 拜占庭容错(Byzantine Fault Tolerance, BFT)
- 分布式锁(Distributed Locking)
- 分片(Sharding)
- 多版本并发控制(Multi-Version Concurrency Control, MVCC)
- 分布式快照(Distributed Snapshots)
- 主从复制(Leader-Follower Replication)
本文将介绍原子提交、并行提交以及事务复制三种模式。
10. 原子提交(Atomic Commitment)
- 原子提交是一种事务性流程,确保事务中的所有操作都成功完成,如果其中任何一个操作失败,则所有操作都失败。
- 单个事务中可能涉及多个流程。
- 涉及如下步骤:
- 开始事务(Begin Transaction) ,事务开始于客户端或用户执行一系列操作的请求,这些操作被视为单个事务。事务协调器管理整个事务,负责将事务标记为"正在进行中"。
- 准备阶段(Prepare Phase) ,事务协调器向事务中涉及的所有参与者发送消息,确保他们能够执行所请求的操作。参与者回复消息,表示是否可以执行操作。如果任何参与者不能执行操作,事务将被终止。
- 提交阶段(Commit Phase) ,如果事务中的所有参与者都可以执行操作,事务协调器将向所有参与者发送提交消息,参与者按照指示执行操作,并回复"确认"消息,表明已经成功完成了操作。
- 确认阶段(Finalize Phase) ,事务协调器等待所有参与者回复"确认"消息。如果所有参与者都响应"确认"消息,事务协调器将事务标记为"已提交"。如果任何参与者响应失败,事务将被终止。
- 中止阶段(Abort Phase) ,如果事务在任何阶段被中止,事务协调器将向所有参与者发送中止消息,要求撤消可能已经执行的任何操作。参与者用"确认"消息进行回复,表明已经成功撤消了操作。
import pymysql
# Connect to the database
connection = pymysql.connect(
host='localhost',
user='user',
password='password',
db='mydatabase',
autocommit=False
)
try:
# Start the transaction
with connection.cursor() as cursor:
cursor.execute("START TRANSACTION")
# Perform the SQL statements
with connection.cursor() as cursor:
cursor.execute("INSERT INTO products (name, price) VALUES ('Product A', 10)")
cursor.execute("UPDATE customers SET balance = balance - 10 WHERE id = 1")
# Commit the transaction
connection.commit()
except:
# Rollback the transaction if there is an error
connection.rollback()
finally:
# Close the database connection
connection.close()
示例代码
- 导入PyMySQL库,用于连接MySQL数据库。
- 基于指定的凭据连接到数据库,并将"auto-commit"参数设置为"False",这意味着将使用事务。
try
代码块将在事务中执行SQL语句。with
语句创建cursor对象,用于执行SQL语句。cursor.execute()
方法通过执行SQL命令start transaction
来启动事务。cursor.execute()
方法执行2条SQL语句: 一条INSERT
语句用于添加新产品,一条UPDATE
语句用于从客户余额中扣除价格。- 如果没有错误,则调用
connection.commit()
方法提交事务,也就是说,将更改永久保存到数据库中。 - 如果出现错误,则执行
except
代码块,该块通过connection.rollback()
方法回滚事务。在事务中所做的更改将被撤消,数据库将恢复到初始状态。 connection.close()
方法关闭数据库连接。
优点
- 一致性 —— 在事务执行后,数据库保持一致状态。要么保存所有更改,要么不保存任何更改,这有助于维护数据的完整性。
- 数据完整性 —— 防止出现不完整数据或者执行了部分事务。
- 回滚 —— 如果事务中出现错误,则回滚整个事务,也就是说,撤消所做的所有更改,并将数据库恢复到初始状态。
缺点
- 性能开销 —— 需要额外资源来确保事务自动执行
- 增加复杂性 —— 需要额外代码来确保事务自动执行,会增加应用程序的复杂性
适用场景
- 用于电子商务应用程序,以确保仅在付款成功时才下订单
- 用于金融应用程序,以确保只有在满足所有必需条件时才完成事务
挑战
- 死锁 —— 当多个事务试图同时获取相同的资源时
- 分布式事务 —— 当涉及多个数据库时,设计变得更加复杂
- 性能开销 —— 会导致性能开销
11. 并行提交(Parallel Commits)
- 允许多个事务并发提交更改
- 涉及如下步骤:
- 启动事务 —— 多个事务由不同的用户或进程发起。每个事务都有自己要执行的一组SQL语句。
- WAL(Write-Ahead Logging) —— 数据库系统维护WAL来记录对数据库的所有更改。在进行任何更改之前,事务将更改的记录写入WAL,确保在发生故障时可以将数据库恢复到以前的状态。
- 执行SQL语句 —— 每个事务执行自己的一组SQL语句,这些语句可以包括更新、插入和删除。
- 锁 —— 当一个事务修改某个数据库记录时,就获得了对该记录的锁,确保没有其他事务可以同时修改同一记录。
- 提交阶段 —— 一旦事务执行了所有SQL语句并释放了锁,就进入提交阶段。在此阶段,事务向WAL写入提交记录,表明已准备好提交。
- 并行提交 —— 一旦所有事务进入提交阶段,数据库系统就可以执行并行提交,这意味着系统可以并发将多个事务所做的更改写入数据库。
- 写入数据库 —— 数据库系统将每个事务所做的更改写入数据库。由于每个事务已经将其更改写入了WAL,因此系统可以快速的将更改写入数据库。
- 结束事务 —— 一旦更改被写入数据库,事务被认为已经完成,系统释放事务持有的所有锁以及使用的资源。
import psycopg2
from psycopg2.extras import RealDictCursor
# Establish connection to the database
conn = psycopg2.connect(
host="localhost",
database="ecommerce",
user="postgres",
password="password"
)
# Initialize two transactions for User A and User B
with conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute("BEGIN;")
cursor.execute("INSERT INTO cart (user_id, item_id) VALUES (1, 123);")
cursor.execute("UPDATE account SET balance = balance - 10 WHERE user_id = 1;")
cursor.execute("INSERT INTO orders (user_id, item_id, price) VALUES (1, 123, 10);")
cursor.execute("COMMIT;")
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute("BEGIN;")
cursor.execute("INSERT INTO cart (user_id, item_id) VALUES (2, 456);")
cursor.execute("UPDATE account SET balance = balance - 20 WHERE user_id = 2;")
cursor.execute("INSERT INTO orders (user_id, item_id, price) VALUES (2, 456, 20);")
cursor.execute("COMMIT;")
# Close the database connection
conn.close()
示例代码
- 用
psycopg2
库连接到PostgreSQL数据库,并执行SQL语句来更新数据库和创建新订单。 - 为用户A和用户B初始化2个事务,每个事务执行一组SQL语句来更新数据库并创建一个新订单。
BEGIN
语句标记事务的开始。COMMIT
语句标记事务的结束。INSERT
和UPDATE
语句修改数据库记录以反映用户所做的更改。- 一旦每个事务执行了SQL语句并释放了锁,就进入提交阶段,这就是并行提交过程发生的地方。
- 事务向WAL写入提交记录,表明已经准备好提交,数据库系统可以执行并行提交,将两个事务所做的更改并发写入数据库。
- 一旦将更改写入数据库,就认为每个事务都完成了,系统释放事务持有的所有锁和资源。
优点
- 并行提交允许并发同时执行多个事务
- 通过向系统中添加更多服务器或节点来横向扩展数据库系统规模
缺点
- 增加了数据库系统的复杂性
- 因为多个事务可以并发修改相同的数据,因此增加了冲突和数据不一致的风险
- 需要比串行提交更多的资源
适用场景
- 允许多个用户在电子商务平台中同时结帐
- 允许多个用户在财务系统中同时访问帐户余额
- 使多个医疗保健专业人员能够在医疗保健系统中并发访问患者记录
- 允许多个用户同时更新运输和交付信息
挑战
- 管理锁以确保数据一致性和完整性
- 跨多个节点或服务器同步事务执行可能是一个挑战
12. 事务复制(Transactional Replication)
- 数据库复制流程,涉及实时将事务从一个数据库复制和分发到另一个数据库。
- 通常在需要将数据从主数据库复制到一个或多个从数据库以进行备份、报告或其他目的时使用。
- 涉及如下步骤:
- 配置 —— 设置事务性复制以配置主数据库和从数据库,包括设置合适的发布、订阅和分发代理,包括选择要复制的表、列和数据类型,以及设置安全性和其他配置选项。
- 规划复制拓扑 —— 决定哪些数据库将充当发布者,哪些数据库将充当订阅者,以及将使用的复制类型(例如,单向或双向)。
- 配置主数据库 —— 设置合适的发布项,定义将要复制的表、列和数据类型,以及任何其他配置选项,如安全性和过滤。
- 配置从数据库 —— 设置合适的订阅项,定义复制数据的目标数据库,以及任何其他配置选项,例如安全性和错误处理。
- 设置分发代理 —— 将复制的数据从主数据库分发到从数据库,并且可以配置为连续运行或定时运行。
- 监控及排障 —— 一旦设置了复制,监控其性能并排除出现的任何问题,例如失败的事务或连接问题。
- 快照 —— 配置完数据库后,对主数据库中的数据进行快照。将主数据库中所选表中的所有数据复制到快照文件中,用该文件初始化从数据库。
- 分发 —— 快照完成后,分发过程开始。将发生在主数据库中的事务复制到分发数据库。分发数据库充当所有事务的中央存储库,并充当将事务分发到从数据库的源。
- 发布 —— 一旦事务被写入分发数据库,将被发布到对应的订阅。一个发布是一组包含一个或多个订阅的项目(即表、视图或存储过程),每个订阅都与某个特定的从数据库相关联。
- 订阅 —— 订阅发布并将复制的事务应用到从数据库。订阅过程包括设置订阅代理,该代理将事务从分发数据库复制到从数据库,并实时的将事务应用到从数据库。
- 配置 —— 设置事务性复制以配置主数据库和从数据库,包括设置合适的发布、订阅和分发代理,包括选择要复制的表、列和数据类型,以及设置安全性和其他配置选项。
import pyodbc
class Replication:
def __init__(self, publisher_conn_str, subscriber_conn_str):
self.publisher_conn = pyodbc.connect(publisher_conn_str)
self.subscriber_conn = pyodbc.connect(subscriber_conn_str)
def add_publication(self, pub_name, table_name):
with self.publisher_conn.cursor() as cursor:
# Create publication
cursor.execute("EXEC sp_addpublication @publication = ?, @description = 'Transaction Replication', @sync_method = 'native', @repl_freq = 'continuous'", pub_name)
# Add article to publication
cursor.execute("EXEC sp_addarticle @publication = ?, @article = ?, @source_owner = 'dbo', @source_object = ?, @type = 'logbased', @destination_table = ?, @pre_creation_cmd = 'truncate'", pub_name, table_name, table_name, table_name)
# Enable publication for subscription
cursor.execute("EXEC sp_addsubscription @publication = ?, @subscriber = ?, @destination_db = ?, @sync_type = 'initialize with backup', @backupdevicetype = 'disk', @backupdevicename = 'C:\\backup.bak', @update_mode = 'read only'", pub_name, self.subscriber_conn.database, self.subscriber_conn.database)
# Create snapshot agent
cursor.execute("EXEC sp_addpublication_snapshot @publication = ?, @frequency_type = 4, @frequency_interval = 1, @frequency_relative_interval = 1, @frequency_recurrence_factor = 0, @frequency_subday = 8, @frequency_subday_interval = 1, @active_start_time_of_day = 0, @active_end_time_of_day = 235959, @active_start_date = 0, @active_end_date = 0, @job_login = null, @job_password = null, @publisher_security_mode = 1", pub_name)
def add_subscription(self, sub_name, pub_name):
with self.subscriber_conn.cursor() as cursor:
# Add subscription to publication
cursor.execute("EXEC sp_addsubscription @publication = ?, @subscriber = ?, @destination_db = ?, @sync_type = 'initialize with backup', @backupdevicetype = 'disk', @backupdevicename = 'C:\\backup.bak', @update_mode = 'read only'", pub_name, sub_name, self.subscriber_conn.database)
# Create subscription agent
cursor.execute("EXEC sp_addsubscription_agent @publication = ?, @subscriber = ?, @subscriber_db = ?, @job_login = null, @job_password = null, @subscriber_security_mode = 1", pub_name, sub_name, self.subscriber_conn.database)
def start(self):
with self.publisher_conn.cursor() as cursor:
# Start snapshot agent
cursor.execute("EXEC sp_startpublication_snapshot @publication = ?", pub_name)
# Start distribution agent
cursor.execute("EXEC sp_startpublication_agent @publication = ?, @publisher_security_mode = 1, @publisher_login = null, @publisher_password = null", pub_name)
示例代码
- 基于
pyodbc
库连接到两个ODBC数据库 replication
类创建复制对象,用于管理复制过程add_publication()
方法在发布者数据库上创建发布,此发布包含在订阅者数据库中复制的数据,接受2个参数pub_name
和table_name
,分别表示发布的名称和要复制的表的名称,然后用pyodbc cursor
执行4条SQL命令。- 第1条命令: 用
sp_addpublication
存储过程创建发布,该存储过程有几个参数,包括发布名称、描述、同步方法和复制频率。 - 第2条命令: 用
sp_addarticle
存储过程向发布添加一条记录,该存储过程指定要复制的表、发布名称和其他参数。 - 第3条命令: 用
sp_addsubscription
存储过程启用订阅的发布,该存储过程指定发布名称、订阅者数据库和其他参数。 - 第4条命令: 用
sp_addpublication_snapshot
存储过程创建快照代理,该存储过程指定发布名称和其他参数。
- 第1条命令: 用
add_subscription()
方法在订阅者数据库上创建订阅。此订阅指定订阅者将接收的发布,接受两个参数sub_name
和pub_name
,分别表示订阅的名称和发布的名称。然后用pyodbc cursor
执行2条SQL命令。- 第1条命令: 用
sp_addsubscription
存储过程向发布添加订阅,该存储过程指定发布名称、订阅者数据库和其他参数。 - 第2条命令: 用
sp_addsubscription_agent
存储过程创建订阅代理,该存储过程指定发布名称、订阅数据库和其他参数。
- 第1条命令: 用
start()
方法通过启动快照代理和分发代理开始复制过程,不需要任何参数,用pyodbc cursor
执行2条SQL命令。- 第1条命令: 用
sp_startpublication_snapshot
存储过程启动快照代理,指定发布名称。 - 第2条命令: 用
sp_startpublication_agent
存储过程启动分发代理,指定发布名称和其他参数。
- 第1条命令: 用
优点
- 在数据库之间提供近乎实时的数据同步
- 通过保持多个数据库同步,支持高可用性和灾难恢复
- 允许多个订阅者从单个发布者接收更新
- 基于某些标准过滤数据,允许选择性复制数据
- 支持冲突检测和解决,允许自动或手动解决冲突
缺点
- 与其他复制方法相比,由于需要捕获和传播单个事务,需要更多资源
- 与其他复制方法相比,设置和管理非常复杂
- 由于捕获事务的开销,会在源数据库中引入延迟
适用场景
- 零售业用来同步不同商店和在线渠道的最新库存状态
- 金融机构在多个分支机构和在线平台上保持客户账户余额和交易同步
- 制造公司保持多个工厂和配送中心的生产计划和库存水平同步
挑战
- 配置和管理事务复制很复杂
- 网络延迟和带宽限制可能会影响复制性能,特别是在长距离或跨不可靠网络复制数据时
参考文献
Atomic Commit Protocol in Distributed System
Atomic Commitment: The Unscalability Protocol
What is Atomic Commit Protocols
Parallel Commits: An atomic commit protocol for globally distributed transactions
Parallel Commits: An Atomic Commit Protocol for Distributed Transactions
Parallel Commits for Transactions Using postgres_fdw on PostgreSQL 15
parallel commit in postgres fdw
Transactional Replication - SQL Server
Tutorial: Configure Transactional Replication - SQL Server
SQL Server replication: Overview of components and topography
How to Set Up Transactional Replication
你好,我是俞凡,在Motorola做过研发,现在在Mavenir做技术工作,对通信、网络、后端架构、云原生、DevOps、CICD、区块链、AI等技术始终保持着浓厚的兴趣,平时喜欢阅读、思考,相信持续学习、终身成长,欢迎一起交流学习。
微信公众号:DeepNoMind
本文由mdnice多平台发布
标签:事务,21,数据库,execute,name,cursor,conn,分布式 From: https://www.cnblogs.com/deepnomind/p/17430281.html