首页 > 数据库 >PythonNote042---pymysql使用

PythonNote042---pymysql使用

时间:2023-07-29 17:05:27浏览次数:65  
标签:cur 14 PythonNote042 pymysql --- sql spark con


  简单介绍pymysql的一些操作,增改删查

先建表,再写数据至表中
除查询操作外,增改删都需要commit操作,具体原理看ref.1

import pandas as pd
import pymysql
import time
import warnings
warnings.filterwarnings("ignore")

建表

con = pymysql.connect(host='localhost',
                      port=3306,
                      user='root',
                      password='12345',
                      db='ai',
                      charset="utf8")
# 创建游标(默认数据返回tuple,修改为dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
create_sql = """
create table user(
    id int NOT NULL AUTO_INCREMENT,
    `name` varchar(50) NOT NULL,
    `age` int NOT NULL,
    PRIMARY KEY (`id`)
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
"""
try:
    # 执行sql语句
    cur.execute(create_sql)
    # 执行sql语句
    con.commit()
except:
    # 发生错误时回滚
    print("发生错误,回滚")
    con.rollback()

# 关闭数据库连接
con.close()
con = pymysql.connect(host='localhost',
                      port=3306,
                      user='root',
                      password='12345',
                      db='ai',
                      charset="utf8")
# 创建游标(默认数据返回tuple,修改为dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
sql = """
desc user;
"""
try:
    # 执行sql语句
    cur.execute(sql)
    get_df = pd.DataFrame(cur.fetchall())
    print(get_df)
    # 执行sql语句
    con.commit()
except:
    # 发生错误时回滚
    con.rollback()
# 关闭游标
cur.close
# 关闭数据库连接
con.close()
Field         Type Null  Key Default           Extra
0    id          int   NO  PRI    None  auto_increment
1  name  varchar(50)   NO         None                
2   age          int   NO         None

插入数据

con = pymysql.connect(host='localhost',
                      port=3306,
                      user='root',
                      password='12345',
                      db='ai',
                      charset="utf8")
# 创建游标(默认数据返回tuple,修改为dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
row_nums = 500000
sql = "insert into user(name, age)values('小明', 14)"
try:
    # 执行sql语句
    t1 = time.time()
    for i in range(row_nums):
        cur.execute(sql)
    con.commit()  # 提交
    t2 = time.time()
    print(f"循环写入耗时:{t2 - t1}")  # 7s
except:
    # 发生错误时回滚
    con.rollback()
# 关闭游标
cur.close
# 关闭数据库连接
con.close()
循环写入耗时:39.632535457611084

批量写入

con = pymysql.connect(host='localhost',
                      port=3306,
                      user='root',
                      password='12345',
                      db='ai',
                      charset="utf8")
# 创建游标(默认数据返回tuple,修改为dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
row_nums = 500000
sql = "insert into user(name, age) values(%s,%s)"
citys = [
    ('小明', 14) for i in range(row_nums)
]

try:
    # 执行sql语句
    t1 = time.time()
    # citys是参数组合,每个元素对应一行insert sql的对应字段,可以是元组,也可以是列表
    cur.executemany(sql, citys)  # 批量执行
    con.commit()  # 提交
    t2 = time.time()
    print(f"批量写入耗时:{t2 - t1}")  # 7s
except:
    # 发生错误时回滚
    con.rollback()
# 关闭游标
cur.close
# 关闭数据库连接
con.close()
批量写入耗时:5.722973823547363

批量写入有明显的速度优势,注意"insert into user(name, age) values(%s,%s)",values前面有空格,具体原因看ref.2

pyspark批量写入

数据量巨大时,可以结合spark的foreachPartition算子,并行写入

import pandas as pd
import time
import pymysql
import functools
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import *
def get_or_create_hudi(app_name):
    spark = SparkSession \
        .builder \
        .appName(app_name) \
        .config("spark.driver.maxResultSize", "10g") \
        .config("spark.sql.execution.arrow.enabled", "true") \
        .config("spark.dynamicAllocation.enabled", "false") \
        .config("spark.sql.crossJoin.enabled", "true") \
        .config("spark.kryoserializer.buffer.max", "512m") \
        .config("spark.io.compression.codec", "snappy") \
        .config("spark.sql.hive.convertMetastoreParquet", "false") \
        .config("spark.hadoop.dfs.namenode.acls.enabled", "false") \
        .config("spark.sql.hive.convertMetastoreParquet", "false") \
        .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
        .enableHiveSupport() \
        .getOrCreate()
    spark.sparkContext.setLogLevel('ERROR')
    print("\n")
    print("\n")
    return spark
