首页 > 其他分享 >Celery在项目中的使用

Celery在项目中的使用

时间:2022-11-13 13:14:15浏览次数:36  
标签:celery 项目 redis app sms send Celery 任务 使用

一 celery简要说明

1 Celery是一个功能完备即插即用的分布式异步任务队列框架。它适用于异步处理问题,当大批量发送邮件/短信等网络请求、或者大文件上传, 批图图像处理等等一些比较耗时的操作,我们可将其异步执行,这样的话原来的项目程序在执行过程中就不会因为耗时任务而形成阻塞,导致出现请求堆积过多的问题。celery通常用于实现异步执行耗时任务定时任务

2 Celery在工作中的作用:应用解耦,异步处理,流量削锋,消息通讯

Celery就是使用python编写的应用程序,用于异步执行任务(函数)代码的,任务的来源是第三方应用(服务端django这样的程序)通过消息(任务)进行通信。

celery通常使用一个叫Broker(中间人/消息中间件/消息队列/任务队列)来协助clients(任务的发出者/客户端)和worker(任务的处理者/工作进程).
clients发出消息到消息队列broker中,worker会以轮询的方式读取并处理broker中的消息(当然,专业的消息中间件可以实现派发消息给worker去处理)。

clients ---> 消息 --> Broker(消息队列) -----> 消息 ---> Worker工作进程(celery运行起来的)

并发任务的数量在10k以下的,直接使用redis
并发任务10k以上,1000k以下的,直接使用RabbitMQ
并发任务1000k以上的,直接使用RocketMQ

3 celery的组成架构:

