首页 > 其他分享 >Celery

Celery

时间:2023-06-04 23:24:00浏览次数:50  
标签:celery 异步 sms send Celery 任务 import

Celery

1、简介

Celery是一个python第三方模块,是一个功能完备即插即用的分布式异步任务队列框架。

它适用于异步处理问题,当大批量发送邮件、或者大文件上传, 批图图像处理等等一些比较耗时的操作,我们可将其异步执行,解决了项目程序在执行过程中因为耗时任务而形成阻塞,导致出现请求堆积过多的问题。

celery通常用于实现异步任务或定时任务。

异步任务:将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等;

定时任务:定时执行某件事情,比如每天数据统计。

img

目前最新版本为: 5.2

项目:https://github.com/celery/celery/

文档:(3.1) http://docs.jinkan.org/docs/celery/getting-started/index.html

​ (最新) https://docs.celeryproject.org/en/latest/

Celery特点

  • 简单,易于使用和维护,有丰富的文档。
  • 高效,支持多线程、多进程、协程模式运行,单个celery进程每分钟可以处理数百万个任务。
  • 灵活,celery中几乎每个部分都可以自定义扩展。

celery的作用是:应用解耦,异步处理,流量削锋,消息通讯。

celery通过消息(任务)进行通信,

celery通常使用一个叫Broker(中间人/消息中间件/消息队列/任务队列)来协助clients(任务的发出者/客户端)和worker(任务的处理者/工作进程)进行通信.

clients发出消息到任务队列中,broker将任务队列中的信息派发给worker来处理。

client ---> 消息 --> Broker(消息队列) -----> 消息 ---> worker(celery运行起来的工作进程)

消息队列(Message Queue),也叫消息队列、中间件,简称消息中间件,它是一个独立运行的程序,表示在消息的传输过程中临时保存消息的容器。

所谓的消息,是指代在两台计算机或2个应用程序之间传送的数据。

消息可以非常简单,例如文本字符串或者数字,也可以是更复杂的json数据或hash数据等。

所谓的队列,是一种先进先出、后进呼后出的数据结构,python中的list数据类型就可以很方便地用来实现队列结构。

目前开发中,使用较多的消息队列有RabbitMQ,Kafka,RocketMQ,MetaMQ,ZeroMQ,ActiveMQ等,当然,像redis、mysql、MongoDB,也可以充当消息中间件,但是相对而言,没有上面那么专业和性能稳定。

并发任务10k以下的,直接使用redis

并发任务10k以上,1000k以下的,直接使用RabbitMQ

并发任务1000k以上的,直接使用RocketMQ

Celery运行架构

Celery的运行架构由三部分组成:

  • 任务消息队列(message broker)
  • 任务执行单元(worker)
  • 任务结果存储(task result store)。

img
image-20210915163601802

一个celery系统可以包含很多的worker和broker。

Celery本身不提供消息队列功能,但是可以很方便地和第三方提供的消息中间件进行集成,包括Redis,RabbitMQ,RocketMQ等

2、安装

pip install -U celery -i  https://pypi.tuna.tsinghua.edu.cn/simple

注意:

Celery不建议在windows系统下使用。

Celery在4.0版本以后不再支持windows系统,所以如果要在windows下使用只能安装4.0以前的版本,

而且即便是4.0之前的版本,在windows系统下也是不能单独使用的,需要安装gevent、geventlet或eventlet协程模块

3、基本使用

3.1 直接使用

创建异步任务执行文件celery_task.py:

import celery
import time

backend='redis://127.0.0.1:6379/1'  # redis的1号库作为 执行结果存储
broker='redis://127.0.0.1:6379/2'  # redis的2号库作为 任务消息队列

# 创建一个Celery实例对象
cel=celery.Celery('test',backend=backend,broker=broker)

@cel.task
def send_email(name):  # 发送邮件模拟函数
    print("向%s发送邮件..."%name)
    time.sleep(5)
    print("向%s发送邮件完成"%name)
    return "ok" 

