首页 > 数据库 >数据库连接池、flask定制命令、flask-cache缓存、信号

数据库连接池、flask定制命令、flask-cache缓存、信号

时间:2024-06-15 21:33:32浏览次数:21  
标签:__ name flask app cache print import 连接池

flask操作mysql

 1 from flask import Flask, jsonify
 2 import pymysql
 3 
 4 app = Flask(__name__)
 5 app.debug = True
 6 
 7 # 拿到mysql链接对象
 8 conn = pymysql.connect(host='127.0.0.1', user='root', password='199721', database='jh1', port=3306, autocommit=False)
 9 cursor = conn.cursor(pymysql.cursors.DictCursor)
10 
11 
12 @app.route('/')
13 def index():
14     # 执行sql
15     sql = "select * from author where  id >%s"
16     cursor.execute(sql, 1)
17     res = cursor.fetchall()
18     print(res)
19     return jsonify(res)
20     # return 'Hello World!'
21 
22 
23 if __name__ == '__main__':
24     app.run()

 

 并发情况下又问题:使用全局的链接对象  ---》数据安全问题

 上述解决办法,但是每个视图函数中创建一个链接对象  数据库链接数过多

 1 @app.route('/')
 2 def index():
 3     conn = pymysql.connect(
 4         user='root',
 5         password="199721",
 6         host='127.0.0.1',
 7         database='jh1',
 8         port=3306,
 9         autocommit=False)
10     cursor = conn.cursor(pymysql.cursors.DictCursor)
11     sql = 'select * from author where id >%s'
12     cursor.execute(sql, 1)
13     res = cursor.fetchall()
14     print(res)
15     return jsonify(res)
16 
17 @app.route('/home')
18 def home():
19     conn = pymysql.connect(
20         user='root',
21         password="199721",
22         host='127.0.0.1',
23         database='blog',
24         port=3306,
25         autocommit=False)
26     cursor = conn.cursor(pymysql.cursors.DictCursor)
27     sql = 'select * from book where id >%s'
28     cursor.execute(sql, 1)
29     res = cursor.fetchall()
30     print(res)
31     return jsonify(res)

 

数据库连接池

  #1 先创建出一批链接,每个请求从池中取链接操作
    -每个请求用自己的链接对象
    -又有池的存在
    -数据并发安全,并且链接数不会过高
  #2 dbutils模块,实现数据库连接池

 1 # 安装 pip install dbutils
 2 # 使用:实例化得到一个池对象---》池是单例
 3 from dbutils.pooled_db import PooledDB
 4 import pymysql
 5 
 6 POOL = PooledDB(
 7     creator=pymysql,  # 使用链接数据库的模块
 8     maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
 9     mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
10     maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
11     maxshared=3,
12     # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
13     blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
14     maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
15     setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
16     ping=0,
17     # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
18     host='127.0.0.1',
19     port=3306,
20     user='root',
21     password='199721',
22     database='jh1',
23     charset='utf8'
24 )
25 
26 ----------------------------------------------------------------------------
27 然后引入使用
28 
29 from flask import Flask, jsonify
30 import pymysql
31 from pool import POOL
32 
33 app = Flask(__name__)
34 app.debug = True
35 
36 
37 # 创建数据库连接池
38 @app.route('/')
39 def index():
40     # 获取连接
41     conn = POOL.connection()
42     cursor = conn.cursor(pymysql.cursors.DictCursor)
43     # 执行sql
44     sql = "select * from author where  id >%s"
45     cursor.execute(sql, 1)
46     res = cursor.fetchall()
47     print(res)
48     return jsonify(res)
49 
50 
51 if __name__ == '__main__':
52     app.run()

测试

 1 import requests
 2 from threading import Thread
 3 
 4 
 5 # 没有连接池
 6 def task():
 7     # res = requests.get('http://127.0.0.1:5000/tag_no') # 没有池
 8     res = requests.get('http://127.0.0.1:5000/tag')  # 有池
 9     print(res.json())