def insert2mysql_partrdd(part, db_param="", value_cols=['name', 'age'], batch=40000):
    """

    @param part:
    @param db_param: mysql配置信息
    @param value_cols: insert 列名称
    @param batch: 批插入数据量
    @return:
    """
    con = pymysql.connect(host='localhost',
                          port=3306,
                          user='root',
                          password='12345',
                          db='ai',
                          charset="utf8")
    cur = con.cursor(cursor=pymysql.cursors.DictCursor)
    cnt = 0
    batch_list = []
    sql = sql = "insert into user(name, age) values(%s,%s)"
    for row in part:
        # 这个操作可能会比较耗时。。有没有好方法优化下?
        batch_list.append([row[i] for i in value_cols])
        cnt = cnt + 1
        if cnt > 0 and cnt % batch == 0:
            cur.executemany(sql, batch_list)
            con.commit()  # 提交
            batch_list = []
            print(f"第{cnt - batch}-{cnt}行数据插入MySql!")

    # 最后一波数据如果不是batch余数,也推过去
    if cnt % batch != 0:
        cur.executemany(sql, batch_list)
        con.commit()  # 提交
        print(f"第{cnt - cnt % batch}-{cnt}行数据插入MySql!")

    if cnt > 0:
        print(f"数据抽样-key:{row}")
        print(f"cnt:{cnt}")
    else:
        print("该分区无数据")

    cur.close()
    con.close()
row_nums = 500000

df = pd.DataFrame({"name": ['小明'] * row_nums, 'age': [14] * row_nums})
spark = get_or_create_hudi("test")
spark_df = spark.createDataFrame(df).repartition(10)
t1 = time.time()
spark_df.rdd.foreachPartition(
    functools.partial(insert2mysql_partrdd, batch=50000))
t2 = time.time()
print(f"spark批写入耗时:{t2 - t1}")  # 1.2s
spark批写入耗时:8.034992456436157
  • 速度上似乎没有更快
  • 可能数据量再大些,会有效果
  • 另,单机跑spark也可能有些影响

刚才搞了100w数据,删除些

con = pymysql.connect(host='localhost',
                      port=3306,
                      user='root',
                      password='12345',
                      db='ai',
                      charset="utf8")
# 创建游标(默认数据返回tuple,修改为dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
sql = """
delete from user where id>10
"""
try:
    # 执行sql语句
    cur.execute(sql)
    # 执行sql语句
    con.commit()
except:
    # 发生错误时回滚
    print("发生错误,回滚")
    con.rollback()

# 关闭数据库连接
con.close()
con = pymysql.connect(host='localhost',
                      port=3306,
                      user='root',
                      password='12345',
                      db='ai',
                      charset="utf8")
# 创建游标(默认数据返回tuple,修改为dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
sql = """
select count(*) as cnt from  user
"""
try:
    # 执行sql语句
    cur.execute(sql)
    get_df = pd.DataFrame(cur.fetchall())
    print(get_df)
    # 执行sql语句
    # con.commit()
except:
    # 发生错误时回滚
    print("发生错误,回滚")
    con.rollback()

# 关闭数据库连接
con.close()
cnt
0   10

还剩10条数据

结合pandas,把查询的数据转成df

con = pymysql.connect(host='localhost',
                      port=3306,
                      user='root',
                      password='12345',
                      db='ai',
                      charset="utf8")
# 创建游标(默认数据返回tuple,修改为dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
sql = """
select * from  user limit 100
"""
try:
    # 执行sql语句
    cur.execute(sql)
    get_df = pd.DataFrame(cur.fetchall())
    print(get_df)
    # 执行sql语句
    # con.commit()
except:
    # 发生错误时回滚
    print("发生错误,回滚")
    con.rollback()

# 关闭数据库连接
con.close()
id name  age
0   1   小明   14
1   2   小明   14
2   3   小明   14
3   4   小明   14
4   5   小明   14
5   6   小明   14
6   7   小明   14
7   8   小明   14
8   9   小明   14
9  10   小明   14

con = pymysql.connect(host='localhost',
                      port=3306,
                      user='root',
                      password='12345',
                      db='ai',
                      charset="utf8")
# 创建游标(默认数据返回tuple,修改为dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
sql = """
update user set name = '小红'  where id<=5
"""
try:
    # 执行sql语句
    cur.execute(sql)
    # 执行sql语句
    con.commit()
except:
    # 发生错误时回滚
    print("发生错误,回滚")
    con.rollback()

# 关闭数据库连接
con.close()
con = pymysql.connect(host='localhost',
                      port=3306,
                      user='root',
                      password='12345',
                      db='ai',
                      charset="utf8")
# 创建游标(默认数据返回tuple,修改为dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
sql = """
select * from  user limit 100
"""
try:
    # 执行sql语句
    cur.execute(sql)
    get_df = pd.DataFrame(cur.fetchall())
    print(get_df)
    # 执行sql语句
    # con.commit()