celery_task.py定义了celery启动所需配置,包括:设置redis库作为任务消息队列与执行结果存储、创建Celery实例’注册任务。

该文件不能直接运行,必须通过celery命令去调用。

异步任务文件命令执行:

celery worker -A celery_task -l info

运行之后的提示:

image-20230401140227895

celery已经启动,接下来消费者即可创建任务了。

执行任务文件,produce_task.py:

from celery_task import send_email

result = send_email.delay("yuan")
print(result.id)  # 每个任务都会有一个唯一的id值

result2 = send_email.delay("alex")
print(result2.id)

# 注意:上面执行了两个任务:分别给yuan与alex发送邮件
# 异步执行,两个任务同时执行,一共需要5秒钟。

创建py文件:result.py,查看任务执行结果:

from celery.result import AsyncResult  # 专门用来取执行结果的类
from celery_task import cel  # 导入创建的celery实例

# 每个任务有一位的id值,利用该值可以取出执行的结果。
async_result=AsyncResult(id="c6ddd5b7-a662-4f0e-93d4-ab69ec2aea5d", app=cel)

if async_result.successful():
    result = async_result.get()
    print(result)
    # result.forget() # 将结果删除
elif async_result.failed():
    print('执行失败')
elif async_result.status == 'PENDING':
    print('任务等待中被执行')
elif async_result.status == 'RETRY':
    print('任务异常后正在重试')
elif async_result.status == 'STARTED':
    print('任务已经开始被执行')

3.2 优化:解耦合

上面直接使用的例子中,实例化celery、创建任务都放到一起了,耦合性不好,实际开发中, 我们常常需要将代码进行拆分

img

celery.py:实例化celery,进行配置

from celery import Celery

cel = Celery('celery_demo',
             broker='redis://127.0.0.1:6379/1',
             backend='redis://127.0.0.1:6379/2',
             # 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类
             include=['celery_tasks.task01',
                      'celery_tasks.task02'
                      ])

# 时区
cel.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
cel.conf.enable_utc = False

task01.py:不同类型的任务单独存放。

#task01
import time
from celery_tasks.celery import cel

@cel.task
def send_email(res):
    time.sleep(5)
    return "完成向%s发送邮件任务"%res

task02.py:不同类型的任务单独存放。

#task02
import time
from celery_tasks.celery import cel
@cel.task
def send_msg(name):
    time.sleep(5)
    return "完成向%s发送短信任务"%name

开启work:

celery worker -A celery -l info -P eventlet  # -P eventlet ---> 开协程

produce_task.py:任务的调用

from celery_tasks.task01 import send_email
from celery_tasks.task02 import send_msg

# 立即告知celery去执行test_celery任务,并传入一个参数
result = send_email.delay('yuan')
print(result.id)
result = send_msg.delay('yuan')
print(result.id)

结果查询check_result.py:

from celery.result import AsyncResult
from celery_tasks.celery import cel

async_result = AsyncResult(id="562834c6-e4be-46d2-908a-b102adbbf390", app=cel)

if async_result.successful():
    result = async_result.get()
    print(result)
    # result.forget() # 将结果删除,执行完成,结果不会自动删除
    # async.revoke(terminate=True)  # 无论现在是什么时候,都要终止
    # async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。
elif async_result.failed():
    print('执行失败')
elif async_result.status == 'PENDING':
    print('任务等待中被执行')
elif async_result.status == 'RETRY':
    print('任务异常后正在重试')
elif async_result.status == 'STARTED':
    print('任务已经开始被执行')

3.3 定时任务

celery除了应用于执行异步任务,也常用于执行定时任务。

异步任务与定时任务,仅是调用方式的不同。

  • 执行异步任务,使用任务.delay()来调用

  • 执行定时任务,使用任务.apply_async()来调用

from celery_task import send_email
from datetime import datetime

# 方式一
v1 = datetime(2020, 3, 11, 16, 19, 00)  # 创建一个时间
print(v1)
v2 = datetime.utcfromtimestamp(v1.timestamp())  # 将时间v1转换为utc时间
print(v2)
result = send_email.apply_async(args=["egon",], eta=v2)  # 使用apply_async启动定时任务,eta参数为执行时间
print(result.id)