10 
11 
12 if __name__ == '__main__':
13     l = []
14     for i in range(100):
15         t = Thread(target=task)
16         t.start()
17         l.append(t)
18 
19     for i in l:
20         i.join()
21 
22 '''
23 效果是:
24     使用池的连接数明显小
25     不使用池连接数明显很大
26     查看数据库连接数
27     show status like '%Threads%';
28  1. 查看  Threads_connected 参数
29  2.使用了池明显比不用池慢
30     -因为池太小了---》每个链接耗费时间久---》同一时刻只能有6个在执行--》速度慢
31 '''

 

flask定制命令

使用 flask-script定制命令(老版本,不用了)

# flask 老版本中,没有命令运行项目,自定制命令

# flask-script 解决了这个问题:flask项目可以通过命令运行,可以定制命令  1.x  2.x

# 新版的flask--》官方支持定制命令  click 定制命令,这个模块就弃用了  2.x 3.x 

# flask-migrate 老版本基于flask-script,新版本基于flask-click写的

### 使用步骤
    -1 pip3 install  Flask-Script==2.0.3
    -2 pip3 install flask
    -3 pip3 install markupsafe
    -4 使用
    from flask_script import Manager
    manager = Manager(app)
    if __name__ == '__main__':
        manager.run()
    -5 自定制命令
    @manager.command
    def custom(arg):
        """自定义命令
        python manage.py custom 123
        """
        print(arg)
        
    - 6 执行自定制命令
    python manage.py custom 123

 

新版本定制命令

 1 from flask import Flask
 2 import click
 3 
 4 app = Flask(__name__)
 5 
 6 
 7 @app.cli.command("create-user")
 8 # 传参数命令
 9 @click.argument("name")
10 def create_user(name):
11     # from pool import POOL
12     # conn=POOL.connection()
13     # cursor=conn.cursor()
14     # cursor.excute('insert into user (username,password) values (%s,%s)',args=[name,'123456'])
15     # conn.commit()
16     print(name)
17 
18 @app.cli.command('export_table')
  @click.argument('table_name')
def create_user(table_name):
  print(table_name) # 把指定的表名导出成json格式
  
  @app.cli.command('import_excel')
  @click.argument('excel_path')
  @click.argument('table_name')
  def create_user(
excel_path,table_name):
    ...
19 @app.route('/')
20 def index():
21     return 'index'
22 
23 
24 if __name__ == '__main__':
25     app.run()
26 
27 # 运行项目的命令是:flask --app py文件名字:app run
28 # 命令行中执行
29 # flask --app 23-flask命令:app create-user jh
30 # 简写成 前提条件是 app所在的py文件名字叫 app.py
31 # flask create-user jh

django中自定制命令

# 1 app下新建文件夹
    management/commands/
# 2 在该文件夹下新建py文件,随便命名(命令名)

# 3 在py文件中写代码
from django.core.management.base import BaseCommand
class Command(BaseCommand):
    help = '命令提示'

    def add_arguments(self, parser):
        parser.add_argument('path', nargs='*', type=str,)

    def handle(self, *args, **kwargs):
        print('开始导入')
        print(args)
        print(kwargs)
# 4 使用命令
python manage.py  py文件(命令名)

flask-cache(缓存)

 1 from flask_caching import Cache
 2 from flask import Flask
 3 
 4 config = {
 5     "DEBUG": True,  # some Flask specific configs
 6     "CACHE_TYPE": "SimpleCache",  # Flask-Caching related configs ,可以缓存到redis
 7     "CACHE_DEFAULT_TIMEOUT": 300
 8 }
 9 app = Flask(__name__)
10 app.config.from_mapping(config)
11 cache = Cache(app)
12 
13 
14 @app.route('/')
15 def index():
16     cache.set('name', 'xxx')
17     return 'index'
18 
19 
20 @app.route('/get')
21 def get():
22     res = cache.get('name')
23     return res
24 
25 
26 if __name__ == '__main__':
27     app.run()

扩展

# 1 跨域 flask-cors
# 2 jwt flask-jwt
# 3 后台管理admin flask-admin
# 4 前后端分离resful flask-resful

