目录
一、数据表操作
在MySQL服务器上已有数据库test_database,且该数据库下已有数据表myelsa_table,此数据表包含4个字段:name,ID_Card,age,city,现借助第三方库对该数据表进行增、删、改、查等操作。
1、新增记录
1-1、用mysql-connector-python库
import mysql.connector
# 数据库连接配置
config = {
'user': 'root', # 替换为你的MySQL用户名
'password': '123456', # 替换为你的MySQL密码
'host': '127.0.0.1', # 如果数据库在远程服务器上,请替换为相应的主机名或IP地址
'database': 'test_database', # 数据库名
}
# 定义要插入的数据
new_record = {
'name': 'Myelsa',
'ID_Card': '443689564710526448',
'age': 18,
'city': 'Guangzhou'
}
# SQL插入语句
insert_query = """
INSERT INTO myelsa_table (name, ID_Card, age, city)
VALUES (%s, %s, %s, %s)
"""
try:
# 连接到数据库
cnx = mysql.connector.connect(**config)
cursor = cnx.cursor()
# 执行SQL插入语句
cursor.execute(insert_query, (new_record['name'], new_record['ID_Card'], new_record['age'], new_record['city']))
# 提交更改
cnx.commit()
print("Record inserted successfully!")
except mysql.connector.Error as err:
print(f"Error: '{err}'")
finally:
# 关闭游标和连接
if cursor:
cursor.close()
if cnx.is_connected():
cnx.close()
1-2、用PyMySQL库
import pymysql
# 数据库连接配置
config = {
'user': 'root', # 替换为你的MySQL用户名
'password': '123456', # 替换为你的MySQL密码
'host': '127.0.0.1', # 如果数据库在远程服务器上,请替换为相应的主机名或IP地址
'database': 'test_database', # 数据库名
'charset': 'utf8mb4', # 添加字符集配置,防止编码问题
'cursorclass': pymysql.cursors.DictCursor # 使用字典游标以便轻松访问列名
}
# 定义要插入的数据
new_record = {
'name': 'Myelsa',
'ID_Card': '443689564710526448',
'age': 18,
'city': 'Guangzhou'
}
# SQL插入语句
insert_query = """
INSERT INTO myelsa_table (name, ID_Card, age, city)
VALUES (%s, %s, %s, %s)
"""
try:
# 连接到数据库
cnx = pymysql.connect(**config)
with cnx.cursor() as cursor:
# 使用with语句确保游标在使用完毕后被关闭
# 执行SQL插入语句
cursor.execute(insert_query, (new_record['name'], new_record['ID_Card'], new_record['age'], new_record['city']))
# 提交更改
cnx.commit()
print("Record inserted successfully!")
except pymysql.Error as err:
print(f"Error: '{err}'")
# 使用with语句连接数据库时,在with块结束后连接将自动关闭
# 如果没有使用with语句,需要显式关闭连接
if cnx.open:
cnx.close()
1-3、用PeeWee库
from peewee import *
# 数据库连接配置
db = MySQLDatabase('test_database', user='root', password='123456', host='127.0.0.1')
# 定义模型
class MyelsaTable(Model):
name = CharField()
ID_Card = CharField()
age = IntegerField()
city = CharField()
class Meta:
database = db
table_name = 'myelsa_table'
# 连接到数据库
db.connect()
# 创建表(如果尚不存在)
db.create_tables([MyelsaTable])
# 插入数据
new_record = {
'name': 'Myelsa',
'ID_Card': '443689564710526448',
'age': 18,
'city': 'Guangzhou'
}
try:
MyelsaTable.create(
name=new_record['name'],
ID_Card=new_record['ID_Card'],
age=new_record['age'],
city=new_record['city']
)
print("Record inserted successfully!")
except IntegrityError as e:
print(f"Error: '{e}'")
finally:
# 关闭数据库连接
db.close()
1-4、用SQLAlchemy库
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import SQLAlchemyError
# 定义基类
Base = declarative_base()
# 定义数据库模型类
class MyElsaTable(Base):
__tablename__ = 'myelsa_table'
ID_Card = Column(String, primary_key=True)
name = Column(String)
age = Column(Integer)
city = Column(String)
def __repr__(self):
return f"<MyElsaTable(ID_Card={self.ID_Card}, name={self.name}, age={self.age}, city={self.city})>"
# 数据库连接配置
config = {
'username': 'root', # 替换为你的MySQL用户名
'password': '123456', # 替换为你的MySQL密码
'host': '127.0.0.1', # 如果数据库在远程服务器上,请替换为相应的主机名或IP地址
'database': 'test_database', # 数据库名
}
# 创建数据库引擎
engine = create_engine(
f'mysql+pymysql://{config["username"]}:{config["password"]}@{config["host"]}/{config["database"]}')
# 确保所有表都已创建(可选)
Base.metadata.create_all(engine)
# 创建会话类
Session = sessionmaker(bind=engine)
# 定义要插入的数据
new_record = {
'name': 'Myelsa',
'ID_Card': '443689564710526448',
'age': 18,
'city': 'Guangzhou'
}
try:
# 使用上下文管理器自动管理会话
with Session() as session:
# 创建新的模型实例
new_entry = MyElsaTable(**new_record)
# 将新实例添加到会话中
session.add(new_entry)
# 提交更改
session.commit()
print("Record inserted successfully!")
except SQLAlchemyError as e:
print(f"Error: '{e}'")
# 在使用上下文管理器时,无需显式回滚,因为上下文管理器会在退出时处理它
2、删除记录
2-1、用mysql-connector-python库
import mysql.connector
# 数据库连接配置
config = {
'user': 'root', # 替换为你的MySQL用户名
'password': '123456', # 替换为你的MySQL密码
'host': '127.0.0.1', # 如果数据库在远程服务器上,请替换为相应的主机名或IP地址
'database': 'test_database', # 数据库名
}
# 要删除的记录的ID_Card值
id_card_to_delete = '443689564710526448'
# SQL删除语句
delete_query = """
DELETE FROM myelsa_table
WHERE ID_Card = %s
"""
try:
# 连接到数据库
cnx = mysql.connector.connect(**config)
cursor = cnx.cursor()
# 执行SQL删除语句
cursor.execute(delete_query, (id_card_to_delete,))
# 提交更改
cnx.commit()
print(f"Record with ID_Card '{id_card_to_delete}' deleted successfully!")
except mysql.connector.Error as err:
print(f"Error: '{err}'")
finally:
# 关闭游标和连接
if cursor:
cursor.close()
if cnx.is_connected():
cnx.close()
2-2、用PyMySQL库
import pymysql
# 数据库连接配置
config = {
'user': 'root', # 替换为你的MySQL用户名
'password': '123456', # 替换为你的MySQL密码
'host': '127.0.0.1', # 如果数据库在远程服务器上,请替换为相应的主机名或IP地址
'database': 'test_database', # 数据库名
'charset': 'utf8mb4', # 添加字符集配置,防止编码问题
'cursorclass': pymysql.cursors.DictCursor # 使用字典游标(虽然在这个删除操作中不是必需的)
}
# 要删除的记录的ID_Card值
id_card_to_delete = '443689564710526448'
# SQL删除语句
delete_query = """
DELETE FROM myelsa_table
WHERE ID_Card = %s
"""
try:
# 连接到数据库
cnx = pymysql.connect(**config)
with cnx.cursor() as cursor:
# 使用with语句确保游标在使用完毕后被关闭
# 执行SQL删除语句
cursor.execute(delete_query, (id_card_to_delete,))
# 提交更改
cnx.commit()
print(f"Record with ID_Card '{id_card_to_delete}' deleted successfully!")
except pymysql.Error as err:
print(f"Error: '{err}'")
# 使用with语句连接数据库时,在with块结束后连接将自动关闭
# 如果没有使用with语句,需要显式关闭连接
if cnx.open:
cnx.close()
2-3、用PeeWee库
from peewee import *
# 数据库连接配置
db = MySQLDatabase('test_database', user='root', password='123456', host='127.0.0.1')
# 定义模型,映射到myelsa_table表
class Myelsa_Table(Model):
name = CharField()
ID_Card = CharField(unique=True) # 假设ID_Card是唯一的
age = IntegerField()
city = CharField()
class Meta:
database = db
# 连接到数据库
db.connect()
# 要删除的记录的ID_Card值
id_card_to_delete = '443689564710526448'
try:
# 使用模型来删除记录
query = Myelsa_Table.delete().where(Myelsa_Table.ID_Card == id_card_to_delete)
query.execute()
print(f"Record with ID_Card '{id_card_to_delete}' deleted successfully!")
except Exception as e:
print(f"Error: '{e}'")
finally:
# 关闭数据库连接(Peewee会在连接池中管理连接,通常不需要显式关闭)
# 但如果你确定不再需要连接,可以调用db.close()
# db.close()
pass
# 注意:在实际应用中,通常不需要显式关闭连接,因为Peewee会管理连接池
# 但在某些情况下,例如脚本结束时,你可能想要确保所有资源都被释放
2-4、用SQLAlchemy库
from sqlalchemy import create_engine, Column, Integer, String, MetaData, Table
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import SQLAlchemyError
# 数据库连接配置
config = {
'username': 'root', # 替换为你的MySQL用户名
'password': '123456', # 替换为你的MySQL密码
'host': '127.0.0.1', # 如果数据库在远程服务器上,请替换为相应的主机名或IP地址
'database': 'test_database', # 数据库名
'port': 3306, # 如果端口不是默认的3306,请添加此行并替换为正确的端口
}
# 创建数据库引擎
engine = create_engine(
f'mysql+pymysql://{config["username"]}:{config["password"]}@{config["host"]}:{config["port"]}/{config["database"]}')
# 要删除的记录的ID_Card值
id_card_to_delete = '443689564710526448'
# 定义元数据(如果使用ORM,则无需此步骤,但这里为简单起见使用Table对象)
metadata = MetaData()
myelsa_table = Table('myelsa_table', metadata, autoload_with=engine)
# 创建会话类
Session = sessionmaker(bind=engine)
try:
# 创建会话对象
session = Session()
# 执行SQL删除语句(这里使用session.execute而不是ORM方法)
session.execute(myelsa_table.delete().where(myelsa_table.c.ID_Card == id_card_to_delete))
# 提交更改
session.commit()
print(f"Record with ID_Card '{id_card_to_delete}' deleted successfully!")
except SQLAlchemyError as e:
print(f"Error: '{e}'")
# 如果出错,回滚更改
session.rollback()
finally:
# 关闭会话
session.close()
3、修改记录
3-1、用mysql-connector-python库
import mysql.connector
# 数据库连接配置
config = {
'user': 'root', # 替换为你的MySQL用户名
'password': '123456', # 替换为你的MySQL密码
'host': '127.0.0.1', # 如果数据库在远程服务器上,请替换为相应的主机名或IP地址
'database': 'test_database', # 数据库名
}
# 要更新的记录的ID_Card值
id_card_to_update = '443689564710526448'
# 新的记录值
new_values = {
'name': 'Jimmy',
'age': 15,
'city': 'Foshan'
}
# SQL更新语句
update_query = """
UPDATE myelsa_table
SET name = %s, age = %s, city = %s
WHERE ID_Card = %s
"""
try:
# 连接到数据库
cnx = mysql.connector.connect(**config)
cursor = cnx.cursor()
# 执行SQL更新语句
cursor.execute(update_query, (new_values['name'], new_values['age'], new_values['city'], id_card_to_update))
# 提交更改
cnx.commit()
print(f"Record with ID_Card '{id_card_to_update}' updated successfully!")
except mysql.connector.Error as err:
print(f"Error: '{err}'")
finally:
# 关闭游标和连接
if cursor:
cursor.close()
if cnx.is_connected():
cnx.close()
3-2、用PyMySQL库
import pymysql
# 数据库连接配置
config = {
'user': 'root', # 替换为你的MySQL用户名
'password': '123456', # 替换为你的MySQL密码
'host': '127.0.0.1', # 如果数据库在远程服务器上,请替换为相应的主机名或IP地址
'database': 'test_database', # 数据库名
'charset': 'utf8mb4', # 添加字符集配置,防止编码问题
'cursorclass': pymysql.cursors.DictCursor # 虽然在更新操作中不是必需的,但这里保持一致
}
# 要更新的记录的ID_Card值
id_card_to_update = '443689564710526448'
# 新的记录值
new_values = {
'name': 'Jimmy',
'age': 15,
'city': 'Foshan'
}
# SQL更新语句
update_query = """
UPDATE myelsa_table
SET name = %s, age = %s, city = %s
WHERE ID_Card = %s
"""
try:
# 连接到数据库
cnx = pymysql.connect(**config)
with cnx.cursor() as cursor:
# 使用with语句确保游标在使用完毕后被关闭
# 执行SQL更新语句
cursor.execute(update_query, (new_values['name'], new_values['age'], new_values['city'], id_card_to_update))
# 提交更改
cnx.commit()
print(f"Record with ID_Card '{id_card_to_update}' updated successfully!")
except pymysql.MySQLError as err:
print(f"Error: '{err}'")
finally:
# 使用with语句时,连接会在with块结束时自动关闭
# 如果连接没有通过with管理,需要在这里关闭
if cnx.open:
cnx.close()
3-3、用PeeWee库
from peewee import *
# 定义数据库连接
db = MySQLDatabase('test_database', user='root', password='123456', host='127.0.0.1')
# 定义数据库模型类
class MyElsa_Table(Model):
ID_Card = CharField(primary_key=True) # 注意:这里保留了你的原始字段命名
name = CharField()
age = IntegerField()
city = CharField()
class Meta:
database = db
# 连接到数据库
db.connect()
# 要更新的记录的ID_Card值
id_card_to_update = '443689564710526448'
# 新的记录值
new_values = {
'name': 'Jimmy',
'age': 15,
'city': 'Foshan'
}
try:
# 使用ORM方法执行更新操作
query = (MyElsa_Table
.update(name=new_values['name'], age=new_values['age'], city=new_values['city'])
.where(MyElsa_Table.ID_Card == id_card_to_update))
query.execute()
# 如果没有错误,打印成功消息
print(f"Record with ID_Card '{id_card_to_update}' updated successfully!")
except Exception as e:
# 捕获异常并打印错误信息
print(f"Error: '{e}'")
finally:
# 关闭数据库连接
db.close()
3-4、用SQLAlchemy库
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import SQLAlchemyError
# 数据库连接配置
config = {
'username': 'root', # 替换为你的MySQL用户名
'password': '123456', # 替换为你的MySQL密码
'host': '127.0.0.1', # 如果数据库在远程服务器上,请替换为相应的主机名或IP地址
'database': 'test_database', # 数据库名
'port': 3306, # 如果端口不是默认的3306,请添加此行并替换为正确的端口
'echo': False # 如果想看到执行的SQL语句,设置为True
}
# 创建数据库引擎
engine = create_engine(
f'mysql+pymysql://{config["username"]}:{config["password"]}@{config["host"]}:{config["port"]}/{config["database"]}',
echo=config['echo'])
# 定义元数据
metadata = MetaData()
# 假设你的表结构如下(需要根据实际情况调整列类型和名称)
myelsa_table = Table('myelsa_table', metadata,
Column('ID_Card', String, primary_key=True),
Column('name', String),
Column('age', Integer),
Column('city', String),
autoload_with=engine
)
# 要更新的记录的ID_Card值
id_card_to_update = '443689564710526448'
# 新的记录值
new_values = {
'name': 'Jimmy',
'age': 15,
'city': 'Foshan'
}
# 创建会话类
Session = sessionmaker(bind=engine)
try:
# 创建会话对象
session = Session()
# 构造更新语句
stmt = myelsa_table.update().where(myelsa_table.c.ID_Card == id_card_to_update).values(**new_values)
# 执行更新
session.execute(stmt)
# 提交更改
session.commit()
print(f"Record with ID_Card '{id_card_to_update}' updated successfully!")
except SQLAlchemyError as e:
print(f"Error: '{e}'")
# 如果出错,回滚更改
session.rollback()
finally:
# 关闭会话
session.close()
4、查询记录
4-1、用mysql-connector-python库
import mysql.connector
# 数据库连接配置
config = {
'user': 'root', # 替换为你的MySQL用户名
'password': '123456', # 替换为你的MySQL密码
'host': '127.0.0.1', # 如果数据库在远程服务器上,请替换为相应的主机名或IP地址
'database': 'test_database', # 数据库名
}
# 查询所有记录的SQL语句
query_all = """
SELECT * FROM myelsa_table
"""
# 查询特定记录的SQL语句(例如,根据ID_Card查询)
id_card_to_query = '443689564710526448'
query_by_id_card = """
SELECT * FROM myelsa_table WHERE ID_Card = %s
"""
try:
# 连接到数据库
cnx = mysql.connector.connect(**config)
cursor = cnx.cursor()
# 执行查询所有记录的SQL语句
cursor.execute(query_all)
# 获取所有记录
for (name, id_card, age, city) in cursor:
print(f"Name: {name}, ID Card: {id_card}, Age: {age}, City: {city}")
# 如果需要查询特定记录,取消注释以下代码
# cursor.execute(query_by_id_card, (id_card_to_query,))
# record = cursor.fetchone()
# if record:
# (name, id_card, age, city) = record
# print(f"Name: {name}, ID Card: {id_card}, Age: {age}, City: {city}")
# else:
# print(f"No record found for ID Card: {id_card_to_query}")
except mysql.connector.Error as err:
print(f"Error: '{err}'")
finally:
# 关闭游标和连接
if cursor:
cursor.close()
if cnx.is_connected():
cnx.close()
4-2、用PyMySQL库
import pymysql
# 数据库连接配置
config = {
'user': 'root', # 替换为你的MySQL用户名
'password': '123456', # 替换为你的MySQL密码
'host': '127.0.0.1', # 如果数据库在远程服务器上,请替换为相应的主机名或IP地址
'database': 'test_database', # 数据库名
'charset': 'utf8mb4', # 添加字符集配置,防止编码问题
'cursorclass': pymysql.cursors.DictCursor # 使用字典游标以便按列名访问数据
}
# 查询所有记录的SQL语句
query_all = """
SELECT * FROM myelsa_table
"""
# 查询特定记录的SQL语句(例如,根据ID_Card查询)
id_card_to_query = '443689564710526448'
query_by_id_card = """
SELECT * FROM myelsa_table WHERE ID_Card = %s
"""
try:
# 连接到数据库
cnx = pymysql.connect(**config)
with cnx.cursor(pymysql.cursors.DictCursor) as cursor: # 使用with语句自动管理游标
# 执行查询所有记录的SQL语句
cursor.execute(query_all)
# 获取所有记录
for row in cursor:
print(f"Name: {row['name']}, ID Card: {row['ID_Card']}, Age: {row['age']}, City: {row['city']}")
# 如果需要查询特定记录,取消注释以下代码
# cursor.execute(query_by_id_card, (id_card_to_query,))
# record = cursor.fetchone()
# if record:
# print(f"Name: {record['name']}, ID Card: {record['ID_Card']}, Age: {record['age']}, City: {record['city']}")
# else:
# print(f"No record found for ID Card: {id_card_to_query}")
# 注意:因为使用了with语句,所以不需要显式关闭游标
# 提交(在这里其实不需要,因为只是查询)
cnx.commit()
except pymysql.MySQLError as err:
print(f"Error: '{err}'")
finally:
# 使用with语句时,连接会在with块结束时自动关闭
# 如果连接没有通过with管理,需要在这里关闭
if cnx.open:
cnx.close()
4-3、用PeeWee库
from peewee import *
# 数据库连接配置
db = MySQLDatabase('test_database', user='root', password='123456', host='127.0.0.1')
# 定义数据库模型类
class MyElsa_Table(Model):
ID_Card = CharField(primary_key=True)
name = CharField()
age = IntegerField()
city = CharField()
class Meta:
database = db
# 连接到数据库
db.connect()
# 查询所有记录
def query_all_records():
for record in MyElsa_Table.select():
print(f"Name: {record.name}, ID Card: {record.ID_Card}, Age: {record.age}, City: {record.city}")
# 查询特定记录(根据ID_Card)
def query_record_by_id_card(id_card):
record = MyElsa_Table.get_or_none(MyElsa_Table.ID_Card == id_card)
if record:
print(f"Name: {record.name}, ID Card: {record.ID_Card}, Age: {record.age}, City: {record.city}")
else:
print(f"No record found for ID Card: {id_card}")
# ID_Card要查询的特定值
id_card_to_query = '443689564710526448'
try:
# 查询所有记录
query_all_records()
print("\n---\n")
# 查询特定记录
query_record_by_id_card(id_card_to_query)
except MyElsa_Table.DoesNotExist:
print(f"No record found for ID Card: {id_card_to_query}")
except Exception as e:
print(f"Error: '{e}'")
finally:
# 关闭数据库连接(如果使用了连接池,则可能不需要显式关闭)
db.close()
4-4、用SQLAlchemy库
from sqlalchemy import create_engine, Column, Integer, String, MetaData
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.exc import SQLAlchemyError
# 声明基础类
Base = declarative_base()
# 定义ORM模型
class MyElsaTable(Base):
__tablename__ = 'myelsa_table'
id_card = Column(String, primary_key=True)
name = Column(String)
age = Column(Integer)
city = Column(String)
# 数据库连接配置
config = {
'username': 'root', # 替换为你的MySQL用户名
'password': '123456', # 替换为你的MySQL密码
'host': '127.0.0.1', # 如果数据库在远程服务器上,请替换为相应的主机名或IP地址
'database': 'test_database', # 数据库名
'port': 3306, # 端口号,默认为3306
'echo': False # 是否打印SQL语句
}
# 创建数据库引擎
engine = create_engine(
f'mysql+pymysql://{config["username"]}:{config["password"]}@{config["host"]}:{config["port"]}/{config["database"]}',
echo=config['echo'])
# 创建Session类
Session = sessionmaker(bind=engine)
try:
# 创建Session对象
session = Session()
# 查询所有记录
all_records = session.query(MyElsaTable).all()
for record in all_records:
print(f"Name: {record.name}, ID Card: {record.id_card}, Age: {record.age}, City: {record.city}")
# 查询特定记录(例如,根据ID_Card查询)
id_card_to_query = '443689564710526448'
record = session.query(MyElsaTable).filter_by(id_card=id_card_to_query).first()
if record:
print(f"Name: {record.name}, ID Card: {record.id_card}, Age: {record.age}, City: {record.city}")
else:
print(f"No record found for ID Card: {id_card_to_query}")
except SQLAlchemyError as e:
print(f"Error: '{e}'")
finally:
# 关闭Session
session.close()