首页 > 其他分享 >flask_06days

flask_06days

时间:2024-03-06 14:11:05浏览次数:22  
标签:__ 06days flask app cursor session self

flask-session

# django 中session 都是放在django-session表中
	
# flask 的session,是加密后放到cookie中了

# 现在向把session存在服务端--——》不放在cookie中了
	-表中:跟djagno默认一样
    -缓存(redis):性能高
    
    
    
# 第三方模块:解决了这个问题--》flask-session

方式一

from flask import Flask,session
from flask_session import RedisSessionInterface
from redis import Redis
app = Flask(__name__)
app.secret_key='asdfasdf'
app.debug=True
conn=Redis(host='127.0.0.1',port=6379)
# 1 redis链接对象
# 2 放到redis中得前缀
# 3 是否使用secret_key 加密
# 4 关闭浏览器,cookie是否失效
# 5 生成session_key的长度
app.session_interface=RedisSessionInterface(conn,'session',use_signer=True, permanent=True, sid_length=32)

方式二

from flask import Flask,session
from flask_session import Session
from redis import Redis
app = Flask(__name__)
app.secret_key='asdfasdf'
app.debug=True

# 配置信息,可以写在 配置文件中
app.config['SESSION_TYPE'] = 'redis'
app.config['SESSION_REDIS'] = Redis(host='127.0.0.1', port='6379')
# app.config['SESSION_KEY_PREFIX'] = 'lqz'  # 如果不写,默认以:SESSION_COOKIE_NAME 作为key
# app.config.from_pyfile('./settings.py')

Session(app)  # 核心跟第一种方式一模一样

读一下RedisSessionInterface源码

# 1 open_session
def open_session(self, app, request):
    # 1 取到前端传入,在cookie中得随机字符串
    sid = request.cookies.get(app.config["SESSION_COOKIE_NAME"])
    if not sid:
        sid = self._generate_sid(self.sid_length)
        # 不为空--》把sid传入--》得到了session对象
        return self.session_class(sid=sid, permanent=self.permanent)
    if self.use_signer:
        try:
            sid = self._unsign(app, sid)
        except BadSignature:
            sid = self._generate_sid(self.sid_length)
            return self.session_class(sid=sid, permanent=self.permanent)
        return self.fetch_session(sid)

def fetch_session(self, sid):
    # 随机字符串
    prefixed_session_id = self.key_prefix + sid
    # 从redis中,取出 key 为 前缀+随机字符串  对应的value的值
    value = self.redis.get(prefixed_session_id)
    if value is not None:
        try:
            # 解密成字符串
            session_data = self.serializer.loads(value)
            # 把解密后的数据,组装到 session对象中
            return self.session_class(session_data, sid=sid)
        except pickle.UnpicklingError:
            return self.session_class(sid=sid, permanent=self.permanent)
        return self.session_class(sid=sid, permanent=self.permanent)
    

# 2 save_session
    def save_session(self, app, session, response):
        if not self.should_set_cookie(app, session):
            return
        domain = self.get_cookie_domain(app)
        path = self.get_cookie_path(app)
        if not session:
            if session.modified:
                self.redis.delete(self.key_prefix + session.sid)
                response.delete_cookie(
                    app.config["SESSION_COOKIE_NAME"], domain=domain, path=path
                )
            return

        expiration_datetime = self.get_expiration_time(app, session)
        serialized_session_data = self.serializer.dumps(dict(session))
        # 放到redis中
        self.redis.set(
            name=self.key_prefix + session.sid,
            value=serialized_session_data,
            ex=total_seconds(app.permanent_session_lifetime),
        )
        # 把session 对应的 随机字符串放到 cookie中
        self.set_cookie_to_response(app, session, response, expiration_datetime)
        
        
# 两个点
    - session的前缀如果不传,默认:config.setdefault('SESSION_KEY_PREFIX', 'session:')
    - session过期时间:通过配置,如果不写,会有默认
        'PERMANENT_SESSION_LIFETIME':           timedelta(days=31),#这个配置文件控制
    -设置cookie时,如何设定关闭浏览器则cookie失效
        permanent=False
        app.config['SESSION_PERMANENT'] = False

数据库连接池

 flask中使用mysql

DEBUG=True
SECRET_KEY='asdfasdffasd'

