目录
1. 数据库连接池
flask 可以使用pymysql操作数据库
1.1 如果把conn做成全局
多个线程使用同一个链接对象,会导致数据错乱
1.2 如果在每个视图函数中建立链接
每个线程使用一个连接,会导致mysql连接数过大
1.3 借助于第三方模块,实现数据库连接池
pip3 install dbutils
# https://www.cnblogs.com/liuqingzheng/articles/9006055.html
1.4 pool.py
from dbutils.pooled_db import PooledDB
import pymysql
POOL=PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5, # 链接池中最多闲置的链接,0和None不限制
maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。
ping=0,
# ping MySQL服务端,检查是否服务可用。
host='127.0.0.1',
port=3306,
user='root',
password='123',
database='luffy_api',
charset='utf8'
)
1.5 flask测试
from flask import Flask
# import pymysql
from pool import POOL
import pymysql
app = Flask(__name__)
# 定义在外面,全局conn,cursor,会出现数据错乱,正常情况应该放在视图函数内部建立链接,但是会影响性能,所有需要建立数据库连接池
# conn = pymysql.connect(host='127.0.0.1', port=3306, database='luffy_api', user='root', password='123')
# cursor = conn.cursor()
@app.route('/banner')
def banner():
# 从池中拿链接,创建出cursor对象
# conn = POOL.connection()
# cursor = conn.cursor()
conn = pymysql.connect(host='127.0.0.1', port=3306, database='luffy_api', user='root', password='123')
cursor = conn.cursor()
cursor.execute('select * from luffy_banner')
print(cursor.fetchall())
return 'hello'
@app.route('/order')
def order():
# 从池中拿链接,创建出cursor对象
# conn = POOL.connection()
# cursor = conn.cursor()
conn = pymysql.connect(host='127.0.0.1', port=3306, database='luffy_api', user='root', password='123')
cursor = conn.cursor()
cursor.execute('select * from luffy_order')
print(cursor.fetchall())
return 'hello'
if __name__ == '__main__':
app.run()
1.6 压力测试
import requests
from threading import Thread
def task():
res = requests.get('http://127.0.0.1:5000/banner')
print(res.text)
res = requests.get('http://127.0.0.1:5000/order')
print(res.text)
# 查询目前有多少个mysql的客户端链接着mysql
if __name__ == '__main__':
for i in range(10000):
t = Thread(target=task)
t.start()
# 查看当前有多个个连接数
show status like 'Threads%'
2. wtfroms(了解)
1. 混合的项目,django中学过forms:校验数据,渲染页面,flask中使用第三方wtfroms完成forms的功能
2. pip3 install wtforms
from flask import Flask,request,render_template
app = Flask(__name__)
from wtforms.fields import core
from wtforms import Form
from wtforms.fields import html5
from wtforms.fields import simple
from wtforms import validators
from wtforms import widgets
class LoginForm(Form):
# 字段(内部包含正则表达式)
name = simple.StringField(
label='用户名',
validators=[
validators.DataRequired(message='用户名不能为空.'),
validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d')
],
widget=widgets.TextInput(), # 页面上显示的插件
render_kw={'class': 'form-control'}
)
# 字段(内部包含正则表达式)
pwd = simple.PasswordField(
label='密码',
validators=[
validators.DataRequired(message='密码不能为空.'),
validators.Length(min=8, message='用户名长度必须大于%(min)d'),
validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}",
message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符')
],
widget=widgets.PasswordInput(),
render_kw={'class': 'form-control'}
)
class RegisterForm(Form):
name = simple.StringField(
label='用户名',
validators=[
validators.DataRequired()
],
widget=widgets.TextInput(),
render_kw={'class': 'form-control'},
default='alex'
)
pwd = simple.PasswordField(
label='密码',
validators=[
validators.DataRequired(message='密码不能为空.')
],
widget=widgets.PasswordInput(),
render_kw={'class': 'form-control'}
)
pwd_confirm = simple.PasswordField(
label='重复密码',
validators=[
validators.DataRequired(message='重复密码不能为空.'),
validators.EqualTo('pwd', message="两次密码输入不一致")
],
widget=widgets.PasswordInput(),
render_kw={'class': 'form-control'}
)
email = html5.EmailField(
label='邮箱',
validators=[
validators.DataRequired(message='邮箱不能为空.'),
validators.Email(message='邮箱格式错误')
],
widget=widgets.TextInput(input_type='email'),
render_kw={'class': 'form-control'}
)
gender = core.RadioField(
label='性别',
choices=(
(1, '男'),
(2, '女'),
),
coerce=int # “1” “2”
)
city = core.SelectField(
label='城市',
choices=(
('bj', '北京'),
('sh', '上海'),
)
)
hobby = core.SelectMultipleField(
label='爱好',
choices=(
(1, '篮球'),
(2, '足球'),
),
coerce=int
)
favor = core.SelectMultipleField(
label='喜好',
choices=(
(1, '篮球'),
(2, '足球'),
),
widget=widgets.ListWidget(prefix_label=False),
option_widget=widgets.CheckboxInput(),
coerce=int,
default=[1, 2]
)
def __init__(self, *args, **kwargs):
super(RegisterForm, self).__init__(*args, **kwargs)
self.favor.choices = ((1, '篮球'), (2, '足球'), (3, '羽毛球'))
def validate_pwd_confirm(self, field):
"""
自定义pwd_confirm字段规则,例:与pwd字段是否一致
:param field:
:return:
"""
# 最开始初始化时,self.data中已经有所有的值
if field.data != self.data['pwd']:
# raise validators.ValidationError("密码不一致") # 继续后续验证
raise validators.StopValidation("密码不一致") # 不再继续后续验证
@app.route('/login',methods=['GET','POST'])
def login():
if request.method == 'GET':
form = LoginForm()
return render_template('login.html', form=form)
else:
form = LoginForm(formdata=request.form)
if form.validate():
print('用户提交数据通过格式验证,提交的值为:', form.data)
else:
print(form.errors)
return render_template('login.html', form=form)
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'GET':
form = RegisterForm(data={'gender': 2,'hobby':[1,]}) # initial
return render_template('register.html', form=form)
else:
form = RegisterForm(formdata=request.form)
if form.validate():
print('用户提交数据通过格式验证,提交的值为:', form.data)
else:
print(form.errors)
return render_template('register.html', form=form)
if __name__ == '__main__':
app.run()
2.1 login.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<h1>登录</h1>
<form method="post">
<p>{{form.name.label}} {{form.name}} {{form.name.errors[0] }}</p>
<p>{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}</p>
<input type="submit" value="提交">
</form>
</body>
</html>
2.2 register.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<h1>用户注册</h1>
<form method="post" novalidate style="padding:0 50px">
{% for field in form %}
<p>{{field.label}}: {{field}} {{field.errors[0] }}</p>
{% endfor %}
<input type="submit" value="提交">
</form>
</body>
</html>
3. 信号
1. 并发编程学的锁:
-互斥锁:Lock
-递归锁(可重入锁):RLock,互斥锁会出现死锁现象
-Semaphore:多把锁,信号量
-event事件:某个条件完成成,线程执行
2. 所有锁:https://zhuanlan.zhihu.com/p/489305763
# flask 信号,signal翻译过来的
# django中,也有信号:https://www.cnblogs.com/liuqingzheng/articles/9803403.html
# pip3 install blinker
# 某种情况下会触发某个函数执行----》aop理念,面向切面编程的理念
-装饰器实现
-魔法方法实现
-其他方式
# 链式调用:编程的思维方法(java,js中使用很多),queryset对象
-如何实现,方法执行结果返回当前对象
-对象.方法().方法().方法()
3.1 内置信号
request_started = _signals.signal('request-started') # 请求到来前执行
request_finished = _signals.signal('request-finished') # 请求结束后执行
before_render_template = _signals.signal('before-render-template') # 模板渲染前执行
template_rendered = _signals.signal('template-rendered') # 模板渲染后执行
got_request_exception = _signals.signal('got-request-exception') # 请求执行出现异常时执行
request_tearing_down = _signals.signal('request-tearing-down') # 请求执行完毕后自动执行(无论成功与否)
appcontext_tearing_down = _signals.signal('appcontext-tearing-down')# 应用上下文执行完毕后自动执行(无论成功与否)
appcontext_pushed = _signals.signal('appcontext-pushed') # 应用上下文push时执行
appcontext_popped = _signals.signal('appcontext-popped') # 应用上下文pop时执行
message_flashed = _signals.signal('message-flashed') # 调用flask在其中添加数据时,自动触发
from flask import Flask, render_template
from flask import signals
app = Flask(__name__)
# 内置信号使用步骤
# 第一步:写个函数
def render_before(*args,**kwargs):
print(args)
print(kwargs)
print('模板要渲染了')
# 第二步:跟信号绑定
signals.before_render_template.connect(render_before)
# 第三步:触发信号 (内置信号,会自动触发)
@app.route('/')
def index():
print('index 执行了')
return render_template('index.html', name='刘亦菲')
if __name__ == '__main__':
app.run()
'''使用步骤
# 第一步:写个函数
# 第二步:跟信号绑定
# 第二步:跟信号绑定
'''
## 6.2 自定义信号
```python
from flask import Flask, render_template,request
from flask import signals
from flask.signals import _signals
app = Flask(__name__)
# 1 内置信号使用步骤
# # 第一步:写个函数
# def render_before(*args, **kwargs):
# print(args)
# print(kwargs)
# print('模板要渲染了')
#
#
# # 第二步:跟信号绑定
# signals.before_render_template.connect(render_before)
# # 第三步:触发信号 (内置信号,会自动触发)
### 2 自定义信号
# 第一步:定义信号
xxx = _signals.signal('xxx')
# 第二步:写个函数
def add(*args, **kwargs):
print(args)
print(kwargs)
print('add执行了')
# 第三步:跟信号绑定
xxx.connect(add)
#第四步:触发信号
@app.route('/')
def index():
xxx.send(request=request)
print('index 执行了')
return render_template('index.html', name='刘亦菲')
if __name__ == '__main__':
app.run()
'''
# 第一步:定义信号
# 第二步:写个函数
# 第三步:跟信号绑定
#第四步:触发信号
'''
3.2 自定义信号
from flask import Flask, render_template,request
from flask import signals
from flask.signals import _signals
app = Flask(__name__)
# 1 内置信号使用步骤
# # 第一步:写个函数
# def render_before(*args, **kwargs):
# print(args)
# print(kwargs)
# print('模板要渲染了')
#
#
# # 第二步:跟信号绑定
# signals.before_render_template.connect(render_before)
# # 第三步:触发信号 (内置信号,会自动触发)
### 2 自定义信号
# 第一步:定义信号
xxx = _signals.signal('xxx')
# 第二步:写个函数
def add(*args, **kwargs):
print(args)
print(kwargs)
print('add执行了')
# 第三步:跟信号绑定
xxx.connect(add)
#第四步:触发信号
@app.route('/')
def index():
xxx.send(request=request)
print('index 执行了')
return render_template('index.html', name='刘亦菲')
if __name__ == '__main__':
app.run()
'''
# 第一步:定义信号
# 第二步:写个函数
# 第三步:跟信号绑定
#第四步:触发信号
'''
4. 多app应用
# 只有一个app对象,一个flask项目,支持多个app对象的
from flask import Flask
from werkzeug.middleware.dispatcher import DispatcherMiddleware
from werkzeug.serving import run_simple
app1 = Flask('app01')
app2 = Flask('app02')
@app1.route('/index')
def index():
return "app01"
@app2.route('/index2')
def index2():
return "app2"
dm = DispatcherMiddleware(app1, {
'/sec': app2,
})
if __name__ == "__main__":
# 请求来了,会执行 dm()---->触发DispatcherMiddleware的__call__
run_simple('localhost', 5000, dm)
5. flask-script
# djagno中执行djagno程序,通过命令:python3 manage.py runserver--->还有其他很多命令
# 想让flask能够执行python3 manage.py runserver启动flask项目,自定制一些命令,完成更多操作
# django中自定制命令?
# 借助于第三方 flask-script来完成上面的需求
# pip3 install flask-script
from flask import Flask
# 使用步骤
from flask_script import Manager
app = Flask('app01')
#基本使用,多了runserver命令
manager=Manager(app)
# 高级使用:自定制命令 python manage.py dbinit
@manager.command
def dbinit():
"""
python manage.py dbinit
"""
print("数据库初始化完成")
@manager.option('-n', '--name', dest='name')
@manager.option('-u', '--url', dest='url')
def cmd(name, url):
"""
自定义命令(-n也可以写成--name)
执行: python manage.py cmd -n lqz -u xxx
执行: python manage.py cmd --name lqz --url yyy
"""
print(name, url)
# 编一个功能,通过execl---》把execl的数据存同步到某个数据表中得命令
@app.route('/index')
def index():
return "app01"
if __name__ == "__main__":
# app.run()
manager.run()
6. flask请求上下文分析
# request,session 是全局的,每个视图函数都在用,为什么不乱
# 每次请求来了,都是一个线程或者协程执行这个app()
# 请求扩展,在源码中得哪些位置执行的?
# 内置信号的触发,在哪触发的
# 请求来了,会执行Flask类的__call__----> app(environ, start_response)---->self.wsgi_app(environ, start_response)---->
# Flask类的方法wsgi_app,这是整个flask请求的声明周期
def wsgi_app(self, environ, start_response):
# 把environ转成Requet类的对象,当次请求对象,放到了ctx中
ctx = self.request_context(environ)
error = None
try:
try:
# 干了非常牛逼的事
ctx.push()
# 执行路由匹配,执行视图函数
response = self.full_dispatch_request()
except Exception as e:
error = e
response = self.handle_exception(e)
except: # noqa: B001
error = sys.exc_info()[1]
raise
# 返回了响应对象
return response(environ, start_response)
finally:
if self.should_ignore_error(error):
error = None
ctx.auto_pop(error)
请求上下文执行流程(ctx):
-0 flask项目一启动,有6个全局变量
-_request_ctx_stack:LocalStack对象
-_app_ctx_stack :LocalStack对象
-request : LocalProxy对象
-session : LocalProxy对象
-1 请求来了 app.__call__()---->内部执行:self.wsgi_app(environ, start_response)
-2 wsgi_app()
-2.1 执行:ctx = self.request_context(environ):返回一个RequestContext对象,并且封装了request(当次请求的request对象),session
-2.2 执行: ctx.push():RequestContext对象的push方法
-2.2.1 push方法中中间位置有:_request_ctx_stack.push(self),self是ctx对象
-2.2.2 去_request_ctx_stack对象的类中找push方法(LocalStack中找push方法)
-2.2.3 push方法源码:
def push(self, obj):
#通过反射找self._local,在init实例化的时候生成的:self._local = Local()
#Local()flask封装的支持线程和协程的local对象
# 一开始取不到stack,返回None
rv = getattr(self._local, "stack", None)
if rv is None:
#走到这,self._local.stack=[],rv=self._local.stack
self._local.stack = rv = []
# 把ctx放到了列表中
#self._local={'线程id1':{'stack':[ctx,]},'线程id2':{'stack':[ctx,]},'线程id3':{'stack':[ctx,]}}
rv.append(obj)
return rv
-3 如果在视图函数中使用request对象,比如:print(request)
-3.1 会调用request对象的__str__方法,request类是:LocalProxy
-3.2 LocalProxy中的__str__方法:lambda x: str(x._get_current_object())
-3.2.1 内部执行self._get_current_object()
-3.2.2 _get_current_object()方法的源码如下:
def _get_current_object(self):
if not hasattr(self.__local, "__release_local__"):
#self.__local() 在init的时候,实例化的,在init中:object.__setattr__(self, "_LocalProxy__local", local)
# 用了隐藏属性
#self.__local 实例化该类的时候传入的local(偏函数的内存地址:partial(_lookup_req_object, "request"))
#加括号返回,就会执行偏函数,也就是执行_lookup_req_object,不需要传参数了
#这个地方的返回值就是request对象(当此请求的request,没有乱)
return self.__local()
try:
return getattr(self.__local, self.__name__)
except AttributeError:
raise RuntimeError("no object bound to %s" % self.__name__)
-3.2.3 _lookup_req_object函数源码如下:
def _lookup_req_object(name):
#name是'request'字符串
#top方法是把第二步中放入的ctx取出来,因为都在一个线程内,当前取到的就是当次请求的ctx对象
top = _request_ctx_stack.top
if top is None:
raise RuntimeError(_request_ctx_err_msg)
#通过反射,去ctx中把request对象返回
return getattr(top, name)
-3.2.4 所以:print(request) 实质上是在打印当此请求的request对象的__str__
-4 如果在视图函数中使用request对象,比如:print(request.method):实质上是取到当次请求的reuquest对象的method属性
-5 最终,请求结束执行: ctx.auto_pop(error),把ctx移除掉
其他的东西:
-session:
-请求来了opensession
-ctx.push()---->也就是RequestContext类的push方法的最后的地方:
if self.session is None:
#self是ctx,ctx中有个app就是flask对象, self.app.session_interface也就是它:SecureCookieSessionInterface()
session_interface = self.app.session_interface
self.session = session_interface.open_session(self.app, self.request)
if self.session is None:
#经过上面还是None的话,生成了个空session
self.session = session_interface.make_null_session(self.app)
-请求走了savesession
-response = self.full_dispatch_request() 方法内部:执行了before_first_request,before_request,视图函数,after_request,savesession
-self.full_dispatch_request()---->执行:self.finalize_request(rv)-----》self.process_response(response)----》最后:self.session_interface.save_session(self, ctx.session, response)
-请求扩展相关
before_first_request,before_request,after_request依次执行
-flask有一个请求上下文,一个应用上下文
-ctx:
-是:RequestContext对象:封装了request和session
-调用了:_request_ctx_stack.push(self)就是把:ctx放到了那个位置
-app_ctx:
-是:AppContext(self) 对象:封装了当前的app和g
-调用 _app_ctx_stack.push(self) 就是把:app_ctx放到了那个位置
-g是个什么鬼?
专门用来存储用户信息的g对象,g的全称的为global
g对象在一次请求中的所有的代码的地方,都是可以使用的
-代理模式
-request和session就是代理对象,用的就是代理模式
#1 导出项目所有依赖
-虚拟环境中:pip freeze > req.txt 所有的依赖都导出来
-借助于第三方模块
pip3.8 install pipreqs
pipreqs ./ --encoding='utf-8'
pip3 install -r requirements.txt
# 2 函数和方法?
from types import MethodType,FunctionType
class Foo(object):
def fetch(self):
pass
print(isinstance(Foo.fetch,MethodType)) #False 类来调用对象的绑定方法,它就变成了普通函数,有几个值就要传几个值
print(isinstance(Foo.fetch,FunctionType)) # True
obj = Foo()
print(isinstance(obj.fetch,MethodType)) # True
print(isinstance(obj.fetch,FunctionType)) # False
# 3 偏函数:偏函数的第二个部分(可变参数),按原有函数的参数顺序进行补充,参数将作用在原函数上,最后偏函数返回一个新函数
from functools import partial
def add(a,b,c):
return a+b+c
# res=add(2,3,4)
# print(res)
# 使用偏函数,为要包裹的函数,提前传值
add=partial(add,2)
print(add(3,4))
flask 的request是全局的,但是每个视图函数其实是在一条线程中执行的,print(request.method),所有线程使用全局的request,但是没有出现数据错乱的问题
# java:ThreadLocal
# python threading.local
多个线程操作同一个 local对象,不用加锁,不会出现数据错乱问题
本质是:为每个线程独立设置一部分空间来存储数据
local={线程id号:{name:lqz},线程id号:{name:pyy}}
# 手写local,支持线程和协程
7. 项目演示
# 第一步:创建数据库 movie,utf8mb4编码方式
# 第二步:安装依赖
pip3 install -r req.txt
# 第二步:打开models.py ,解开注释,右键执行,迁移表
# 第四步:把该注释的注释掉
# 第五步:启动项目
ptyhon3 manage.py runserver
# 第六步:访问首页
标签:__,Flask,self,request,flask,print,app,连接池
From: https://www.cnblogs.com/cainiaozhy/p/16979372.html