请求与响应
# 请求对象 request
def index():
print(request.method) # 请求方式
print(request.form) # post请求数据
print(request.args) # get请求数据
print(request.path) # 不带域名的请求路径
print(request.data) # 获取的json格式数据 二进制
print(request.cookies) # 获取客户端的cookie
print('----', request.headers, '-----') # 请求头中的数据
print(request.host) # 请求地址
print(request.files) # 文件数据
return ''
# 响应对象 response
def index():
# 先做一个响应对象
response = make_response()
response.headers['session'] = 'wasdzsgfhyjtdjfaiaug2134' # 给响应头里写数据
response.set_cookie('cookie','hahhaha') # 设置cookie 以k,v形式设置
response.delete_cookie('cookie') # 删除cookie
return response
'''
cookie的一些参数:
1.max_age=None 超时时间 cookie需要延续的时间,如果为none会延续到浏览器关闭为止
2.expires=None 过期时间
3.path='/' cookie生效的路径【根路径的cookie可以被任何url的页面访问,浏览器只会把cookie回传给带有该路径的页面,这样可以避免将cookie传给站点中的其他应用】
4.domain=None cookie生效的域名,可以用这个参数来构造一个跨站cookie 【如果为none只能由设置它的站点读取】
5.secure=False 浏览器将通过https来回传cookie
6.HTTP only=False 只能按照http协议传输 无法被js获取
'''
session的使用【很重要】
# session原理 【django】
1.请求过来的时候设置一个session request.session['name']='hahhahah'
- 会随机生成一个字符串 ghhvjxjbkf
- 把session字典序列化放到djangosession表中
- 以session为key,随机字符串为value,写入到cookie中
session:ghhvjxjbkf
2.下次再访问的时候
- 携带随机字符串
- 根据session取出随机字符串
- 去django-session表中根据这个随机字符串取出value值,解密,得到字典
- 放到request.session中 下次直接点session就可以获取到上次写入的name的值
session的生成是在中间件中完成的
1.先判断request.session是否为空 如果是空的不做任何操作
2.不为空的话
- 判断值是否改过:改过的话重写到session表中的value
- 没改过:不做任何操作
# flask的session设置
1.设置session
- app.secret_key = 'dsazfdgfhjghjz'
- session['name'] = 'summer'
2.取值
- session.get('name')
- 先判断session是否为空,不是空的话又加了东西,把这个字典序列化使用密钥加密把加密串以cookie的形式写入到浏览器中
- 在进入视图函数之前,根据session取出三段,反序列化解密成session对象,在后续的视图函数中使用
session源码分析
# 入口是app() 对象加括号会执行类的__call__方法
- return self.wsgi_app(environ, start_response)
1.查看wsgi_app方法
def wsgi_app(self, environ,start_response):
ctx = self.request_context(environ)
try:
try:
ctx.push() # 执行了push方法
response = self.full_dispatch_request()
except Exception as e:
error = e
response = self.handle_exception(e)
except:
error = sys.exc_info()[1]
raise
return response(environ, start_response)
finally:
ctx.pop(error)
2.查看push方法
def push(self):
if self.session is None:
# flask对象中的方法 session_interface = SecureCookieSessionInterface()
session_interface = self.app.session_interface
# 如果session是空的会执行open_session方法
self.session = session_interface.open_session(self.app, self.request)
if self.session is None:
self.session = session_interface.make_null_session(self.app)
3.查看SecureCookieSessionInterface()里的open_session方法 【请求来的时候执行】
def open_session(self, app, request):
s = self.get_signing_serializer(app)
if s is None:
return None
# 此时的val就是上面的session的三段式 【加密串】
val = request.cookies.get(self.get_cookie_name(app))
if not val: # 判断session是否有值
return self.session_class()
max_age = int(app.permanent_session_lifetime.total_seconds())
try:
data = s.loads(val, max_age=max_age)
return self.session_class(data)
except BadSignature:
return self.session_class()
4.查看save_session方法【请求走的时候执行】
def save_session(self, app, session,response) :
name = self.get_cookie_name(app)
domain = self.get_cookie_domain(app)
path = self.get_cookie_path(app)
secure = self.get_cookie_secure(app)
samesite = self.get_cookie_samesite(app)
httponly = self.get_cookie_httponly(app)
if not session: # 如果session有值的话
if session.modified: # 判断当前的session是否改值
response.delete_cookie( # 将当前的cookie删除
name,
domain=domain,
path=path,
secure=secure,
samesite=samesite,
httponly=httponly,)
return
if session.accessed:
response.vary.add("Cookie")
if not self.should_set_cookie(app, session):
return
expires = self.get_expiration_time(app, session)
# 给重写的session做序列化 加密
val = self.get_signing_serializer(app).dumps(dict(session))
response.set_cookie( # 设置cookie
name,
val,
expires=expires,
httponly=httponly,
domain=domain,
path=path,
secure=secure,
samesite=samesite,)
# 总结
1.当请求来的时候,会执行open_session,从中取出cookie,判断是否为空,不为空的话反序列化解密,变成字典转到session对象中,从视图函数中用session.get就可以取值
2.当请求走的时候,会执行save_session,session转成字典,序列化加密成三段放到cookie中
闪现
# 作用: 访问a页面时,出现错误,重定向到b页面,要在b页面展示a页面的错误信息
必须要设置 app.secret_key = '' 才可以使用
使用: 【方式一】
flash('没有权限访问')
取值:
get_flashed_messages()
分类设置和获取 【方式二】
设置:
flash('没有访问权限',category='1')
flash('没有登录',category='2')
取值:
errors = get_flashed_messages(category_filter=['1'])
【类似于django中的message框架】
请求扩展
# 类似于django的中间件
1.before_request:在请求进视图函数之前执行
【从上往下依次执行 遇到返回四件套就会直接返回 不再走后面的视图函数】
2.after_request:在请求从视图函数走之后执行 【可以在响应头中写东西】
【从下往上执行,要有参数和返回值,参数就是response对象,返回值也是response对象】
3.before_first_request:项目启动后,第一次访问会执行,以后再也不执行了
4.teardown_request:每一个请求之后绑定一个函数,即使遇到了异常,每个请求走,都会执行,记录错误日志
@app.teardown_request
def tear_down(e):
print(e) # 如果有异常,这是异常对象
print('我执行了')
5.errorhandler路径不存在时404,服务器内部错误500
# @app.errorhandler(404)
# def error_404(arg):
# print('404会执行我')
# # return "404错误了"
# return render_template('404.html')
@app.errorhandler(500) # debug为False请情况下才能看到
def error_500(arg):
print('500会执行我')
return "服务器内部错误"
6.template_global 标签 ,在模板中用 {{sb(1,2)}}
@app.template_global()
def sb(a1, a2):
return a1 + a2
7.template_filter过滤器 在模板中用 {{10|db(1,2)}}
@app.template_filter()
def db(a1, a2, a3):
return a1 + a2 + a3
蓝图
# 就是为了划分目录结构的
使用步骤:
1.在不同的view的py文件中定义蓝图
2.使用app对象,注册蓝图
3.使用蓝图注册路由,注册请求扩展
# 蓝图小型项目
flask_blueprint_little # 项目名
-src # 项目代码所在路径
-__init__.py # app对象创建的地方
-templates # 模板
-user.html
-static # 静态文件
-views # 视图函数存放位置
-user.py # 用户相关视图
-order.py # 订单相关视图
-manage.py # 启动文件
# 大型项目
flask_blurprint_big # 项目名字
-src # 项目代码所在位置
-__init__.py # src的init,falsk,app实例化
-settings.py # 配置文件
-admin # 类似于django的admin app
-__init__.py # 蓝图初始化
-template # 模板
-backend.html
-static # 静态文件
-xx.jpg
-views.py # 视图层
-models.py # models层
-api
-__init__.py
-template
-static
-models.py
-views.py
-manage.py # 启动文件
异步【拓展芝士】
# 异步框架 FastAPi
async def index():
print('sdfasd')
a++
await xxx # io操作
async def goods():
pass
# 框架之前的web框架,开启进程,线程---》一条线程会运行多个协程函数----》协程函数中遇到io,读到await关键字,就会切换到别的协程函数
# 一旦使用了异步,以后所有的模块,都要是异步
-pymysql :同步的
-redis :同步
-aiomysql:异步
-aioredis:异步
-在fastapi或sanic中,要操作mysql,redis要使用异步的框架,否则效率更低
-django 3.x 以后页支持async 关键字
-没有一个特别好异步的orm框架
-sqlalchemy在做
-tortoise-orm
https://tortoise-orm.readthedocs.io/en/latest/index.html
# aiomysql
import asyncio
import aiomysql
loop = asyncio.get_event_loop()
async def test_example():
conn = await aiomysql.connect(host='127.0.0.1', port=3306,
user='root', password='', db='mysql',
loop=loop)
cur = await conn.cursor()
await cur.execute("SELECT Host,User FROM user")
print(cur.description)
r = await cur.fetchall()
print(r)
await cur.close()
conn.close()
loop.run_until_complete(test_example())
# aioredis
import aioredis
import asyncio
class Redis:
_redis = None
async def get_redis_pool(self, *args, **kwargs):
if not self._redis:
self._redis = await aioredis.create_redis_pool(*args, **kwargs)
return self._redis
async def close(self):
if self._redis:
self._redis.close()
await self._redis.wait_closed()
async def get_value(key):
redis = Redis()
r = await redis.get_redis_pool(('127.0.0.1', 6379), db=7, encoding='utf-8')
value = await r.get(key)
print(f'{key!r}: {value!r}')
await redis.close()
if __name__ == '__main__':
asyncio.run(get_value('key')) # need python3.7
标签:请求,get,Flask,self,request,响应,session,cookie,app
From: https://www.cnblogs.com/Hsummer/p/16980866.html