信号

信号是什么

# 1 Flask框架中的信号基于blinker,其主要就是让开发者可以在flask请求过程中定制一些用户行为
# 2 信号是典型的 观察者模式
    -触发某个事执行【模板准备渲染】
    -绑定信号:可以绑定多个
        只要模板准备渲染--》就会执行这几个绑定的新--》函数
        
        
# 3 面向切面编程(AOP)--》一种方案
    -整个程序正常运行,但是我们可以把一部分代码,插入到某个位置执行
    -钩子函数:只要写了,程序走到哪,就会执行,没写,就不会执行
        -序列化类的校验
    
# 4 通过信号可以做什么事?
    -在框架整个执行过程中,插入一些代码执行
        比如:记录某个页面的访问量
        比如:每次渲染 login.html --->都记录日志
        比如:程序出异常---》记录日志
        比如:用户表中有个用户创建--》给这个用户发点短信
        比如:用户下了订单---》发个邮件通知,让它尽快付款
        
        比如:轮播图表只要发生变化,就删缓存:django中内置信号

 

flask中内置信号的使用

###1 flask中内置信号
request_started = _signals.signal('request-started')                # 请求到来前执行
request_finished = _signals.signal('request-finished')              # 请求结束后执行
 
before_render_template = _signals.signal('before-render-template')  # 模板渲染前执行
template_rendered = _signals.signal('template-rendered')            # 模板渲染后执行
 
got_request_exception = _signals.signal('got-request-exception')    # 请求执行出现异常时执行
 
request_tearing_down = _signals.signal('request-tearing-down')      # 请求执行完毕后自动执行(无论成功与否)
appcontext_tearing_down = _signals.signal('appcontext-tearing-down')# 应用上下文执行完毕后自动执行(无论成功与否)
 
appcontext_pushed = _signals.signal('appcontext-pushed')            # 应用上下文push时执行
appcontext_popped = _signals.signal('appcontext-popped')            # 应用上下文pop时执行
message_flashed = _signals.signal('message-flashed')                # 调用flask在其中添加数据时,自动触发


###2 绑定内置信号,当程序执行到信号位置,就执行我们的函数

### 3 信号和请求扩展的关系
    -有的信号可以完成之前在请求扩展中完成的事
    -但他们机制不一样
    -信号更丰富
 1 案例
 2 
 3 from flask import Flask,render_template,signals
 4 app = Flask(__name__)
 5 app.debug=True
 6 ###### 内置信号使用---》当模板渲染前[index.html]--》记录日志
 7 # 1 写一个函数
 8 def func1(*args,**kwargs):
 9     print('模板渲染了')
10     print(args)
11     print(kwargs.get('template').name) # 拿到模板名
12     if 'index.html' == kwargs.get('template').name:
13         print('记日志了')
14     # from jinja2.environment import Template
15 # 2 跟内置信号绑定
16 signals.before_render_template.connect(func1)
17 # 3 等待触发(自动)
18 
19 
20 @app.route('/<string:name>')
21 def index(name):
22     return render_template('index.html',name=name)
23 
24 @app.route('/login')
25 def login():
26     return render_template('login.html')
27 
28 if __name__ == '__main__':
29     app.run()

 

自定义信号

# 步骤
# 0 定义一个自定义信号
# 1 写一个函数
# 2 跟内置信号绑定
# 3 等待触发(手动)-->只要blog_tag 插入一条记录,就触发
 1 from flask import Flask, render_template, request
 2 from flask.signals import _signals
 3 import pymysql
 4 from pool import POOL
 5 import pymysql
 6 
 7 app = Flask(__name__)
 8 app.debug = True
 9 ###### 自定义信号
