Flask
基础语法
from flask import Flask
app = Flask(__name__)
@app.route('/') # route 装饰器
def hello_world(): # 视图处理
return 'Hello World!'
@app.route('/index/')
def index():
return "Hello World!"
if __name__ == '__main__':
app.run(debug=True)
关于run的参数
debug
[!NOTE]
是指是否开启调试模式,开启后修改后的python代码会自动重启
如果不加上debug=True这个参数时,每次修改视图内容时只有从新启动服务器才会在网页中更新内容,这样比较操作比较麻烦,所以设置这个debug=True参数,就不需要重启服务器就可以查看到修改后的新内容。
port
[!NOTE]
启动指定服务器的端口号,默认是5000
app.run(debug=True, port=5001)
该处使用的url网络请求的数据。
host
[!NOTE]
指的是主机,默认是127.0.0.1,指定为0.0.0.0代表本机所有ip都可以进行访问。
app.run(debug=True, port=5000,host='0.0.0.0')
模板渲染
render_template
jsonify
[!IMPORTANT]
如果返回的是字典会自动转换为Json数据 但是不会设置Content-Type: application/json
from flask import Flask, render_template, jsonify
@app.route('/index/')
def index():
# 返回字符串: 支持HTML标签
# return '<b>Flask Index</b>'
@app.route('/index/')
def index():
# 返回字符串: 支持HTML标签 如果返回的
return '<b>Flask Index</b>'
# 模板渲染
return render_template('index.html', name='法外狂徒张三')
# JSON
return jsonify({'name': '张三', 'age': 33})
项目拆分
[!IMPORTANT]
在一个Flask项目中可以拥有很多Django中的子应用
Static 文件夹中一般存放静态的文件
Templates 中一般存放网页的模板
初始化文件一般创建flaks子应用
models是数据库模块
views.py 视图与路由
app.py Flask程序的总入口
演示案例
app.py 主程序入口
from App import create_app #导入子程序App中的Create_app 函数
app = create_app()
if __name__ == '__main__':
app.run(debug=True)
子进程init初始化文件
from flask import Flask
from .views import blue # 去vies中寻找蓝图
def create_app():
app = Flask(__name__)
# 注册蓝图
app.register_blueprint(blueprint=blue)
return app
views.py
# views.py: 路由 + 视图函数
from flask import Blueprint # 引入蓝图
from .models import * # 导入数据库模块
# 蓝图
blue = Blueprint('user', __name__) # user是名字
@blue.route('/')
def index():
return 'index'\
"""
__name__ 指示 Blueprint 是在当前模块中定义的,这样 Flask 可以正确地找到相关的资源(如果有模板或静态文件的话)
"""
路由参数
[!TIP]
返回值需要时一个字符串
string
@blue.route('/string/<string:username>/') # 类型 变量名
@blue.route('/string/<username>/') # 可缺省 是默认
def get_string(username):
print(type(username)) # <class 'str'>
return username
int
@blue.route('/int/<int:id>/')
def get_int(id):
print(type(id)) # <class 'int'>
return str(id)
float
@blue.route('/float/<float:money>/')
def get_float(money):
print(type(money)) # <class 'float'>
return str(money)
uuid
@blue.route('/uuid/<uuid:id>/')
def get_uuid(id):
print(type(id)) # <class 'uuid.UUID'>
return str(id)
path
可接受带有/的字符串
@blue.route('/path/<path:name>/')
def get_path(name):
print(type(name)) # <class 'str'>
return str(name)
any
枚举类型 多参数选一个
@blue.route('/any/<any(apple, orange, banana):fruit>/')
def get_any(fruit):
print(type(fruit)) # <class 'str'>
return str(fruit)
请求和回复
from flask import Blueprint, request, render_template, \
jsonify, make_response, Response, redirect, url_for, abort
request
@blue.route('/request/', methods=['GET', 'POST'])
def get_request():
pass
# print(request) # <Request 'http://127.0.0.1:5000/request/' [GET]>
# 重要属性
print(request.method) # 请求方式,'GET'或'POST'...
# GET请求的参数
# ImmutableMultiDict: 类字典对象,区别是可以出现重复的key
print(request.args) # ImmutableMultiDict([('name', 'lisi'), ('name', 'wangwu'), ('age', '33')])
# print(request.args['name'], request.args['age']) # lisi 33
# print(request.args.get('name')) # lisi
# print(request.args.getlist('name')) # ['lisi', 'wangwu']
# POST请求的参数
print(request.form) # ImmutableMultiDict([('name', 'lucy'), ('age', '33')])
# print(request.form.get('name')) # lucy
# cookie
print(request.cookies) # ImmutableMultiDict([('name', 'hello')])
# 路径
print(request.path) # /request/
print(request.url) # http://127.0.0.1:5000/request/?name=lisi&name=wangwu&age=33
print(request.base_url) # http://127.0.0.1:5000/request/
print(request.host_url) # http://127.0.0.1:5000/
print(request.remote_addr) # 127.0.0.1,客户端的ip
print(request.files) # 文件内容 ,ImmutableMultiDict([])
print(request.headers) # 请求头
print(request.user_agent) # 用户代理,包括浏览器和操作系统的信息 , python-requests/2.28.2
return 'request ok!'
respone
# Response: 服务器端向客户端发送的响应
@blue.route('/response/')
def get_response():
pass
# 响应的几种方式
# 1. 返回字符串(不常用)
# return 'response OK!'
# 2. 模板渲染 (前后端不分离)
# return render_template('index.html', name='张三', age=33)
# 3. 返回json数据 (前后端分离)
data = {'name': '李四', 'age': 44}
# return data
# jsonify(): 序列化,字典=>字符串
# return jsonify(data)
# 4. 自定义Response对象
html = render_template('index.html', name='张三', age=33)
print(html, type(html)) # <class 'str'>
# res = make_response(html, 200)
res = Response(html)
return res
redict
[!NOTE]
url_for传参 可以传参
# Redirect: 重定向
@blue.route('/redirect/')
def make_redirect():
pass
# 重定向的几种方式
# return redirect('https://www.qq.com')
# return redirect('/response/')
# url_for():反向解析,通过视图函数名反过来找到路由
# url_for('蓝图名称.视图函数名')
# ret = url_for('user.get_response')
# print('ret:', ret) # /response/ 路径名
# return redirect(ret)
# url_for传参 可以传参
ret2 = url_for('user.get_request', name='王五', age=66)
return redirect(ret2)
异常
# 抛出异常:abort
@blue.route('/abort/')
def make_abort():
# 主动抛出异常
abort(500)
return 'hello abort'
# 捕获异常
@blue.errorhandler(500) # 异常处理装饰器
def error_handler(e):
print('e:', e)
return '捕获到异常:' + str(e)
Cookie
import datetime
from flask import Blueprint, render_template, request, redirect, session
from .models import *
# 蓝图
blue = Blueprint('user', __name__)
# 首页
@blue.route('/')
@blue.route('/home/')
def home():
# 4. 获取cookie
username = request.cookies.get('user')
return render_template('home.html', username=username)
# 登录
@blue.route('/login/', methods=['GET', 'POST'])
def login():
# GET:访问登录页面
if request.method == 'GET':
return render_template('login.html')
# POST: 实现登录功能
elif request.method == 'POST':
# 1. 获取前端提交过来的数据
username = request.form.get('username')
password = request.form.get('password')
# 2. 模拟登录:用户名和密码验证
if username == 'lisi' and password == '123':
# 登录成功
response = redirect('/home/')
# 3. 设置cookie
# cookie不能用中文
response.set_cookie('user', username) # 默认浏览器关闭则cookie失效
# 过期时间:
# max_age: 秒
# expires: 指定的datetime日期
response.set_cookie('user', username, max_age=3600*24*7)
response.set_cookie('user', username, expires=datetime.datetime(2023, 12, 12))
return response
else:
return '用户名或密码错误!'
# 注销
@blue.route('/logout/')
def logout():
response = redirect('/home/')
response.delete_cookie('user')
return response
Session
# views.py: 路由 + 视图函数
import datetime
from flask import Blueprint, render_template, request, redirect, session
from .models import *
# 蓝图
blue = Blueprint('user', __name__)
# 首页
@blue.route('/')
@blue.route('/home/')
def home():
# 获取session
username = session.get('user')
return render_template('home.html', username=username)
# 登录
@blue.route('/login/', methods=['GET', 'POST'])
def login():
# GET:访问登录页面
if request.method == 'GET':
return render_template('login.html')
# POST: 实现登录功能
elif request.method == 'POST':
pass
# 1. 获取前端提交过来的数据
username = request.form.get('username')
password = request.form.get('password')
# 2. 模拟登录:用户名和密码验证
if username == 'lisi' and password == '123':
# 登录成功
response = redirect('/home/')
session['user'] = username
session.permanent = True
return response
else:
return '用户名或密码错误!'
# 注销
@blue.route('/logout/')
def logout():
response = redirect('/home/')
session.pop('user')
# session.clear() # 慎用,会删除服务器下的所有session
return response
"""
session 配置
"""
# __init__.py :初始化文件,创建Flask应用
from flask import Flask
from .views import blue
import datetime
def create_app():
app = Flask(__name__)
# 注册蓝图
app.register_blueprint(blueprint=blue)
# session配置
print(app.config) # flask配置信息
'''
<Config {
'ENV': 'production',
'DEBUG': False, 'TESTING': False, 'PROPAGATE_EXCEPTIONS': None,
'SECRET_KEY': None,
'PERMANENT_SESSION_LIFETIME': datetime.timedelta(days=31),
'USE_X_SENDFILE': False, 'SERVER_NAME': None, 'APPLICATION_ROOT': '/',
'SESSION_COOKIE_NAME': 'session',
'SESSION_COOKIE_DOMAIN': None,
'SESSION_COOKIE_PATH': None,
'SESSION_COOKIE_HTTPONLY': True,
'SESSION_COOKIE_SECURE': False,
'SESSION_COOKIE_SAMESITE': None,
'SESSION_REFRESH_EACH_REQUEST': True,
'MAX_CONTENT_LENGTH': None,
'SEND_FILE_MAX_AGE_DEFAULT': None,
'TRAP_BAD_REQUEST_ERRORS': None, 'TRAP_HTTP_EXCEPTIONS': False,
'EXPLAIN_TEMPLATE_LOADING': False, 'PREFERRED_URL_SCHEME': 'http',
'JSON_AS_ASCII': None, 'JSON_SORT_KEYS': None, 'JSONIFY_PRETTYPRINT_REGULAR': None,
'JSONIFY_MIMETYPE': None, 'TEMPLATES_AUTO_RELOAD': None, 'MAX_COOKIE_SIZE': 4093}
>
'''
# 设置cookie的SECRET_KEY
app.config['SECRET_KEY'] = 'abc123'
# 设置session过期时间
app.config['PERMANENT_SESSION_LIFETIME'] = datetime.timedelta(days=8)
return app
模板语言
后端处理
from flask import Blueprint, render_template
from .models import *
# 蓝图
blue = Blueprint('user', __name__)
@blue.route('/')
def home():
pass
data = {
'name': 'ikun ikun ikun',
'age': 26,
'likes': ['ball', 'sing', 'dance', 'code']
}
# return render_template('home.html', **data) # 解析为关键字
# return render_template('home.html', name='ikun', age=26)
# return render_template('base.html')
# return render_template('child1.html')
return render_template('child2.html', **data)
前端使用
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Home</title>
</head>
<body>
<h2>Home</h2>
<hr>
{# 模板语言的注释 #}
<h4>变量:</h4>
<p>name: {{ name }}</p>
<p>age: {{ age }}</p>
<p>likes: {{ likes }}</p>
<hr>
<h4>标签:</h4>
<h5>if语句</h5>
{% if age >= 18 %}
<p>{{ name }}已经成年了</p>
{% elif age >= 6 %}
<p>{{ name }}可以上学了</p>
{% else %}
<p>{{ name }} 还是小孩 </p>
{% endif %}
<h5>for循环</h5>
{% for like in likes %}
{% if loop.first %}
<p style="color: red;">{{ like }}</p>
{% elif loop.last %}
<p style="color: blue;">{{ like }}</p>
{% else %}
<p>{{ like }}</p>
{% endif %}
index: {{ loop.index }},
index0: {{ loop.index0 }},
revindex: {{ loop.revindex }},
revindex0: {{ loop.revindex0 }}
{% else %}
<b>else</b>
{% endfor %}
<br><br><br><br><br>
<br><br><br><br><br>
<br><br><br><br><br>
</body>
</html>
模板继承
<!--父模板-->
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>父模板</title>
{# 通用css文件 #}
<link rel="stylesheet" href="{{ url_for('static', filename='css/base.css') }}">
{% block extcss %}
{% endblock %}
</head>
<body>
{% block head %}
{% endblock %}
<hr>
{% block content %}
{% endblock %}
<hr>
{% block foot %}
{% endblock %}
{# 通用的js文件 #}
<script src="{{ url_for('static', filename='js/base.js') }}"></script>
{% block extjs %}
{% endblock %}
</body>
</html>
<!--子模板-->
{% extends 'base.html' %}
{% block head %}
<div>
<p>千锋YYDS Head</p>
</div>
{% endblock %}
模板父类调用
{# super #}
{% block head %}
{{ super() }}
<p>Python</p>
{% endblock %}
宏定义-过滤器
{# 宏定义:Python函数 #}
{% macro person(name, age) %}
<b>姓名:{{ name }}, 年龄:{{ age }}</b>
{% endmacro %}
{% block foot %}
{{ person("坤坤", 25) }}
{# 过滤器 #}
<p>{{ name | capitalize }}</p>
<p>{{ name | title }}</p>
<p>{{ name | upper }}</p>
<p>{{ name | lower }}</p>
<p>{{ name | upper | first | lower }}</p>
<p>{{ name2 | default('kunkun') }}</p>
<p>...</p>
{% endblock %}
Include组件
<div>我是child2中include的内容</div>
# 使用
{% include 'child2_include.html' %}
数据库配置
exts.py
# 数据库的链接 与 迁移
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
db = SQLAlchemy() # orm对象实例化
migrate = Migrate()
def init_exts(app):
db.init_app(app=app)
migrate.init_app(app=app, db=db)
init.py
# __init__.py :初始化文件,创建Flask应用
from flask import Flask
from .views import blue
from .exts import init_exts
def create_app():
app = Flask(__name__)
# 注册蓝图
app.register_blueprint(blueprint=blue)
# 配置数据库
db_uri = 'sqlite:///sqlite3.db' # sqlite配置
# db_uri = 'mysql+pymysql://root:123456@localhost:3306/flaskdb' # mysql的配置
app.config['SQLALCHEMY_DATABASE_URI'] = db_uri
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False # 禁止对象追踪修改
# 初始化插件
init_exts(app=app)
return app
"""
flask db init # 创建迁移文件夹migrates,只调用一次
flask db migrate # 生成迁移文件
flask db upgrade # 更新
flask db downgrade # 降级
"""
ORM模型建立
from .exts import db
class User(db.Model):
__tablename__ = 'tb_user'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
name = db.Column(db.String(30), unique=True, index=True)
age = db.Column(db.Integer, default=1)
单表操作
from flask import Blueprint, request, render_template
from sqlalchemy import desc, and_, not_, or_
from .models import *
# 蓝图
blue = Blueprint('user', __name__)
@blue.route('/')
def index():
return 'index'
# 单表操作:
# 增删改查
# 增:添加数据
@blue.route('/useradd/')
def user_add():
# 添加一条数据
# u = User()
# u.name = 'kun'
# u.age = 24
# db.session.add(u) # 将u对象添加到session中
# db.session.commit() # 同步到数据库中
# 同时添加多条数据
users = [] # 实例化obj
for i in range(10, 30):
u = User()
u.name = '冰冰' + str(i)
u.age = i
users.append(u)
try:
db.session.add_all(users)
db.session.commit() # 事务提交
except Exception as e:
db.session.rollback() # 回滚
db.session.flush()
return 'fail: ' + str(e)
return 'success!'
# 删:删除数据
# 找到要删除的数据,然后删除
@blue.route('/userdel/')
def user_del():
u = User.query.first() # 查询第一条数据
db.session.delete(u)
db.session.commit()
return 'success!'
# 改:修改数据
# 找到要修改的数据,然后修改
@blue.route('/userupdate/')
def user_update():
u = User.query.first() # 查询第一条数据
u.age = 1000
db.session.commit()
return 'success!'
# 查:查询数据
# 条件
@blue.route('/userget/')
def user_get():
# all(): 返回所有数据,返回列表
users = User.query.all()
# print(users, type(users)) # <class 'list'>
# print(User.query, type(User.query)) # <class 'flask_sqlalchemy.query.Query'>
# filter() : 过滤,得到查询集,类似SQL中的where
users = User.query.filter()
# print(users, type(users)) # 查询集
# print(list(users))
# get():查询到对应主键的数据对象
user = User.query.get(8)
# print(user, type(user)) # User对象 <class 'App.models.User'>
# print(user.name, user.age) # 获取数据的属性
# filter() : 类似SQL中的where
# filter_by() : 用于等值操作的过滤
# users = User.query.filter(User.age==20)
# users = User.query.filter_by(age=20)
users = User.query.filter(User.age>20) # 可以用于非等值操作
# print(list(users)) # [冰冰20]
# first() : 第一条数据
# first_or_404(): 第一条数据,如果不存在则抛出404错误
user = User.query.first()
# user = User.query.filter_by(age=100).first_or_404()
# print(user)
# count(): 统计查询集中的数据条数
users = User.query.filter()
# print(users.count()) # 20
# limit() : 前几条
# offset() : 跳过前几条
users = User.query.offset(3).limit(4)
# print(list(users))
# order_by() : 排序
users = User.query.order_by('age') # 升序
users = User.query.order_by(desc('age')) # 降序
# print(list(users))
# 逻辑运算:and_,or_,not_
users = User.query.filter(User.age>20, User.age<25) # 且,常用
users = User.query.filter(and_(User.age>20, User.age<25)) # 且
users = User.query.filter(or_(User.age>25, User.age<20)) # 或
users = User.query.filter(not_(or_(User.age>25, User.age<20))) # 非
# print(list(users))
# 查询属性
# contains('3'): 模糊查找,类似SQL中的like
users = User.query.filter(User.name.contains('3'))
# in_(): 其中之一
users = User.query.filter(User.age.in_([10, 20, 30, 40, 50]))
# startswith() : 以某子串开头
# endswith() : 以某子串结尾
users = User.query.filter(User.name.startswith('冰')) #
users = User.query.filter(User.name.endswith('2'))
# print(list(users))
# __gt__: 大于
users = User.query.filter(User.age.__gt__(25))
print(list(users))
return 'success'
# 分页,翻页
# 1.手动翻页
# offset().limit()
# 数据: 1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20
# 页码:page=1
# 每页显示数量:per_page=5
# page=1 : 1,2,3,4,5 => offset(0).limit(5)
# page=2 : 6,7,8,9,10 => offset(5).limit(5)
# page=3 : 11,12,13,14,15 => offset(10).limit(5)
# page=4 : 16,17,18,19,20 => offset(15).limit(5)
# ... ....
# page=n : => offset((page-1)*per_page).limit(per_page)
# 2.paginate对象
@blue.route('/paginate/')
def get_paginate():
# 页码:默认显示第一页
page = int(request.args.get('page', 1))
# per_page: 每页显示数据量
per_page = int(request.args.get('per_page', 5))
# print(page, type(page))
# print(per_page, type(per_page))
# paginate()
p = User.query.paginate(page=page, per_page=per_page, error_out=False)
# paginate对象的属性:
# items:返回当前页的内容列表
print(p.items)
# has_next:是否还有下一页
# print(p.has_next)
# has_prev:是否还有上一页
# print(p.has_prev)
# next(error_out=False):返回下一页的Pagination对象
# print(p.next(error_out=False).items)
# prev(error_out=False):返回上一页的Pagination对象
# print(p.prev(error_out=False).items)
# page:当前页的页码(从1开始)
print(p.page)
# pages:总页数
print(p.pages)
# per_page:每页显示的数量
# print(p.per_page)
# prev_num:上一页页码数
# print(p.prev_num)
# next_num:下一页页码数
# print(p.next_num)
# total:查询返回的记录总数
print(p.total)
return render_template('paginate.html', p=p)
数据分页
后端
# 分页,翻页
# 1.手动翻页
# offset().limit()
# 数据: 1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20
# 页码:page=1
# 每页显示数量:per_page=5
# page=1 : 1,2,3,4,5 => offset(0).limit(5)
# page=2 : 6,7,8,9,10 => offset(5).limit(5)
# page=3 : 11,12,13,14,15 => offset(10).limit(5)
# page=4 : 16,17,18,19,20 => offset(15).limit(5)
# ... ....
# page=n : => offset((page-1)*per_page).limit(per_page)
# 2.paginate对象
@blue.route('/paginate/')
def get_paginate():
# 页码:默认显示第一页
page = int(request.args.get('page', 1))
# per_page: 每页显示数据量
per_page = int(request.args.get('per_page', 5))
# print(page, type(page))
# print(per_page, type(per_page))
# paginate()
p = User.query.paginate(page=page, per_page=per_page, error_out=False)
# paginate对象的属性:
# items:返回当前页的内容列表
print(p.items)
# has_next:是否还有下一页
# print(p.has_next)
# has_prev:是否还有上一页
# print(p.has_prev)
# next(error_out=False):返回下一页的Pagination对象
# print(p.next(error_out=False).items)
# prev(error_out=False):返回上一页的Pagination对象
# print(p.prev(error_out=False).items)
# page:当前页的页码(从1开始)
print(p.page)
# pages:总页数
print(p.pages)
# per_page:每页显示的数量
# print(p.per_page)
# prev_num:上一页页码数
# print(p.prev_num)
# next_num:下一页页码数
# print(p.next_num)
# total:查询返回的记录总数
print(p.total)
return render_template('paginate.html', p=p)
多表操作
from .exts import db
# 多表关系
# ----------------------- 一对多 ----------------------- #
# 班级:学生 = 1:N
# 班级表
class Grade(db.Model):
__tablename__ = 'grade' # 表名
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
name = db.Column(db.String(30), unique=True)
# 建立关联
# 第1个参数:关联的模型名(表)
# 第2个参数:反向引用的名称,grade对象,
# 让student去反过来得到grade对象的名称: student.grade
# 第3个参数:懒加载
# 这里的students不是字段
students = db.relationship('Student', backref='grade', lazy=True)
# 学生表
class Student(db.Model):
__tablename__ = 'student'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
name = db.Column(db.String(30), unique=True)
age = db.Column(db.Integer)
# 外键:跟Grade表中的id字段关联
gradeid = db.Column(db.Integer, db.ForeignKey(Grade.id))
@blue.route('/addgrade/')
def add_grade():
# 添加班级
grades = []
for i in range(10):
grade = Grade()
grade.name = f'Jay{i}-{str(random.randint(10, 99))}'
grades.append(grade)
try:
db.session.add_all(grades)
db.session.commit()
except Exception as e:
print('e:', e)
db.session.rollback()
db.session.flush()
return 'OK'
@blue.route('/addstu/')
def add_stu():
# 添加学生
students = []
for i in range(10, 20):
stu = Student()
stu.name = f'Lucy{i}'
stu.age = i
stu.gradeid = random.randint(31, 32)
students.append(stu)
try:
db.session.add_all(students)
db.session.commit()
except Exception as e:
print('e:', e)
db.session.rollback()
db.session.flush()
return 'OK'
# 修改
@blue.route('/updatestu/')
def update_stu():
stu = Student.query.first()
stu.age = 100
db.session.commit()
return 'OK'
# 删除
@blue.route('/delstu/')
def del_stu():
# 删除学生
stu = Student.query.first()
db.session.delete(stu)
db.session.commit()
return 'OK'
@blue.route('/delgrade/')
def del_grade():
# 删除班级
grade = Grade.query.first()
db.session.delete(grade)
db.session.commit()
return 'OK'
# 查询
@blue.route('/getstu/')
def get_stu():
# 查询某学生所在的班级: 反向引用grade
stu = Student.query.get(2)
# print(stu.name, stu.age)
# print(stu.gradeid, stu.grade, stu.grade.name, stu.grade.id)
# 查找某班级下的所有学生
grade = Grade.query.get(32)
print(grade.name)
print(grade.students) # 所有学生
for stu in grade.students:
print(stu.name, stu.age)
return 'OK'
# ----------------------- 多对多 ----------------------- #
# 用户收藏电影
# 用户 : 电影 = N : M
# 中间表:收藏表
collect = db.Table(
'collects',
db.Column('user_id', db.Integer, db.ForeignKey('usermodel.id'), primary_key=True),
db.Column('movie_id', db.Integer, db.ForeignKey('movie.id'), primary_key=True)
)
# 用户表
class UserModel(db.Model):
__tablename__ = 'usermodel'
id = db.Column(db.Integer, primary_key=True, autoincrement=True) # 多表参考1
name = db.Column(db.String(30))
age = db.Column(db.Integer)
# 电影表
class Movie(db.Model):
__tablename__ = 'movie'
id = db.Column(db.Integer, primary_key=True, autoincrement=True) # 多表参考2
name = db.Column(db.String(30))
# 关联
# secondary=collect: 设置中间表
# users = db.relationship('UserModel', backref='movies', lazy=True, secondary=collect)
users = db.relationship('UserModel', backref='movies', lazy='dynamic', secondary=collect)
# lazy属性:
# 懒加载,可以延迟在使用关联属性的时候才建立关联
# lazy='dynamic': 会返回一个query对象(查询集),可以继续使用其他查询方法,如all().
# lazy='select': 首次访问到属性的时候,就会全部加载该属性的数据.
# lazy='joined': 在对关联的两个表进行join操作,从而获取到所有相关的对象
# lazy=True: 返回一个可用的列表对象,同select
# ----------------------- 多对多 ----------------------- #
# 添加数据
@blue.route('/adduser/')
def add_user():
# 添加用户
users = []
for i in range(10, 14):
user = UserModel()
user.name = f'Lucy-{i}'
user.age = i
users.append(user)
try:
db.session.add_all(users)
db.session.commit()
except Exception as e:
print('e:', e)
db.session.rollback()
db.session.flush()
return 'OK'
@blue.route('/addmovie/')
def add_movie():
# 添加电影
movies = []
for i in range(10, 14):
moive = Movie()
moive.name = f'阿凡达-{i}'
movies.append(moive)
try:
db.session.add_all(movies)
db.session.commit()
except Exception as e:
print('e:', e)
db.session.rollback()
db.session.flush()
return 'OK'
@blue.route('/addcollect/')
def add_collect():
# 用户收藏电影
user = UserModel.query.get(1)
movie = Movie.query.get(1)
user.movies.append(movie)
db.session.commit()
return 'OK'
# 查询
@blue.route('/getcollect/')
def get_collect():
# 查找某用户收藏的所有电影
user = UserModel.query.get(1)
print(user.movies)
# 查找收藏了某电影的所有用户
movie = Movie.query.get(4)
print(movie.users)
print(list(movie.users))
return 'OK'
# 修改:和单表操作
# 删除
@blue.route('/deluser/')
def del_user():
# 级联删除
user = UserModel.query.get(1)
db.session.delete(user)
db.session.commit()
return 'OK'
Cache插件
exts.py
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from flask_caching import Cache
db = SQLAlchemy()
migrate = Migrate()
cache = Cache(config={
'CACHE_TYPE': 'simple' # 缓存类型
})
def init_exts(app):
db.init_app(app=app)
migrate.init_app(app=app, db=db)
cache.init_app(app=app)
使用
# 使用缓存 请求网页20s时候不用阻塞 使用缓冲的内容 只返回 return
@blue.route('/')
@cache.cached(timeout=20) # 给视图函数加一个缓存20秒
def index():
print('Index2')
print('index视图函数中:', g.star)
time.sleep(5)
return 'index2'
钩子
中间件
before_request
# before_request: 每一次请求之前访问
@blue.before_request
def before():
print('before_request')
# 简单的反爬
# print(request.user_agent) # python-requests/2.28.2
# if "python" in request.user_agent.string:
# return '您正在使用Python爬虫,再见!'
# 针对IP做反爬(简单)
ip = request.remote_addr
# cache.get()
# cache.set()
if cache.get(ip):
# 做了拦截,不会进入视图函数
return '小伙子,别爬了!'
else:
# 对每个IP设置一个缓存,1秒内不让重复访问
cache.set(ip, 'value', timeout=1)
Flaks四大内置对象
from flask import Blueprint, request, render_template, session, g, current_app
from .models import *
from .exts import cache
import time
# 蓝图
blue = Blueprint('user', __name__)
# 使用缓存
@blue.route('/')
@cache.cached(timeout=20) # 给视图函数加一个缓存20秒
def index():
print('Index2')
print('index视图函数中:', g.star)
return 'index2'
# before_request: 每一次请求之前访问
@blue.before_request
def before():
# Flask内置对象
# request:请求对象
# session:会话对象
# g:global全局对象
# current_app: Flask应用对象
g.star = '杰伦'
print(g.star)
print(current_app) # <Flask 'App'>
print(current_app.config) # <Config {'ENV': 'production', 'DEBU...
模板和静态文件的配置
# __init__.py :初始化文件,创建Flask应用
from flask import Flask
from .views import blue
from .exts import init_exts
import os
# 项目目录
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # 寻找上 上 层 文件夹
print('BASE_DIR:', BASE_DIR)
# BASE_DIR: FlaskPro1_进阶 项目目录
def create_app():
# 配置静态文件和模板文件目录
# static_folder = '../static'
# template_folder = '../templates'
static_folder = os.path.join(BASE_DIR, 'static')
template_folder = os.path.join(BASE_DIR, 'templates')
app = Flask(__name__, static_folder=static_folder,
template_folder=template_folder)
# 注册蓝图
app.register_blueprint(blueprint=blue)
# 配置数据库
db_uri = 'sqlite:///sqlite3.db' # sqlite配置
# db_uri = 'mysql+pymysql://root:123456@localhost:3306/flaskdb' # mysql的配置
app.config['SQLALCHEMY_DATABASE_URI'] = db_uri
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False # 禁止对象追踪修改
# 初始化插件
init_exts(app=app)
return app
前后端分离
前后端不分离:
render_template('index.html', users=users)
前后端分离:
后端返回json字符串: jsonify()
前端使用ajax来请求数据: ajax
前后端项目分离的配置方式
init.py
from flask import Flask
from .exts import init_exts
from .urls import * # 确保路由文件启动
def create_app():
app = Flask(__name__)
# 配置数据库
db_uri = 'sqlite:///sqlite3.db' # sqlite配置
# db_uri = 'mysql+pymysql://root:123456@localhost:3306/flaskdb' # mysql的配置
app.config['SQLALCHEMY_DATABASE_URI'] = db_uri
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False # 禁止对象追踪修改 节省性能
# 初始化插件
init_exts(app=app)
return app
exts.py
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from flask_restful import Api
db = SQLAlchemy()
migrate = Migrate()
api = Api()
def init_exts(app):
db.init_app(app=app)
migrate.init_app(app=app, db=db)
api.init_app(app=app)
modles.py
from .exts import db
class User(db.Model):
__tablename__ = 'tb_user'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
name = db.Column(db.String(30), unique=True, index=True)
age = db.Column(db.Integer, default=1)
apis.py
from flask import jsonify
from flask_restful import Resource, fields, marshal_with, reqparse
from .models import *
# 类视图: CBV Class Based View
# 视图函数: FBV Function Based View
class HelloResouce(Resource):
def get(self):
# return {'msg': 'get请求'}
return jsonify({'msg': 'get请求'})
def post(self):
return jsonify({'msg': 'post请求'})
# --------------------------- 字段格式化 --------------------------- #
# Flask-RESTful
# 字段格式化:定义返回给前端的数据格式
ret_fields = {
'status': fields.Integer,
'msg': fields.String,
# 'data': fields.String,
'like': fields.String(default='ball'),
'like2': fields.String(),
'data2': fields.String(attribute='data') # 使用data的值
}
class UserResource(Resource):
@marshal_with(ret_fields)
def get(self):
return {
'status': 1,
'msg': 'ok',
'data': '千锋教育Python'
}
# --------------------------- 字段格式化 --------------------------- #
#
user_fields = {
# 'id': fields.Integer,
'name': fields.String,
'age': fields.Integer,
# 绝对路径
'url': fields.Url(endpoint='id', absolute=True)
}
ret_fields2 = {
'status': fields.Integer,
'msg': fields.String,
# user对象
'data': fields.Nested(user_fields)
}
class User2Resource(Resource):
@marshal_with(ret_fields2)
def get(self):
user = User.query.first()
return {
'status': 1,
'msg': 'ok',
'data': user
}
# --------------------------- 字段格式化 --------------------------- #
user_fields2 = {
'name': fields.String,
'age': fields.Integer,
}
ret_fields3 = {
'status': fields.Integer,
'msg': fields.String,
'data': fields.List(fields.Nested(user_fields2))
}
class User3Resource(Resource):
@marshal_with(ret_fields3)
def get(self):
users = User.query.all()
return {
'status': 1,
'msg': 'ok',
'data': users
}
# --------------------------- 参数解析 --------------------------- #
# 参数解析:解析前端发送过来的数据
parser = reqparse.RequestParser()
parser.add_argument('name', type=str, required=True, help='name是必需的参数')
parser.add_argument('age', type=int, action='append') # 可以有多个age
parser.add_argument('key', type=str, location='cookies') # 可以有多个age
class User4Resource(Resource):
def get(self):
# 获取参数
args = parser.parse_args()
name = args.get('name')
age = args.get('age')
key = args.get('key')
return jsonify({"name": name, 'age': age, 'key': key})
urls.py
# urls.py 路由文件
from .exts import api
from .apis import *
# 路由
api.add_resource(HelloResouce, '/hello/')
api.add_resource(UserResource, '/user/', endpoint='id') # endopoint 别名解析
api.add_resource(User2Resource, '/user2/')
api.add_resource(User3Resource, '/user3/')
api.add_resource(User4Resource, '/user4/')
flask_restful
Flask-RESTful 是一个 Flask 扩展,简化了构建 RESTful API 的过程。它提供了一些方便的工具和类,帮助你快速定义和管理 API 资源。
普通字段格式化
# Flask-RESTful
# 字段格式化:定义返回给前端的数据格式
ret_fields = {
'status': fields.Integer,
'msg': fields.String,
# 'data': fields.String,
'like': fields.String(default='ball'),
'like2': fields.String(),
'data2': fields.String(attribute='data') # 使用data的值
}
class UserResource(Resource):
@marshal_with(ret_fields)
def get(self):
return {
'status': 1,
'msg': 'ok',
'data': '千锋教育Python'
}
字段格式化-url的使用
user_fields = {
# 'id': fields.Integer,
'name': fields.String,
'age': fields.Integer,
# 绝对路径
'url': fields.Url(endpoint='id', absolute=True) # endpoint 别名解析 url absolute 绝对路径
}
ret_fields2 = {
'status': fields.Integer,
'msg': fields.String,
# user对象
'data': fields.Nested(user_fields)
}
class User2Resource(Resource):
@marshal_with(ret_fields2)
def get(self):
user = User.query.first()
return {
'status': 1,
'msg': 'ok',
'data': user
}
嵌套字段格式化-返回对象
fields.Nested
是一个强大的工具,用于序列化嵌套的对象。它允许你在定义输出字段时,嵌套其他序列化器字段,从而支持复杂的数据结构。
user_fields2 = {
'name': fields.String,
'age': fields.Integer,
}
ret_fields3 = {
'status': fields.Integer,
'msg': fields.String,
'data': fields.List(fields.Nested(user_fields2))
}
class User3Resource(Resource):
@marshal_with(ret_fields3)
def get(self):
users = User.query.all()
return {
'status': 1,
'msg': 'ok',
'data': users
}
前端参数解析
在 Flask-RESTful 中,reqparse
模块提供了一种简单的方法来解析请求中的参数。parser.add_argument
方法允许你定义需要从请求中提取的参数,并且你可以指定参数的类型、默认值、位置等属性。
parser.add_argument
的参数
name
: 参数的名称。type
: 参数的类型(例如str
,int
,float
,bool
等)。location
: 参数的位置,可以是'args'
,'form'
,'json'
,'headers'
,'cookies'
等。required
: 参数是否是必需的,默认是False
。help
: 当参数验证失败时返回的错误消息。default
: 参数的默认值,如果未提供则使用这个值。
# --------------------------- 参数解析 --------------------------- #
# 参数解析:解析前端发送过来的数据
parser = reqparse.RequestParser()
parser.add_argument('name', type=str, required=True, help='name是必需的参数')
parser.add_argument('age', type=int, action='append') # 可以有多个age 得到的是一个列表
parser.add_argument('key', type=str, location='cookies')
class User4Resource(Resource):
def get(self):
# 获取参数
args = parser.parse_args()
name = args.get('name')
age = args.get('age')
key = args.get('key')
return jsonify({"name": name, 'age': age, 'key': key})
标签:return,name,Flask,app,db,print,def
From: https://www.cnblogs.com/wbcde116/p/18199824