首页 > 其他分享 >Celery在Django项目中集成

Celery在Django项目中集成

时间:2023-08-05 11:12:49浏览次数:49  
标签:集成 celery mobile py sms send Django Celery 任务

使用celery第一件要做的最为重要的事情是需要先创建一个Celery实例对象,我们一般叫做celery应用对象,或者更简单直接叫做一个app。app应用对象是我们使用celery所有功能的入口,比如启动celery、创建任务,管理任务,执行任务等.

celery框架有2种使用方式,一种是单独一个项目目录,另一种就是Celery集成到web项目框架中。

Celery作为单个项目运行

项目目录

-- celeryapp
  -- sms  # 异步任务目录,这里拿发送短信来举例,一个类型的任务就一个目录
    - __init__.py
    - ronglianyunapi.py  # 短信发送文件
    - tasks.py  # 任务的文件,文件名必须是tasks.py!!!每一个任务就是一个被装饰的函数,写在任务文件中
  - __init__.py
  - main.py  # 入口程序
  - settings.py  # 配置文件

main.py,代码:

from celery import Celery

# 实例化celery应用,参数一般为项目应用名
app = Celery("lg_shop")

# 通过app实例对象加载配置文件(参数为导包路径)
app.config_from_object("celeryapp.settings")

# 注册任务, 自动搜索并加载任务
# 参数必须必须是一个列表,里面的每一个任务都是任务的路径名称
# app.autodiscover_tasks(["任务1","任务2",....])
app.autodiscover_tasks(["celeryapp.sms"])

# 启动Celery的终端命令
# celery -A celeryapp.main worker --loglevel=info  # windows不好使,能接收到任务,但不执行
# celery -A celeryapp.main worker --loglevel=info --pool=solo  # windows加上这个参数
# celery -A celeryapp.main worker --loglevel=info -P eventlet -c 10  # 或者使用eventlet,-c是协程的数量,生产环境可以用1000

配置文件settings.py,代码:

REDIS_PASSWORD = "root123456"
REDIS_HOST = "127.0.0.1"
REDIS_PORT = 6379

# 任务队列的链接地址
broker_url = 'redis://:%s@%s:%s/14' % (
    REDIS_PASSWORD, REDIS_HOST, REDIS_PORT
)
# 结果队列的链接地址
result_backend = 'redis://:%s@%s:%s/15' % (
    REDIS_PASSWORD, REDIS_HOST, REDIS_PORT
)

创建任务文件sms/tasks.py,任务文件名必须固定为tasks.py,并创建任务,代码:

from ..main import app
from . import sms_sington


@app.task(name="send_sms_code")
def send_sms_code(tid, mobile, datas):
    response = sms_sington.send_sms(tid=tid, mobile=mobile, datas=datas)
    return response

@app.task(name="send_sms1")
def send_sms1():
    """没有任何参数的异步任务"""
    print('任务:send_sms1执行了...')


@app.task(name="send_sms2")
def send_sms2(mobile, code):
    """有参数的异步任务"""
    print(f'任务:send_sms2执行了...mobile={mobile}, code={code}')


@app.task(name="send_sms3")
def send_sms3():
    """有结果的异步任务"""
    print('任务:send_sms3执行了...')
    return 100


@app.task(name="send_sms4")
def send_sms4(x, y):
    """有结果的异步任务"""
    print('任务:send_sms4执行了...')
    return x + y

发送短信api,sms/ronglianyunapi.py

import os
import json

from ronglian_sms_sdk import SmsSDK

RONGLIANYUN = {
    "accId": '2c94811c86c00e9b0186f2873a040afa',
    "accToken": os.environ.get("RONGLIANYUNACCTOKEN"),
    "appId": '2c94811c86c00e9b0186f2873b0d0b01',
    "reg_tid": 1,  # 注册短信验证码的模板ID
    "sms_expire": 60,  # 短信有效期,单位:秒(s)
    "sms_interval": 60,  # 短信发送的冷却时间,单位:秒(s)
}