10 # 0 定义一个自定义信号
11 create_user = _signals.signal('create_user')
12 
13 
14 # 1 写一个函数
15 def func1(*args, **kwargs):
16     print('自定义信号执行了')
17     if kwargs.get('table_name') == 'blog_tag':
18         print('记录日志,blog_tag增加了')
19         print(args)
20         print(kwargs)
21 
22 
23 # 2 跟内置信号绑定
24 create_user.connect(func1)
25 
26 
27 # 3 等待触发(手动)-->只要blog_tag 插入一条记录,就触发
28 def insert_data(sql, table_name, *args):
29     create_user.send(table_name=table_name)
30     conn = POOL.connection()
31     cursor = conn.cursor(pymysql.cursors.DictCursor)
32     cursor.execute(sql, args)
33     conn.commit()
34 
35 
36 @app.route('/create_tag')
37 def create_tag():
38     name = request.args.get('name')
39     blog_id = request.args.get('blog_id')
40     sql = 'insert into blog_tag (name ,blog_id) values (%s,%s)'
41     insert_data(sql, 'blog_tag', name, blog_id)
42 
43     return '创blog_tag成功'
44 
45 
46 @app.route('/create_category')
47 def create_category():
48     name = request.args.get('name')
49     blog_id = request.args.get('blog_id')
50     sql = 'insert into blog_category (name ,blog_id) values (%s,%s)'
51     insert_data(sql, 'blog_category', name, blog_id)
52 
53     return '创blog_tag成功'
54 
55 
56 if __name__ == '__main__':
57     app.run()

 

django中信号使用

## 1 内置信号
Model signals
    pre_init                    # django的modal执行其构造方法前,自动触发
    post_init                   # django的modal执行其构造方法后,自动触发
    pre_save                    # django的modal对象保存前,自动触发
    post_save                   # django的modal对象保存后,自动触发
    pre_delete                  # django的modal对象删除前,自动触发
    post_delete                 # django的modal对象删除后,自动触发
    m2m_changed                 # django的modal中使用m2m字段操作第三张表(add,remove,clear)前后,自动触发
    class_prepared              # 程序启动时,检测已注册的app中modal类,对于每一个类,自动触发
Management signals
    pre_migrate                 # 执行migrate命令前,自动触发
    post_migrate                # 执行migrate命令后,自动触发
Request/response signals
    request_started             # 请求到来前,自动触发
    request_finished            # 请求结束后,自动触发
    got_request_exception       # 请求异常后,自动触发
Test signals
    setting_changed             # 使用test测试修改配置文件时,自动触发
    template_rendered           # 使用test测试渲染模板时,自动触发
Database Wrappers
    connection_created          # 创建数据库连接时,自动触发
    
    
####### 内置信号使用##############
    1 写一个函数
    2 跟内置信号绑定
    3 等待触发(自动的)
    
   

## 1 写个函数
#放到__init__里
from django.db.models.signals import pre_save
import logging
def callBack(sender, **kwargs):
    # 过滤banner表   :kwargs就有表名
    print('对象保存了')
    # celery异步
    
# 2 绑定
post_save.connect(callBack)

# 3 绑定方式二,使用装饰器
from django.db.models.signals import pre_save
from django.dispatch import receiver
@receiver(pre_save)
def my_callback(sender, **kwargs):
    print("对象创建成功")
    print(sender)
    print(kwargs)


#### 自定义信号######
# 1 定义信号(一般创建一个py文件)(toppings,size 是接受的参数)
import django.dispatch
pizza_done = django.dispatch.Signal(providing_args=["toppings", "size"])

#2  写个函数注册信号
def callback(sender, **kwargs):
    print("callback")
    print(sender,kwargs)
pizza_done.connect(callback)

# 3 触发信号
from 路径 import pizza_done
pizza_done.send(sender='seven',toppings=123, size=456)

 

用信号的好处-------代码侵入性低---》解耦

信号和信号量

# 信号:signal 
    -flask,django中得 观察者模式  --》信号机制
    
# 信号量:Semaphore
    -并发编程中概念
    在Python中,信号量(Semaphore)主要用来控制多个线程或进程对共享资源的访问。信号量本质上是一种计数器的锁,它维护一个许可(permit)数量,每次 acquire() 函数被调用时,
如果还有剩余的许可,则减少一个,并允许执行;如果没有剩余许可,则阻塞当前线程直到其他线程释放信号量

 

 

 

