这是一个 ORM 框架(已经包含连接池)
安装插件
pip install pymysql sqlalchemy
参数配置
from sqlalchemy import create_engine, text
# 定义数据库连接字符串
DATABASE_URI = 'mysql+pymysql://{username}:{password}@{host}:{port}/{dbname}?charset=utf8mb4'
# 替换为你的数据库用户名、密码、主机、端口和数据库名
USERNAME = 'root'
PASSWORD = 'root'
HOST = 'localhost'
PORT = '3306'
DBNAME = 'med'
# 创建数据库引擎,使用连接池
engine = create_engine(
DATABASE_URI.format(
username=USERNAME,
password=PASSWORD,
host=HOST,
port=PORT,
dbname=DBNAME
),
echo=False, # 如果设置为True,SQLAlchemy将打印所有执行的SQL语句,通常用于调试
pool_size=10, # 连接池大小
max_overflow=20, # 超过连接池大小外最多创建的连接数
pool_timeout=30, # 连接池中没有线程可用时,在抛出异常前等待的时间
pool_recycle=3600 # 多少秒之后对连接进行一次回收(重置)
)
# do a test
with engine.connect() as con:
rs = con.execute(text('SELECT 1'))
rs.fetchone()
print('create engine succeed!')
数据库会话 session
数据库会话(session)可分为:普通会话、线程安全的会话。
线程安全的会话,已经实现了 threadlocal 功能,不同线程之间彼此隔离。
from sqlalchemy.orm import sessionmaker, scoped_session
# 普通 session
Session = sessionmaker(bind=engine)
# 线程安全的 session
DBSession = scoped_session(Session)
测试代码如下,运行后可以看到,两个通过 DBSession 创建的 session,他们的 id 是完全一致的。
print(id(DBSession()))
print(id(DBSession()))
session = Session()
print(id(session))
执行SQL
session = Session()
print(id(session))
# 定义SQL语句
sql = "SELECT * FROM `t_temp` WHERE (`id`= :id) LIMIT 1"
# 执行SQL语句
result = session.execute(text(sql), {'id': 18})
# 处理结果
for row in result:
print(row)
# 关闭会话
session.close()
事务注解
sqlalchemy 没有提供注解事务,不过我们可以参考 spring 中的做法,自己做一个。
def transactional(rollback: type = Exception):
r"""
注解式事务
用法类似于 spring 环境下的 @Transactional 注解
注意: 事务控制在 session 级别,不兼容事务嵌套的场景
推荐: 如果遇到很复杂的事务嵌套,显式调用 session,手动控制事务
改进方案:如果想进行改造,允许事务嵌套,可以通过 save-point 调整当前代码
:param rollback: 指定触发回滚的异常类型
:return: 装饰器函数
"""
def decorator(func):
def call(*args, **kwargs):
session = None
try:
session = DBSession()
ret = func(*args, **kwargs)
session.commit()
return ret
except rollback as e:
if session:
session.rollback()
logger.exception(f'transaction exception, rollback: {str(e)}')
raise
finally:
if session:
session.close()
return call
return decorator
pass
测试函数如下
@transactional()
def debug():
session = DBSession()
session.execute(text("UPDATE `t_temp` SET `desc`= :desc WHERE (`id`= :id) LIMIT 1"), {'id': 18, 'desc': 'OR 1=3'})
session.execute(text("UPDATE `t_temp` SET `desc`= :desc WHERE (`id`= :id) LIMIT 1"), {'id': 18, 'desc': 'OR 1=4'})
result = session.execute(text("SELECT * FROM `t_temp` WHERE (`id`= :id) LIMIT 1"), {'id': 18})
# 处理结果
for row in result:
print(row)
raise ValueError('db error')
保存点 save-point
和游戏的保存点概念一致,在一个事务中,可以设置多个保存点,出现异常,可以按需选择保存点进行回退。
保存点用于处理复杂的事务控制,通常情况下,整个职业生涯都不会直接接触这些代码。
session = DBSession()
connection = session.connection()
savepoint = connection.begin_nested()
try:
# do sth.
savepoint.commit()
except Exception as e:
if savepoint:
savepoint.rollback()
ORM 相关内容
因为只是想要连接池,相关需求暂无,以后再做补充……
标签:sqlachemy,DBSession,rollback,python,text,session,print,id From: https://www.cnblogs.com/chenss15060100790/p/18589082