except:
    # 发生错误时回滚
    print("发生错误,回滚")
    con.rollback()

# 关闭数据库连接
con.close()
id name  age
0   1   小红   14
1   2   小红   14
2   3   小红   14
3   4   小红   14
4   5   小红   14
5   6   小明   14
6   7   小明   14
7   8   小明   14
8   9   小明   14
9  10   小明   14

Ref

[1] https://www.runoob.com/python3/python3-mysql.html[2] https://www.modb.pro/db/184700

                                2023-07-28 台风、大雨 于南京市江宁区


标签:cur,14,PythonNote042,pymysql,---,sql,spark,con
From: https://blog.51cto.com/u_15575753/6893729

相关文章

  • PysparkNote006---pycharm加载spark环境
    pycharm配置pyspark环境,本地执行pyspark代码spark安装、添加环境变量不提了File-Settings-Project-ProjectStructure-addcontentroot添加如下两个路径D:\code\spark\python\lib\py4j-0.10.7-src.zipD:\code\spark\python\lib\pyspark.zip                ......
  • Hive12---日期时间函数的操作
    Intro    时间函数的一些操作,记录之。备查当前时间frompyspark.sqlimportSparkSessionfrompyspark.sqlimportfunctionsasFfrompyspark.sql.typesimportDoubleType,IntegerType,StringTypedefget_or_create(app_name):spark=(SparkSession.buil......
  • PyPackage01---Pandas17_null、inf筛选
    判断数据集是否存在null、inf,快速定位所在列、行,方便分析原因无穷大、无穷小处理importpandasaspdimportnumpyasnp#Createdataframeusingdictionarydata={'StudentID':[10,11,12,13,14],'Age':[23,22,24,22,np.nan],'Weight':[66,72,np.in......
  • electron的electron-packager打包运行和electron-builder生产安装包过程,学透 Electron
    electron的electron-packager打包运行和electron-builder生产安装包过程开发electron客户端程序,打包是绕不开的问题。macOS应用构建,看似近在咫尺,实则坑坑致命。场景:mac笔记本打包,以及生产出可交付的软件安装包,如何避坑,如何理解app的产生过程!!!!可以按照我测试的路程来配置环境。包......
  • 【CMU15-445 FALL 2022】Project #1 - Buffer Pool
    About实验官网Project#1-BufferPool在线评测网站gradescopeLabTask#1-ExtendibleHashTable详见——【CMU15-445FALL2022】Project#1-ExtendableHashing如果链接失效,请查看当前平台我之前发布的文章。Task#2-LRU-KReplacementPolicyConcept相关参考LRU-K和2Q......
  • 面向对象编程的 SOLID 原则 - 里氏替换原则
    里氏替换原则里氏替换原则描述的是子类应该能替换为它的基类。意思是,给定classB是classA的子类,在预期传入classA的对象的任何方法传入classB的对象,方法都不应该有异常。这是一个预期的行为,因为继承假定子类继承了父类的一切。子类可以扩展行为但不会收窄。因此,当......
  • 关于vue element-admin 切换tag, 页面刷新 以及内存增加不释放问题
    1:切换tag,页面刷新,检查了路由,配置了  nocache:false,以及isKeep:true, 但是在页面tag切换时,还是会刷新,, 在生命周期中打印,发现能够打印,, 检查了代码,在组件引用中未发现v-if的使用, 最后竟查找,借鉴 https://blog.csdn.net/weixin_45616483/article/details/122959997 ......
  • 面向对象编程的 SOLID 原则 - 依赖倒置原则
    依赖倒置原则依赖倒置原则描述的是我们的class应该依赖接口和抽象类而不是具体的类和函数。在这篇文章(2000)里,Bob大叔如下总结该原则:“如果OCP声明了OO体系结构的目标,那么DIP则声明了主要机制”。这两个原则的确息息相关,我们在讨论开闭原则之前也要用到这一模式。......
  • 面向对象编程的 SOLID 原则 - 接口隔离原则
    接口隔离原则隔离意味着保持独立,接口隔离原则是关于接口的独立。该原则描述了很多客户端特定的接口优于一个多用途接口。客户端不应该强制实现他们不需要的函数。这是一个简单的原则,很好理解和实践,直接看例子。publicinterfaceParkingLot{ voidparkCar(); //Decrease......
  • v831-c-yolov2例程解析
    没错,自从把ubuntu搞坏之后无奈把之前的例程全删了,因此所有的笔记都没了,又得从新分析一遍main函数先从最简单的main分析此函数主要创建一个屏幕句柄用来显示,然后调用nn_test来开始yolov2的操作,并且传入画布,显示等都在里面操作,最后跳出来后摧毁屏幕nn_test函数此函数很长,一点......