首页 > 其他分享 >Django框架学习-Celery的使用

Django框架学习-Celery的使用

时间:2023-07-12 17:56:39浏览次数:45  
标签:celery 框架 redis app sms Django Celery 任务 import

celery用户文档:https://docs.celeryq.dev/en/v5.3.1/userguide/index.html

1、Celery的提出

用户需要在网站填写注册信息,发给用户一封注册激活邮件到邮箱,如果由于各种原因,这封邮件发送所需时间较长,那么客户端将会等待很久,造成不好的用户体验。——> 将耗时任务放到后台异步执行,从而不影响用户其他操作。

功能:

  • 异步执行任务(发送邮件、调用第三方接口、批量处理文件)
  • 定时处理任务(清除缓存、备份数据库)

工作原理:

基于分布式消息传递的作业队列,

通常使用 broker(中间人)来协调 client(任务的发出者)和 worker(任务的处理者),clients发出消息到队列中,broker将队列中的信息派发给worker来处理。celery本身不提供消息服务,它支持的消息服务broker有RocketMQ和Redis。

2、Celery的执行流程

celery的运行由三部分组成,消息队列(message broker),任务执行单元(worker)和任务执行结果存储组成。

3、文件配置

目录结构

1 myproject/              # 服务端项目根目录
2 └── myproject/          # 主应用目录
3     ├── apps/           # 子应用存储目录  
4     ├   └── users/            # django的子应用
5     ├       └── tasks.py      # [新增]分散在各个子应用下的异步任务模块
6     ├── settings/     # [修改]django的配置文件存储目录[celery的配置信息填写在django配置中即可]
7     ├── __init__.py   # [修改]设置当前包目录下允许外界调用celery应用实例对象
8     └── celery.py     # [新增]celery入口程序,相当于上一种用法的main.py

celery.py文件,主目录下创建celery入口程序

 1 import os
 2 from celery import Celery
 3 
 4 # 为 celery 程序设置默认的 Django 配置
 5 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
 6 
 7 app = Celery('myproject')
 8 
 9 # 表示从 Django 的配置中加载 celery 的配置,namespace='CELERY' 表示 celery 的配置必须是以 'CELERY' 为前缀
10 app.config_from_object('django.conf:settings', namespace='CELERY')
11 
12 app.conf.update(
13     task_ignore_result=True
14 )
15 
16 # 自动根据配置查找django的所有子应用下的tasks任务文件
17 app.autodiscover_tasks()

setting.py,新增celery相关配置信息。

 1 # Celery异步任务队列框架的配置项[注意:django的配置项必须大写,所以这里的所有配置项必须全部大写]
 2 # 任务队列
 3 CELERY_BROKER_URL = 'redis://:[email protected]:6379/14'
 4 # 结果队列
 5 CELERY_RESULT_BACKEND = 'redis://:[email protected]:6379/15'
 6 # 时区,与django的时区同步
 7 CELERY_TIMEZONE = TIME_ZONE
 8 # 防止死锁
 9 CELERY_FORCE_EXECV = True
10 # 设置并发的worker数量
11 CELERYD_CONCURRENCY = 200
12 # 设置失败允许重试[这个慎用,如果失败任务无法再次执行成功,会产生指数级别的失败记录]
13 CELERY_ACKS_LATE = True
14 # 每个worker工作进程最多执行500个任务被销毁,可以防止内存泄漏,500是举例,根据自己的服务器的性能可以调整数值
15 CELERYD_MAX_TASKS_PER_CHILD = 500
16 # 单个任务的最大运行时间,超时会被杀死[慎用,有大文件操作、长时间上传、下载任务时,需要关闭这个选项,或者设置更长时间]
17 CELERYD_TIME_LIMIT = 10 * 60
18 # 任务发出后,经过一段时间还未收到acknowledge, 就将任务重新交给其他worker执行
19 CELERY_DISABLE_RATE_LIMITS = True
20 # celery的任务结果内容格式
21 CELERY_ACCEPT_CONTENT = ['json', 'pickle']
22  
23 # 之前定时任务(定时一次调用),使用了apply_async({}, countdown=30);
24 # 设置定时任务(定时多次调用)的调用列表,需要单独运行SCHEDULE命令才能让celery执行定时任务:celery -A mycelery.main beat,当然worker还是要启动的
25 # https://docs.celeryproject.org/en/stable/userguide/periodic-tasks.html
26 from celery.schedules import crontab
27 CELERY_BEAT_SCHEDULE = {
28     "user-add": {  # 定时任务的注册标记符[必须唯一的]
29         "task": "add",   # 定时任务的任务名称
30         "schedule": 10,  # 定时任务的调用时间,10表示每隔10秒调用一次add任务
31         # "schedule": crontab(hour=7, minute=30, day_of_week=1),,  # 定时任务的调用时间,每周一早上7点30分调用一次add任务
32     }
33 }

