数据库连接池 + pymysql
最近想用小程序来做个移动BI, 然后涉及后端接口部分打算用 Python 的 flask 框架整一波, 主要听闻它比较轻量, 简单和可灵活配置, 这就和我很对味. 毕竟我主要搞数据用的就是 sql 而已, 只要有个服务器提供接口就行. 真正开始来写接口的时候, 就遇到这个数据库的问题, 关于查询效率, 和优雅.
我在工作中偶尔有一些数据处理或者逻辑分析的场景下, 即不通过BI平台, 大量用 pandas + sql 来操作. 我们数据的 ADS 层几乎都在 mysql 故用这个 pymysql 作为驱动连接工具是很好用的.
低频分析查询-随便搞
针对低频查询数据, 通常写法是这样的:
import pymysql
# 1.创建连接
conn = pymysql.connect(
host='127.0.0.1',
port=3306,
user='root',
passwd='123456',
db='cj'
)
# 3.创建游标
cursor = conn.cursor()
# 4. 执行sql语句和获取数据, 以经典的超市数据为例
cursor.execute('select order_id, province from market limit 2;')
data = cursor.fetchall()
print(data)
# 5. 关闭连接(同时也就关闭了cursor, 不关可能造成锁表)
conn.close()
(('US-2023-1357144', '浙江'), ('CN-2023-1973789', '四川'))
高频分析查询-连接池
但如果变成了web, 每次访问接口都要查询数据, 如果每次都要重新进行 connect 那这个就有点低效了.
若在外层只连接一次, 重复用 cursor 的话又可能会造成锁表和线程异常问题.
因此, 创建一个数据库连接池来进行任务管理是非常必要的. 即可通过一个 pool 来自动创建多个连接和资源回收, 这样就又高效又优雅.
在 pyhton 中, 我们用 dbutils 用作池管理, 驱动还是用 pymysql, 写法如下:
import pymysql
from pymysql import cursors
from dbutils.pooled_db import PooledDB
pool = PooledDB(
creator=pymysql,
# 创建最大连接
maxconnections=6,
mincached=2,
maxcached=3,
maxshared=4,
blocking=True,
maxusage=None,
setsession=[],
ping=0,
# 这一坨会传给上面的 pymysql
host='127.0.0.1',
port=3306,
user='root',
passwd='123456',
database='cj',
charset='utf8',
# 让查询结果是一个 dict
cursorclass=cursors.DictCursor
)
# 使用上和 mysql 是一样的
conn = pool.connection()
cursor = conn.cursor()
cursor.execute('select order_id, province from market limit 2;')
data = cursor.fetchall()
print(data)
# 这里并没有关闭连接, 而是放进了连接池 pool
conn.close()
结果也是一样的.
(('US-2023-1357144', '浙江'), ('CN-2023-1973789', '四川'))
模拟多用户请求
上面的作用不太直观, 这里用多线程 threading 来模拟多请求的情况. 假设每次最大连接 3次, 至少1次的配置,
通过 sleep 减速一下则可以看到每次 3个连接这样, 一波波处理:
import pymysql
from pymysql import cursors
from dbutils.pooled_db import PooledDB
pool = PooledDB(
creator=pymysql,
# 创建最大连接, 这里假设搞个3
maxconnections=3,
mincached=1,
maxcached=3,
maxshared=4,
blocking=True,
maxusage=None,
setsession=[],
ping=0,
# 这一坨会传给上面的 pymysql
host='127.0.0.1',
port=3306,
user='root',
passwd='123456',
database='cj',
charset='utf8',
# 让查询结果是一个 dict
cursorclass=cursors.DictCursor
)
# 模拟多个请求的情况
from threading import Thread
def task(num):
conn = pool.connection()
cursor = conn.cursor()
cursor.execute('select sleep(3);')
data = cursor.fetchall()
print(num, '-------', data)
conn.close()
# 多线程任务测试
for i in range(20):
t = Thread(target=task, args=(i,))
# 启动
t.start()
一波波的输出如下:
2 ------- [{'sleep(3)': 0}]
1 ------- [{'sleep(3)': 0}]
3 ------- [{'sleep(3)': 0}]
0 ------- [{'sleep(3)': 0}]
4 ------- [{'sleep(3)': 0}]
5 ------- [{'sleep(3)': 0}]
7 ------- [{'sleep(3)': 0}]
6 ------- [{'sleep(3)': 0}]
10 ------- [{'sleep(3)': 0}]
8 ------- [{'sleep(3)': 0}]
11 ------- [{'sleep(3)': 0}]
9 ------- [{'sleep(3)': 0}]
14 ------- [{'sleep(3)': 0}]
12 ------- [{'sleep(3)': 0}]
15 ------- [{'sleep(3)': 0}]
13 ------- [{'sleep(3)': 0}]
16 ------- [{'sleep(3)': 0}]
18 ------- [{'sleep(3)': 0}]
17 ------- [{'sleep(3)': 0}]
19 ------- [{'sleep(3)': 0}]
可以看到连接池简直太优秀了, 非常实用和优雅呀.
耐心和恒心, 总会获得回报的.