简单介绍pymysql的一些操作,增改删查
增
先建表,再写数据至表中
除查询操作外,增改删都需要commit操作,具体原理看ref.1
import pandas as pd
import pymysql
import time
import warnings
warnings.filterwarnings("ignore")
建表
con = pymysql.connect(host='localhost',
port=3306,
user='root',
password='12345',
db='ai',
charset="utf8")
# 创建游标(默认数据返回tuple,修改为dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
create_sql = """
create table user(
id int NOT NULL AUTO_INCREMENT,
`name` varchar(50) NOT NULL,
`age` int NOT NULL,
PRIMARY KEY (`id`)
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
"""
try:
# 执行sql语句
cur.execute(create_sql)
# 执行sql语句
con.commit()
except:
# 发生错误时回滚
print("发生错误,回滚")
con.rollback()
# 关闭数据库连接
con.close()
con = pymysql.connect(host='localhost',
port=3306,
user='root',
password='12345',
db='ai',
charset="utf8")
# 创建游标(默认数据返回tuple,修改为dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
sql = """
desc user;
"""
try:
# 执行sql语句
cur.execute(sql)
get_df = pd.DataFrame(cur.fetchall())
print(get_df)
# 执行sql语句
con.commit()
except:
# 发生错误时回滚
con.rollback()
# 关闭游标
cur.close
# 关闭数据库连接
con.close()
Field Type Null Key Default Extra
0 id int NO PRI None auto_increment
1 name varchar(50) NO None
2 age int NO None
插入数据
con = pymysql.connect(host='localhost',
port=3306,
user='root',
password='12345',
db='ai',
charset="utf8")
# 创建游标(默认数据返回tuple,修改为dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
row_nums = 500000
sql = "insert into user(name, age)values('小明', 14)"
try:
# 执行sql语句
t1 = time.time()
for i in range(row_nums):
cur.execute(sql)
con.commit() # 提交
t2 = time.time()
print(f"循环写入耗时:{t2 - t1}") # 7s
except:
# 发生错误时回滚
con.rollback()
# 关闭游标
cur.close
# 关闭数据库连接
con.close()
循环写入耗时:39.632535457611084
批量写入
con = pymysql.connect(host='localhost',
port=3306,
user='root',
password='12345',
db='ai',
charset="utf8")
# 创建游标(默认数据返回tuple,修改为dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
row_nums = 500000
sql = "insert into user(name, age) values(%s,%s)"
citys = [
('小明', 14) for i in range(row_nums)
]
try:
# 执行sql语句
t1 = time.time()
# citys是参数组合,每个元素对应一行insert sql的对应字段,可以是元组,也可以是列表
cur.executemany(sql, citys) # 批量执行
con.commit() # 提交
t2 = time.time()
print(f"批量写入耗时:{t2 - t1}") # 7s
except:
# 发生错误时回滚
con.rollback()
# 关闭游标
cur.close
# 关闭数据库连接
con.close()
批量写入耗时:5.722973823547363
批量写入有明显的速度优势,注意"insert into user(name, age) values(%s,%s)",values前面有空格,具体原因看ref.2
pyspark批量写入
数据量巨大时,可以结合spark的foreachPartition算子,并行写入
import pandas as pd
import time
import pymysql
import functools
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import *
def get_or_create_hudi(app_name):
spark = SparkSession \
.builder \
.appName(app_name) \
.config("spark.driver.maxResultSize", "10g") \
.config("spark.sql.execution.arrow.enabled", "true") \
.config("spark.dynamicAllocation.enabled", "false") \
.config("spark.sql.crossJoin.enabled", "true") \
.config("spark.kryoserializer.buffer.max", "512m") \
.config("spark.io.compression.codec", "snappy") \
.config("spark.sql.hive.convertMetastoreParquet", "false") \
.config("spark.hadoop.dfs.namenode.acls.enabled", "false") \
.config("spark.sql.hive.convertMetastoreParquet", "false") \
.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.enableHiveSupport() \
.getOrCreate()
spark.sparkContext.setLogLevel('ERROR')
print("\n")
print("\n")
return spark
def insert2mysql_partrdd(part, db_param="", value_cols=['name', 'age'], batch=40000):
"""
@param part:
@param db_param: mysql配置信息
@param value_cols: insert 列名称
@param batch: 批插入数据量
@return:
"""
con = pymysql.connect(host='localhost',
port=3306,
user='root',
password='12345',
db='ai',
charset="utf8")
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
cnt = 0
batch_list = []
sql = sql = "insert into user(name, age) values(%s,%s)"
for row in part:
# 这个操作可能会比较耗时。。有没有好方法优化下?
batch_list.append([row[i] for i in value_cols])
cnt = cnt + 1
if cnt > 0 and cnt % batch == 0:
cur.executemany(sql, batch_list)
con.commit() # 提交
batch_list = []
print(f"第{cnt - batch}-{cnt}行数据插入MySql!")
# 最后一波数据如果不是batch余数,也推过去
if cnt % batch != 0:
cur.executemany(sql, batch_list)
con.commit() # 提交
print(f"第{cnt - cnt % batch}-{cnt}行数据插入MySql!")
if cnt > 0:
print(f"数据抽样-key:{row}")
print(f"cnt:{cnt}")
else:
print("该分区无数据")
cur.close()
con.close()
row_nums = 500000
df = pd.DataFrame({"name": ['小明'] * row_nums, 'age': [14] * row_nums})
spark = get_or_create_hudi("test")
spark_df = spark.createDataFrame(df).repartition(10)
t1 = time.time()
spark_df.rdd.foreachPartition(
functools.partial(insert2mysql_partrdd, batch=50000))
t2 = time.time()
print(f"spark批写入耗时:{t2 - t1}") # 1.2s
spark批写入耗时:8.034992456436157
- 速度上似乎没有更快
- 可能数据量再大些,会有效果
- 另,单机跑spark也可能有些影响
删
刚才搞了100w数据,删除些
con = pymysql.connect(host='localhost',
port=3306,
user='root',
password='12345',
db='ai',
charset="utf8")
# 创建游标(默认数据返回tuple,修改为dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
sql = """
delete from user where id>10
"""
try:
# 执行sql语句
cur.execute(sql)
# 执行sql语句
con.commit()
except:
# 发生错误时回滚
print("发生错误,回滚")
con.rollback()
# 关闭数据库连接
con.close()
con = pymysql.connect(host='localhost',
port=3306,
user='root',
password='12345',
db='ai',
charset="utf8")
# 创建游标(默认数据返回tuple,修改为dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
sql = """
select count(*) as cnt from user
"""
try:
# 执行sql语句
cur.execute(sql)
get_df = pd.DataFrame(cur.fetchall())
print(get_df)
# 执行sql语句
# con.commit()
except:
# 发生错误时回滚
print("发生错误,回滚")
con.rollback()
# 关闭数据库连接
con.close()
cnt
0 10
还剩10条数据
查
结合pandas,把查询的数据转成df
con = pymysql.connect(host='localhost',
port=3306,
user='root',
password='12345',
db='ai',
charset="utf8")
# 创建游标(默认数据返回tuple,修改为dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
sql = """
select * from user limit 100
"""
try:
# 执行sql语句
cur.execute(sql)
get_df = pd.DataFrame(cur.fetchall())
print(get_df)
# 执行sql语句
# con.commit()
except:
# 发生错误时回滚
print("发生错误,回滚")
con.rollback()
# 关闭数据库连接
con.close()
id name age
0 1 小明 14
1 2 小明 14
2 3 小明 14
3 4 小明 14
4 5 小明 14
5 6 小明 14
6 7 小明 14
7 8 小明 14
8 9 小明 14
9 10 小明 14
改
con = pymysql.connect(host='localhost',
port=3306,
user='root',
password='12345',
db='ai',
charset="utf8")
# 创建游标(默认数据返回tuple,修改为dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
sql = """
update user set name = '小红' where id<=5
"""
try:
# 执行sql语句
cur.execute(sql)
# 执行sql语句
con.commit()
except:
# 发生错误时回滚
print("发生错误,回滚")
con.rollback()
# 关闭数据库连接
con.close()
con = pymysql.connect(host='localhost',
port=3306,
user='root',
password='12345',
db='ai',
charset="utf8")
# 创建游标(默认数据返回tuple,修改为dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
sql = """
select * from user limit 100
"""
try:
# 执行sql语句
cur.execute(sql)
get_df = pd.DataFrame(cur.fetchall())
print(get_df)
# 执行sql语句
# con.commit()
except:
# 发生错误时回滚
print("发生错误,回滚")
con.rollback()
# 关闭数据库连接
con.close()
id name age
0 1 小红 14
1 2 小红 14
2 3 小红 14
3 4 小红 14
4 5 小红 14
5 6 小明 14
6 7 小明 14
7 8 小明 14
8 9 小明 14
9 10 小明 14
Ref
[1] https://www.runoob.com/python3/python3-mysql.html[2] https://www.modb.pro/db/184700
2023-07-28 台风、大雨 于南京市江宁区