今日内容概要
- g对象
- flask-session使用
- 数据库连接池
- wtfroms
- 信号
今日内容详细
g对象
# g :global缩写,是关键字,不能用,就写成了g,对象,是一个全局对象,当此请求过程中,一直有效
# 作用:上下文
-其实是请求的上下文,从请求进来,就有,到请求走了,一直存在,所以在当次请求过程中,如果调用别的函数,不需要把参数传入,只需要放到g对象中,在别的函数中直接使用g获取即可
# djagno中有没有这个东西?
request.context
# 为什么不把变量放到request对象中?
担心把requets原来的属性替换掉
# g对象和session有什么区别
-g只针对于当次请求
-session针对于所有请求
代码演示
from flask import Flask, g, request
# g 对象
app = Flask(__name__)
def add():
a = g.a
b = g.b
return a + b
# 1 写个请求扩展,before_request
@app.before_request
def before():
request.name = 'lqz'
if 'home' not in request.path:
g.name = 'lqz'
@app.after_request
def after(response):
print('-----', g.name)
return response
@app.route('/')
def index():
g.a = 1
g.b = 6
res = add()
print(res)
return 'hello'
@app.route('/home')
def home():
print(g.name)
return 'hello'
if __name__ == '__main__':
app.run()
flask-session使用
# flask内置的session,把数据加密后保存到浏览器了
# 能不能自己写个session的类,重写open_session和save_session,把数据保存到服务端的redis中
# 第三方:flask-session已经把这事干了,可以放到文件,redis,mongodb,关系型数据库
# 下载:
pip3 install -U flask-session # 升级安装
# 使用:
#方式一: 以后会保存到redis中
from flask_session.sessions import RedisSessionInterface
from redis import Redis
conn=Redis(host='127.0.0.0',port=6379)
# app.session_interface = RedisSessionInterface(redis=None, key_prefix='lqz')
app.session_interface = RedisSessionInterface(redis=conn, key_prefix='lqz')
# 在视图函数中使用即可
@app.route('/')
def index():
session['name'] = 'pyy'
return 'hello'
# 方式二: 通用方案,以后集成第三方插件,大致都按这个流程
from flask_session import Session
#配置文件
app.config.from_pyfile('settings.py')
from redis import Redis
app.config['SESSION_TYPE'] = 'redis'
app.config['SESSION_REDIS'] = Redis(host='127.0.0.1',port='6379')
Session(app) # 导入一个类,把app传入
# 方案二,源码分析
本质在 Session(app)--->就是根据配置文件,生成 RedisSessionInterface 对象,赋值给app.session_interface
# 1 如何配置session的过期时间
配置文件:PERMANENT_SESSION_LIFETIME = timedelta(seconds=10)
# 2 如何让cookie,关闭浏览器就失效
# expires 设置为None,就是浏览器关闭cookie就失效了
res = make_response('hello')
res.set_cookie('name', 'lqz', expires=None)
#session设置的cookie,关闭浏览器失效
-使用方式一:app.session_interface = RedisSessionInterface(redis=conn, key_prefix='lqz',permanent=False)
-使用方式二:配置文件加入
SESSION_PERMANENT=False
代码演示
from flask import Flask, session, make_response
app = Flask(__name__)
app.secret_key = 'asdfasdfa'
# 使用步骤:方式一:
from flask_session.sessions import RedisSessionInterface
from redis import Redis
conn = Redis(host='127.0.0.0', port=6379)
app.session_interface = RedisSessionInterface(redis=None, key_prefix='lqz')
app.session_interface = RedisSessionInterface(redis=conn, key_prefix='lqz', permanent=False)
# 方式二:flask使用其他第三插件,模式基本一样
from flask_session import Session
# 配置文件
app.config.from_pyfile('settings.py')
from redis import Redis
app.config['SESSION_TYPE'] = 'redis'
app.config['SESSION_REDIS'] = Redis(host='127.0.0.1', port='6379')
Session(app) # 导入一个类,把app传入
@app.route('/')
def index():
session['name'] = 'lyf'
res = make_response('hello')
res.set_cookie('name', 'lqz', expires=None)
return res
if __name__ == '__main__':
app.run(port=8080)
数据库连接池
flask中集成mysql
from flask import Flask, jsonify
import pymysql
app = Flask(__name__)
@app.route('/boys')
def boys():
# 第一步,链接mysql
conn = pymysql.connect(host='127.0.0.1', port=3306, database='aaa', password='123', user='root')
cursor = conn.cursor()
cursor.execute('select * from boy')
res = cursor.fetchall()
cursor.close()
conn.close()
print(res)
return jsonify(res)
使用数据库连接池
# 每个请求过来,都打开mysql链接,操作,操作完了,关闭链接
# 如果并发量很高,如果有1w个并发,要开1w mysql的链接,mysql顶不住
-django就是这么做的
# 解决方案:使用,把连接对象和cursor定义成全局的
-每个视图函数使用同一个cursor,这样会错乱
# 使用第三方的:DBUtils ,创建数据库连接池
# pip3 install -U DBUtils
# 使用步骤
# 第一步:在py中实例化一个池
from dbutils.pooled_db import PooledDB
import pymysql
# 实例化得到对象
POOL = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5, # 链接池中最多闲置的链接,0和None不限制
maxshared=0, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host='127.0.0.1',
port=3306,
user='root',
password='123',
database='cnblogs',
charset='utf8'
)
# 第二步:在视图函数中使用池
from db import POOL
@app.route('/boys')
def boys():
# 第一步,从连接池中取一个链接
conn = POOL.connection()
cursor = conn.cursor()
time.sleep(0.01)
cursor.execute('select * from article')
res = cursor.fetchall()
print(res)
return jsonify(res)
# 总结,
如果使用池:无论客户端连接数有多大,mysql的连接数,最多就是6个
如果不使用池:mysql的连接数会过大,把mysql崩掉
# 压力测试:客户端代码
import requests
from threading import Thread, get_ident
def task():
res = requests.get('http://127.0.0.1:8080/boys')
print('线程id号为:%s,获取的数据为:' % str(get_ident()), res.json())
if __name__ == '__main__':
for i in range(5000):
t = Thread(target=task)
t.start()
#链接mysql 查看连接数
show status like 'Threads%'
wtfroms
# 跟 django中学过的forms组件是一个东西
#作用:
1 校验数据
2 渲染错误信息
3 渲染页面
#第三方:下载
pip3 install wtforms
python代码
from flask import Flask, render_template, request, redirect
from wtforms import Form
from wtforms.fields import simple
from wtforms import validators
from wtforms import widgets
from wtforms.fields import choices
app = Flask(__name__, template_folder='templates')
app.debug = True
class LoginForm(Form):
# 字段(内部包含正则表达式)
name = simple.StringField(
label='用户名',
validators=[
validators.DataRequired(message='用户名不能为空.'),
validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d')
],
widget=widgets.TextInput(), # 页面上显示的插件
render_kw={'class': 'form-control'}
)
# 字段(内部包含正则表达式)
pwd = simple.PasswordField(
label='密码',
validators=[
validators.DataRequired(message='密码不能为空.'),
validators.Length(min=8, message='用户名长度必须大于%(min)d'),
validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}",
message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符')
],
widget=widgets.PasswordInput(),
render_kw={'class': 'form-control'}
)
class RegisterForm(Form):
name = simple.StringField(
label='用户名',
validators=[
validators.DataRequired()
],
widget=widgets.TextInput(),
render_kw={'class': 'form-control'},
default='pyy'
)
pwd = simple.PasswordField(
label='密码',
validators=[
validators.DataRequired(message='密码不能为空.')
],
widget=widgets.PasswordInput(),
render_kw={'class': 'form-control'}
)
pwd_confirm = simple.PasswordField(
label='重复密码',
validators=[
validators.DataRequired(message='重复密码不能为空.'),
validators.EqualTo('pwd', message="两次密码输入不一致")
],
widget=widgets.PasswordInput(),
render_kw={'class': 'form-control'}
)
email = simple.EmailField(
label='邮箱',
validators=[
validators.DataRequired(message='邮箱不能为空.'),
validators.Email(message='邮箱格式错误')
],
widget=widgets.TextInput(input_type='email'),
render_kw={'class': 'form-control'}
)
gender = choices.RadioField(
label='性别',
choices=(
(1, '男'),
(2, '女'),
),
coerce=int # “1” “2”
)
city = choices.SelectField(
label='城市',
choices=(
('bj', '北京'),
('sh', '上海'),
)
)
hobby = choices.SelectMultipleField(
label='爱好',
choices=(
(1, '篮球'),
(2, '足球'),
),
coerce=int
)
favor = choices.SelectMultipleField(
label='喜好',
choices=(
(1, '篮球'),
(2, '足球'),
),
widget=widgets.ListWidget(prefix_label=False),
option_widget=widgets.CheckboxInput(),
coerce=int,
default=[1, 2]
)
def __init__(self, *args, **kwargs):
super(RegisterForm, self).__init__(*args, **kwargs)
self.favor.choices = ((1, '篮球'), (2, '足球'), (3, '羽毛球'))
def validate_pwd_confirm(self, field):
"""
自定义pwd_confirm字段规则,例:与pwd字段是否一致
:param field:
:return:
"""
# 最开始初始化时,self.data中已经有所有的值
if field.data != self.data['pwd']:
# raise validators.ValidationError("密码不一致") # 继续后续验证
raise validators.StopValidation("密码不一致") # 不再继续后续验证
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'GET':
form = LoginForm()
return render_template('login.html', form=form)
else:
form = LoginForm(formdata=request.form)
if form.validate():
print('用户提交数据通过格式验证,提交的值为:', form.data)
else:
print(form.errors)
return render_template('login.html', form=form)
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'GET':
form = RegisterForm(data={'gender': 2, 'hobby': [1, ]}) # initial
return render_template('register.html', form=form)
else:
form = RegisterForm(formdata=request.form)
if form.validate():
print('用户提交数据通过格式验证,提交的值为:', form.data)
else:
print(form.errors)
return render_template('register.html', form=form)
if __name__ == '__main__':
app.run()
html代码
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<h1>用户注册</h1>
<form method="post" novalidate style="padding:0 50px">
{% for field in form %}
<p>{{field.label}}: {{field}} {{field.errors[0] }}</p>
{% endfor %}
<input type="submit" value="提交">
</form>
</body>
</html>
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<h1>登录</h1>
<form method="post" novalidate>
<p>{{form.name.label}} {{form.name}} {{form.name.errors[0] }}</p>
<p>{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}</p>
<input type="submit" value="提交">
</form>
</body>
</html>
信号
# 信号和信号量
# 信号量:Semaphore:信号量可以理解为多把锁,同时允许多个线程来更改数据
# 信号:signal:Flask框架中的信号基于blinker,其主要就是让开发者可是在flask请求过程中定制一些用户行为
# 有什么用?
-代码耦合性低
-before_render_template :只要模板渲染就会执行,记录日志,login页面被渲染了多少次
-统计今天用户访问量
-User表只要删除记录,就干个什么事
-向banner表存数据,双写一致性问题:定时更新
# django得信号:https://www.cnblogs.com/liuqingzheng/articles/9803403.html
# 内置信号
request_started = _signals.signal('request-started') # 请求到来前执行
request_finished = _signals.signal('request-finished') # 请求结束后执行
before_render_template = _signals.signal('before-render-template') # 模板渲染前执行
template_rendered = _signals.signal('template-rendered') # 模板渲染后执行
got_request_exception = _signals.signal('got-request-exception') # 请求执行出现异常时执行
request_tearing_down = _signals.signal('request-tearing-down') # 请求执行完毕后自动执行(无论成功与否)
message_flashed = _signals.signal('message-flashed') # 调用flashed在其中添加数据时,自动触发
使用步骤
from flask import Flask, render_template
from flask import signals
app = Flask(__name__)
# 内置信号使用步骤
# 第一步:写个函数
def render_before(*args,**kwargs):
print(args)
print(kwargs)
print('模板要渲染了')
# 第二步:跟信号绑定
signals.before_render_template.connect(render_before)
# 第三步:触发信号 (内置信号,会自动触发)
@app.route('/')
def index():
print('index 执行了')
return render_template('index.html', name='刘亦菲')
if __name__ == '__main__':
app.run()
'''使用步骤
1 定义函数
2 跟内置信号绑定:signals.before_render_template.connect(before_render)
3 等待信号被触发
'''
自定义信号
from flask import Flask, render_template,request
from flask import signals
from flask.signals import _signals
app = Flask(__name__)
# 第一步:定义信号
xxx = _signals.signal('xxx')
# 第二步:写个函数
def add(*args, **kwargs):
print(args)
print(kwargs)
print('add执行了')
# 第三步:跟信号绑定
xxx.connect(add)
#第四步:触发信号
@app.route('/')
def index():
xxx.send(request=request)
print('index 执行了')
return render_template('index.html', name='刘亦菲')
if __name__ == '__main__':
app.run()
'''
# 第一步:定义信号
# 第二步:定义函数
# 第三步:绑定自定义的信号
# 第四步:触发自定义的信号
'''
为什么cpython获得gil锁,线程才能执行?
1.在没有 GIL 锁的情况下,有可能多线程在执行一个代码的同时,垃圾回收机制线程对所执行代码的变量直接回收,导致运行报错
2.不同的数据除了 GIL 锁,还需要一把互斥锁,来保证数据处理不会错乱
注意:GIL 锁是加在 CPython 解释器上的,进程先获取 GIL 锁,在获取 CPython 解释器
标签:__,name,form,flask,app,session,import
From: https://www.cnblogs.com/wwjjll/p/16980717.html