# 方式二
ctime = datetime.now()
# 默认用utc时间
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())

from datetime import timedelta

time_delay = timedelta(seconds=10)
task_time = utc_ctime + time_delay

# 使用apply_async并设定时间
result = send_email.apply_async(args=["egon"], eta=task_time)
print(result.id)

3.4 将定时任务写入配置

我们也可以将定时任务统一写入配置文件中,实现解耦合的效果。

3.2 的celery.py修改如下:

from datetime import timedelta
from celery import Celery
from celery.schedules import crontab

cel = Celery('tasks', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', include=[
    'celery_tasks.task01',
    'celery_tasks.task02',
])
cel.conf.timezone = 'Asia/Shanghai'
cel.conf.enable_utc = False

cel.conf.beat_schedule = { # 指定定时任务,字典中存放的就是一个个定时任务
    # 定时任务的名称可以随意命名
    'add-every-10-seconds': {
        # 执行tasks1下的test_celery函数
        'task': 'celery_tasks.task01.send_email',
        # schedule有三种写法
        # 'schedule': 1.0,  # 第一种写法:每秒执行1次
        # 'schedule': crontab(minute="*/1"),  # 第二种写法:每分钟执行1次,*表示每分钟
        'schedule': timedelta(seconds=6),  # 第三种写法:时差的写法
        # 传递参数
        'args': ('张三',)
    },
    # 'add-every-12-seconds': {
    #     'task': 'celery_tasks.task01.send_email',
    #     每年4月11号,8点42分执行
    #     'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
    #     'args': ('张三',)
    # },
} 

前面的定时任务是生产者生成的,生产者通过任务的apply_async方法,定时地往消息队列中添加任务;

如果定时任务是写入配置文件中的,celery是通过beat来专门管理定时任务的。

说起启动celery时,除了启动broker,还需要启动beat。

# 启动 Beat 程序
celery beat -A 项目名 # Celery Beat进程会读取该项目的配置文件内容,周期性地将配置中到期需要执行的任务发送给任务队列
 
# 启动 worker 进程 
celery -A 文件名 worker -l info 
# 或者 
celery -B -A 文件名 worker -l info

在项目中使用

使用celery第一件要做的事情是先创建一个Celery实例对象,我们一般叫做celery应用对象,或者叫一个app。

app应用对象是我们使用celery所有功能的入口,比如启动celery、创建任务,管理任务,执行任务等.

celery框架有2种使用方式:

  • 一种是单独一个项目目录,

  • 一种就是Celery集成到web项目框架中。

celery作为一个单独项目运行

例如,mycelery代码目录直接放在项目根目录下即可,路径如下:

服务端项目根目录/
└── mycelery/
    ├── settings.py   # 配置文件
    ├── __init__.py   
    ├── main.py       # 入口程序
    └── sms/          # 异步任务目录,这里拿发送短信来举例,一个类型的任务就一个目录
         └── tasks.py # 任务的文件,文件名必须是tasks.py!!!每一个任务就是一个被装饰的函数,写在任务文件中

main.py,代码:

from celery import Celery

# 实例化celery应用,参数一般为项目应用名
app = Celery("luffycity")

# 通过app实例对象加载配置文件
app.config_from_object("mycelery.settings")

# 注册任务, 自动搜索并加载任务
# 参数必须必须是一个列表,里面的每一个任务都是任务的路径名称
# app.autodiscover_tasks(["任务1","任务2",....])
app.autodiscover_tasks(["mycelery.sms","mycelery.email"])

# 启动Celery的终端命令
# 强烈建议切换目录到项目的根目录下启动celery!!
# celery -A mycelery.main worker --loglevel=info

配置文件settings.py,代码:

# 任务队列的链接地址
broker_url = 'redis://127.0.0.1:6379/14'
# 结果队列的链接地址
result_backend = 'redis://127.0.0.1:6379/15'

关于配置信息的官方文档:https://docs.celeryproject.org/en/master/userguide/configuration.html