class SMSSington:
    def __new__(cls, *args, **kwargs):
        if not hasattr(cls, "_instance"):
            cls._instance = super().__new__(cls, *args, **kwargs)
            ronglianyun = RONGLIANYUN
            cls._instance.sms = SmsSDK(ronglianyun.get("accId"), ronglianyun.get("accToken"), ronglianyun.get("appId"))
        return cls._instance

    def send_sms(self, mobile, datas, tid=1):
        """
        发送短信
        @params tid: 模板ID,默认测试使用1
        @params mobile: 接收短信的手机号,多个手机号使用逗号隔开
                单个号码: mobile="13312345678"
                多个号码: mobile="13312345678,13312345679,...."
        @params datas: 短信模板的参数列表(测试只能发四位验证码)
                例如短信模板为: 【云通讯】您的验证码是{1},请于{2}分钟内正确输入。
                则datas=("1234",5,)
        """
        sdk = self._instance.sms
        resp = sdk.sendMessage(tid, mobile, datas)
        response = json.loads(resp)
        return response

sms/__init__.py,这个文件的作用是减少tasks.py里面的导包路径

from .ronglianyunapi import SMSSington

sms_sington = SMSSington()

接下来,我们运行celery。

# 项目根目录下运行

# 启动Celery的终端命令(普通的运行方式)
# celery -A celeryapp.main worker --loglevel=info  # windows不好使,能接收到任务,但不执行
# celery -A celeryapp.main worker --loglevel=info --pool=solo  # windows加上这个参数
# celery -A celeryapp.main worker --loglevel=info -P eventlet -c 10  # 或者使用eventlet,-c是协程的数量,生产环境可以用1000

# 启动多工作进程,以守护进程的模式运行[一个工作进程就是4个子进程]
# windows大概率报错
# 注意:pidfile和logfile必须以绝对路径来声明
celery multi start worker -A celeryapp.main -l info -n worker1
celery multi start worker -A celeryapp.main -l info -n worker2

# 关闭运行的工作进程
celery multi stop worker -A celeryapp.main -l info -n worker1
celery multi stop worker -A celeryapp.main -l info -n worker2

调用上面的异步任务,拿django的shell进行举例:

# 终端调试
python manage.py shell

# 调用celery执行异步任务
from celeryapp.sms.tasks import send_sms1,send_sms2,send_sms3,send_sms4
mobile = "18533538210"
code = "6666"

# delay 表示马上按顺序来执行异步任务,在celrey的worker工作进程有空闲的就立刻执行
# 可以通过delay异步调用任务,可以没有参数
ret1 = send_sms1.delay()
# 可以通过delay传递异步任务的参数,可以按位置传递参数,也可以使用命名参数
# ret2 = send_sms.delay(mobile=mobile,code=code)
ret2 = send_sms2.delay(mobile,code)

# apply_async 让任务在后面指定时间后执行,时间单位:秒/s
# 任务名.apply_async(args=(参数1,参数2), countdown=定时时间)
ret4 = send_sms4.apply_async(kwargs={"x":10,"y":20},countdown=30)

# 根据返回结果,不管delay,还是apply_async的返回结果都一样的。
ret4.id      # 返回一个UUID格式的任务唯一标志符,5b2becb8-c1ba-4e33-8593-9c6fa7ff8c78
ret4.status  # 查看当前任务的状态 SUCCESS表示成功! PENDING任务等待
ret4.get()   # 获取任务执行的结果[如果任务函数中没有return,则没有结果,如果结果没有出现则会导致阻塞]

if ret4.status == "SUCCESS":
    print(ret4.get())

在django的视图里面,我们调用Celery来异步执行任务(这里容联云发送短信举例)。

***/views.py