为了确保 celery 的 app 在 Django 运行的时候被加载,需要在 myproject/__init__.py 中引入 celery_app。

1 from .celery import app as celery_app
2 
3 __all__ = ('celery_app',)

redis和celery配置完成后,就可以编写任务了。

3、task定义

task.py文件可以位于myproject目录下,也可位于各个app的目录下。专属于某个Celery实例化项目的task可以使用@app.task装饰器定义,各个app目录下可以复用的task建议使用@shared_task定义。

 users/tasks.py

 1 from celery import shared_task
 2 from ronglianyunapi import send_sms as sms
 3 import logging
 4 logger = logging.getLogger("django")
 5 
 6 @shared_task(name="send_sms")
 7 def send_sms(tid, mobile, datas):
 8     """异步发送短信"""
 9     try:
10         return sms(tid, mobile, datas)
11     except Exception as e:
12         logger.error(f"手机号:{mobile},发送短信失败错误: {e}")
13 
14 @shared_task(name="send_sms1")
15 def send_sms1():
16     print("send_sms1执行了!!!")

users/views.py

 1 import random
 2 from django_redis import get_redis_connection
 3 from django.conf import settings
 4 # from ronglianyunapi import send_sms
 5 # from mycelery.sms.tasks import send_sms
 6 from .tasks import send_sms
 7  
 8 """
 9 /users/sms/(?P<mobile>1[3-9]\d{9})
10 """
11 class SMSAPIView(APIView):
12     """
13     SMS短信接口视图
14     """
15     def get(self, request, mobile):
16         """发送短信验证码"""
17         redis = get_redis_connection("sms_code")
18         # 判断手机短信是否处于发送冷却中[60秒只能发送一条]
19         interval = redis.ttl(f"interval_{mobile}")  # 通过ttl方法可以获取保存在redis中的变量的剩余有效期
20         if interval != -2:
21             return Response({"errmsg": f"短信发送过于频繁,请{interval}秒后再次点击获取!", "interval": interval},status=status.HTTP_400_BAD_REQUEST)
22  
23         # 基于随机数生成短信验证码
24         # code = "%06d" % random.randint(0, 999999)
25         code = f"{random.randint(0, 999999):06d}"
26         # 获取短信有效期的时间
27         time = settings.RONGLIANYUN.get("sms_expire")
28         # 短信发送间隔时间
29         sms_interval = settings.RONGLIANYUN["sms_interval"]
30         # 调用第三方sdk发送短信
31         # send_sms(settings.RONGLIANYUN.get("reg_tid"), mobile, datas=(code, time // 60))
32         # 异步发送短信
33         send_sms.delay(settings.RONGLIANYUN.get("reg_tid"), mobile, datas=(code, time // 60))
34  
35         # 记录code到redis中,并以time作为有效期
36         # 使用redis提供的管道对象pipeline来优化redis的写入操作[添加/修改/删除]
37         pipe = redis.pipeline()
38         pipe.multi()  # 开启事务
39         pipe.setex(f"sms_{mobile}", time, code)
40         pipe.setex(f"interval_{mobile}", sms_interval, "_")
41         pipe.execute()  # 提交事务,同时把暂存在pipeline的数据一次性提交给redis
42  
43         return Response({"errmsg": "OK"}, status=status.HTTP_200_OK)

 celery中异步任务发布的2个方法的参数如下:

1 异步任务名.delay(*arg, **kwargs)
2 异步任务名.apply_async((arg,), {'kwarg': value}, countdown=60, expires=120)

 

标签:celery,框架,redis,app,sms,Django,Celery,任务,import
From: https://www.cnblogs.com/cxq1126/p/17531313.html

相关文章

  • django相关知识
    Djangoselect_related和prefetch_related函数对QuerySet查询的优化在数据库有外键的时候,使用select_related()和prefetch_related()能够很好的减小数据库请求的次数,从而提升性能。本文经过一个简单的例子详解这两个函数的做用。虽然QuerySet的文档中已经详细说明了,但......
  • 确定毕设题目——《基于SSM框架高校学生博客系统的设计与实现》
    人总要喜欢什么,追求什么。题目的灵感来自于大二的Web课程学习,当时的期末大作业是根据所学内容自己搭建一个网站,我搭建的是一个个人博客网站。人总会成长。大二的时候我已经能够为自己搭建一个博客网站。经过一年的成长,我能否使用所学所得为全校的同学每人搭建一个博客网站,并将......
  • django 传递参数的方式
    1、view和url传递参数参数方式一:url.pypath('xxx/',MailTemplateList.as_view(),name='MailTemplateList'),path('xxx/<str:id>/',MailTemplateList.as_view(),name='MailTemplateList'),......
  • ASP.NET CORE 框架揭秘读书笔记系列——命令行程序的创建(一)
    一、dotnet--info查看本机开发环境dotnet--info 会显示本机安装的SDK版本、运行时环境、运行时版本二、利用命令行创建.NET项目我们不仅可以利用脚手架模版创建各种类型的应用项目,还可以为项目添加各种组件和配置。换句话说IDE能完成的各项工作全部都可以通过脚手架命令行......
  • django python manage.py migrate 后报错字段长度超了 django.db.utils.OperationalE
     现象:在models.py将CharField字段的maxlength=修改后,执行ythonmanage.pymigrate 报错django.db.utils.OperationalError:(1118 'Rowsizetoolarge.Themaximumrowsizefortheusedtabletype,notcountingBLOBs,is65535.Thisincludes storageoverhead,c......
  • django_filters/rest_framework/form.html的报错问题
    报错问题:django_filters/rest_framework/form.htm报错原因为:1没有装django_filters模块使用pip安装pipinstalldjango-filter2模块没有在配置文件中注册:将django_filters添加到installed_apps中INSTALLED_APPS=[...'django_filters',] ......
  • django 中 设置一个logging,来记录日志
    当你使用Django框架开发应用程序时,配置日志是一个重要的任务。以下是一步一步配置Django日志的示例:第1步:在你的Django项目中创建一个名为"logs"的文件夹,用于存储日志文件。第2步:在项目的根目录下的settings.py文件中,找到`LOGGING`配置项。如果该配置项不存在,请添加以下内容:```p......
  • Scrapy框架爬取cnblog实例
    Scrapy框架爬取cnblog下一页简单实例犯了一个错误:直接拿浏览器点出来第二页链接去做拼接,导致一直爬不到下一页实际上应该是:blog.pyimportscrapyfromscrapyimportRequestfrombs4importBeautifulSoupimporttimeclassBlogSpider(scrapy.Spider):name="bl......
  • 基于python的租房网站-房屋出租租赁系统(python+django+vue)
    该项目是基于python/django/vue开发的房屋租赁系统/租房平台,作为本学期的课程作业作品。欢迎大家提出宝贵建议。功能介绍平台采用B/S结构,后端采用主流的Python+Django进行开发,前端采用主流的Vue.js进行开发。整个平台包括前台和后台两个部分。前台功能包括:首页、房屋详情页、......
  • 基于python+django的酒店预定网站-酒店管理系统
    该系统是基于python+django开发的酒店预定管理系统。适用场景:大学生、课程作业、毕业设计。学习过程中,如遇问题可在github给作者留言。演示地址前台地址:http://hotel.gitapp.cn后台地址:http://hotel.gitapp.cn/admin后台管理帐号:用户名:admin123密码:admin123源码地址h......