创建任务文件sms/tasks.py,任务文件名必须固定为"tasks.py",并创建任务,代码:

from ..main import app

@app.task(name="send_sms1")
def send_sms1():
    """没有参数没有返回结果的异步任务"""
    print('任务:send_sms1执行了...')

@app.task(name="send_sms2")
def send_sms2(mobile, code):
    """有参数没有返回结果的异步任务"""
    print(f'任务:send_sms2执行了...mobile={mobile}, code={code}')

@app.task(name="send_sms3")
def send_sms3():
    """没有参数有返回结果的异步任务"""
    print('任务:send_sms3执行了...')
    return 100

@app.task(name="send_sms4")
def send_sms4(x,y):
    """有参数又有返回结果的异步任务"""
    print('任务:send_sms4执行了...')
    return x+y

接下来,我们运行celery。

# 先切换到项目的根目录
cd ~/Desktop/luffycity/luffycityapi

# 普通的运行方式[默认多进程,卡终端,按CPU核数+1创建进程数]
# ps aux|grep celery
celery -A mycelery.main worker --loglevel=info

# 启动多工作进程,以守护进程的模式运行[一个工作进程就是4个子进程]
# 注意:pidfile和logfile必须以绝对路径来声明
celery multi start worker -A mycelery.main -E --pidfile="/home/moluo/Desktop/luffycity/luffycityapi/logs/worker1.pid" --logfile="/home/moluo/Desktop/luffycity/luffycityapi/logs/celery.log" -l info -n worker1
celery multi start worker -A mycelery.main -E --pidfile="/home/moluo/Desktop/luffycity/luffycityapi/logs/worker2.pid" --logfile="/home/moluo/Desktop/luffycity/luffycityapi/logs/celery.log" -l info -n worker2

# 关闭运行的工作进程
celery multi stop worker -A mycelery.main --pidfile="/home/moluo/Desktop/luffycity/luffycityapi/logs/worker1.pid" --logfile="/home/moluo/Desktop/luffycity/luffycityapi/logs/celery.log"
celery multi stop worker -A mycelery.main --pidfile="/home/moluo/Desktop/luffycity/luffycityapi/logs/worker2.pid" --logfile="/home/moluo/Desktop/luffycity/luffycityapi/logs/celery.log"

效果如下:

image-20230325151346565

调用上面的异步任务,拿django的shell进行举例:

# 因为celery模块安装在了虚拟环境中,所以要确保进入虚拟环境
conda activate luffycity
cd ~/Desktop/luffycity/luffycityapi

python manage.py shell

# 调用celery执行异步任务
from mycelery.sms.tasks import send_sms1,send_sms2,send_sms3,send_sms4
mobile = "13312345656"
code = "666666"

# delay 表示马上按顺序来执行异步任务,在celrey的worker工作进程有空闲的就立刻执行
# 可以通过delay异步调用任务,可以没有参数
ret1 = send_sms1.delay()
# 可以通过delay传递异步任务的参数,可以按位置传递参数,也可以使用命名参数
# ret2 = send_sms.delay(mobile=mobile,code=code)
ret2 = send_sms2.delay(mobile,code)

# apply_async 让任务在后面指定时间后执行,时间单位:秒/s
# 任务名.apply_async(args=(参数1,参数2), countdown=定时时间)
ret4 = send_sms4.apply_async(kwargs={"x":10,"y":20},countdown=30)

# 根据返回结果,不管delay,还是apply_async的返回结果都一样的。
ret4.id      # 返回一个UUID格式的任务唯一标志符,78fb827e-66f0-40fb-a81e-5faa4dbb3505
ret4.status  # 查看当前任务的状态 SUCCESS表示成功! PENDING任务等待
ret4.get()   # 获取任务执行的结果[如果任务函数中没有return,则没有结果,如果结果没有出现则会导致阻塞]

if ret4.status == "SUCCESS":
    print(ret4.get())

接下来,我们让celery可以调度第三方框架的代码,这里拿django当成一个第三模块调用进行举例。