# 异步发送短信
from celeryapp.sms.tasks import send_sms_code
# 原来的
response = send_sms_code(tid=settings.RONGLIANYUN.get("reg_tid"), mobile=mobile,
                                       datas=(sms_code, SMS_CODE_EXPIRES // 60))
# 改成
response = send_sms_code.delay(tid=settings.RONGLIANYUN.get("reg_tid"), mobile=mobile,
                                       datas=(sms_code, SMS_CODE_EXPIRES // 60))

上面就是使用celery并执行异步任务的第一种方式,适合在一些无法直接集成celery到项目中的场景,或是想celery单独跑在一台服务器上的场景。

Celery作为第三方模块在Django中集成

这里还是拿django来举例,目录结构调整如下:

-- lg_shop/
  -- lg_shop/
    -- apps/
      -- verify/
        - tasks.py  # 分散在各个子应用下的异步任务模块
    -- libs/
      -- sms/
        - __init__.py
        - ronglianyunapi.py
    - __init__.py
  - __init__.py
  - celery_app.py  # celery入口程序,相当于上一种用法的main.py

lg_shop/celery_app.py,主应用目录下创建cerley入口程序,创建celery对象并加载配置和异步任务,代码:

import os
from celery import Celery

# 必须在实例化celery应用对象之前执行(Django项目manage.py里面的)
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'lg_shop.settings.dev')

# 实例化celery应用,参数一般为项目应用名
app = Celery("lg_shop")
# 指定任务的队列名称
app.conf.task_default_queue = 'Celery'
# 可以把配置写在django的项目配置中
# 设置django中配置信息以 "CELERY_"开头为celery的配置信息
app.config_from_object('django.conf:settings', namespace='CELERY')
# 注册任务,自动搜索并加载任务
app.autodiscover_tasks()

# 启动Celery的终端命令
# celery -A lg_shop worker --loglevel=info  # windows不好使,能接收到任务,但不执行
# celery -A lg_shop worker --loglevel=info --pool=solo  # windows加上这个参数
# celery -A lg_shop worker --loglevel=info -P eventlet -c 10  # 或者使用eventlet,-c是协程的数量,生产环境可以用1000

settings/dev.py,django配置中新增celery相关配置信息,代码:

# celery相关
# Celery异步任务队列框架的配置项[注意:django的配置项必须大写,所以这里的所有配置项必须全部大写]
# 任务队列的链接地址
CELERY_BROKER_URL = 'redis://:%s@%s:%s/14' % (
    REDIS_PASSWORD, REDIS_HOST, REDIS_PORT
)
# 结果队列的链接地址
CELERY_RESULT_BACKEND = 'redis://:%s@%s:%s/15' % (
    REDIS_PASSWORD, REDIS_HOST, REDIS_PORT
)
# 时区,与django的时区同步
CELERY_TIMEZONE = TIME_ZONE
# 防止死锁
CELERY_FORCE_EXECV = True
# 设置并发的worker数量
CELERYD_CONCURRENCY = 200
# celery的任务结果内容格式
CELERY_ACCEPT_CONTENT = ['json', 'pickle']
# 设置失败允许重试[这个慎用,如果失败任务无法再次执行成功,会产生指数级别的失败记录]
# CELERY_ACKS_LATE = True
# 每个worker工作进程最多执行500个任务被销毁,可以防止内存泄漏,500是举例,根据自己的服务器的性能可以调整数值
# CELERYD_MAX_TASKS_PER_CHILD = 500
# 单个任务的最大运行时间,超时会被杀死[慎用,有大文件操作、长时间上传、下载任务时,需要关闭这个选项,或者设置更长时间]
# CELERYD_TIME_LIMIT = 10 * 60
# 任务发出后,经过一段时间还未收到acknowledge, 就将任务重新交给其他worker执行
CELERY_DISABLE_RATE_LIMITS = True

# 之前定时任务(定时一次调用),使用了apply_async({}, countdown=30);
# 设置定时任务(定时多次调用)的调用列表,需要单独运行SCHEDULE命令才能让celery执行定时任务:
# celery -A celeryapp.main beat,当然worker还是要启动的

# from celery.schedules import crontab
# CELERY_BEAT_SCHEDULE = {
#     "send_sms1": {  # 定时任务的注册标记符[必须唯一的]
#         "task": "send_sms1",  # 定时任务的任务名称
#         "schedule": 10,  # 定时任务的调用时间,10表示每隔10秒调用一次add任务
#         # "schedule": crontab(hour=7, minute=30, day_of_week=1),  # 定时任务的调用时间,每周一早上7点30分调用一次add任务
#     }
# }

lg_shop/__init__.py,主应用下初始化,代码:

import pymysql

pymysql.install_as_MySQLdb()
from .celery_app import app as celery_app

__all__ = ['celery_app']

***/tasks.py,这个任务文件要写在子应用下,app.autodiscover_tasks会自动调用。

from celery import shared_task

from sms import sms_sington


@shared_task(name="send_sms_code")
def send_sms_code(tid, mobile, datas):
    response = sms_sington.send_sms(tid=tid, mobile=mobile, datas=datas)
    return response

libs/sms/__init__.py

from .ronglianyunapi import SMSSington

sms_sington = SMSSington()

libs/sms/ronglianyunapi.py

import json

from django.conf import settings

from ronglian_sms_sdk import SmsSDK

from logger import log
from response_code import RETCODE


class SMSSington:
    def __new__(cls, *args, **kwargs):
        if not hasattr(cls, "_instance"):
            cls._instance = super().__new__(cls, *args, **kwargs)
            ronglianyun = settings.RONGLIANYUN
            cls._instance.sms = SmsSDK(ronglianyun.get("accId"), ronglianyun.get("accToken"), ronglianyun.get("appId"))
        return cls._instance

    def send_sms(self, mobile, datas, tid=1):
        """
        发送短信
        @params tid: 模板ID,默认测试使用1
        @params mobile: 接收短信的手机号,多个手机号使用逗号隔开
                单个号码: mobile="13312345678"
                多个号码: mobile="13312345678,13312345679,...."
        @params datas: 短信模板的参数列表(测试只能发四位验证码)
                例如短信模板为: 【云通讯】您的验证码是{1},请于{2}分钟内正确输入。
                则datas=("1234",5,)
        """
        sdk = self._instance.sms
        resp = sdk.sendMessage(tid, mobile, datas)
        response = json.loads(resp)
        if response.get("statusCode") != "000000":
            msg = {"code": RETCODE.SMSCODESENDRR, "msg": f"{mobile}验证码发送失败-{response.get('statusMsg')}"}
            log.error(msg)
        return response

django中的用户发送短信,就可以改成异步发送短信了。

视图中调用异步发送短信的任务,代码:

# 异步发送短信
from 子应用.tasks import send_sms_code
# 原来的
response = send_sms_code(tid=settings.RONGLIANYUN.get("reg_tid"), mobile=mobile,
                                       datas=(sms_code, SMS_CODE_EXPIRES // 60))
# 改成
response = send_sms_code.delay(tid=settings.RONGLIANYUN.get("reg_tid"), mobile=mobile,
                                       datas=(sms_code, SMS_CODE_EXPIRES // 60))

终端下先启动celery,在django项目根目录下启动。

# 1. 普通运行模式,关闭终端以后,celery就会停止运行
# celery -A 项目名 worker -l info
celery -A lg_shop worker -l info --pool=solo
# 2. 启动多worker进程模式,以守护进程的方式运行,不需要在意终端。但是这种运行模型,一旦停止,需要手动启动。
# windows大概率报错
celery multi start worker -A lg_shop -l info -n lg_shop_worker
# 3. 停止多worker进程模式
celery multi stop worker -A lg_shop -l info -n lg_shop_worker

关于celery中异步任务发布的2个方法的参数如下:

异步任务名.delay(*arg, **kwargs)
异步任务名.apply_async((arg,), {'kwarg': value}, countdown=60, expires=120)

定时任务的调用器启动,可以在运行了worker以后,使用以下命令:

celery -A lg_shop beat

beat调度器关闭了,则定时任务无法执行,如果worker工作进程关闭了,则celery关闭,保存在消息队列中的任务就会囤积在那里。

代码示例

https://github.com/HkwJsxl/lg_shop

标签:集成,celery,mobile,py,sms,send,Django,Celery,任务
From: https://www.cnblogs.com/hkwJsxl/p/17607658.html

相关文章

  • Springboot 3.x 集成Knife4j [踩坑日记]
    之前项目用的是SpringBoot2.x新项目用了SpringBoot3.x版本,引入Knife4j报错java.lang.TypeNotPresentException:Typejavax.servlet.http.HttpServletRequestnotpresent,以为是servlet的原因,更新了Servlet的版本,依旧报错,几番周折找到找到了在SpringBoot3中集成Knife......
  • django开发的safe过滤器适用
    django_safe在自己开发的博客系统当中,如果使用django框架,那么在发布博客内容的时候,希望可以发布一些被渲染过的样式文本,比如说图片等。下面是发布文章用到的代码#views.pydefarticle_detail(request,username,article_id):user=UserInfo.objects.filter(username=use......
  • django(ORM、数据库)
    一单表(增、删、改、查)1测试脚本'''只想测试django中的某一个py文件内容,那么可以不用书写前后端交互的形式而是直接写一个测试脚本即可'''#脚本代码无论是写在应用下的tests.py,还是自己单独开设py文件都可以#测试环境的准备,去manage.py中拷贝前四行代码,然后自己写两......
  • springboot 集成druid 集成mybatise
    spring加载druid和mybatisepom依赖<?xmlversion="1.0"encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation=&quo......
  • 外键字段的增删改查、多表查询、正反向的概念、子查询、多表查询之连表查询、聚合函数
    外键字段的增删改查多对多的外键增删改查图书和作者是多对多,借助于第三张表实现的,如果想绑定图书和作者的关系,本质上就是在操作第三方表操作第三张表问题:让你给图书添加一个作者,他俩的关系可是多对多让你给图书id=2添加一个作者id=1add方法book_obj=models.Book.objects......
  • 扩展Django:实现自己的manage命令
    我们都用过Django的django-admin.py和manage.py。django-admin.py是一个命令行工具,可以执行一些管理任务,比如创建Django项目。而manage.py是在创建每个Djangoproject时自动添加在项目目录下的,只是对manage.py的一个简单包装,其功能是将Djangoproject放到sys.path目录中,同时设置DJ......
  • Django-4.2博客开发教程:初识模板(九)
    一、模板简介为了更好的维护和展示页面数据,使用直接返回数据显然是呆板的,不够美观,不够灵活,所以要使用模板。模板一般都放到项目根目录下的templates文件夹里。模板包含一些基础的HTML代码和一些特殊的语法,通过特殊的语法将数据动态的插入HTML页面中。特殊的语法中有一些变量......
  • websocke在django中使用
    目录一、websocket介绍:二、后端1.安装(基于django3.x)2.settings配置3.修改asgi文件(默认不支持websocket,只支持http)一、websocket介绍:channels4.0之后默认不带Daphne服务器了。解决方案可以有两种:1.指定channels的版本为3.x;2.安装时使用pip3install-Uchannels[“dap......
  • 微信公众号发模板消息(spring集成)
    引入依赖:<dependency><groupId>me.chanjar</groupId><artifactId>weixin-java-mp</artifactId><version>1.3.3</version></dependency> 其中已实现的功能:publicinter......
  • Django 模型(数据库)
    Django模型是与数据库相关的,与数据库相关的代码一般写在 models.py本节的代码:(Django1.6,Python2.7测试环境)learn_models.zip大家可以按照我的步骤来开始做:django-admin.pystartprojectlearn_models#新建一个项目cdlearn_models#......