Celery的运行架构由三部分组成,消息队列(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

  • 1 消息中间件(broker):任务调度队列,是一个生产者消费者模式。生产者把任务放入队列中,消费者(worker)从任务队列中取出任务执行,但是Broker本身不提供消息队列功能,所以要集成第三方队列,一般使用Redis或RatbbitMQ

  • 2 任务执行单元(worker):执行任务的程序,可以有多个并发。它实时监控消息队列,获取队列中调度的任务,并执行。

  • 3 任务执行结果存储:由于任务的执行同主程序分开,如果主程序想获取任务执行的结果,就必须通过中间件存储。同消息中间人一样,存储也可以使用RabbitMQ、Redis;如果不需要保存执行的结果也可以不配置这个模块。

3 Celery的特点是:

  • 简单,易于使用和维护,有丰富的文档,是python开发中非常热门的异步任务框架(目前最新稳定版本为:5.3.x),所以网上资料非常多。

  • 高效,支持多线程、多进程、协程模式运行,单个celery worker进程每分钟可以处理数百万个任务。

  • 灵活,celery中几乎每个部分都可以支持自定义扩展配置。

二 celery的安装使用

一 安装

pip install -U celery -i  https://pypi.tuna.tsinghua.edu.cn/simple
#Celery不建议在windows系统下使用,Celery在4.0版本以后不再支持windows系统,所以如果要在windows下使用只能安装4.0以前的版本,而且即便是4.0之前的版本,在windows系统下也是不能单独使用的,需要安装gevent、geventlet或eventlet协程模块。

二 使用

使用celery第一件要做的最为重要的事情是需要先创建一个Celery实例对象,我们一般叫做celery应用,或者更简单直接叫做一个app。app应用是我们使用celery所有功能的入口,比如创建任务,管理任务等.

celery框架有2种使用方式,一种是单独一个项目目录独立运行,另一种就是Celery集成到web项目框架中再独立运行。

2.1 celery作为一个单独项目运行

1 目录结构

服务端项目根目录/
└── mycelery/
    ├── settings.py   # 配置文件
    ├── __init__.py   
    ├── main.py       # 入口程序脚本,将来在终端下通过这个文件来运行celery的所有功能
    └── sms/          # 异步任务目录,这里拿发送短信来举例,一个类型的任务就一个目录
         └── tasks.py # 任务的文件,文件名必须是tasks.py!!!每一个任务就是一个被装饰的函数,写在任务文件中

2 入口文件:main.py

from celery import Celery

# 实例化celery应用
app = Celery("fuguang")

# 通过app实例对象加载配置文件
app.config_from_object("mycelery.settings")

# 注册任务, 自动搜索并加载任务
# 参数必须必须是一个列表,里面的每一个任务都是任务的路径名称
# app.autodiscover_tasks(["任务1","任务2",....])
app.autodiscover_tasks(["mycelery.sms","mycelery.email"])

# 启动Celery的终端命令
# 强烈建议切换目录到项目的根目录下启动celery!!
# celery -A mycelery.main worker --loglevel=info

3 配置文件:settings.py

# 任务队列的链接地址
broker_url = 'redis://127.0.0.1:6379/14'
# 结果队列的链接地址
result_backend = 'redis://127.0.0.1:6379/15'

4 创建任务文件:sms/tasks.py

from ..main import app

@app.task
def send_sms1():
    """没有任何参数的异步任务"""
    print('任务:send_sms1执行了...')

5 运行celery

cd 项目的根目录

# 普通的运行方式[默认多进程,卡终端,按CPU逻辑核数+1创建进程数]
# 注意:pidfile和logfile必须以绝对路径来声明
celery -A mycelery.main worker --loglevel=info --logfile="绝对路径/logs/celery.log"
# ps aux|grep celery  # 查看celery启动以后的所有进程

# 启动多工作进程,以守护进程的模式运行[一个工作进程就是4个子进程1个主进程]
celery multi start worker -A mycelery.main -E --pidfile="绝对路径/logs/worker1.pid" --logfile="绝对路径/logs/celery.log" -l info -n worker1
celery multi start worker -A mycelery.main -E --pidfile="绝对路径/logs/worker2.pid" --logfile="/home/moluo/Desktop/fuguang/fuguangapi/logs/celery.log" -l info -n worker2

# 关闭运行的工作进程
celery multi stop worker -A mycelery.main --pidfile="绝对路径/logs/worker1.pid" --logfile="绝对路径/logs/celery.log"
celery multi stop worker -A mycelery.main --pidfile="绝对路径/logs/worker2.pid" --logfile="绝对路径/logs/celery.log"

6 拿django的shell,在程序中调用上面的异步任务

# 调用celery执行异步任务
# 确保进入虚拟环境

cd ~/Desktop/fuguang/fuguangapi
conda activate fuguang

python manage.py shell
from mycelery.sms.tasks import send_sms1,send_sms2,send_sms3,send_sms4
mobile = "13312345656"
code = "666666"

# delay 表示马上按顺序来执行异步任务,在celrey的worker工作进程有空闲的就立刻执行
# 可以通过delay异步调用任务,可以没有参数
ret1 = send_sms1.delay()
# 可以通过delay传递异步任务的参数,可以按位置传递参数,也可以使用命名参数
# ret2 = send_sms.delay(mobile=mobile,code=code)
ret2 = send_sms2.delay(mobile,code)

# apply_async 让任务在后面指定时间后执行,时间单位:秒/s

# 无参数执行定时任务
# 任务名apply_async(countdown=定时时间)
ret3 = send_sms3.apply_async(countdown=15)

# 有参数执行定时任务
# 任务名.apply_async(args=(参数1,参数2), countdown=定时时间)
ret4 = send_sms4.apply_async(kwargs={"x":10,"y":20},countdown=30)

# 根据返回结果,不管delay,还是apply_async的返回结果都一样的。
ret4.id      # 返回一个UUID格式的任务唯一标志符,78fb827e-66f0-40fb-a81e-5faa4dbb3505
ret4.status  # 查看当前任务的状态 SUCCESS表示成功!PENDING 表示任务发送中
ret4.get()   # 获取任务执行的结果[如果任务函数中没有return,则没有结果,如果结果没有出现则会导致阻塞]
举例

7 celery可以调度第三方框架的代码,这里把django当成一个第三模块在main.py主程序中进行导包引入,并设置django的配置文件进行django的初始化。

import os,django
from celery import Celery


# 初始化django的运行
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'fuguangapi.settings.dev')
django.setup()

# 实例化celery应用
app = Celery("fuguang")

"""
通过app实例对象加载配置的方式有4种:
1. app.config_from_object("配置文件的导包路径") # 以python文件来声明配置,并导包到celery中[最常用的方式]
2. app.config_from_object(配置类)             # 以python对象的方式来声明配置 
3. app.config_from_cmdline("参数列表")         # 以终端参数的格式来声明celery的配置
4. app.config_from_envvar(环境变量)            # 以当前系统的环境变量的方式来声明配置
"""
app.config_from_object("mycelery.settings")

