目录
信号
Flask框架
中的信号基于blinker(安装这个模块 pip install blinker)
,其主要就是让开发者可是在flask
请求过程中定制一些用户行为 ,flask 和django都有信号
观察者模式,又叫发布-订阅(Publish//Subscribe) 23 种设计模式之一
信号:signial , 并发编程中是信号量Semaphore
信号的使用场景
# 比如:用户表新增一条记录,就记录一下日志
-方案一:在每个增加后,都写一行代码 ---》后期要删除,比较麻烦
-方案二:使用信号,写一个函数,绑定内置信号,只要程序执行到这,就会执行这个函数
# 内置信号: flask少一些,django多一些
request_started = _signals.signal('request-started') # 请求到来前执行
request_finished = _signals.signal('request-finished') # 请求结束后执行
before_render_template = _signals.signal('before-render-template') # 模板渲染前执行
template_rendered = _signals.signal('template-rendered') # 模板渲染后执行
got_request_exception = _signals.signal('got-request-exception') # 请求执行出现异常时执行
request_tearing_down = _signals.signal('request-tearing-down') # 请求执行完毕后自动执行(无论成功与否)
appcontext_tearing_down = _signals.signal('appcontext-tearing-down')# 应用上下文执行完毕后自动执行(无论成功与否)
appcontext_pushed = _signals.signal('appcontext-pushed') # 应用上下文push时执行
appcontext_popped = _signals.signal('appcontext-popped') # 应用上下文pop时执行
message_flashed = _signals.signal('message-flashed') # 调用flask在其中添加数据时,自动触发
使用内置信号的步骤
- 写一个函数
- 绑定内置信号
- 等待被触发
from flask import signals
1 写一个函数
def test(*args,**kwargs):
print(args)
print(kwargs)
print('我执行了')
2.内置信号很多,随意绑定一个:模板渲染前
signals.before_render_template.connect(test)
3.等待被触发
自定义信号
from flask.signals import _signals
# 1.定义信号
session_set = _signals.signal('session_set')
# 2.写一个函数
def task(*args,**kwargs):
print(args)
print(kwargs)
print('session设置值了')
# 3.绑定琉璃自定义的信号
session_set.connect(task)
# 4. 触发信号的执行---我们做的
session_set.send('kimi') # 触发信号执行
# django中使用信号
https://www.cnblogs.com/liuqingzheng/articles/9803403.html
django信号
Model signals # 模型层
pre_init # django的modal执行其构造方法前,自动触发
post_init # django的modal执行其构造方法后,自动触发
pre_save # django的modal对象保存前,自动触发
post_save # django的modal对象保存后,自动触发
pre_delete # django的modal对象删除前,自动触发
post_delete # django的modal对象删除后,自动触发
m2m_changed # django的modal中使用m2m字段操作第三张表(add,remove,clear)前后,自动触发
class_prepared # 程序启动时,检测已注册的app中modal类,对于每一个类,自动触发
Management signals # 迁移命令
pre_migrate # 执行migrate命令前,自动触发
post_migrate # 执行migrate命令后,自动触发
Request/response signals # 请求
request_started # 请求到来前,自动触发
request_finished # 请求结束后,自动触发
got_request_exception # 请求异常后,自动触发
Database Wrappers # 数据库
connection_created # 创建数据库连接时,自动触发
# django中使用内置信号
1.写一个函数
def callBack(*args, **kwargs):
print(args)
print(kwargs)
2.绑定信号
# 方式一
post_save.connect(callBack)
# 方式二
formfrom django.db.models.signals import pre_save
from django.dispatch import receiver
@receiver(pre_save)
def my_callback(sender, **kwargs):
print("对象创建成功")
print(sender)
print(kwargs)
3.等待触发
flask-script
django启动项目命令:python manage.py runserver
flask启动项目命令
# 首先注意两个模块的版本
Flask===2.2.2 Flask_Script==2.0.3
# 借助于:flask-script 实现
1.安装
pin install flask_script
2.修改代码
from flask_script import Manager
manager=Manager(app) # app注册
manager.run()
3.命令行启动项目
python manage.py runserver
#Flask 自定制命令
1.简单自定制命令
@manager.command
def custom(arg):
# 命令的代码,比如:初始化数据库, 有个excel表格,使用命令导入到mysql中
print(arg)
# >python manage.py custom kimi---》kimi
2.复杂一些的自定制命令
@manager.option('-n', '--name', dest='name')
@manager.option('-u', '--url', dest='url')
def cmd(name, url):
# python run.py cmd -n kimi -u xxx
# python run.py cmd --name kimi --url uuu
print(name, url)
# django 中如何自定制命令----git 项目django-admin-vue--git项目中可以借鉴
sqlalchemy
sqlalchemy介绍
flask
没有ORM框架,为了方便快速操作数据库,使用关系映射,flask
和fastapi
中使用sqlalchemy
居多。SQLAlchemy是一个基于Python实现的ORM框架。该框架建立在 DB API(数据库api的规范)之上,使用关系对象映射进行数据库操作,简言之便是:将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果
# 安装
pip3.8 install sqlalchemy
#了解
SQLAlchemy本身无法操作数据库,其必须以来pymsql等第三方插件
pymysql
mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
cx_Oracle
oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html
sqlalchemy快速使用
# 先不是orm,而是原生sql
# 第一步:导入
from sqlalchemy import create_engine
# 第二步:生成引擎对象
engine = create_engine(
"mysql+pymysql://root@127.0.0.1:3306/cnblogs" 没有密码
"mysql+pymysql://root:123@127.0.0.1:3306/end?charset=utf8 " # 有密码的root
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
# 第三步:使用引擎获取连接,操作数据库
conn = engine.raw_connection()
cursor=conn.cursor()
cursor.execute('select * from aritcle')
print(cursor.fetchall())
sqlalchemy创建表和操作数据
# 第一步:导入
from sqlalchemy import create_engine
import datetime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
# 第二步:执行declarative_base,得到一个类
Base = declarative_base()
# 第三步:继承生成的Base类
class User(Base):
# 第四步:写字段
id = Column(Integer, primary_key=True) # 生成一列,类型是Integer,主键
name = Column(String(32), index=True, nullable=False) # name列varchar32,索引,不可为空
email = Column(String(32), unique=True)
# datetime.datetime.now不能加括号,加了括号,以后永远是当前时间
ctime = Column(DateTime, default=datetime.datetime.now)
# extra = Column(Text, nullable=True)
# 第五步:写表名 如果不写以类名为表名
__tablename__ = 'users' # 数据库表名称
# 第六步:建立联合索引,联合唯一
__table_args__ = (
UniqueConstraint('id', 'name', name='uix_id_name'), # 联合唯一
Index('ix_id_name', 'name', 'email'), # 索引
)
class Book(Base):
__tablename__ = 'books'
id = Column(Integer, primary_key=True)
name = Column(String(32))
# 第七步:把表同步到数据库中
# 不会创建库,只会创建表
engine = create_engine(
"mysql+pymysql://root@127.0.0.1:3306/aaa",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
# 把表同步到数据库 (把被Base管理的所有表,都创建到数据库)
Base.metadata.create_all(engine)
# 把所有表删除
# Base.metadata.drop_all(engine)
标签:触发,sqlalchemy,框架,Flask,django,signals,flask,print
From: https://www.cnblogs.com/zhanglanhua/p/17297533.html