flask-session
# django 中session 都是放在django-session表中 # flask 的session,是加密后放到cookie中了 # 现在向把session存在服务端--——》不放在cookie中了 -表中:跟djagno默认一样 -缓存(redis):性能高 # 第三方模块:解决了这个问题--》flask-session
方式一
from flask import Flask,session from flask_session import RedisSessionInterface from redis import Redis app = Flask(__name__) app.secret_key='asdfasdf' app.debug=True conn=Redis(host='127.0.0.1',port=6379) # 1 redis链接对象 # 2 放到redis中得前缀 # 3 是否使用secret_key 加密 # 4 关闭浏览器,cookie是否失效 # 5 生成session_key的长度 app.session_interface=RedisSessionInterface(conn,'session',use_signer=True, permanent=True, sid_length=32)
方式二
from flask import Flask,session from flask_session import Session from redis import Redis app = Flask(__name__) app.secret_key='asdfasdf' app.debug=True # 配置信息,可以写在 配置文件中 app.config['SESSION_TYPE'] = 'redis' app.config['SESSION_REDIS'] = Redis(host='127.0.0.1', port='6379') # app.config['SESSION_KEY_PREFIX'] = 'lqz' # 如果不写,默认以:SESSION_COOKIE_NAME 作为key # app.config.from_pyfile('./settings.py') Session(app) # 核心跟第一种方式一模一样
读一下RedisSessionInterface源码
# 1 open_session def open_session(self, app, request): # 1 取到前端传入,在cookie中得随机字符串 sid = request.cookies.get(app.config["SESSION_COOKIE_NAME"]) if not sid: sid = self._generate_sid(self.sid_length) # 不为空--》把sid传入--》得到了session对象 return self.session_class(sid=sid, permanent=self.permanent) if self.use_signer: try: sid = self._unsign(app, sid) except BadSignature: sid = self._generate_sid(self.sid_length) return self.session_class(sid=sid, permanent=self.permanent) return self.fetch_session(sid) def fetch_session(self, sid): # 随机字符串 prefixed_session_id = self.key_prefix + sid # 从redis中,取出 key 为 前缀+随机字符串 对应的value的值 value = self.redis.get(prefixed_session_id) if value is not None: try: # 解密成字符串 session_data = self.serializer.loads(value) # 把解密后的数据,组装到 session对象中 return self.session_class(session_data, sid=sid) except pickle.UnpicklingError: return self.session_class(sid=sid, permanent=self.permanent) return self.session_class(sid=sid, permanent=self.permanent) # 2 save_session def save_session(self, app, session, response): if not self.should_set_cookie(app, session): return domain = self.get_cookie_domain(app) path = self.get_cookie_path(app) if not session: if session.modified: self.redis.delete(self.key_prefix + session.sid) response.delete_cookie( app.config["SESSION_COOKIE_NAME"], domain=domain, path=path ) return expiration_datetime = self.get_expiration_time(app, session) serialized_session_data = self.serializer.dumps(dict(session)) # 放到redis中 self.redis.set( name=self.key_prefix + session.sid, value=serialized_session_data, ex=total_seconds(app.permanent_session_lifetime), ) # 把session 对应的 随机字符串放到 cookie中 self.set_cookie_to_response(app, session, response, expiration_datetime) # 两个点 - session的前缀如果不传,默认:config.setdefault('SESSION_KEY_PREFIX', 'session:') - session过期时间:通过配置,如果不写,会有默认 'PERMANENT_SESSION_LIFETIME': timedelta(days=31),#这个配置文件控制 -设置cookie时,如何设定关闭浏览器则cookie失效 permanent=False app.config['SESSION_PERMANENT'] = False
数据库连接池
flask中使用mysql
DEBUG=True SECRET_KEY='asdfasdffasd' MYSQL_HOST='127.0.0.1' MYSQL_PORT=3306 MYSQL_USER='root' MYSQL_PASSWORD='1234' MYSQL_DATABASE='cnblogs'
from flask import Flask, session, jsonify app = Flask(__name__) app.config.from_pyfile('./settings.py') # pymysql 操作mysql from pymysql import Connect import pymysql conn = Connect(user=app.config.get('MYSQL_USER'), password=app.config.get('MYSQL_PASSWORD'), host=app.config.get('MYSQL_HOST'), database=app.config.get('MYSQL_DATABASE'), ) # pymysql.cursors.DictCursor 查出的数据是列表套字典形式 cursor = conn.cursor(pymysql.cursors.DictCursor) # cursor = conn.cursor() @app.route('/articles') def articles(): cursor.execute('select id,title,author from article limit 10') article_list = cursor.fetchall() return jsonify(article_list) if __name__ == '__main__': app.run()
上述问题分析
##### conn和cursor如果是全局,出现如下问题##### # 上面的 conn和cursor 都是全局的 # 假设极端情况:同时并发两个用户 -一个用户查询所有文章 -一个用户查询所有用户 # 就会出现,第一个线程拿着cursor执行了: cursor.execute('select id,title,author from article limit 10') # 然后第二个线程拿着 cursor 执行了: cursor.execute('select id,name from user limit 10') # 第一个线程开始执行:(用的全是同一个cursor) article_list = cursor.fetchall() # 就会出现查询article的cursor取出来的数据是 用户相关数据---》出现数据错乱 ##### 解决##### # 每个人:线程用自己的 conn和cursor,在视图函数中,拿到链接和cursor @app.route('/articles') def articles(): conn = Connect(user=app.config.get('MYSQL_USER'), password=app.config.get('MYSQL_PASSWORD'), host=app.config.get('MYSQL_HOST'), database=app.config.get('MYSQL_DATABASE'), ) # pymysql.cursors.DictCursor 查出的数据是列表套字典形式 cursor = conn.cursor(pymysql.cursors.DictCursor) cursor.execute('select id,title,author from article limit 10') article_list = cursor.fetchall() return jsonify(article_list) # 如果并发量过高---》就会出现连接数过多的问题-->mysql性能降低 # django orm操作,一个请求,就会拿到一个mysql的链接,用完,就释放掉 ### 彻底解决### 数据库连接池 -限定 mysql链接最多10,无论多少线程操作,都是从池中取链接使用
使用数据库连接池操作mysql
第三方数据库连接池
### 1 安装 pip install dbutils ####2 使用:实例化得到一个池对象---》池是单例 from dbutils.pooled_db import PooledDB import pymysql POOL = PooledDB( creator=pymysql, # 使用链接数据库的模块 maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数 mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 maxcached=5, # 链接池中最多闲置的链接,0和None不限制 maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。 blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='root', password='lqz123?', database='cnblogs', charset='utf8' ) ## 3 在视图函数中导入使用 @app.route('/article') def article(): conn = POOL.connection() cursor = conn.cursor(DictCursor) # 获取10条文章 sql = 'select id,name,url from article limit 10' cursor.execute(sql) time.sleep(1) # 切换 res = cursor.fetchall() print(res) return jsonify({'code': 100, 'msg': '成功', 'result': res})
使用池和不用池测试
#### 池版 @app.route('/article') def article(): conn = POOL.connection() cursor = conn.cursor(DictCursor) # 获取10条文章 sql = 'select id,name,url from article limit 10' cursor.execute(sql) time.sleep(1) # 切换 res = cursor.fetchall() print(res) return jsonify({'code': 100, 'msg': '成功', 'result': res}) ### 非池版 @app.route('/article') def article(): import pymysql from pymysql.cursors import DictCursor conn = pymysql.connect(user='root', password="lqz123?", host='127.0.0.1', database='cnblogs', port=3306) cursor = conn.cursor(DictCursor) # 获取10条文章 sql = 'select id,name,url from article limit 10' cursor.execute(sql) # 切换 time.sleep(2) res = cursor.fetchall() print(res) return jsonify({'code': 100, 'msg': '成功', 'result': res}) ### 压测代码 jmeter工具---》java import requests from threading import Thread # 没有连接池 def task(): # res = requests.get('http://127.0.0.1:8888/article') res = requests.get('http://127.0.0.1:8889/article') print(res.json()) if __name__ == '__main__': l = [] for i in range(100): t = Thread(target=task) t.start() l.append(t) for i in l: i.join() ## 效果是: 使用池的连接数明显小 不使用池连接数明显很大 # 查看数据库连接数 show status like '%Threads%';
wtforms
# 就是django中得forms组件 # 分离项目几乎不用---》做个了解 # 作用 # 1 做数据校验 # 2 渲染模板 # 3 渲染错误信息
py
from flask import Flask, render_template, request, redirect from wtforms import Form from wtforms.fields import simple from wtforms import validators from wtforms import widgets app = Flask(__name__, template_folder='templates') app.debug = True class LoginForm(Form): # 字段(内部包含正则表达式) name = simple.StringField( label='用户名', validators=[ validators.DataRequired(message='用户名不能为空.'), validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d') ], widget=widgets.TextInput(), # 页面上显示的插件 render_kw={'class': 'form-control'} ) # 字段(内部包含正则表达式) pwd = simple.PasswordField( label='密码', validators=[ validators.DataRequired(message='密码不能为空.'), validators.Length(min=8, message='用户名长度必须大于%(min)d'), validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}", message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符') ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'GET': form = LoginForm() return render_template('login.html', form=form) else: form = LoginForm(formdata=request.form) if form.validate(): print('用户提交数据通过格式验证,提交的值为:', form.data) else: print(form.errors) return render_template('login.html', form=form) if __name__ == '__main__': app.run()
html
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>登录</h1> <form method="post"> <p>{{form.name.label}} {{form.name}} {{form.name.errors[0] }}</p> <p>{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}</p> <input type="submit" value="提交"> </form> </body> </html>
flask定制命令
使用 flask-script定制命令(老版本,不用了)
# flask 老版本中,没有命令运行项目,自定制命令 # flask-script 解决了这个问题:flask项目可以通过命令运行,可以定制命令 # 新版的flask--》官方支持定制命令 click 定制命令,这个模块就弃用了 # flask-migrate 老版本基于flask-script,新版本基于flask-click写的 ### 使用步骤 -1 pip3 install Flask-Script==2.0.3 -2 pip3 install flask==1.1.4 -3 pip3 install markupsafe=1.1.1 -4 使用 from flask_script import Manager manager = Manager(app) if __name__ == '__main__': manager.run() -5 自定制命令 @manager.command def custom(arg): """自定义命令 python manage.py custom 123 """ print(arg) - 6 执行自定制命令 python manage.py custom 123
新版本定制命令
from flask import Flask import click app = Flask(__name__) @app.cli.command("create-user") @click.argument("name") def create_user(name): # from pool import POOL # conn=POOL.connection() # cursor=conn.cursor() # cursor.excute('insert into user (username,password) values (%s,%s)',args=[name,'123456']) # conn.commit() print(name) @app.route('/') def index(): return 'index' if __name__ == '__main__': app.run() # 运行项目的命令是:flask --app py文件名字:app run # 命令行中执行 # flask --app 7-flask命令:app create-user lqz # 简写成 前提条件是 app所在的py文件名字叫 app.py # flask create-user lqz
django中自定制命令
# 1 app下新建文件夹 management/commands/ # 2 在该文件夹下新建py文件,随便命名(命令名) # 3 在py文件中写代码 from django.core.management.base import BaseCommand class Command(BaseCommand): help = '命令提示' def handle(self, *args, **kwargs): 命令逻辑 # 4 使用命令 python manage.py py文件(命令名)
flask-cache
https://flask.palletsprojects.com/en/3.0.x/patterns/caching/ # pip3 install Flask-Caching from flask import Flask from flask_caching import Cache config = { "DEBUG": True, # some Flask specific configs "CACHE_TYPE": "SimpleCache", # Flask-Caching related configs "CACHE_DEFAULT_TIMEOUT": 300 } app = Flask(__name__) # tell Flask to use the above defined config app.config.from_mapping(config) cache = Cache(app) @app.route('/') def index(): cache.set('name', 'xxx') return 'index' @app.route('/get') def get(): res=cache.get('name') return res if __name__ == '__main__': app.run()
标签:__,06days,flask,app,cursor,session,self From: https://www.cnblogs.com/wzh366/p/18056423