# 注册任务, 自动搜索并加载任务
# 参数必须必须是一个列表,里面的每一个任务都是任务的路径名称
# app.autodiscover_tasks(["任务1的包路径","任务2的包路径",....])
app.autodiscover_tasks(["mycelery.sms","mycelery.email"])

8 在需要使用django配置的任务中,直接加载配置,在项目中我们可以把注册的短信发送功能,整合成一个任务函数,mycelery.sms.tasks,代码:

from ..main import app
from ronglianyunapi import send_sms as send_sms_to_user


@app.task(name="send_sms")
def send_sms(tid, mobile, datas):
    """发送短信"""
    return send_sms_to_user(tid, mobile, datas)

@app.task
def send_sms1():
    """没有任何参数的异步任务"""
    print('任务:send_sms1执行了...')

9 重启celery,最终在django的视图里面,我们直接导包调用Celery的task.py任务来异步执行即可。

只需要完成2个步骤,分别是导入异步任务调用异步任务。users/views.py,代码:

import random
from django_redis import get_redis_connection
from django.conf import settings


class SMSAPIView(APIView):
    """
    SMS短信接口视图
    /users/sms/(?P<mobile>1[3-9]\d{9})
    """
    def get(self, request, mobile):
        """发送短信验证码"""
        redis = get_redis_connection("sms_code")
        # 判断本次请求是否属于短信冷却时间内?
        # 判断手机短信是否处于发送冷却中[60秒只能发送一条]
        interval = redis.ttl(f"interval_{mobile}")  # 通过ttl方法可以获取保存在redis中的变量的剩余有效期
        if interval != -2:
            return Response({
                "interval": interval,
                "errmsg": f"短信发送过于频繁,请{interval}秒后再次点击获取短信验证码!"
            },
            status=status.HTTP_400_BAD_REQUEST
            )

        # 基于随机数生成短信验证码
        code = f"{random.randint(100, 9999):04d}"
        # 获取短信有效期的时间
        time = settings.RONGLIANYUN.get("sms_expire")  # 指定时间内,用户可以使用
        # 短信发送间隔时间
        sms_interval = settings.RONGLIANYUN["sms_interval"]  # 指定时间过去以后,用户才可以重新获取短信验证码

        # [同步]调用第三方sdk发送短信
        # send_sms(settings.RONGLIANYUN.get("reg_tid"), mobile, datas=(code, time // 60))

        # [Celery异步]调用第三方sdk发送短信
        from mycelery.sms.tasks import send_sms
        send_sms.delay(settings.RONGLIANYUN.get("reg_tid"), mobile, datas=(code, time // 60))

        # 记录code到redis中,并以time作为有效期
        # 使用redis提供的管道对象pipeline来优化redis的写入操作[添加/修改/删除]
        pipe = redis.pipeline()
        pipe.multi()  # 开启事务[redis的事务保证了命令一并发送到服务端,但是不保证命令全部一起执行成功!]
        # redis.setex(f"sms_{mobile}", time, code)  # 直接发送给redis的服务端的
        pipe.setex(f"sms_{mobile}", time, code)
        pipe.setex(f"interval_{mobile}", sms_interval, "")
        pipe.execute()  # 提交事务,同时把暂存在pipeline对象的多条命令一次性提交给redis
        return Response({"errmsg": "OK"}, status=status.HTTP_200_OK)
users/views.py

上面就是使用celery并执行异步任务的第一种方式,适合在一些无法直接集成celery到项目中的场景。

2.2 celery作为模块集成到web项目中

1 目录结构

fg/           # 服务端项目根目录
└── fg/       # 主应用目录
    ├── apps/         # 子应用存储目录  
    ├   └── users/            # django的子应用
    ├       └── tasks.py      # [新增]分散在各个子应用下的异步任务模块,还是必须固定为tasks.py,不能叫别的名字。
    ├── settings/     # [修改]django的配置文件存储目录[celery的配置信息填写在django配置中即可]
    ├── __init__.py   # [修改]设置当前包目录下允许外界调用celery应用实例对象
    └── celery.py     # [新增]celery入口程序,相当于上一种用法的main.py

2 fg/celery.py,主应用目录下创建cerley入口程序,创建celery对象并加载配置和异步任务,代码

import os
from celery import Celery

# 必须在实例化celery应用对象之前执行
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'fuguangapi.settings.dev')

# 实例化celery应用对象
app = Celery('fgapi')
# 指定任务的队列名称
app.conf.task_default_queue = 'Celery'
# 也可以把配置写在django的项目配置中
app.config_from_object('django.conf:settings', namespace='CELERY') # 设置django中配置信息以 "CELERY_"开头为celery的配置信息
# 自动根据配置查找django的所有子应用下的tasks任务文件
app.autodiscover_tasks()

3 settings/dev.py,django配置中新增celery相关配置信息,代码:

# Celery异步任务队列框架的配置项[注意:django的配置项必须大写,所以这里的所有配置项必须全部大写]
# 任务队列
CELERY_BROKER_URL = 'redis://default:[email protected]:6379/14'
# 结果队列
CELERY_RESULT_BACKEND = 'redis://default:[email protected]:6379/15'
# 时区,与django的时区同步
CELERY_TIMEZONE = TIME_ZONE
# 防止死锁
CELERY_FORCE_EXECV = True
# 设置并发的worker数量
CELERYD_CONCURRENCY = 200
# 设置失败允许重试[这个慎用,如果失败任务无法再次执行成功,会产生指数级别的失败日志记录]
CELERY_ACKS_LATE = True
# 每个worker工作进程最多执行500个任务被销毁,可以防止内存泄漏,500是举例,根据自己的服务器的性能可以调整数值
CELERYD_MAX_TASKS_PER_CHILD = 500
# 单个任务的最大运行时间,超时会被杀死任务[慎用,有大文件操作、长时间上传、下载任务、图像分析时,需要关闭这个选项,或者设置更长时间]
CELERYD_TIME_LIMIT = 10 * 60
# 任务发出后,经过一段时间还未收到acknowledge, 就将任务重新交给其他worker执行
CELERY_DISABLE_RATE_LIMITS = True
# celery的任务结果内容格式[如果客户端调用程序不是python,则不能设置为pickle,因为其他语言不识别pickle格式数据]
CELERY_ACCEPT_CONTENT = ['json', 'pickle']

# 之前定时任务(定时一次调用),使用了apply_async({}, countdown=30);
# 设置定时任务(定时多次调用)的调用列表,需要单独运行SCHEDULE命令才能让celery执行定时任务:celery -A mycelery.main beat,当然worker还是要启动的
# https://docs.celeryproject.org/en/stable/userguide/periodic-tasks.html
# from celery.schedules import crontab
# CELERY_BEAT_SCHEDULE = {
#     "total_user_online": {               # 定时任务的注册标记符[必须唯一的]
#         "task": "total_user_online",     # 定时任务的任务名称,如果每隔1段时间,统计用户在线在数
#         "schedule": 5 * 60,              # 定时任务的调用时间,10表示每隔10秒调用一次任务,5*60表示每5分钟
#         # "schedule": crontab(hour=7, minute=30, day_of_week=1),,  # 定时任务的调用时间,每周一早上7点30分调用一次add任务
#     }
# }

fg/__init__.py,主应用下初始化,代码:

import pymysql
from .celery import app as celery_app

pymysql.install_as_MySQLdb()

__all__ = ['celery_app']

5 users/tasks.py,代码:

from celery import shared_task
from ronglianyunapi import send_sms as sms
# 记录日志:
import logging
logger = logging.getLogger("django")

@shared_task(name="send_sms")
def send_sms(tid, mobile, datas):
    """异步发送短信"""
    try:
        return sms(tid, mobile, datas)
    except Exception as e:
        logger.error(f"发送短信失败: {e}")

6 users.views,视图中从同级目录下导包调用异步发送短信的任务即可,代码:

import random
from django_redis import get_redis_connection
from django.conf import settings


class SMSAPIView(APIView):
    """
    SMS短信接口视图
    /users/sms/(?P<mobile>1[3-9]\d{9})
    """
    def get(self, request, mobile):
        """发送短信验证码"""
        redis = get_redis_connection("sms_code")
        # 判断本次请求是否属于短信冷却时间内?
        # 判断手机短信是否处于发送冷却中[60秒只能发送一条]
        interval = redis.ttl(f"interval_{mobile}")  # 通过ttl方法可以获取保存在redis中的变量的剩余有效期
        if interval != -2:
            return Response({
                "interval": interval,
                "errmsg": f"短信发送过于频繁,请{interval}秒后再次点击获取短信验证码!"
            },
            status=status.HTTP_400_BAD_REQUEST
            )

        # 基于随机数生成短信验证码
        code = f"{random.randint(100, 9999):04d}"
        # 获取短信有效期的时间
        time = settings.RONGLIANYUN.get("sms_expire")  # 指定时间内,用户可以使用
        # 短信发送间隔时间
        sms_interval = settings.RONGLIANYUN["sms_interval"]  # 指定时间过去以后,用户才可以重新获取短信验证码

        # [同步]调用第三方sdk发送短信
        # send_sms(settings.RONGLIANYUN.get("reg_tid"), mobile, datas=(code, time // 60))

        # [Celery异步]调用第三方sdk发送短信
        # from mycelery.sms.tasks import send_sms
        from .tasks import send_sms
        send_sms.delay(settings.RONGLIANYUN.get("reg_tid"), mobile, datas=(code, time // 60))

        # 记录code到redis中,并以time作为有效期
        # 使用redis提供的管道对象pipeline来优化redis的写入操作[添加/修改/删除]
        pipe = redis.pipeline()
        pipe.multi()  # 开启事务[redis的事务保证了命令一并发送到服务端,但是不保证命令全部一起执行成功!]
        # redis.setex(f"sms_{mobile}", time, code)  # 直接发送给redis的服务端的
        pipe.setex(f"sms_{mobile}", time, code)
        pipe.setex(f"interval_{mobile}", sms_interval, "")
        pipe.execute()  # 提交事务,同时把暂存在pipeline对象的多条命令一次性提交给redis
        return Response({"errmsg": "OK"}, status=status.HTTP_200_OK)
users/views.py

关于celery中异步任务发布的2个方法的参数如下:

delay(*arg, **kwargs)
apply_async((arg,), {'kwarg': value}, countdown=60, expires=120)

7 启动

cd 项目根目录
# 普通运行模式,关闭终端以后,celery就会停止运行
celery -A fuguangapi worker  -l INFO --logfile="根目录/logs/celery.log"

# 启动多worker进程模式,以守护进程的方式运行,不需要在意终端。但是这种运行模型,一旦停止,需要手动启动。
celery multi start worker -A fuguangapi -E --pidfile="根目录/logs/worker1.pid" --logfile="根目录/logs/celery.log" -l info -n worker1

# 暂停多worker进程模式
celery multi stop worker -A fuguangapi --pidfile="根目录/logs/worker1.pid"

8 定时任务的调用器启动

我们可以新增一个统计用户在线人数的计划任务(周期任务),users/tasks.py,代码:

from django_redis import get_redis_connection


@shared_task(name="total_user_online")
def total_user_online():
    """统计在线人数"""
    redis = get_redis_connection("default")
    res = redis.keys("user_*_access_token")
    return len(res)

settings/dev.py,配置注册计划任务,代码:

# 之前定时任务(定时一次调用),使用了apply_async({}, countdown=30);
# 设置定时任务(定时多次调用)的调用列表,需要单独运行SCHEDULE命令才能让celery执行定时任务:celery -A mycelery.main beat,当然worker还是要启动的
# https://docs.celeryproject.org/en/stable/userguide/periodic-tasks.html
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
    "total_user_online": {               # 定时任务的注册标记符[必须唯一的]
        "task": "total_user_online",     # 定时任务的任务名称,如果每隔1段时间,统计用户在线在数
        "schedule": 60,                  # 定时任务的调用时间,10表示每隔10秒调用一次任务,5*60表示每5分钟
        # "schedule": crontab(hour=7, minute=30, day_of_week=1),,  # 定时任务的调用时间,每周一早上7点30分调用一次add任务
    }
}

完成上面的操作以后,就可以在终端运行了worker工作进程以后,在另一个终端使用以下命令来开启计划任务的调度器:

cd ~/Desktop/fuguang/fuguangapi
celery -A fuguangapi beat

注意:如果beat调度器关闭了,则计划任务就无法执行,如果worker工作进程关闭了,则celery关闭,保存在消息队列中的任务就会囤积在那里。

9 celery可以使用supervisor进行后台托管celery的运行(supervisor会在celery关闭了以后,自动重启celery)。

还可以针对任务执行的情况和结果,使用celery-flower查看来进行监控。celery失败任务的重新尝试执行,除了flower,也有一个django-celery-results模块,也支持在admin站点后台管理celery异步任务的。

标签:celery,项目,redis,app,sms,send,Celery,任务,使用
From: https://www.cnblogs.com/daminghuahua/p/16885813.html

相关文章