缓存
缓存更新策略
# 如果内存中redis数据满了,再继续往里存数据,redis会触发缓存更新的策略 # 有如下几种 LRU/LFU/FIFO算法剔除:例如maxmemory-policy(到了最大内存,对应的应对策略) # LRU -Least Recently Used,没有被使用时间最长的 # LFU -Least Frequenty User,一定时间段内使用次数最少的 # FIFO -First In First Out,最早放进去的key # LIRS (Low Inter-reference Recency Set)是一个页替换算法,相比于LRU(Least Recently Used)和很多其他的替换算法,LIRS具有较高的性能。这是通过使用两次访问同一页之间的距离(本距离指中间被访问了多少非重复块)作为一种尺度去动态地将访问页排序,从而去做一个替换的选择
缓存穿透 缓存击穿 缓存雪崩
### 缓存穿透 #描述: 缓存穿透是指缓存和数据库中都没有的数据,而用户不断发起请求,如发起为id为“-1”的数据或id为特别大不存在的数据。这时的用户很可能是攻击者,攻击会导致数据库压力过大。 #解决方案: 1 接口层增加校验,如用户鉴权校验,id做基础校验,id<=0的直接拦截; 2 从缓存取不到的数据,在数据库中也没有取到,这时也可以将key-value对写为key-null,缓存有效时间可以设置短点,如30秒(设置太长会导致正常情况也没法使用)。这样可以防止攻击用户反复用同一个id暴力攻击 3 通过布隆过滤器实现:把所有用户id放到布隆过滤器中---》请求过来---》去布隆过滤器中检查 id在不在,如果在---》数据肯定有---》继续往后走 ---》如果布隆过滤器中没有---》不是我们的数据---》直接拒绝 ### 缓存击穿 #描述: 缓存击穿是指缓存中没有但数据库中有的数据(一般是缓存时间到期),这时由于并发用户特别多,同时读缓存没读到数据,又同时去数据库去取数据,引起数据库压力瞬间增大,造成过大压力 #解决方案: 设置热点数据永远不过期。 ### 缓存雪崩 #描述: 缓存雪崩是指缓存中数据大批量到过期时间,而查询数据量巨大,引起数据库压力过大甚至down机。和缓存击穿不同的是,缓存击穿指并发查同一条数据,缓存雪崩是不同数据都过期了,很多数据都查不到从而查数据库。 # 解决方案: 1 缓存数据的过期时间设置随机,防止同一时间大量数据过期现象发生。 2 如果缓存数据库是分布式部署,将热点数据均匀分布在不同得缓存数据库中。 3 设置热点数据永远不过期。 # 你知道redis的跳跃表吗? -听说过,它是redis 存储有序集合底层的数据结构 # 平衡二叉树,红黑树 , hash # 手写排序算法 -冒泡 -快排 -插入排序。。。
django-celery-flower
# celer:分布式异步任务框架,python写的,是一个独立服务,跟其他服务相互配合使用 # 能够做的事: -异步任务 -延迟任务 -定时任务 # celery架构 # 快速使用 # 包结构 # 执行三个任务 异步 延迟 定时 # 集成到django中---》包结构
手动提交任务
pip3 install django-celery-beat
结果存在数据库中
pip3 install django-celery-results
flower
# flower 对celery运行情况做监控 # 消息堆积 # worker运行情况 # flower 对celery进行监控 # 启动 celery --broker=redis://127.0.0.1:6379/2 flower # 浏览器中访问:http://127.0.0.1:5555/ -worker -broker -task
python发送邮件-钉钉
# 短信通知 # 邮件通知 -django -原生python # 企业微信通知:对应接口 # 个人公众号通知: # 钉钉通知 -1 群里创建钉钉机器人---》发送消息 # 建个群,在群里定时发送消息 # 给企业中某个人发通知 -创建应用 -获得access_token -获得公司所有部门 -获得公司部门下所有员工 -给员工发送钉钉通知 -获取某个员工打卡信息
- - - - - - - - - - - - - - - - - -
Celery介绍
Celery 是什么?
# 1 celery 是一个灵活且可靠的,处理大量消息的分布式系统,可以在多个节点之间处理某个任务 # 2 celery 是一个专注于实时处理的任务队列,支持任务调度 # 3 celery 是开源的,有很多的使用者 # 4 celery 完全基于 Python 语言编写 # 5 所以 celery 本质上是一个分布式的异步任务调度框架,类似于 Apache 的 airflow # 6 celery 只是用来调度任务的,但它本身并不具备存储任务的功能,而调度任务的时候肯定是要把任务存起来的。因此要使用 celery 的话,还需要搭配一些具备存储、访问功能的工具,比如:消息队列、Redis缓存、数据库等等。官方推荐的是消息队列 RabbitMQ,我们使用 Redis
celery 使用场景
# 1 异步任务 -一些耗时的操作可以交给celery异步执行,而不用等着程序处理完才知道结果。 -视频转码、邮件发送、消息推送等等 # 2 定时任务 -定时推送消息、定时爬取数据、定时统计数据等 # 3 延迟任务 -提交任务后,等待一段时间再执行某个任务
Celery官网
# 1 开源地址(源码) https://github.com/celery/celery # 2 官网 https://docs.celeryq.dev/en/stable/ # 4 最新版本 Celery (5.3) # 5 python支持 Celery version 5.3 runs on Python ❨3.8, 3.9, 3.10, 3.11❩ # 6 Django支持 Celery 5.3.x supports Django 2.2 LTS or newer versions. Please use Celery 5.2.x for versions older than Django 2.2 or Celery 4.4.x if your Django version is older than 1.11
Celery架构
# Celery 架构,它采用典型的生产者-消费者模式,主要由以下部分组成: # 1 Celery Beat,任务调度器,Beat 进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列。 # 2 Producer:需要在队列中进行的任务,一般由用户、触发器或其他操作将任务入队,然后交由 workers 进行处理。调用了 Celery 提供的 API、函数或者装饰器而产生任务并交给任务队列处理的都是任务生产者。 # 3 Broker,即消息中间件,在这指任务队列本身,Celery 扮演生产者和消费者的角色,brokers 就是生产者和消费者存放/获取产品的地方(队列)。 # 4 Celery Worker,执行任务的消费者,从队列中取出任务并执行。通常会在多台服务器运行多个消费者来提高执行效率。 # 5 Result Backend:任务处理完后保存状态信息和结果,以供查询。Celery 默认已支持 Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy 等方式。 实际应用中,用户从 Web 前端发起一个请求,我们只需要将请求所要处理的任务丢入任务队列 broker 中,由空闲的 worker 去处理任务即可,处理的结果会暂存在后台数据库 backend 中。我们可以在一台机器或多台机器上同时起多个 worker 进程来实现分布式地并行处理任务。
Celery 快速使用
安装
# 0 创建Python项目 # 1 创建虚拟环境 # 2 安装celery pip install celery # 3 安装redis(消息队列和结果存储使用redis) pip install redis # 4 安装eventlet(win 平台) pip install eventlet
快速使用
celery_demo.py--主文件
from celery import Celery import time broker = 'redis://127.0.0.1:6379/1' backend = 'redis://127.0.0.1:6379/2' app = Celery('celery_test', broker=broker, backend=backend) @app.task def add(n, m): time.sleep(2) print('n+m的结果:%s' % (n + m)) return n + m @app.task def send_email(mail='[email protected]'): print('模拟发送延迟--开始') time.sleep(2) print('模拟发送延迟--结束') return '邮件发送成功:%s' % mail
add_task.py--提交异步任务
from celery_demo import add,send_email ##1 同步调用 res=send_email() print(res) # 2 异步调用 res = add.delay(10, 20) print(res.id)
通过resp查看任务
开启worker
celery -A celery_demo worker -l info -P eventlet
get_result.py-查看结果
from celery_demo import app from celery.result import AsyncResult id = 'd0ae78c8-9a8e-4f93-9d32-b17d4e295fe9' if __name__ == '__main__': result = AsyncResult(id=id, app=app) if result.successful(): result = result.get() print(result) elif result.failed(): print('任务失败') elif result.status == 'PENDING': print('任务等待中被执行') elif result.status == 'RETRY': print('任务异常后正在重试') elif result.status == 'STARTED': print('任务已经开始被执行')
包结构
目录结构
项目名 ├── celery_task # celery包 │ ├── __init__.py # 包文件 │ ├── celery.py # celery连接和配置相关文件,且名字必须叫celery.py │ └── tasks.py # 所有任务函数 ├── add_task.py # 添加任务 └── get_result.py # 获取结果
celery.py
from celery import Celery broker = 'redis://127.0.0.1:6379/1' backend = 'redis://127.0.0.1:6379/2' app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])
tasks.py
from .celery import app import time @app.task def add(n, m): time.sleep(2) print('n+m的结果:%s' % (n + m)) return n + m @app.task def send_email(mail='[email protected]'): print('模拟发送延迟--开始') time.sleep(2) print('模拟发送延迟--结束') return '邮件发送成功:%s' % mail
执行异步--延迟--定时任务
异步任务
res = add.delay(10, 20)
延迟任务
from datetime import datetime, timedelta eta=datetime.utcnow() + timedelta(seconds=10) tasks.add.apply_async(args=(200, 50), eta=eta)
定时任务
#1 celery.py中加入 # 时区 app.conf.timezone = 'Asia/Shanghai' # 是否使用UTC app.conf.enable_utc = False # 任务的定时配置 from datetime import timedelta from celery.schedules import crontab app.conf.beat_schedule = { 'low-task': { 'task': 'celery_task.tasks.add', 'schedule': timedelta(seconds=3), # 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点 'args': (300, 150), } } # 2 启动worker celery -A celery_task worker -l debug -P eventlet # 3 启动beat celery -A celery_task beat -l debug
标签:Celery,task,--,app,celery,任务,print From: https://www.cnblogs.com/wzh366/p/18097372