接口缓存
1 查询所有接口(带过滤)--》每次都要去查询,性能不高
2 一旦查出来,下次还用这个数据的话--数据存放在缓存中---》直接给
3 首页轮播图
class BannerView(GenericViewSet, APIListModelMixin): queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[ 0:settings.BANNER_COUNT] # 过滤排序 serializer_class = Bannerserializer # 接口缓存 def list(self, request, *args, **kwargs): banner_list = cache.get('banner_list') if not banner_list: # 缓存中没有数据,走数据库 res = super().list(request, *args, **kwargs) banner_list = res.data.get('results') # 列表类型 # 放到django的缓存中 cache.set('banner_list', banner_list) return APIResponse(results=banner_list)
。
1.1带缓存的mixin类
1 class CacheAPIListModelMixin(ListModelMixin): 2 cache_key = None 3 4 def list(self, request, *args, **kwargs): 5 assert self.cache_key, APIException('如果继承CacheAPIListModelMixin,必须要加类属性') 6 # 1 先去缓存中取,根据key 7 results = cache.get(self.cache_key) 8 if not results: 9 logger.info('走了数据库----') 10 res = super().list(request, *args, **kwargs) 11 results = res.data 12 # 2 存入缓存中 13 cache.set(self.cache_key, results) 14 return APIResponse(results=results)
视图类 home.view
1 from utils.common_mixin import CacheAPIListModelMixin 2 3 4 class BannerView(GenericViewSet, CacheAPIListModelMixin): 5 queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[:settings.BANNER_COUNT] 6 serializer_class = Bannerserializer 7 cache_key = 'banner_list'
.
加入缓存后存在问题
# 1 数据从缓存拿--》如果mysql中数据改了--》缓存中一直有数据---》不会做自动更新---》缓存中的数据和mysql中的数据--》不一致了 # 2 缓存不一致;双写一致性 # 3 解决这个问题: -1 增,改,删 mysql--》删缓存 -2 增,改,删 mysql--》更新缓存 -3 定时更新缓存--》会有延迟
。
。
celery介绍
1 0。 celery:分布式异步任务框架,celery 芹菜,吉祥物是芹菜 2 3 1。 celery 是一个灵活且可靠的,处理大量消息的分布式系统,可以在多个节点之间处理某个任务 4 5 2。 celery 是一个专注于实时处理的任务队列,支持任务调度 6 7 3。 celery 是开源的,有很多的使用者 8 9 4。 celery 完全基于 Python 语言编写 10 11 5。 所以 celery 本质上是一个分布式的异步任务调度框架,类似于 Apache 的 airflow 12 13 6。 celery 只是用来调度任务的,但它本身并不具备存储任务的功能,而调度任务的时候肯定是要把任务存起来的。
因此要使用 celery 的话,还需要搭配一些具备存储、访问功能的工具,比如:消息队列、Redis缓存、数据库等等。官方推荐的是消息队列 RabbitMQ,我们使用 Redis 14 15 16 ### celery能做什么### 17 1 定时任务 18 2 异步任务 19 3 延迟任务
1.2celery 使用场景
1 异步任务 -一些耗时的操作可以交给celery异步执行,而不用等着程序处理完才知道结果。 -视频转码、邮件发送、消息推送等等 2 定时任务 -定时推送消息、定时爬取数据、定时统计数据等 3 延迟任务 -提交任务后,等待一段时间再执行某个任务
1.3 Celery官网
# 1 开源地址(源码) https://github.com/celery/celery # 2 官网 https://docs.celeryq.dev/en/stable/ # 3 最新版本 Celery (5.4) #4 python支持 Celery version 5.3 runs on Python ❨3.8, 3.9, 3.10, 3.11❩ #5 Django支持 Celery 5.3.x supports Django 2.2 LTS or newer versions. Please use Celery 5.2.x for versions older than Django 2.2 or Celery 4.4.x if your Django version is older than 1.11
1.4 celery架构
# Celery 架构,它采用典型的生产者-消费者模式,主要由以下部分组成: # 1 Celery Beat,任务调度器,Beat 进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列。 # 2 Producer:需要在队列中进行的任务,一般由用户、触发器或其他操作将任务入队,然后交由 workers 进行处理。调用了 Celery 提供的 API、函数或者装饰器而产生任务并交给任务队列处理的都是任务生产者。 # 3 Broker,即消息中间件,在这指任务队列本身,Celery 扮演生产者和消费者的角色,brokers 就是生产者和消费者存放/获取产品的地方(队列)。 # 4 Celery Worker,执行任务的消费者,从队列中取出任务并执行。通常会在多台服务器运行多个消费者来提高执行效率。 # 5 Result Backend:任务处理完后保存状态信息和结果,以供查询。Celery 默认已支持 Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy 等方式。 实际应用中,用户从 Web 前端发起一个请求,我们只需要将请求所要处理的任务丢入任务队列 broker 中,由空闲的 worker 去处理任务即可,处理的结果会暂存在后台数据库 backend 中。
我们可以在一台机器或多台机器上同时起多个 worker 进程来实现分布式地并行处理任务。
。
celery快速使用
1.1安装
# 0 创建Python项目
# 1 创建虚拟环境
# 2 安装celery pip install celery
# 3 安装redis(消息队列和结果存储使用redis) pip install redis
# 4 安装eventlet(win 平台,如果是mac,linux不需要) pip install eventlet
1.2快速使用
celery_demo.py--主文件
main.py
1 from celery import Celery 2 import time 3 4 # 创建一个Celery实例 5 broker = 'redis://127.0.0.1:6379/1' 6 backend = 'redis://127.0.0.1:6379/2' 7 app = Celery('celery_test', broker=broker, backend=backend) 8 9 10 # 定义任务 11 @app.task 12 def add(n, m): 13 time.sleep(2) 14 print('n+m的结果:%s' % (n + m)) 15 return n + m 16 17 18 @app.task 19 def send_email(mail='1922517453@qq.com'): 20 print('模拟发送延迟--开始') 21 time.sleep(2) 22 print('模拟发送延迟--结束') 23 return '邮件发送成功:%s' % mail
add_task.py--提交异步任务
# 提交任务文件 from main import add, send_email # 同步执行任务 # res = add(3, 4) # print(res) # 2. 异步执行任务,提交到broker消息队列 res = add.delay(3, 4) print(res) # 返回任务id号 02358487-658e-4815-8f23-2da1ac919898
1.3查看提交的任务
1.4让worker执行任务
1 # 通过命令启动worker 2 # win启动 3 celery -A main worker -l info -P eventlet 4 # mac linux 5 celery -A main worker -l info
1.5 结果存储查看结果
# 1 直接看redis 有数据
# 2 通过代码,拿到结果
from main import app from celery.result import AsyncResult id = '5a8d1d68-1963-4771-9889-839bd954a37b' if __name__ == '__main__': result = AsyncResult(id=id, app=app) if result.successful(): result = result.get() print(result) elif result.failed(): print('任务失败') elif result.status == 'PENDING': print('任务等待中被执行') elif result.status == 'RETRY': print('任务异常后正在重试') elif result.status == 'STARTED': print('任务已经开始被执行')
。
。
。
celery包结构
1 celery.py 2 3 from celery import Celery 4 import time 5 broker = 'redis://127.0.0.1:6379/1' 6 backend = 'redis://127.0.0.1:6379/2' 7 # 1 创建app对象 8 app = Celery('app', broker=broker, backend=backend,include=['celery_task.order_task','celery_task.user_task']) 9 10 ------------------------------------------------ 11 12 user_tasks.py 13 14 from .celery import app 15 import time 16 # 用户相关任务 17 @app.task 18 def add(x,y): 19 time.sleep(3) 20 return x+y 21 22 -------------------------------------------------- 23 order_task.py 24 25 from .celery import app 26 # 订单相关任务 27 # 下单成功,发送短信 28 @app.task 29 def send_sms(mobile,code): 30 print(f'手机号:{mobile},下单成功') 31 return True 32 33 -------------------------------------
4.2 提交任务add_task.py
1 from celery_task.order_task import send_sms 2 3 # 1 同步调用 4 # res=send_sms('111111',888) 5 # print(res) 6 7 # 2 提交到任务队列被 worker执行 8 res = send_sms.delay('18896982015', 9999) 9 print(res) # 282fd101-1047-427e-a4d6-867c36ec70ee
查看任务结果get_result.py
from celery_task.celery import app from celery.result import AsyncResult id = '282fd101-1047-427e-a4d6-867c36ec70ee' if __name__ == '__main__': result = AsyncResult(id=id, app=app) if result.successful(): result = result.get() print(result) elif result.failed(): print('任务失败') elif result.status == 'PENDING': print('任务等待中被执行') elif result.status == 'RETRY': print('任务异常后正在重试') elif result.status == 'STARTED': print('任务已经开始被执行')
启动worker
# 包所在目录下,启动worker
-celery -A celery_task worker -l debug -P eventlet
执行异步--延迟--定时
1 # 1 刚刚学的是:异步任务 2 任务名.delay() 3 4 # 2 延迟任务 5 任务名.apply_async 6 from celery_task.user_task import add 7 from datetime import datetime, timedelta 8 # datetime.utcnow() 取utc时间---》默认使用utc时间--》修改配置文件做中国时区 9 eta=datetime.utcnow() + timedelta(seconds=30) 10 res=add.apply_async(args=[5,6],eta=eta) 11 12 # 3 定时任务:配置文件 13 3.1 配置文件 在celery.py里面操作 14 # 时区 15 app.conf.timezone = 'Asia/Shanghai' 16 # 是否使用UTC 17 app.conf.enable_utc = False 18 19 # 任务的定时配置 20 from datetime import timedelta 21 from celery.schedules import crontab 22 app.conf.beat_schedule = { 23 'add-task': { 24 'task': 'celery_task.user_task.add', 25 'schedule': timedelta(seconds=3), 26 # 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点 27 'args': (300, 150), 28 }, 29 'send-sms-task': { 30 'task': 'celery_task.order_task.send_sms', 31 # 'schedule': timedelta(seconds=30), 32 'schedule': crontab(hour=11,minute=20), # 每天11点20执行 33 'args': ('189232222',888), 34 }, 35 } 36 3.2 启动beat 37 celery -A celery_task beat -l debug 38 3.3 启动worker
。
django中使用celery
通用方案
1 # 1 把之前写的celery包,放到项目根路径中 2 3 # 2 在视图函数中提交任务 4 # from libs.tx_sms import get_code,send_sms as sms 5 from celery_task.order_task import send_sms as sms1 6 from celery_task.user_task import add 7 class CeleryView(APIView): 8 def get(self, request, *args, **kwargs): 9 ## 1异步发送短信 10 mobile=request.query_params.get('mobile') 11 code=get_code() 12 # 使用celery做异步,提交任务 13 res=sms1.delay(mobile,code) 14 return APIResponse(msg=f'短信已发送,{str(res)}') 15 16 17 ## 2 异步计算 18 # x=request.query_params.get('x') 19 # y=request.query_params.get('y') 20 # # res=add(x,y)# -->请求要等:3s多 21 # res=add.delay(x,y)# -->请求直接返回 22 # return APIResponse(msg=str(res)) 23 24 # 3 启动worker 25 # 4 运行django,正常使用接口即可 26 27 ====================================================== 28 # 必须加这行代码,以后才能用django的配置 29 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffytest.settings.dev')
celery官方方案(新)
标签:task,app,介绍,celery,任务,result,import From: https://www.cnblogs.com/liuliu1/p/18196897