在main.py主程序中对django进行导包引入,并设置django的配置文件进行django的初始化。

import os,django
from celery import Celery

# 初始化django,把celery和django进行组合,识别和加载django的配置文件
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffycityapi.settings.dev')
django.setup()

# 初始化celery对象
app = Celery("luffycity")

# 通过app对象加载配置
app.config_from_object("mycelery.config")

# 自动注册任务
app.autodiscover_tasks(["mycelery.sms","mycelery.email"])
# 运行celery
# 终端下: celery -A mycelery.main worker -l info

在需要使用django配置的任务中,直接加载配置,所以我们把注册的短信发送功能,整合成一个任务函数,mycelery.sms.tasks,代码:

from ..main import app
from ronglianyunapi import send_sms as send_sms_to_user

@app.task(name="send_sms1")
def send_sms1():
    """没有任何参数,没有返回结果的异步任务"""
    print('任务:send_sms1执行了...')

@app.task(name="send_sms2")
def send_sms2(mobile, code):
    """有参数,没有返回结果的异步任务"""
    print(f'任务:send_sms2执行了...mobile={mobile}, code={code}')


@app.task(name="send_sms3")
def send_sms3():
    """没有任何参数,有返回结果的异步任务"""
    print('任务:send_sms3执行了...')
    return 100

@app.task(name="send_sms4")
def send_sms4(x,y):
    """有结果的异步任务"""
    print('任务:send_sms4执行了...')
    return x+y

@app.task(name="send_sms")
def send_sms(tid, mobile, datas):
    """发送短信"""
    print("给{}发送短信".format(mobile))
    return send_sms_to_user(tid, mobile, datas)

最终在django的视图里面,我们调用Celery来异步执行任务。

只需要完成2个步骤,分别是导入异步任务调用异步任务

users/views.py,代码:

