sqlite3
import sqlite3
try:
# 连接到SQLite数据库,数据库文件是test.db,如果文件不存在,会自动在当前目录创建:
conn = sqlite3.connect("test.db")
# 创建一个Cursor
cursor = conn.cursor()
# 执行SQL语句
cursor.execute("create table user (id varchar(20) primary key, name varchar(20))")
cursor.execute("insert into user (id, name) values ('1', 'Michael')")
# 提交事务
conn.commit()
# 获得插入的行数
print(cursor.rowcount)
# 查询
cursor.execute("select * from user where id=?", ("1",))
values = cursor.fetchall()
print(values)
except:
pass
finally:
# 关闭Cursor
cursor.close()
# 关闭Connection
conn.close()
mysql
# pip install mysql-connector-python
import mysql.connector
try:
# 连接mysql数据库
conn = mysql.connector.connect(user="root", password="", database="test")
# 创建一个Cursor
cursor = conn.cursor()
# 执行SQL语句
cursor.execute("create table test (id varchar(20) primary key, name varchar(20))")
cursor.execute("insert into test (id, name) values ('1', 'Michael')")
# 提交事务
conn.commit()
# 获得插入的行数
print(cursor.rowcount)
cursor.execute("delete from test where id=1")
# 回滚事务
conn.rollback()
# 查询
cursor.execute("select * from user where id = %s", ("1",))
values = cursor.fetchall()
print(values)
except:
pass
finally:
# 关闭Cursor
cursor.close()
# 关闭Connection
conn.close()
ORM
# SQLAlchemy是一个基于Python实现的ORM框架。该框架建立在 DB API之上,使用关系对象映射进行数据库操作,
# 简言之便是:将类和对象转换成SQL,然后使用 DB API执行SQL并获取执行结果
# pip install sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy import Column, Integer, String, DateTime, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import declarative_base, scoped_session
from sqlalchemy.sql import text, func
from sqlalchemy import and_, or_
import datetime
# 初始化数据库连接:
engine = create_engine(
"mysql+mysqlconnector://root:@localhost:3306/test?charset=utf8mb4",
max_overflow=0, # 超过连接池大小外最多可以创建的连接数量
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1, # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
# 获取原生连接
# conn = engine.raw_connection()
# 创建对象的基类
Base = declarative_base()
# 定义User对象:
class User(Base):
# 表的名字:
__tablename__ = "user"
# 表的结构:
id = Column(Integer, primary_key=True)
name = Column(String(20), index=True, nullable=False)
age = Column(Integer)
email = Column(String(32), unique=True, nullable=False)
ctime = Column(DateTime, default=datetime.datetime.now)
group = Column(String(50), nullable=False)
# 建立索引,指定字符集和引擎
__table_args__ = (
UniqueConstraint("group", "name", name="uidx_id_name"), # 联合唯一
Index("idx_name_email", "name", "email"), # 索引
{
"mysql_charset": "utf8mb4",
"mysql_engine": "InnoDB",
},
)
# 删除所有表
# Base.metadata.drop_all(engine)
# 把表同步到数据库
Base.metadata.create_all(engine)
# 创建DBSession类型:
DBSession = sessionmaker(bind=engine)
# 创建session对象 线程安全:
session = scoped_session(DBSession)
# 创建新User对象:
new_user = User(name="susie1", age=22, email="[email protected]", group="A")
new_user2 = User(name="daisy1", age=25, email="[email protected]", group="A")
new_user3 = User(name="lily1", age=24, email="[email protected]", group="B")
new_user4 = User(name="bob1", age=23, email="[email protected]", group="B")
# 添加到session:
# session.add(new_user)
session.add_all([new_user, new_user2, new_user3, new_user4])
# 原生sql查询
user = session.query(User).from_statement(text("SELECT * FROM user where email=:email")).params(email="[email protected]").first()
# 查询所有
users = session.query(User).all() # 是个普通列表
# 查询某些字段
user = session.query(User.name.label("Name"), User.email).first()
# 查询:filter表达式
user = session.query(User).filter(User.name == "susie").one()
print(user.id, user.name)
user = session.query(User).filter(User.name == "susie").first()
print(user.id, user.name)
users = session.query(User).filter(User.name == "susie").all()
user = users[0]
print(user.id, user.name)
user_name = session.query(User.name).select_from(User).filter(User.id == 1).scalar()
print(user_name)
user_num = session.query(func.count("*")).select_from(User).scalar()
print(user_num)
# 查询:filter_by参数
user = session.query(User).filter_by(name="lily").first()
user = session.query(User).filter_by(id=6).first()
# 占位符
user = session.query(User).filter(text("id<:value or name=:name")).params(value=10, name="lily").first()
# and 表达式条件
user = session.query(User).filter(User.name == "susie", User.email == "[email protected]").first()
# between
users = session.query(User).filter(User.id.between(1, 8)).all()
# in
users = session.query(User).filter(User.id.in_([2, 3, 4])).all()
# 非
users = session.query(User).filter(~User.id.in_([1, 3, 4])).all()
# and or条件
users = session.query(User).filter(or_(User.id < 10, and_(User.name == "lily", User.id > 3), User.email.like("%@test.com"))).all()
# 排序
users = session.query(User).order_by(User.email.desc()).all()
# 多字段排序
users = session.query(User).order_by(User.name.desc(), User.id.asc())
# 分页
users = session.query(User)[0:10]
users = session.query(User).offset(1).limit(4).all()
# 分组查询 聚合函数
users = session.query(User.group, func.max(User.id), func.sum(User.id), func.min(User.id)).group_by(User.group).all()
# having
users = session.query(User.group, func.max(User.id), func.sum(User.id), func.min(User.id)).group_by(User.group).having(User.group == "A").all()
# 二次筛选
users = session.query(User).filter(User.id.in_(session.query(User.id).filter_by(name="lily"))).all()
# 修改
res = session.query(User).filter_by(id=1).update({"age": 24})
session.commit()
print(res) # 影响的行数
session.query(User).filter(User.id > 0).update({User.age: User.age + 1})
session.query(User).filter(User.id > 0).update({"age": User.age + 1})
# 删除
res = session.query(User).filter_by(id=6).delete()
session.commit()
print(res) # 影响的行数
user = session.query(User).filter_by(id=7).first()
session.delete(user)
session.flush() # 写数据库,但并不提交
session.rollback() # 回滚
# 批量操作
session.execute(
User.__table__.insert(),
[
{"name": "john", "age": 34, "email": "[email protected]", "group": "D"},
],
)
# 提交即保存到数据库:
session.commit()
# 关闭session:
session.close()
关联关系
from sqlalchemy import create_engine
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy.orm import declarative_base, scoped_session
import datetime
# 初始化数据库连接:
engine = create_engine("mysql+mysqlconnector://root:@localhost:3306/test?charset=utf8mb4")
# 创建对象的基类
Base = declarative_base()
# 定义User对象:
class User(Base):
# 表的名字:
__tablename__ = "user"
# 表的结构:
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(20), index=True, nullable=False, comment="名字")
age = Column(Integer)
email = Column(String(32), unique=True, nullable=False)
ctime = Column(DateTime, default=datetime.datetime.now)
group = Column(String(50), nullable=False)
# 关联关系一对多
books = relationship("Book", backref="person")
# 关联关系多对多
hobbys = relationship("Hobby", secondary="user_bobby", backref="pers")
# 建立索引,指定字符集和引擎
__table_args__ = (
UniqueConstraint("group", "name", name="uidx_id_name"), # 联合唯一
Index("idx_name_email", "name", "email"), # 索引
{
"mysql_charset": "utf8mb4",
"mysql_engine": "InnoDB",
},
)
class Book(Base):
__tablename__ = "book"
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(20), unique=True, nullable=False)
# 关联关系通过外键关联到user表的
user_id = Column(Integer, ForeignKey("user.id"))
__table_args__ = (
{
"mysql_charset": "utf8mb4",
"mysql_engine": "InnoDB",
},
)
class Hobby(Base):
__tablename__ = "hobby"
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(20), unique=True, nullable=False)
__table_args__ = (
{
"mysql_charset": "utf8mb4",
"mysql_engine": "InnoDB",
},
)
class UserHobby(Base):
__tablename__ = "user_bobby"
id = Column(Integer, primary_key=True, autoincrement=True)
user_id = Column(Integer, ForeignKey("user.id"))
hobby_id = Column(Integer, ForeignKey("hobby.id"))
# 删除所有表
Base.metadata.drop_all(engine)
# 把表同步到数据库
Base.metadata.create_all(engine)
# 创建DBSession类型:
DBSession = sessionmaker(bind=engine)
# 创建session对象:
session = scoped_session(DBSession)
# 创建新对象:
new_user = User(name="susie", age=22, email="[email protected]", group="A")
new_user2 = User(name="daisy", age=25, email="[email protected]", group="A")
new_user3 = User(name="lily", age=24, email="[email protected]", group="B")
new_user4 = User(name="bob", age=23, email="[email protected]", group="B")
book = Book(name="dd", user_id=3)
book2 = Book(name="ee", user_id=2)
# 添加到session:
session.add_all([new_user, new_user2, new_user3, new_user4])
session.add_all([book, book2])
user = session.query(User).filter_by(name="lily").first()
user.hobbys = [Hobby(name="足球"), Hobby(name="篮球")]
user.books = [Book(name="执行力"), Book(name="纸飞机")]
session.add(user)
session.commit()
# 关联关系
user = session.query(User).filter_by(name="lily").first()
for row in user.books:
print(row.name)
for row in user.hobbys:
print(row.name)
print("-------------------")
book = session.query(Book).filter_by(name="ee").first()
print(book.person.name)
print("-------------------")
hobby = session.query(Hobby).filter_by(name="足球").first()
for row in hobby.pers:
print(row.name)
print("-------------------")
# 连表查询
res = session.query(User, Book).filter(Book.user_id == User.id).all()
for row in res:
print(row[0].name, row[1].name)
print("-------------------")
# join表,默认是inner join,自动按外键关联
res = session.query(User).join(Book).all()
for row in res:
print(row.name)
print("-------------------")
# left join
res = session.query(User).join(Book, isouter=True).all()
for row in res:
print(row.name)
print("-------------------")
# left join指定关联的字段
res = session.query(User).join(Book, Book.user_id == User.id, isouter=True).all()
for row in res:
print(row.name)
print("-------------------")
# 多对多关系连表
res = session.query(User, Hobby, UserHobby).filter(User.id == UserHobby.user_id, Hobby.id == UserHobby.hobby_id).all()
res = session.query(User).join(UserHobby).join(Hobby).filter(User.id >= 2).all()
# 关闭session:
session.close()
标签:session,sqlalchemy,name,ORM,User,query,id,python3,user From: https://www.cnblogs.com/caroline2016/p/18454654