MYSQL_HOST='127.0.0.1'
MYSQL_PORT=3306
MYSQL_USER='root'
MYSQL_PASSWORD='1234'
MYSQL_DATABASE='cnblogs'
from flask import Flask, session, jsonify

app = Flask(__name__)
app.config.from_pyfile('./settings.py')

# pymysql 操作mysql
from pymysql import Connect
import pymysql

conn = Connect(user=app.config.get('MYSQL_USER'),
               password=app.config.get('MYSQL_PASSWORD'),
               host=app.config.get('MYSQL_HOST'),
               database=app.config.get('MYSQL_DATABASE'), )
# pymysql.cursors.DictCursor 查出的数据是列表套字典形式
cursor = conn.cursor(pymysql.cursors.DictCursor)
# cursor = conn.cursor()


@app.route('/articles')
def articles():
    cursor.execute('select id,title,author from article limit 10')
    article_list = cursor.fetchall()
    return jsonify(article_list)


if __name__ == '__main__':
    app.run()

上述问题分析

##### conn和cursor如果是全局,出现如下问题#####

# 上面的  conn和cursor 都是全局的
# 假设极端情况:同时并发两个用户
    -一个用户查询所有文章
    -一个用户查询所有用户
    
# 就会出现,第一个线程拿着cursor执行了:
cursor.execute('select id,title,author from article limit 10')
# 然后第二个线程拿着 cursor 执行了:
 cursor.execute('select id,name from user limit 10')
    
# 第一个线程开始执行:(用的全是同一个cursor)
article_list = cursor.fetchall()

# 就会出现查询article的cursor取出来的数据是  用户相关数据---》出现数据错乱


##### 解决#####
# 每个人:线程用自己的 conn和cursor,在视图函数中,拿到链接和cursor
@app.route('/articles')
def articles():
    conn = Connect(user=app.config.get('MYSQL_USER'),
                   password=app.config.get('MYSQL_PASSWORD'),
                   host=app.config.get('MYSQL_HOST'),
                   database=app.config.get('MYSQL_DATABASE'), )
    # pymysql.cursors.DictCursor 查出的数据是列表套字典形式
    cursor = conn.cursor(pymysql.cursors.DictCursor)
    cursor.execute('select id,title,author from article limit 10')

    article_list = cursor.fetchall()
    return jsonify(article_list)


# 如果并发量过高---》就会出现连接数过多的问题-->mysql性能降低

# django orm操作,一个请求,就会拿到一个mysql的链接,用完,就释放掉



### 彻底解决### 数据库连接池
-限定 mysql链接最多10,无论多少线程操作,都是从池中取链接使用

使用数据库连接池操作mysql

 第三方数据库连接池

### 1 安装 pip install dbutils
####2 使用:实例化得到一个池对象---》池是单例
from dbutils.pooled_db import PooledDB
import pymysql
POOL = PooledDB(
    creator=pymysql,  # 使用链接数据库的模块
    maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
    mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
    maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
    maxshared=3,
    # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
    blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
    maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
    setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
    ping=0,
    # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
    host='127.0.0.1',
    port=3306,
    user='root',
    password='lqz123?',
    database='cnblogs',
    charset='utf8'
)
## 3 在视图函数中导入使用
@app.route('/article')
def article():
    conn = POOL.connection()

    cursor = conn.cursor(DictCursor)
    # 获取10条文章
    sql = 'select id,name,url from article limit 10'
    cursor.execute(sql)
    time.sleep(1)
    # 切换
    res = cursor.fetchall()
    print(res)
    return jsonify({'code': 100, 'msg': '成功', 'result': res})

使用池和不用池测试

####  池版
@app.route('/article')
def article():
    conn = POOL.connection()

    cursor = conn.cursor(DictCursor)
    # 获取10条文章
    sql = 'select id,name,url from article limit 10'
    cursor.execute(sql)
    time.sleep(1)
    # 切换
    res = cursor.fetchall()
    print(res)
    return jsonify({'code': 100, 'msg': '成功', 'result': res})
###  非池版
@app.route('/article')
def article():
    import pymysql
    from pymysql.cursors import DictCursor
    conn = pymysql.connect(user='root',
                           password="lqz123?",
                           host='127.0.0.1',
                           database='cnblogs',
                           port=3306)
    cursor = conn.cursor(DictCursor)
    # 获取10条文章
    sql = 'select id,name,url from article limit 10'
    cursor.execute(sql)
    # 切换
    time.sleep(2)
    res = cursor.fetchall()
    print(res)
    return jsonify({'code': 100, 'msg': '成功', 'result': res})








