SQLAlchemy
-
简单介绍
ORM较全的框架,可以使用ORM操作表,比较方便
-
ORM
Object Relation MApping
对象关系映射 通过操作对象的方式来操纵表之间的关系
一. 单表操作
1.1 创建表
- 下载
sqlalchemy
模块
create_table.py
'''
ORM
1.Class - Obj
2.创建数据库引擎/链接
3.将所有的Class序列化成数据表
4.ORM操作 - CRUD
'''
# 1.导入declarative_base
from sqlalchemy.ext.declarative import declarative_base
# 2.实例化该类,Base现在 是ORM 的基类,也可以理解为django中的Model
Base = declarative_base()
# 导入需要使用的字段类型等....
from sqlalchemy import String, INT, Column
# 3.创建类,并且该类继承与Base基类
class User(Base):
__tablename__ = 'user'
id = Column(INT, primary_key=True, autoincrement=True)
name = Column(String(32))
# 4.创建数据库引擎,并且让该引擎链接我们需要使用的数据库
from sqlalchemy import create_engine
# engine = create_engine("使用的数据库+py模块://数据库用户名:密码@127.0.0.1:3306/数据库?charset=utf8")
engine = create_engine("mysql+pymysql://root:root@127.0.0.1:3306/sqlalchemy?charset=utf8")
# 5.将所有继承基类的类创建为数据表
Base.metadata.create_all(engine)
# 删除所有创建的表
Base.metadata.drop_all(engine)
1.2 插入数据
crud_insert.py
'''
·····参考navicat使用-----
1.选中数据库 - 创建数据库引擎 导入数据库引擎
2.创建查询窗口,必须是选中数据库的查询窗口
3.创建sql语句
4.点击运行
'''
# 1.选中数据库 - 创建数据库引擎 导入数据库引擎
from create_table import engine
# 2.创建查询窗口,必须是选中数据库的查询窗口
from sqlalchemy.orm import sessionmaker
Session_Window = sessionmaker(engine)
# 打开查询窗口
db_session = Session_Window()
from create_table import User
# 3.创建sql语句 -- 增加单条数据
user_obj = User(name='xhm')
# 将语句写入到查询窗口
db_session.add(user_obj)
# 4.执行sql语句
db_session.commit()
# 关闭会话窗口
db_session.close()
# 增加多条数据
user_obj_list = [User(name='林允儿'), User(name='赵丽颖')]
db_session.add_all(user_obj_list)
db_session.commit()
db_session.close()
1.3 查询数据
crud_select.py
# 创建查询窗口
from create_table import engine
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(engine)
db_session = Session()
from create_table import User
# 查询数据
user_obj = db_session.query(User).filter(User.id == 1).first()
# print(user_obj)
print(user_obj.name)
user_obj_list = db_session.query(User).all()
for user in user_obj_list:
print(user.name)
# 条件查询
# user_obj = db_session.query(User).filter(User.id>2).first()
# print(user_obj.name)
user_obj = db_session.query(User).filter(User.id>2,User.name=='666').first()
print(user_obj.name)
1.4 更新数据
'''
导入数据库引擎
导入并打开查询窗口
'''
# 这里我就不在创建会话窗口了,直接从插入数据那导入过来(不建议,多练练)
from crud_insert import db_session
from create_table import User
# 1.修改一条数据
# user_obj = db_session.query(User).filter(User.id == 1).update({'name':'杨幂'})
# db_session.commit()
# 注意注意注意
# 这里一定要将db_session中的执行语句进行提交,因为你这是要对数据中的数据进行操作
# 数据库中 增 改 删 都是操作,也就是说执行以上三种操作的时候一定要commit
# 2.修改多条数据
user_obj = db_session.query(User).filter(User.id > 1).update({'name':'666'})
db_session.commit()
1.5 删除数据
from create_table import engine, User
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(engine)
db_session = Session()
# 删除数据
# res = db_session.query(User).filter(User.name == '666').delete()
# db_session.commit()
res = db_session.query(User).filter(User.id>=12).delete()
db_session.commit()
二. Foreign一对多操作
2.1 创建表
- create_table_ForeignKey.py
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
from sqlalchemy import Column, VARCHAR, INT, ForeignKey
from sqlalchemy.orm import relationship
# 创建Class
class Student(Base):
# 继承基类
__tablename__ = 'student'
id = Column(INT, primary_key=True)
name = Column(VARCHAR(32))
# 在这里Foreign只是对应了表之间的关系,无法进行ORM操作. Foreign('表名.字段')
school_id = Column(ForeignKey('school.id'))
# 所以,这里新导入了relationship,对应ORM中的R,这样就可以通过对象进行ORM操作了
# backref你可以理解为Model中的relate_name,用于反向查询(单纯我的理解)。relationship('需要关联的对象',backref...)
stu2sch = relationship('School', backref='sch2stu')
class School(Base):
__tablename__ = 'school'
id = Column(INT, primary_key=True)
name = Column(VARCHAR(32))
# 创建数据库引擎
from sqlalchemy import create_engine
engine = create_engine("mysql+pymysql://root:root@127.0.0.1:3306/sqlalchemy?charset=utf8")
# 将所有class创建为表
Base.metadata.create_all(engine)
2.2 插入数据
# 导入引擎
from create_table_ForeignKey import engine
# 创建查询窗口
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(engine)
db_session = Session() # 打开创建的查询窗口
from create_table_ForeignKey import Student, School
# 插入数据 (笨方法)
sch_obj = School(name='南昌大学')
db_session.add(sch_obj)
db_session.commit()
sch_obj = db_session.query(School).filter(School.name=='南昌大学').first()
stu_obj = Student(name='小明',school_id=sch_obj.id)
db_session.add(stu_obj)
# db_session.commit()
# -----使用ORM----
# 插入数据 -- relationship版 正向添加
stu_obj = Student(name='小豪',stu2sch=School(name='西北工业大学'))
db_session.add(stu_obj)
db_session.commit()
# 反向添加
sch_obj = School(name='南昌航天大学')
sch_obj.sch2stu = [Student(name='小危')] # 因为一对多,所以是一个列表
db_session.add(sch_obj)
db_session.commit()
# --------------分割线----------------
stu_obj = Student(name='小晖', school_id=2)
db_session.add(stu_obj)
db_session.commit()
2.3 查询数据
from create_table_ForeignKey import engine, Student, School
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(engine)
db_session = Session()
# 查询 笨方法
sch_obj = db_session.query(School).filter(School.name == '西北工业大学').first()
stu_obj = db_session.query(Student).filter(Student.school_id == sch_obj.id).first()
print(stu_obj.name)
stu_obj_list = db_session.query(Student).filter(Student.school_id == sch_obj.id).all()
for row in stu_obj_list:
print(row.name)
# 正向查询
stu_obj = db_session.query(Student).filter(Student.name=='小豪').first()
print(stu_obj.stu2sch.name)
# 反向查询
sch_obj_list = db_session.query(School).filter(School.name=='西北工业大学')
for sch_obj in sch_obj_list:
for row in sch_obj.sch2stu:
print(sch_obj.name,row.name)
2.4 更新和删除
from create_table_ForeignKey import engine, Student, School
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(engine)
db_session = Session()
# 更新数据
class_info = db_session.query(School).filter(School.name=="西北工业大学").first()
db_session.query(Student).filter(Student.school_id == class_info.id).update({"name":"小南"})
# 删除数据
class_info = db_session.query(School).filter(School.name=="西北工业大学").first()
db_session.query(Student).filter(Student.school_id == class_info.id).delete()
db_session.commit()
db_session.close()
三. 多对多
3.1 创建表
- create_table_M2M.py
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, VARCHAR, INT, ForeignKey
from sqlalchemy.orm import relationship
Base = declarative_base()
class Girl(Base):
__tablename__ = 'girl'
id = Column(INT, primary_key=True)
name = Column(VARCHAR(32))
# relationship可以在对象层面创建关系,但表关系无法创建。所以这里我们引出一个参数:secondary=`表名`
# 通过这个,我们就可以在表层面也创建关系
g2b = relationship('Boy', backref='b2g', secondary='hotel')
class Boy(Base):
__tablename__ = 'boy'
id = Column(INT, primary_key=True)
name = Column(VARCHAR(32))
# 创建第三张表,存储关系。上面secondary设置值后就可以访问到需要的表(boy、girl)
class Hotel(Base):
__tablename__ = 'hotel'
id = Column(INT, primary_key=True)
boy_id = Column(ForeignKey('boy.id'))
girl_id = Column(ForeignKey('girl.id'))
# 创建数据库引擎
from sqlalchemy import create_engine
engine = create_engine("mysql+pymysql://root:root@127.0.0.1:3306/sqlalchemy?charset=utf8")
# 将class创建为表
Base.metadata.create_all(engine)
3.2 插入数据
from create_table_M2M import engine
from sqlalchemy.orm import sessionmaker
from create_table_M2M import Boy, Girl
Session = sessionmaker(engine)
db_session = Session()
# 插入数据 正向插入
girl_obj = Girl(name='')
girl_obj.g2b = [Boy(name='小果')]
db_session.add(girl_obj)
db_session.commit()
# 反向插入数据
boy_obj = Boy(name='小伟')
boy_obj.b2g = [Girl(name='小芳'),Girl(name='小红')]
db_session.add(boy_obj)
db_session.commit()
3.3 查询数据
from create_table_M2M import engine
from sqlalchemy.orm import sessionmaker
from create_table_M2M import Boy, Girl
Session = sessionmaker(engine)
db_session = Session()
# 正向查询
girl_obj = db_session.query(Girl).filter(Girl.id == 2).first()
print(girl_obj.name, girl_obj.g2b[0].name)
boy_obj_list = db_session.query(Boy).filter(Boy.id == 2).all()
for boy_obj in boy_obj_list:
for girl in boy_obj.b2g:
print(boy_obj.name, girl.name)
3.4 更新和删除
与一对多类似....就不写了....
四. 高级操作
- 是不是觉得查询的时候太低级了
- 都没有什么聚合和分组
- 这里我们就来 :高级操作
4.1 创建表
- 先创建表,再取进行相应的操作
- create_table.py
from sqlalchemy.ext.declarative import declarative_base
# 创建基类
Base = declarative_base()
from sqlalchemy import Column, VARCHAR, INT
# 创建类
class Host(Base):
# 继承基类
__tablename__ = 'host'
id = Column(INT, primary_key=True)
name = Column(VARCHAR(32))
age = Column(INT)
gender = Column(VARCHAR(32))
# 创建数据库引擎
from sqlalchemy import create_engine
engine = create_engine("mysql+pymysql://root:root@127.0.0.1:3306/sqlalchemy?charset=utf8")
# 将所有的class创建为表
Base.metadata.create_all(engine)
4.2 插入数据
# 导入数据库引擎
from create_table import engine
# 创建查询窗口
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(engine)
# 打开查询窗口
db_session = Session()
from create_table import Host
# 插入数据
# host_obj = Host(name='小明', gender='男', age=19)
# db_session.add(host_obj)
# db_session.commit()
# 插入多条数据
host_obj_list = [
Host(name="小林",age=16,gender="女"),
Host(name="小杨",age=16,gender="女"),
Host(name="小刚",age=15,gender="男"),
Host(name="小舒",age=16,gender="女")
]
db_session.add_all(host_obj_list)
db_session.commit()
4.3 高级查询
- 里面含有可以支持sql原生语句查询的
- 好好看,好好学...
from create_table import engine
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(engine)
db_session = Session()
from create_table import Host
from sqlalchemy.sql import and_, or_
# 1.与或查询 -- and_ or_
# host_obj = db_session.query(Host).filter(Host.name == '小明', Host.age == 19).first() # 默认and
# print(host_obj.name, host_obj.age, host_obj.gender)
# and_
# host_obj = db_session.query(Host).filter(and_(Host.name == '小明', Host.age == 19)).first()
# print(host_obj.name, host_obj.age, host_obj.gender)
# or_
# host_obj = db_session.query(Host).filter(or_(Host.name == '小明', Host.age >= 16)).all()
# for host in host_obj:
# print(host.name, host.age, host.gender)
# 筛选:只能拿出筛选的字段
# res = db_session.query(Host.gender,Host.name).filter(Host.id == 2,Host.name=="小杨").first()
# print(res.gender)
# print(res.age) # 报错,只能拿出gender和name字段
# 2.lable
# res = db_session.query(Host.gender.label("xb"),Host.name).filter(Host.id == 2,Host.name=="小杨").first()
# print(res.gender) # 别名 -->xb
# print(res.xb)
# 3.order_by
# res = db_session.query(Host).order_by(Host.id.desc()).all() # 按照id倒序排序
# res = db_session.query(Host).order_by(Host.name).all() # 按照名字排序
# for row in res:
# print(row.id,row.name)
# 4.between
# res = db_session.query(Host).filter(Host.id.between(2, 4)).order_by(Host.id.desc()).all()
# for row in res:
# print(row.id)
# notin_,~, in_
# res = db_session.query(Host).filter(Host.id.notin_([1,4,3])).all()
# res = db_session.query(Host).filter(~Host.id.in_([1,4,3])).all() # 与notin_一致
# res = db_session.query(Host).filter(Host.id.in_([1,4,3])).all()
# for row in res:
# print(row.id)
# 复杂查询
from sqlalchemy.sql import text
user_list = db_session.query(Host).filter(text("id<:value and name=:name")).params(value=3,name="小杨")
# 查询语句(sql语句)
from sqlalchemy.sql import text
user_list = db_session.query(User).filter(text("select * from User id<:value and
name=:name")).params(value=3,name="小杨")
# 通配符
# ret = db_session.query(Host).filter(Host.name.like('w__')).all()
# print(ret[0].name)
# ret = db_session.query(Host).filter(~Host.name.like('小%')).all()
# for row in ret:
# print(row.name)
# 限制
# ret = db_session.query(Host)[1:3]
# print(ret)
# 聚合函数
from sqlalchemy.sql import func
ret = db_session.query(func.max(Host.id),
func.sum(Host.id),
func.min(Host.id)).group_by(Host.gender).all()
print(ret[0][0]) # -- 取出一组中最大的id值
# ret = db_session.query(
# func.max(Host.id), func.sum(Host.id),
# func.min(Host.id)).group_by(Host.name).having(func.min(Host.id) > 2).all()
4.4 高级更新
- 我不太懂....
- 参考博客
from create_table import engine
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(engine)
db_session = Session()
from create_table import Host
# 引用增加(说实话,我不太懂synchronize_session这个参数干嘛的,建议查一下)
res = db_session.query(Host).update({Host.age: Host.age + 1}, synchronize_session=False) # 将所有年龄都加一
print(res)
db_session.query(Host).filter(Host.id > 0).update({"age": Host.age + 1}, synchronize_session="evaluate")
db_session.commit()
Flask-SQLAlchemy
- 安装:
pip install Flask-SQLAlchemy
1.创建一个标准的蓝图目录
- 格式如下:
- 像不像django项目哈哈,就是要这种效果
- app01为一个python包
- views目录存储蓝图
- templates存储模板
- models中存储要建的表类
- init用来初始化flask的实例
- manager用来启动项目
2.基本使用
-
init.py
from flask import Flask # 1.从flask_sqlalchemy导入SQLAlchemy from flask_sqlalchemy import SQLAlchemy # 2.实例化SQLAlchemy对象,该实例放置在注册蓝图之前 db = SQLAlchemy() # 导入蓝图 from Flask_SQLAlchemy.app01.views.user import user # 该函数用于app的配置 def create_app(): app = Flask(__name__) # 3.基于上下文进行SQLAlchemy的配置 # SQLALCHEMY_DATABASE_URI 配置 SQLAlchemy 的链接字符串儿 app.config["SQLALCHEMY_DATABASE_URI"] = "mysql+pymysql://root:root@127.0.0.1:3306/sqlalchemy?charset=utf8" app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False # 这两个参数必须配置,否则会报错 # SQLALCHEMY_POOL_SIZE 配置 SQLAlchemy 的连接池大小,还记得DBUtils不? # app.config["SQLALCHEMY_POOL_SIZE"] = 5 # SQLALCHEMY_POOL_TIMEOUT 配置 SQLAlchemy 的连接超时时间 # app.config["SQLALCHEMY_POOL_TIMEOUT"] = 15 app.config['DEBUG'] = True # 4.SQLAlchemy初始化App的相关配置,可点进去看相关配置 db.init_app(app) app.register_blueprint(user) return app
-
models.py
# 5.创建ORM对象及关联数据表 from Flask_SQLAlchemy.app01 import db Base = db.Model # 相当于⬇ ''' from sqlalchemy.ext.declarative import declarative_base Base = declarative_base() 每一次我们在创建数据表的时候都要做这样一件事 然而Flask-SQLAlchemy已经为我们把 Base 封装好了 ''' class Fsk(Base): __tablename__ = 'fsk' # 在SQLAlchemy 中我们是导入了Column和数据类型 Integer 在这里 # 就和db.Model一样,已经封装好了 id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(32)) password = db.Column(db.String(32)) if __name__ == '__main__': from Flask_SQLAlchemy.app01 import create_app # 6.绕过应用上下文创建 关联数据表,剩下的就是应用了 app = create_app() # 该代码会将所有继承Base的类创建为表(Base = db.Model) db.create_all(app=app)
-
这就是Flask-SQLAlchemy的基本使用,之后只需要run就能创建成表
-
不过是不是感觉太少了?还有一些操作没弄呢是吧
-
比如增删改查啊....
3.启动项目
-
增删改查
-
views/user.py
from flask import Blueprint, render_template, request from Flask_SQLAlchemy.app01 import db from Flask_SQLAlchemy.app01.models import Fsk user = Blueprint('user', __name__) @user.route('/login', methods=['POST', 'GET']) def login_func(): if request.method == 'POST': print(111) username = request.form.get('user') pwd = request.form.get('pwd') ''' # 之前我们需要手动打开session会话窗口⬇ # from sqlalchemy.orm import sessionmaker # Session = sessionmaker(engine) # db_sesson = Session() # 现在不用了,因为 Flask-SQLAlchemy 也已经为我们做好会话打开的工作 # 我们在这里做个弊: ''' fsk_obj = Fsk(username=username,password=pwd) db.session.add(fsk_obj) db.session.commit() return render_template('./login.html')
-
views/index.py
from flask import Blueprint from Flask_SQLAlchemy.app01 import db from Flask_SQLAlchemy.app01.models import Fsk index = Blueprint("index", __name__) @index.route("/index") def user_func(): # res = Fsk.query.first() # Fsk.query == db_session.query(Fsk) res = db.session.query(Fsk).first() print(res,res.name) return "index func"
-
templates/login.html
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <form action="./login" method="post"> <input type="text" name="user"> <br> <input type="password" name="pwd"> <button>提交</button> </form> </body> </html>
-
manager.py
from Flask_SQLAlchemy.app01 import create_app app = create_app() if __name__ == '__main__': app.run()
-
之后启动项目再访问相应的蓝图路由即可
4.增删改查
- 你可以按照之前的SQLAlchemy一样来操作
- 不同的只是会话窗口的开启
例:查询
# 基于SQLAlchemy,该db_session = Session()
1. res = db_session.query(Fsk).first()
# Flask-SQLAlchemy封装了session会话窗口---> db.session = Session() 差不多这样理解
2. res = db.session.query(Fsk).first()
Flask-Script
- 想一想,现在我们的项目都跟django看起来差不多了
- 那django可以用命令来启动项目,Flask能不能呢?
- 答案当然是能的呐,下面就引出我们的主角:
Flask-Script
1. 安装
pip install Flask-Script
2. Flask-Script的运用
-
用到我们上面的项目
-
manager.py
from Flask_SQLAlchemy.app01 import create_app # 从Flask-Script导入Manager from flask_script import Manager app = create_app() # 让app支持 Manager manager = Manager(app) if __name__ == '__main__': # app.run() # 替换原有的app.run(),然后大功告成了 manager.run()
3.使用命令行启动项目
python manager.py runserver
- manager.py是你启动项目的py文件,是你自己自定义的
4. 命令行配置ip和port
python manager.py runserver -h 0.0.0.0 -p 9527
5.高级操作-自定义脚本命令
5.1 @manager.command
-
manager.py
import app01 # Flask-Script用法 # 从Flask-Script导入Manager from flask_script import Manager app = app01.create_app() # 实例化一个对象 manager = Manager(app) @manager.command def test(): return 'test func' # 执行命令为 --> python manager.py test 参数1 参数2 (参数值以空格区分) @manager.command def test2(a, b): return a+b if __name__ == '__main__': # app.run() # 然后使用命令启动项目 manager.run()
-
python manager.py test
5.2 @manager.opation("-短指令","--长指令",dest="变量名")
-
dest确定变量名,也就是说,函数接收时也要用相同的变量名进行接收,否则报错
-
manager.py
import app01 # Flask-Script用法 # 从Flask-Script导入Manager from flask_script import Manager app = app01.create_app() # 实例化一个对象 manager = Manager(app) @manager.option('-n', '--name', dest='name') @manager.option('-a', '--age', dest='age') def user(name, age): return f"{name}的年龄是{age}" if __name__ == '__main__': # app.run() # 然后使用命令启动项目 manager.run()
-
python manager.py user -n 参数1 -a 参数2
-
python manager.py user --name 参数1 -age 参数2
Flask-Migrate
- 再进一步想,我们现在基本什么都可以和django类似了
- 但是django可以进行数据库迁移,flask可以吗?
- 当然也可以,这样一来,基本flask就相当于django了
- 不过注意:
Flask-Migrate 是要依赖 Flask-Script 组件的
- 对了,新版本没有
MigrateCommand
,可降低版本pip install flask-migrate==2.7.0
1.安装
pip install Flask-Migrate
2.基本使用
-
manager.py
import MyApp # 导入 Flask-Script 中的 Manager from flask_script import Manager # 导入 Flask-Migrate 中的 Migrate 和 MigrateCommand # 这两个东西说白了就是想在 Flask-Script 中添加几个命令和指令而已 from flask_migrate import Migrate,MigrateCommand app = MyApp.create_app() # 让app支持 Manager manager = Manager(app) # type:Manager # Migrate 既然是数据库迁移,那么就得告诉他数据库在哪里 # 并且告诉他要支持那个app Migrate(app,MyApp.db) # 现在就要告诉manager 有新的指令了,这个新指令在MigrateCommand 中存着呢 manager.add_command("db",MigrateCommand) # 当你的命令中出现 db 指令,则去MigrateCommand中寻找对应关系 """ 数据库迁移指令: python manager.py db init python manager.py db migrate # Django中的 makemigration python manager.py db upgrade # Django中的 migrate """ @manager.option('-n', '--name', dest='name') @manager.option('-a', '--age', dest='age') def user(name, age): return f"{name}的年龄是{age}" if __name__ == '__main__': #app.run() # 替换原有的app.run(),然后大功告成了 manager.run()
-
执行数据库初始化命令:
python manager.py db init
-
接下来操作就和django一样了。。。。