import random
from django_redis import get_redis_connection
from django.conf import settings
# from ronglianyunapi import send_sms
from mycelery.sms.tasks import send_sms
"""
/users/sms/(?P<mobile>1[3-9]\d{9})
"""
class SMSAPIView(APIView):
    """
    SMS短信接口视图
    """
    def get(self, request, mobile):
        """发送短信验证码"""
        redis = get_redis_connection("sms_code")
        # 判断手机短信是否处于发送冷却中[60秒只能发送一条]
        interval = redis.ttl(f"interval_{mobile}")  # 通过ttl方法可以获取保存在redis中的变量的剩余有效期
        if interval != -2:
            return Response({"errmsg": f"短信发送过于频繁,请{interval}秒后再次点击获取!", "interval": interval},status=status.HTTP_400_BAD_REQUEST)

        # 基于随机数生成短信验证码
        # code = "%06d" % random.randint(0, 999999)
        code = f"{random.randint(0, 999999):06d}"
        # 获取短信有效期的时间
        time = settings.RONGLIANYUN.get("sms_expire")
        # 短信发送间隔时间
        sms_interval = settings.RONGLIANYUN["sms_interval"]
        # 调用第三方sdk发送短信
        # send_sms(settings.RONGLIANYUN.get("reg_tid"), mobile, datas=(code, time // 60))
        # 异步发送短信
        send_sms.delay(settings.RONGLIANYUN.get("reg_tid"), mobile, datas=(code, time // 60))

        # 记录code到redis中,并以time作为有效期
        # 使用redis提供的管道对象pipeline来优化redis的写入操作[添加/修改/删除]
        pipe = redis.pipeline()
        pipe.multi()  # 开启事务
        pipe.setex(f"sms_{mobile}", time, code)
        pipe.setex(f"interval_{mobile}", sms_interval, "_")
        pipe.execute()  # 提交事务,同时把暂存在pipeline的数据一次性提交给redis

        return Response({"errmsg": "OK"}, status=status.HTTP_200_OK)

上面就是使用celery并执行异步任务的第一种方式,适合在一些无法直接集成celery到项目中的场景。

Celery作为第三方模块集成到项目中

这里还是拿django来举例,目录结构调整如下:

luffycityapi/           # 服务端项目根目录
└── luffycityapi/       # 主应用目录
    ├── apps/           # 子应用存储目录  
    ├   └── users/            # django的子应用
    ├       └── tasks.py      # [新增]分散在各个子应用下的异步任务模块
    ├── settings/     # [修改]django的配置文件存储目录[celery的配置信息填写在django配置中即可]
    ├── __init__.py   # [修改]设置当前包目录下允许外界调用celery应用实例对象
    └── celery.py     # [新增]celery入口程序,相当于上一种用法的main.py

luffycityapi/celery.py,主应用目录下创建cerley入口程序,创建celery对象并加载配置和异步任务,代码:

import os
from celery import Celery

# 必须在实例化celery应用对象之前执行
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffycityapi.settings.dev')

# 实例化celery应用对象
app = Celery('luffycityapi')

# 指定任务的队列名称
app.conf.task_default_queue = 'Celery'

# 也可以把配置写在django的项目配置中
app.config_from_object('django.conf:settings', namespace='CELERY') 
# 设置django中配置信息以 "CELERY_"开头为celery的配置信息

# 自动根据配置查找django的所有子应用下的tasks任务文件
app.autodiscover_tasks()

settings/dev.py,django配置中新增celery相关配置信息,代码:

# Celery异步任务队列框架的配置项
# [注意:django的配置项必须大写,所以这里的所有配置项必须全部大写]

# 任务队列
CELERY_BROKER_URL = 'redis://:[email protected]:6379/14'

# 结果队列
CELERY_RESULT_BACKEND = 'redis://:[email protected]:6379/15'

# 时区,与django的时区同步
CELERY_TIMEZONE = TIME_ZONE

# 防止死锁
CELERY_FORCE_EXECV = True

# 设置并发的worker数量
CELERYD_CONCURRENCY = 200

# 设置失败允许重试,这个慎用!!!!!!
# 如果设置为True,必须在异步任务中指定失败重试的次数
# 否则,如果任务一直失败,会产生指数级别的失败记录
CELERY_ACKS_LATE = True

# 每个worker工作进程最多执行500个任务被销毁,可以防止内存泄漏,500是举例,根据自己的服务器的性能可以调整数值
CELERYD_MAX_TASKS_PER_CHILD = 500

# 单个任务的最大运行时间,超时会被杀死[慎用,有大文件操作、长时间上传、下载任务时,需要关闭这个选项,或者设置更长时间]
CELERYD_TIME_LIMIT = 10 * 60

# 任务发出后,经过一段时间还未收到acknowledge, 就将任务重新交给其他worker执行
CELERY_DISABLE_RATE_LIMITS = True

# celery的任务结果内容格式
CELERY_ACCEPT_CONTENT = ['json', 'pickle']

# 之前定时任务(定时一次调用),使用了apply_async({}, countdown=30);
# 设置定时任务(定时多次调用)的调用列表,需要单独运行SCHEDULE命令才能让celery执行定时任务:celery -A mycelery.main beat,当然worker还是要启动的
# https://docs.celeryproject.org/en/stable/userguide/periodic-tasks.html
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
    "user-add": {  # 定时任务的注册标记符[必须唯一的]
        "task": "add",   # 定时任务的任务名称
        "schedule": 10,  # 如果不使用crontab,直接填写定时任务的调用时间,例如10,表示每隔10秒调用一次add任务
        # "schedule": crontab(hour=7, minute=30, day_of_week=1),,  # 定时任务的调用时间,每周一早上7点30分调用一次add任务
    }
}

luffycityapi/__init__.py,主应用下初始化,代码:

import pymysql
from .celery import app as celery_app

pymysql.install_as_MySQLdb()

__all__ = ['celery_app']

users/tasks.py,代码:

from celery import shared_task
from ronglianyunapi import send_sms as sms

# 记录日志:
import logging
logger = logging.getLogger("django")

@shared_task(name="send_sms")
def send_sms(tid, mobile, datas):
    """异步发送短信"""
    try:
        return sms(tid, mobile, datas)
    except Exception as e:
        logger.error(f"手机号:{mobile},发送短信失败错误: {e}")


@shared_task(name="send_sms1")
def send_sms1():
    print("send_sms1执行了!!!")

django中的用户发送短信,就可以改成异步发送短信了。

users/views,视图中调用异步发送短信的任务,代码:

from .tasks import send_sms
send_sms.delay(settings.RONGLIANYUN.get("reg_tid"),mobile, datas=(code, time // 60))

users/views.py,异步发送信息的完整视图,代码:

import random
from django_redis import get_redis_connection
from django.conf import settings
# from ronglianyunapi import send_sms
# from mycelery.sms.tasks import send_sms
from .tasks import send_sms

"""
/users/sms/(?P<mobile>1[3-9]\d{9})
"""
class SMSAPIView(APIView):
    """
    SMS短信接口视图
    """
    def get(self, request, mobile):
        """发送短信验证码"""
        redis = get_redis_connection("sms_code")
        # 判断手机短信是否处于发送冷却中[60秒只能发送一条]
        interval = redis.ttl(f"interval_{mobile}")  # 通过ttl方法可以获取保存在redis中的变量的剩余有效期
        if interval != -2:
            return Response({"errmsg": f"短信发送过于频繁,请{interval}秒后再次点击获取!", "interval": interval},status=status.HTTP_400_BAD_REQUEST)

        # 基于随机数生成短信验证码
        # code = "%06d" % random.randint(0, 999999)
        code = f"{random.randint(0, 999999):06d}"
        # 获取短信有效期的时间
        time = settings.RONGLIANYUN.get("sms_expire")
        # 短信发送间隔时间
        sms_interval = settings.RONGLIANYUN["sms_interval"]
        # 调用第三方sdk发送短信
        # send_sms(settings.RONGLIANYUN.get("reg_tid"), mobile, datas=(code, time // 60))
        # 异步发送短信
        send_sms.delay(settings.RONGLIANYUN.get("reg_tid"), mobile, datas=(code, time // 60))

        # 记录code到redis中,并以time作为有效期
        # 使用redis提供的管道对象pipeline来优化redis的写入操作[添加/修改/删除]
        pipe = redis.pipeline()
        pipe.multi()  # 开启事务
        pipe.setex(f"sms_{mobile}", time, code)
        pipe.setex(f"interval_{mobile}", sms_interval, "_")
        pipe.execute()  # 提交事务,同时把暂存在pipeline的数据一次性提交给redis

        return Response({"errmsg": "OK"}, status=status.HTTP_200_OK)

终端下先启动celery,在django项目根目录下启动。

cd ~/Desktop/luffycity/luffycityapi
# 1. 普通运行模式,关闭终端以后,celery就会停止运行
celery -A luffycityapi worker  -l INFO

# 2. 启动多worker进程模式,以守护进程的方式运行,不需要在意终端。但是这种运行模型,一旦停止,需要手动启动。
celery multi start worker -A luffycityapi -E --pidfile="/home/moluo/Desktop/luffycity/luffycityapi/logs/worker1.pid" --logfile="/home/moluo/Desktop/luffycity/luffycityapi/logs/celery.log" -l info -n worker1

# 3. 启动多worker进程模式
celery multi stop worker -A luffycityapi --pidfile="/home/moluo/Desktop/luffycity/luffycityapi/logs/worker1.pid"

还是可以在django终端下调用celery的

$ python manage.py shell
>>> from users.tasks import send_sms1
>>> res = send_sms1.delay()
>>> res = send_sms1.apply_async(countdown=15)
>>> res.id
'893c31ab-e32f-44ee-a321-8b07e9483063'
>>> res.state
'SUCCESS'
>>> res.result

关于celery中异步任务发布的2个方法的参数如下:

异步任务名.delay(*arg, **kwargs)
异步任务名.apply_async((arg,), {'kwarg': value}, countdown=60, expires=120)

定时任务的调用器启动,可以在运行了worker以后,使用以下命令:

cd ~/Desktop/luffycity/luffycityapi
celery -A luffycityapi beat

beat调度器关闭了,则定时任务无法执行,如果worker工作进程关闭了,则celery关闭,保存在消息队列中的任务就会囤积在那里。

image-20210721123812775

标签:celery,异步,sms,send,Celery,任务,import
From: https://www.cnblogs.com/zibuyu2015831/p/17456679.html

相关文章

  • Celery框架
    Celery框架1.什么是celerycelery是一个简答,灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度.这里面提到了一个概念:分布式系统一个系统应用(网站),会有相关组件(web服务器,web应用,数据库,消息中间件),将它们架构在不同......
  • celery笔记三之task和task的调用
    本文首发于公众号:Hunter后端原文链接:celery笔记三之task和task的调用这一篇笔记介绍task和task的调用。以下是本篇笔记目录:基础的task定义方式日志处理任务重试忽略任务运行结果task的调用1、基础的task定义方式前面两篇笔记中介绍了最简单的定义方式,使用@......
  • celery笔记一之celery介绍、启动和运行结果跟踪
    本文首发于公众号:Hunter后端原文链接:celery笔记一之celery介绍、启动和运行结果跟踪本篇笔记内容如下:celery介绍celery准备celery启动和异步任务的运行运行结果跟踪1、celery介绍celery大致有两种应用场景,一种是异步任务,一种是定时任务。比如说在一个接口请求中,......
  • 使用Celery实现计划任务与异步任务
    前言Celery是一个开源的分布式任务队列系统,用于处理异步任务和分布式任务调度。使用消息代理(如RabbitMQ、Redis)来实现任务队列和消息传递。在使用Python开发web应用过程中,经常使用Celery完成两种任务需求:异步任务。将任务提交到任务队列中,然后继续处理其他任务,而不必等待任务完成。......
  • django通过celery定时任务
    settings.py #Broker配置,使用Redis作为消息中间件BROKER_URL='redis://127.0.0.1:6379/0'#BACKEND配置,这里使用redisCELERY_RESULT_BACKEND='redis://127.0.0.1:6379/0'#结果序列化方案CELERY_RESULT_SERIALIZER='json'#任务结果过期时间,秒CELERY_TASK......
  • Celery
    Celery一、什么是Celery?Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统专注于实时处理的异步任务队列同时也支持任务调度二、Celery架构Celery的架构由三部分组成,消息中间件(messagebroker),任务执行单元(worker)和任务执行结果存储(taskresultstore)组成。1、消息中......
  • Celery - 分布式任务队列
    Celery-分布式任务队列目录Celery-分布式任务队列1celery简介1.1什么是celery1.2celery架构(1)消息中间件messagebroker(2)任务执行单元worker(3)任务结果存储taskresultstore(4)使用场景2Celery安装与使用2.1安装2.2快速使用①第1步:创建celeryapp与创建任务②第2步......
  • Flask中使用Celery教程
    不管是使用什么编程语言,使用什么框架。在服务器上执行耗时操作,比如网络请求、视频转码、图片处理等。如果想实现快速响应客户端的需求,则必须使用任务队列。任务队列是一个单独的程序,和网站没有直接关系,任务队列提供了接口,能在网站中通过代码操作任务队列,比如:添加任务,查看任务结......
  • Celery 的详解
    (一)Celery的使用1.概述Celery是一个基于分布式消息传递的任务队列,用于异步处理任务和定时任务等。它可以让你将耗时的任务放到后台处理,从而不会阻塞Web应用程序的主线程。(例如:发送电子邮件、生成报表、爬虫、定时任务等)Celery可以与多种消息代理(如RabbitMQ、Redis等)配......
  • Django 如何使用 Celery 完成异步任务或定时任务
    以前版本的Celery需要一个单独的库(django-celery)才能与Django一起工作,但从Celery3.1开始,情况便不再如此,我们可以直接通过Celery库来完成在Django中的任务。安装Redis服务端以Docker安装为例,安装一个密码为mypassword的Redis服务端dockerrun-itd--name......