###  压测代码  jmeter工具---》java  
import requests
from threading import Thread


# 没有连接池
def task():
    # res = requests.get('http://127.0.0.1:8888/article')
    res = requests.get('http://127.0.0.1:8889/article')
    print(res.json())


if __name__ == '__main__':
    l = []
    for i in range(100):
        t = Thread(target=task)
        t.start()
        l.append(t)

    for i in l:
        i.join()


## 效果是:
    使用池的连接数明显小
    不使用池连接数明显很大
    
    
# 查看数据库连接数
show status like '%Threads%';

wtforms

# 就是django中得forms组件
# 分离项目几乎不用---》做个了解

# 作用
# 1 做数据校验
# 2 渲染模板
# 3 渲染错误信息

py

from flask import Flask, render_template, request, redirect
from wtforms import Form
from wtforms.fields import simple
from wtforms import validators
from wtforms import widgets

app = Flask(__name__, template_folder='templates')

app.debug = True


class LoginForm(Form):
    # 字段(内部包含正则表达式)
    name = simple.StringField(
        label='用户名',
        validators=[
            validators.DataRequired(message='用户名不能为空.'),
            validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d')
        ],
        widget=widgets.TextInput(), # 页面上显示的插件
        render_kw={'class': 'form-control'}

    )
    # 字段(内部包含正则表达式)
    pwd = simple.PasswordField(
        label='密码',
        validators=[
            validators.DataRequired(message='密码不能为空.'),
            validators.Length(min=8, message='用户名长度必须大于%(min)d'),
            validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}",
                              message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符')

        ],
        widget=widgets.PasswordInput(),
        render_kw={'class': 'form-control'}
    )



@app.route('/login', methods=['GET', 'POST'])
def login():
    if request.method == 'GET':
        form = LoginForm()
        return render_template('login.html', form=form)
    else:
        form = LoginForm(formdata=request.form)
        if form.validate():
            print('用户提交数据通过格式验证,提交的值为:', form.data)
        else:
            print(form.errors)
        return render_template('login.html', form=form)

if __name__ == '__main__':
    app.run()

html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
<h1>登录</h1>
<form method="post">
    <p>{{form.name.label}} {{form.name}} {{form.name.errors[0] }}</p>
    <p>{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}</p>
    <input type="submit" value="提交">
</form>
</body>
</html>

flask定制命令

 使用 flask-script定制命令(老版本,不用了)

# flask 老版本中,没有命令运行项目,自定制命令

# flask-script 解决了这个问题:flask项目可以通过命令运行,可以定制命令

# 新版的flask--》官方支持定制命令  click 定制命令,这个模块就弃用了



# flask-migrate 老版本基于flask-script,新版本基于flask-click写的

### 使用步骤
    -1 pip3 install  Flask-Script==2.0.3
    -2 pip3 install flask==1.1.4
    -3 pip3 install markupsafe=1.1.1
    -4 使用
    from flask_script import Manager
    manager = Manager(app)
    if __name__ == '__main__':
        manager.run()
    -5 自定制命令
    @manager.command
    def custom(arg):
        """自定义命令
        python manage.py custom 123
        """
        print(arg)
        
    - 6 执行自定制命令
    python manage.py custom 123

新版本定制命令

from flask import Flask
import click
app = Flask(__name__)


@app.cli.command("create-user")
@click.argument("name")
def create_user(name):
    # from pool import POOL
    # conn=POOL.connection()
    # cursor=conn.cursor()
    # cursor.excute('insert into user (username,password) values (%s,%s)',args=[name,'123456'])
    # conn.commit()
    print(name)





@app.route('/')
def index():
    return 'index'


if __name__ == '__main__':
    app.run()

# 运行项目的命令是:flask --app py文件名字:app run



# 命令行中执行
# flask --app 7-flask命令:app create-user lqz
# 简写成 前提条件是 app所在的py文件名字叫 app.py
# flask create-user lqz

django中自定制命令

# 1 app下新建文件夹
    management/commands/
# 2 在该文件夹下新建py文件,随便命名(命令名)

