Flask基础
Flask介绍
参考:Flask官方文档
Flask 是一个用 Python 编写的轻量级 Web 应用框架。它的核心非常简单,但是可以通过各种插件来扩展,使其可以用来构建复杂的 Web 应用。Flask 的设计目标是保持核心简单且易于使用,同时能够被扩展以适应不同的应用需求。
Flask框架主要特点:
- 轻量级:Flask 本身只提供了最基础的 Web 功能,如 URL 路由、请求和响应处理等。这使得 Flask 非常轻量,易于学习和使用。
- 易于扩展:虽然 Flask 本身功能有限,但是它可以通过各种插件来扩展,如数据库操作(Flask-SQLAlchemy)、表单处理(Flask-WTF)、用户认证(Flask-Login)等。
- Jinja2 模板引擎:Flask 使用 Jinja2 作为其模板引擎,可以方便地生成动态 HTML 内容。
常用插件
Flask 常用插件
- 数据库操作:Flask-SQLAlchemy pip install flask-sqlalchemy
- 数据库迁移:Flask-Migrate pip install flask-migrate
- 表单处理:Flask-WTF pip install flask-wtf
- 后台管理:Flask-Admin pip install flask-admin
- 用户认证:Flask-Login pip install flask-login
- Token认证: Flask-JWT-Extended pip install flask-jwt-extended
- 接口频率限制:Flask-Limiter pip install flask-limiter
- 邮件发送:Flask-Mail pip install flask-mail
- 密码生成:Flask-Bcypt pip install flask-bcypt
- 缓存:FLask-Caching pip intall flask-caching
- 页面调试:Flask-DebugToobar pip install flask-debugtoolbar
- 静态文静缓存:Flask-Static-Digest pip install flask-static-digest
快速上手
- 安装Flask:
$ pip install flask
新建app.py
from flask import Flask
app = Flask(__name__)
@app.route("/")
def hello_world():
return "<p>Hello, World!</p>"
- 命令行运行
$ flask run
常用命令
Flask常用命令
- flask run: 启动开发服务器,常用参数
--app: 指定模块及应用名(默认模块为app.py,应用为app或application),例如:flask --app hello:myapp run
--debug: 启用调试模式
--reload: 修改代码后自动重启服务
--env-file: 指定.env环境配置
--host/-h:指定主机地址,例如 --host 0.0.0.0
--port/-p:指定端口,例如 --port 5001 - flask shell: ½øÈëflask应用环境(可以执行数据库操作)
Flask Migrate插件常用命令
- flask db init:初始化迁移目录
- flask db migrate: 生成数据库迁移
- flask db upgrade: 执行数据迁移
Flask路由配置
URL重定向
- @app.route(‘/login/‘) 支持 .../login/ 和 .../login 访问 (自动重定向到/login/)
- @app.route(‘/login‘) 只支持 ../login 访问
请求方法限制
- @app.route('/login', methods=['GET', 'POST’])
只接受单个请求方法也可以使用 @app.post(‘/login’) 或 @app.get(‘/login’)
路径参数 @app.route(‘/user/string:username’) , 参数类型
参数类型 | 说明 |
---|---|
string | (缺省值) 接受任何不包含斜杠的文本 |
int | 接受正整数 |
float | 接受正浮点数 |
path | 类似 string ,但可以包含斜杠 |
uuid | 接受 UUID 字符串 |
通过接口函数名反查URL
from flask import url_for
print(url_for(login’))
静态文件URL: url_for('static', filename='style.css')
Flask请求参数获取
- URL路径参数获取:在接口函数中添加URL对应的参数来获取
- Query参数获取:·name = request.args.get(‘name’)
- POST表单请求数据:request.form.get(‘name’)
- JSON格式数据获取:request.json.get(‘name’)
- 获取上传文件:file = request.files.get(‘file_name’); file.save(f‘/uploads/{file.filename}’
- 获取Cookie参数:request.cookies.get(‘token’)
会话
参考:quickstart
除了请求对象之外还有一种称为 session 的对象,允许我们在不同请求之间储存信息。这个对象相当于用密钥签名加密的 cookie ,即用户可以查看您的 cookie ,但是如果没有密钥就无法修改它。
- 使用方法:
from flask import session
- 添加会话变量:
sesstion[‘username’] = ...
- 删除会话变量:
session.pop(‘username’)
Flask返回响应
返回文本或HTML
return ''<h2>Hello</h2>''
渲染并返回页面模板
return render_template(‘user.html’, id=1)
返回JSON数据,直接返回一个字典/列表或使用jsonify()
return {‘code’:0, ‘msg’: ‘success’}
return jsonify({‘code’:0, ‘msg’: ‘success’})
指定状态码和响应头
return render_template(‘error.html’), 404, {‘test’: ‘abc’}
指定状态码和响应头
return render_template(‘error.html’), 404, {‘test’: ‘abc’}
# 或者 return make_response(render_template(‘error.html’), 404, {‘test’: ‘abc’})
返回文件流或文件下载
from flask import send_file; send_file(‘/path/a.png’, ‘image/png’, as_attachment=True)
from flask import send_file_from_directory(‘/path’, ‘a.png’, as_attachement=True)
返回提示消息
- flash(): 发送提示消息 模板中使用
get_flashed_message()
来获取消息
返回重定向
return redirect(‘/400’)
Flask返回响应-异常处理
抛出异常
abort(404)
注册异常处理函数
@app.errorhandler(404)
def error_404(error):
return render_template(‘404.html'), 404
@app.errorhandler(Exception)
def error_unknown(error):
return render_template(‘500.html’), 500
JinJa2模板引擎
Jinja2介绍
参考:Jinja2官方文档
Jinja2 是一个用于 Python 的强大的模板引擎,它被广泛用于各种 Web 开发框架中,包括 Flask。Jinja2 提供了一种简单的方式来动态地生成 HTML 或其他标记语言。
Jinja2主要特性:
- 变量替换:你可以在模板中使用双大括号 {{ }} 来插入变量,例如 {{ name }}。当模板被渲染时,这些变量将被实际的值替换。
- 控制结构:Jinja2 支持多种控制结构,包括条件语句({% if %}、{% else %}、{% elif %})和循环语句({% for %})。这使得你可以在模板中进行复杂的逻辑处理。
- 模板继承:Jinja2 支持模板继承,这意味着你可以定义一个基础模板(包含一些通用的元素,如页头、页脚等),然后创建多个继承自这个基础模板的子模板。这可以避免重复代码,使得模板更易于管理。
- 过滤器:·Jinja2 提供了一系列的过滤器,可以用来修改变量的值。例如,{{ name|lower }} 将把 name 变量的值转换为小写。你也可以定义自己的过滤器。
- 自动转义:为了防止跨站脚本攻击(XSS),Jinja2 默认会自动转义所有的变量。这意味着如果变量的值包含 HTML 代码,这些代码将被转义为对应的 HTML 实体,而不会被浏览器解析执行。
- 宏:Jinja2 的宏类似于 Python 中的函数,可以用来封装可重用的模板片段。
Jinja2基础使用
安装Flask已默认安装,独立安装方法: pip install jinja2
引用变量
<h1>Hello, {{ name }}!</h1>
支持使用过滤器
<h1>Hello, {{ name|capitalize }}!</h1>
if判断
{% if name %}
<h1>Hello, {{ name }}!</h1>
{% else %}
<h1>Hello, Stranger!</h1>
{% endif %}
for循环
{% for user in users %}
<tr class="{% if loop.odd %}odd{% else %}even{% endif %}">
<td>{{ loop.index }}</td>
<td>{{ user.name }}</td>
<td>{{ user.email }}</td>
</tr>
{% endfor %}
Jinja2模板嵌套
Jinja2 提供了模板继承的功能,这使得你可以创建一个基础模板(通常称为 "父模板" 或 "基模板"),然后创建多个继承自基模板的子模板。这样可以避免在多个模板中重复相同的代码。
base.html
<!DOCTYPE html>
<html>
<head>
<title>{% block title %}{% endblock %}</title>
</head>
<body>
<header> <h1>My Website</h1> </header>
<main> {% block content %}{% endblock %}</main>
<footer> <p>Copyright © 2022</p> </footer>
</body>
</html>
在这个模板中,{% block title %}
和 {% block content %}
是两个块。子模板可以覆盖这些块来插入自己的内容。
index.html(继承base.html)
{% extends “base.html” %}
{% block title %} Home Page {% endblock %}
{% block content %}
<h2>Welcome to my website!</h2>
<p>This is the home page.</p>
{% endblock %}
Jinja2使用宏
在 Jinja2 中,宏类似于 Python 中的函数,可以用来封装可重用的模板片段。
你可以使用 {% macro %}
和 {% endmacro %}
标签来定义一个宏,例如:
{% macro render_user(user) %}
<p>Name: {{ user.name }}</p>
<p>Email: {{ user.email }}</p>
{% endmacro %}
在这个例子中,render_user 是一个宏,它接受一个参数 user,并生成一个包含用户的名字和电子邮件的段落。你可以像调用函数一样调用宏,例如:
{% for user in users %}
{{ render_user(user) }}
{% endfor %}
你也可以将宏保存在一个单独的文件中,然后在其他模板中导入它。例如,如果你将上面的宏保存在 macros.html 文件中,你可以这样导入它:
{% from “macros.html” import render_user %}
{% for user in users %}
{{ render_user(user) }}
{% endfor %}
Flask数据库操作
SQLAlchemy介绍
SQLAlchemy 是一个用于 Python 的 SQL 工具包和对象关系映射(ORM)系统。它为高效和高性能的数据库访问提供了全面的企业级持久性模型
SQLAlchemy主要特点:
- 对象关系映射(ORM):SQLAlchemy 提供了一个全功能的 ORM,它允许开发者以面向对象的方式处理数据库中的数据。你可以定义数据模型(即类),SQLAlchemy 会自动将它们映射到数据库表。
- 数据表达语言(DDL):SQLAlchemy 提供了一种 Pythonic 的方式来生成和执行 SQL 语句,包括创建和删除表,插入、更新和删除数据等。
- SQL 表达语言:SQLAlchemy 提供了一种构造 SQL 查询的 DSL(领域特定语言)。这种 DSL 提供了丰富的查询构造选项,并且可以跨多种数据库后端使用。
- 数据库抽象层:SQLAlchemy 提供了一种数据库抽象层,使得你可以使用相同的代码来操作不同的数据库系统(如 MySQL、PostgreSQL、SQLite 等)。
- 事务和会话管理:SQLAlchemy 提供了强大的事务和会话管理功能,使得你可以方便地处理数据库事务。
- 连接池:SQLAlchemy 内置了连接池功能,可以有效地管理数据库连接,提高应用性能。
Flask-SQLAlchemy使用
Flask-SQLAlchemy 是一个为 Flask 应用提供 SQLAlchemy 支持的扩展,它简化了 SQLAlchemy 的使用,使得在 Flask 应用中进行数据库操作变得更加方便。
- 安装方法:
$ pip install flask-sqlalchemy
- 初始化插件
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:////db.sqlite'
db = SQLAlchemy(app)
- 定义模型
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
...
- 创建表
with app.app_context():
db.create_all()
Flask-Migrate使用
Flask-Migrate 为 Flask 和SQLAlchemy 添加了数据库迁移的扩展名了
- 安装方法
$ pip install flask-migrate
- 初始化插件
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:////db.sqlite'
db = SQLAlchemy(app)
migrate = Migrate(app, db) # 添加此行
- 使用方法
$ flask db init # 初始化migrations目录
$ flask db migrate # 生成迁移脚本
$ flask db upgrade # 执行迁移
模型声明
声明式模型
class Role(db.Model):
__tablename__ = 'roles' # 指定表名,默认为类名小写
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(80), nullable=False, comment="角色名称”)
description = db.Column(db.Text, comment="角色描述")
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(80), nullable=False, comment="用户名称")
email = db.Column(db.String(120), unique=True, comment="邮件”)
age = db.Column(db.Integer, comment=“年龄”)
role_id = db.Column(db.Integer, db.ForeignKey("roles.id", comment="角色id") # 外键
role = db.reletionship("Role", backref="users") # 关系字段(非该表字段)
描述型模型
Role = Table(
'roles', # 表名
db.Model.metadata, # 元数据配置
db.Column(‘id’, db.Integer, primary_key=True),
db.Column(‘name’, db.String(80), nullable=False, comment="角色名称"),
db.Column(‘description’, db.Text, nullable=False, comment="角色描述"),
)
常见字段类型
字段(db.Column)常见类型
- db.Integer 整型
- db.String (size) 字符串,size 为最大长度,比如 db.String(20)
- db.Text 长文本
- db.DateTime 时间日期,Python datetime 对象
- db.Float 浮点数
- db.Boolean 布尔值
字段(db.Column)常用参数
- primary_key 是否主键
- unique 是否唯一
- index 是否索引列
- nullable 是否允许为null,默认为允许 (nullable=True)
- default 插入/修改时字段的默认值,如 default=1
- server_default 在创建表结构时,声明字段默认值,必须为字符串类型,如 server_default=text(¡°1¡±)
- onupdate 设置字段更新时的处理函数
- comment 在创建表结构时,为字段添加字段说明
模型增删改查
增加
user = User(name='kevin', email='[email protected]')
db.session.add(user)
db.session.commit()
删除
user = User.query.get(user_id)
db.session.delete(user)
db.session.commit()
修改
user = User.query.get(user_id)
user.email = '[email protected]'
db.session.add(user)
db.session.commit()
查询
查询所有
users = User.query.all()
查询单个
# 通过主键id查询(查询不到返回None)
user = User.query.get(user_id)
# 通过属性过滤并取第一个
user = User.query.filter_by(name='kevin').first()
# 或 user = User.query.filter(User.name =='kevin').first()
查询多个
users = User.query.filter(User.email.like('%gmail.com%').all()
# 多条件筛选
users = User.query.filter(User.name.like('%kevin%').filter(User.email.like('%gmail.com%')).all()
# 或使用and_
users = User.query.filter(
and_(User.name.like('%kevin%'),
User.email.like('%gmail.com%')
).all()
# 多条件筛选-or
users = User.query.filter(
or_(User.name.like('%kevin%'),
User.email.like('%gmail.com%')
).all()
# 联表查询
users = User.query.join(Role, User.role_id==Role.id).filter(Role.name == 'Admin').all()
详细查询方法
模型query方法
- User.query.get(id) 根据主键查询,返回指定主键值的记录,如果未找到,则返回 None
- User.query.get_or_404(id) 根据主键查询,如果未找到,则返回 404 错误响应
- User.query.first() 返回查询的第一条记录,如果未找到,则返回 None
- User.query.first_or_404() 404 错误响应
- User.query.all() 返回包含所有查询记录的列表
- User.query.count() 返回查询结果的数量
- User.query.filter() 使用指定的规则过滤记录,返回新产生的查询对象
- User.query.filter_by() 以关键字表达式形式过滤纪录,返回新产生的查询对象
- User.query.join() 联表查询,返回新产生的查询对象
- User.query.order_by() 根据指定条件对记录进行排序,返回新产生的查询对象
- User.query.paginate() 返回一个 Pagination 对象,可以对记录进行分页处理
使用db.session
增加/修改
- db.session.add(obj)
删除 - db.session.delete(obj)
提交 - db.session.flash(): 预提交
- db.session.commit(): 提交
查询 - result = db.session.query(User.age, func.count(User.id)).group_by(User.age).having(User.age<18).all()
执行(查询或修改) - db.session.execute(select(User).where(User.age < 18)).scalars()
- db.session.exectue(insert(User).values(username=“kevin”, email=“[email protected]”, age=16)
关系字段db.relationship
关系字段(db.relationship)常用参数
- backref 在关系的另一模型中添加反向引用,用于找到父表 (单向关系声明)
- back_populates 关联另一模型中指向该表的关系字段(双向关系声明)
- primary join 明确指定两个模型之间使用的联结条件
- uselist 如果为False,不使用列表,而使用标量值
- order_by 指定关系中记录的排序方式
- secondary 指定多对多中记录的排序方式
- secondary join 在SQLAlchemy中无法自行决定时,指定多对多关系中的二级联结条件
- foreign_keys 指定关联的本表中外键字段,当有多个指向同一表的外键时需要手动指定,如
creator = db.releation_ship("User", foreign_keys="Book.creator_id")
- lazy 关系字段数据加载方式,默认lazy='select' 自动加载
关系字段加载(lazy)选项
- select (默认) 多表查询方式自动加载关系字段数据
- joined 联表(join)查询方式自动加载关系字段数据
- subquery 子查询方式自动加载关系字段数据
- dynamic 动态加载,在使用 <关系字段>.all()/.filter()/.first()等方法时再进行加载(查询),dynamic仅支持一对多父表中关联子表的关系字段,或多对多关系字段
- noload 不加载关系字段数据
一对多关系
外键声明
子表中添加父表的外键 如,
role_id = db.Column(db.Interger, db.ForeignKey("roles.id"),# 注意!!!,ForeignKey中是父表的"表名.id"
单向关系声明(可选,一对多关系建议使用该方式)
如果需要引用父表或子表的数据,可以在子表 或 父表中添加关系字段,并添加反向引用字段
子表添加关系字段:
class Role(db.Model):
__tablename__ = 'roles'
id = db.Column(db.Integer, primary_key=True)
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
role_id = db.Column(db.Integer, db.ForeignKey(“roles.id”))
# 子表添加关系字段
role = db.relationship("Role", backref="users")
或 父表添加关系字段:
class Role(db.Model):
__tablename__ = 'roles'
id = db.Column(db.Integer, primary_key=True)
# 父表添加关系字段
users = db.relationship("User", backref="role")
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
role_id = db.Column(db.Integer, db.ForeignKey("roles.id"))
双向关系声明(可选)
也可以在子表和父表中都添加关系字段,并使用back_populates关联两个字段
class Role(db.Model):
# ...
users = db.relationship("User", back_populates="role")
class User(db.Model):
# ...
role = db.relationship("Role", back_populates="users")
自关联
自关联关系声明
表中可以使用外键关联本身,形成自关联(嵌套),自关联表关系字段需要设置remote_side=[id]
class Category(db.Model):
__tablename__ = 'categories'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(128), nullable=False)
description = db.Column(db.String(500))
parent_id = db.Column(db.Integer, db.ForeignKey(“categories.id”)) # 外键指向当前表
parent = db.relationship(“Category”, remote_side=[id], backref=“children”) # 不支持lazy=dynamic
# 或 children = db.relationship("Category", remote_side=[id], backref="parent")
自关联关系创建
# 创建上级分类
c1 = Category(name='分类1')
db.session.add(c1)
db.session.commit()
# 创建子分类
c1_1 = Category(name='分类1-1', parent_id=c1.id)
db.session.add(c1)
db.session.commit()
自关关系查询
# 通过父对象查询子对象
c1 = Category.query.get(1)
for sub_category in c1.children():
print(sub_category.name)
# 通过子对象查询父对象
c1_1 = Category.query.get(2)
print(c1_1.parent.name)
多个外键
多个外键指向同一个表
当子表中有多个外键指向同一个父表时,关系字段需要显示声明foreign_keys
,关联本表哪个外键字段
class User(db.Model):
# ...
class TestCase(db.Model):
__tablename__ = 'testcases'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(128), nullable=False)
description = db.Column(db.String(500))
create_time = db.Column(db.DateTime, server_default=func.now(), comment='创建时间')
update_time = db.Column(db.DateTime, server_default=func.now(), onupdate=func.now(), comment='修改时间')
creator_id = db.Column(db.Integer, db.ForeignKey('user.id')) # 外键字段
operator_id = db.Column(db.Integer, db.ForeignKey('user.id')) # 外键字段
creator = db.relationship("User", backref="created_testcases", foreign_keys="TestCase.creator_id")
operator = db.relationship("User", backref="updated_testcases", foreign_keys="TestCase.operator_id")
一对多关系操作
关系查询(lazy=select(默认)/joined/subquery)
子表获取父表信息,如,
user = User.query.get(user_id)
print(user.role.name)
父表获取子表信息,如
role = Role.query.get(role_id)
for user in role.users:
print(user.name)
关系查询(lazy=dynamic)
子表获取父表信息,如,
user = User.query.get(user_id)
role = user.role.first()
print(role.name)
父表获取子表信息,如
role = Role.query.get(role_id)
for user in role.users.all():
print(user.name)
关系查询(lazy=noload)
子表获取父表信息,如
user = User.query.get(user_id)
role = Role.query.filter(Role.id == user.role_id)
print(role.name)
父表获取子表信息,如
role = Role.query.get(role_id)
users = User.query.filter(User.role_id==role.id).all()
for user in users:
print(user.name)
注意,在filter筛选时不能直接使用关系字段属性进行筛选
User.query.filter_by(user.role.name='Admin')
❌
User.query.filter(User.role.name=='Admin')
❌
正确的使用方式为
User.join(Role).filter(Role.name=='Admin')
或
db.session.query(User,Role).filter(Role.name=='Admin')
一对一关系
外键声明
子表中添加父表的外键 如,
role_id = db.Column(db.Interger, db.ForeignKey("roles.id"),# 注意!!!,ForeignKey中是父表的"表名.id"
双项关系声明(可选,一对一建议使用该方式,userlist=False不支持 lazy=dynamic)
如果需要引用父表或子表的数据,可以在子表和父表中添加关系字段,并设置userlist=False
class User(db.Model):
# ...
profile = db.relationship("Profile", back_populates="user", uselist=False) # 添加一对一关系
class Profile(db.Model): # 附表
__tablename__ = 'profiles'
id = db.Column(db.Integer, primary_key=True)
phone = db.Column(db.String(11))
address = db.Column(db.String(500))
user_id = db.Column(db.Integer, db.ForeignKey("user.id")) # 添加主表外键
user = db.relationship('User', back_populates='profile', uselist=False) # 添加一对一关系
关系查询(不支持lasy=dynamic)
主表获取附表信息,如,
user = User.query.get(user_id)
print(user.profile.phone)
附表获取主表信息,如
profile = Profile.query.get(profile_id)
print(profile.user.name)
模型关系-多对多关系
使用标准中间表
多对多实际上是通过一个包含两者外键的中间表实现的,通过添加关系字段,可以略过中间表,直接获取关联的数据
可以通过Table创建一个(标准的)中间表,示例如下:
class User(db.Model):
__tablename__ = 'users'
# ..
ProjectUser = Table( # 中间表
'projects_users', # 表名
db.Model.metadata, # 元数据配置
db.Column(‘project_id’, db.Integer, db.ForeignKey('projects.id'), primary_key=True),
db.Column(‘user_id’, db.Integer, db.ForeignKey('users.id'), primary_key=True),
)
class Project(db.Model):
__tablename__ = 'projects'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(128), unique=True, nullable=False)
description = db.Column(db.String(500))
# 关系字段
users = db.relationship("User", secondary=ProjectUser, backref="projects", lazy="dynamic")
创建多对多关系
project = Project(name='项目1')
for user_id in user_ids: # 如 [1, 2, 3]
user = User.query.get(user_id)
project.user.append(user)
db.session.add(project)
db.session.commit()
查询多对多关系
project = Project.query.get(project_id)
for user in project.users.all() # lazy=dynamic方式
print(user.name)
使用带额外字段的独立中间表
如果需要在中间表中添加额外字段,可以使用声明的方式建立中间表(当然用Table描述的方式也可以)
class User(db.Model):
__tablename__ = 'users'
# ..
class ProjectUser(db.Model): # 中间表
__tablename__ = 'projects_users'
project_id = db.Column(db.Integer, db.ForeignKey('projects.id'), primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), primary_key=True)
role = db.Column(db.String(30))
project = db.relationship('Project', backref='related_users') # 关系字段,也可以在主表中声明 (以使用lazy=dynamic)
user = db.relationship('User', backref='related_projects') # 关系字段,也可以在主表中声明(以使用lazy=dynamic)
class Project(db.Model):
__tablename__ = 'projects'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(128), unique=True, nullable=False)
description = db.Column(db.String(500))
创建多对多关系
project = Project(name='项目1')
db.session.add(project)
db.session.commit() # 先要创建出项目,这样才有project.id
for user_id in user_ids: # 创建每一个中间表对象,并关联两个表
project_user = ProjectUser(project_id=project.id,user_id=user_id, role='QA')
db.session.add(project_user)
db.session.commit()
查询多对多关系
project = Project.query.get(project_id)
for m in project.related_users:
print(m.user.name) # 通过中间表对象m中的关系字段user
Flask项目结构搭建
蓝图(Blueprint)介绍
在 Flask 中,“蓝图”(Blueprint),即路由嵌套,是一种组织路由和视图函数的方式,它允许你将应用分割成一组相关的部分。每个蓝图都可以有自己的路由、视图函数、模板文件、静态文件等。你可以在一个蓝图中定义一组相关的路由,然后在另一个蓝图中定义另一组相关的路由。这样可以使你的应用更加模块化,更容易维护。
from flask import Flask, Blueprint
app = Flask(__name__)
api_bp = Blueprint('api', __name__, url_prefix='/api') # 接口根路由(蓝图)
user_bp = Blueprint('user', __name__, url_prefix='/user') # 用户接口路由(蓝图)
api_bp.register_blueprint(user_bp) # 注册用户接口路由(蓝图)到api
app.register_blueprint(api_bp) # 注册主接口路由(蓝图)到app
@user_bp.route('/list') # 最终接口地址为 /api/user/list
def user_list(id):
users = User.query.all()
return [{'id': user.id, 'name': user.name} for user in users]
项目结构搭建
Flask项目并没有标准的项目结构,每个人根据自己的规划搭建出来的项目结构也是不同的。总体来说合理的组织,确保可启动、可配置、可扩展、模型可被发现即可。
一般来说可以分为按功能聚合(单应用) 和 按模型(或应用)聚合 两种
按功能聚合(单应用)结构参考
project /
|-- apis /
| |-- __init__.py # 主路由及自路由注册
| |-- base.py # 接口基础配置
| |-- user.py # 用户相关接口
| |-- testcase.py
|-- models /
| |-- __init__.py # 暴露内部的模型
| |-- base.py # 模型基础类
| |-- user.py # 用户相关模型
| |-- testcase.py
|-- config.py # 数据库、插件等配置
|-- app.py # 应用创建及启动
按模型(或应用)聚合结构参考
project /
|-- user /
| |-- __init__.py # 主路由及自路由注册
| |-- apis.py # 用户相关接口
| |-- models.py
|-- testcase /
| |-- __init__.py # 暴露内部的模型
| |-- apis.py # 模型基础类
| |-- models.py # 用户相关模型
|-- config.py # 数据库、插件等配置
|-- app.py # 应用创建及启动
注:对于非前后端分离式项目,项目结构中会有templates/ static/等目录,接口模块一般使用views.py 或 views/目录,而在前后端分离式项目中,我们一般使用apis.py或apis/目录
简单项目结构参考
以按功能聚合为例,一种简单的结构如下:
项目配置 config.py 内容
```python from pathlib import Path from flask_migrate import Migrate from flask_sqlalchemy import SQLAlchemy from sqlalchemy import MetaDataROOT_PATH = Path(file).parent # 项目根目录
插件配置
db = SQLAlchemy()
migrate = Migrate(db=db)
EXTENSIONS = [db, migrate]
APP配置
SECRET_KEY = 'secret'
APP_CONFIG = {
'SQLALCHEMY_DATABASE_URI': f"sqlite:///{ROOT_PATH}/db.sqlite"
}
</details>
<details>
<summary>项目主程序 app.py 内容参考</summary>
```python
from flask import Flask
from apis import api_bp # 从 apis/__init__.py中导入api_bp
from config import APP_CONFIG, EXTENSIONS, SECRET_KEY
def make_app():
# 创建app
app = Flask(__name__)
app.secret_key = SECRET_KEY # 配置app
app.config.update(APP_CONFIG)
for ext in EXTENSIONS: # 注册插件
ext.init_app(app)
app.register_blueprint(api_bp) # 注册接口根路由
@app.errorhandler(Exception) # 注册异常处理函数
def error_500(error):
app.logger.exception(error)
return {'code': 500, 'msg': str(error), ‘data’: None}, 500
return app
模型基础类 models/base.py 内容参考
```python from sqlalchemy import desc, inspect from settings import dbclass BaseModel(db.Model):
abstract = True # 抽象模型
fields = None # 自定义字段, 序列化字段配置
order_by = 'id' # 自定义字段,排序字段
id = db.Column(db.Integer, primary_key=True) # 默认id主键字段
@classmethod
def get(cls, id=None, kwargs): # 查询单个对象
if id is not None:
return cls.query.get(id)
return cls.query.filter_by(kwargs).first()
@classmethod
def list(cls, order_by: str = None, kwargs): # 查询列表
order_by = order_by or cls.order_by
if order_by.startswith(‘-’): # 如果配置为 '-id' 则按降序排序
order_by = desc(order_by[1:])
fields = cls.get_fields()
kwargs = {key: value for key, value in kwargs.items()
if key in fields and value is not None}
qs = cls.query.filter_by(kwargs)
if order_by is not None:
return qs.order_by(order_by)
return qs.all()
@classmethod
def create(cls, kwargs): # 创建对象
obj = cls(kwargs)
return obj.save()
def save(self, commit=True): # 保存对象(创建或更新时使用)
db.session.add(self)
db.session.flush()
if commit:
db.session.commit()
return self
def update(self, commit=True, **kwargs): # 更新对象
for attr, value in kwargs.items():
setattr(self, attr, value)
if commit:
return self.save()
return self
def delete(self, commit: bool = True) -> None: # 删除对象
db.session.delete(self)
db.session.flush()
if commit:
db.session.commit()
@classmethod
def get_fields(cls): # 获取字段配置或模型所有字段名
return cls.fields or [column.key for column in inspect(cls).attrs]
@property
def data(self): # 根据字段名,返回对象的属性字典
fields = self.get_fields()
return {key: getattr(self, key) for key in fields}
</details>
<details>
<summary>具体模型类 models/user.py 内容参考</summary>
```python
from models.base import BaseModel, db
class Role(BaseModel):
"""角色"""
__fields__ = ['id', 'name', 'description']
name = db.Column(db.String(64), unique=True)
description = db.Column(db.String(256), nullable=True)
users = db.relationship('User', backref='role', lazy='dynamic')
class User(BaseModel):
"""用户"""
__fields__ = ['id', 'name', 'username']
username = db.Column(db.String(50), unique=True)
name = db.Column(db.String(50))
password_hash = db.Column(db.String(128))
role_id = db.Column(db.Integer, db.ForeignKey('role.id'))
模型包(Package)配置 models/__init__.py 参考
```python from .user import User ```接口基础配置 apis/base.py 参考内容
```python from typing import UnionSUCCESS = (0, 'success')
class Errors: # 错误码
BAD_REQUEST = (400, '请求参数错误')
USER_NOT_FOUND = (1001, '用户不存在')
USERNAME_EXIST = (1002, '用户名已存在')
# ...
def success_response(data: Union[dict, list] = None, msg=None, status_code=200):
code, msg = SUCCESS[0], msg or SUCCESS[1]
return {'code': code, 'msg': msg, 'data': data}, status_code
def error_response(error, data: Union[dict, list] = None, msg=None, append_msg=None, status_code=200):
code, msg = error[0], msg or error[1]
if append_msg is not None:
msg = '%s %s' % (msg, append_msg)
return {'code': code, 'msg': msg, 'data': data}, status_code
</details>
<details>
<summary>具体模型类 apis/user.py 内容参考</summary>
```python
from flask import Blueprint, request
from models.user import User
from .base import Errors, error_response, success_response
user_bp = Blueprint('user', __name__, url_prefix='/user')
@user_bp.post('/add')
def user_add():
username = request.form.get('username')
name = request.form.get('name') or ''
password = request.form.get('password') # todo 加密密码
if not username:
return error_response(Errors.BAD_REQUEST, msg='用户名不能为空')
exist = User.get(username=username)
if exist is not None:
return error_response(Errors.USERNAME_EXIST)
user = User.create(username=username, name=name, password_hash=password)
return success_response(data=user.data, status_code=201)
@user_bp.get('/get/<int:id>')
def user_get(id):
user = User.get(id)
if user is None:
return error_response(Errors.USER_NOT_FOUND)
return success_response(data=user.data)
@user_bp.get('/list')
def user_list():
users = User.list(**request.args)
return success_response(data=[user.data for user in users])
@user_bp.post('/update/<int:id>')
def user_update(id):
user = User.get(id)
if user is None:
return error_response(Errors.USER_NOT_FOUND)
name = request.form.get('name') or ''
password = request.form.get('password') # todo 加密密码
user.update(name=name, password_hash=password)
return success_response(data=user.data)
@user_bp.post('/delete/<int:id>')
def user_delete(id):
user = User.get(id)
if user is None:
return error_response(Errors.USER_NOT_FOUND)
user.delete()
接口包(Package)路由注册 apis/__init__.py 内容参考
from flask import Blueprint
from .user import user_bp
# ...
api = Blueprint('api', __name__, url_prefix='/api')
api.register_blueprint(user_bp)
# ...