标签:__,name,flask,app,cache,print,import,连接池
From: https://www.cnblogs.com/liuliu1/p/18249762

相关文章

  • flask中cbv加装饰器、闪现(flash)、g对象、蓝图、flask-session、wtforms
    开源项目,可写在简历里1#开源项目sql审核平台2-https://gitee.com/cookieYe/Yearning3-https://gitee.com/rtttte/Archery cbv加装饰器1fromflaskimportFlask2fromflask.viewsimportMethodView34app=Flask(__name__)56app.debug=True7......
  • Unity 利用Cache实现边下边玩
    现在手机游戏的常规更新方案都是在启动时下载所有资源更新,游戏质量高的、用户粘性大的有底气,先安装2个G,启动再更新2个G,文件小了玩家还觉得品质不行不想玩。最近在做微信、抖音小游戏,使用他们提供的资源缓存方案,现在要转成AndroidAPP,也想用这种边下边玩的机制把首包做小。其实......
  • flask路由系统、偏函数、CBV、模板、请求响应、session、请求扩展
    路由系统1代码演示23fromflaskimportFlask45app=Flask(__name__)67app.debug=True8#路由基本使用9#@app.route('/',methods=['GET'])10#@app.get()11#@app.post()12defindex(name):13print(name)14return&......
  • FastAPI-7:框架比较(Flask、Django及FastAPI)
    7框架比较(Flask、Django及FastAPI)关于一个新的Web框架,您可能想知道的第一件事就是如何入门,而一种自上而下的方法就是定义路由(从URL和HTTP方法到函数的映射)。7.1FlaskFlask自称是微框架。它提供基本功能,你可以根据需要下载第三方软件包进行补充。它比Django小,入门时学习......
  • Chrome使用回退,出现表单提交失败,ERR_CACHE_MISS问题
    是什么、为什么、怎么办"ERR_CACHE_MISS"错误通常发生在你使用浏览器的“返回”按钮时。这种错误与浏览器处理缓存数据的方式有关,特别是在处理表单和POST请求时。常见原因表单重新提交当你导航回包含表单提交的页面(通常是POST请求)时,Chrome可能会提示你重新提......
  • 面试专区|【31道Memcache高频题整理(附答案背诵版)】
    阐述什么是Memcache?它有什么作用?Memcache是一个分布式的高速缓存系统,由LiveJournal的BradFitzpatrick开发,被许多网站用于提升访问速度,尤其是对于一些大型的、需要频繁访问数据库的网站来说,其效果十分显著。Memcache的作用主要在于通过在内存中缓存数据和对象,减少读取数......
  • mongodb的安装使用、mongodb与redis,memcache,mysql的区别优缺点 以及 好用的MongoDB
    一、mongodb的安装使用、与redis,memcache,mysql的区别优缺点    MongoDB是一个介于关系数据库和非关系数据库之间的基于分布式文件存储的数据库。是非关系数据库当中功能最丰富,最像关系数据库的。他支持的数据结构非常松散,是类似json的bson格式,因此可以存储比较复杂的数......
  • 使用Druid替换springboot默认连接池HikariPool
    使用Druid替换springboot默认连接池HikariPool1.在pom文件中增加Druid依赖<dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.1.23</version></dependency>2.在a......
  • spring和mybatis中的连接池和缓存
    目录十、连接池10.1连接池10.2、mybatis连接池的分类十一、mybatis的缓存一级缓存和二级缓存使用一级缓存失效的四种情况:11.1、不同的SqlSession对应不同的一级缓存。11.2、MyBatis的二级缓存二级缓存开启的条件:11、3二级缓存的相关配置11.4、mybatis缓存查询的顺序11.5整合第三方......
  • Scaling Memcache at Facebook
      Memcached是一种众所周知的、简单的内存缓存解决方案。本文描述了Facebook如何利用memcached作为构建块来构造和扩展一个分布式键值存储支持世界上最大的社交网络。1.Introduction  一个社交网络(FB)的基础架构通常需要以下允许实时通信(近似,允许一定的延迟),动态地,从......