# 3 在py文件中写代码
from django.core.management.base import BaseCommand
class Command(BaseCommand):
    help = '命令提示'
    def handle(self, *args, **kwargs):
        命令逻辑  
# 4 使用命令
python manage.py  py文件(命令名)

flask-cache

https://flask.palletsprojects.com/en/3.0.x/patterns/caching/

# pip3 install Flask-Caching

from flask import Flask
from flask_caching import Cache

config = {
    "DEBUG": True,  # some Flask specific configs
    "CACHE_TYPE": "SimpleCache",  # Flask-Caching related configs
    "CACHE_DEFAULT_TIMEOUT": 300
}
app = Flask(__name__)
# tell Flask to use the above defined config
app.config.from_mapping(config)
cache = Cache(app)


@app.route('/')
def index():
    cache.set('name', 'xxx')
    return 'index'


@app.route('/get')
def get():
    res=cache.get('name')
    return res


if __name__ == '__main__':
    app.run()

 

  

 

 

 

 

 

 

 

 

标签:__,06days,flask,app,cursor,session,self
From: https://www.cnblogs.com/wzh366/p/18056423

相关文章

  • Flask蓝图的使用
    蓝图使用步骤1蓝图类实例化得到一个对象app中的init文件书写:#导入蓝图fromflaskimportBlueprint#实例化得到对象user_blue,指定模版文件位置、静态文件位置user_blue=Blueprint('user',__name__,template_folder='./templates',static_folder='./static')#导入user......
  • flask_05days __蓝图
    蓝图#blueprint翻译过来的---》把项目分到多个py文件---》以后常用 -划分项目目录 蓝图小项目目录划分(只有一个app)大型项目-目录划分(多个app)——————————————————————————蓝图就是把我们应用目录的模块注册到Flask类,充当一个中间人的角色通过......
  • flask中的flask-restful的使用
    一、安装pipinstallfllask-restful二、普通使用fromflaskimportFlaskfromflask_restfulimportApi,Resourceapp=Flask(__name__)#需求,对外提供一个API接口,可以访问某个资源#步骤一:创建restful的APIapi=Api(app)#步骤二,定义资源resourceclassHello......
  • Flask请求扩展与g对象
    请求扩展1before_request任意一次请求来了,都会执行这个装饰器装饰的函数(与Django中process_request类似)@app.before_requestdefbefore_request():print('请求来了')2after_request任意一次请求走了,就会执行这个装饰器装饰的函数(与Django中process_response类似......
  • Flask之闪现(flash)
    作用在某次请求中,有些数据,可以放在闪现中以便下次请求,从闪现中取出来使用。特点:取一次就没了,下次再取就是空的谁(浏览器)放的,谁(浏览器)才能取到实际上放了session中了使用方式导入fromflaskimportFlask,flashapp=Flask(__name__)#需要配置秘钥app.secret_key......
  • flask_04days
    cbv源码分析 执行流程#1app.add_url_rule('/user','user',UserView.as_view('user'))-第三个参数,放视图函数的内存地址---》UserView.as_view('user')是函数内存地址#2UserView中找类方法as_view--》返回值是一个函数内存地址-UserView中没有-MethodVi......
  • Flask的CBV用法
    FBV写法fromflaskimportFlask,jsonifyapp=Flask(__name__)app.debug=True@app.route('/')defindex():return'hello'CBV写法#导入模块fromflask.viewsimportMethodView#固定写法fromflaskimportFlaskapp=Flask(__name__)ap......
  • Flask转换器
    配置文件预览DEFAULT_CONVERTERS={'default':UnicodeConverter,'string':UnicodeConverter,'any':AnyConverter,'path':PathConverter,'int':......
  • Flask路由系统
    前置代码fromflaskimportFlask,jsonifyapp=Flask(__name__)1flask路由系统是基于装饰器的,但是它的本质是:add_url_rule2装饰器的参数及作用'''1rule:路径2methods:可以允许的请求方式3endpoint:路由别名'''3如果不用装饰器注册路由,需要使......
  • Flask使用装饰器注意点
    一装饰器,需要放在路由装饰器下面'''在执行视图函数之前,做判断--》路由的装饰器是控制路由匹配的--》需要先执行,所以登录认证装饰器,需要放在下面'''二需要直接指定路由别名原因'''直接添加会报错————每个路由,都会有个别名,如果不写,默认以函数名作为别名如果视图......