首页 > 其他分享 >django中使用celery

django中使用celery

时间:2024-05-18 12:43:03浏览次数:13  
标签:CELERY task app django celery 使用 import

通用方案

将自定义的celery的包拉到django项目根目录

image-20240516200645444

在celery.py文件内导入django的环境变量

这样才能正常启动worker或者beat

from celery import Celery
import time
import os

# 必须要将django的环境变量加进去(在manage.py中复制)
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')
# 消息中间件
broker = 'redis://127.0.0.1:6379/1'
# 结果存储
backend = 'redis://127.0.0.1:6379/2'

# 1 创建app对象
app = Celery('app', broker=broker, backend=backend, include=['celery_task.order_task', 'celery_task.user_task'])

最后在需要用到celery的地方导入对应的任务函数----》提交任务

from celery_task.user_task import celery_send_sms, celery_add
from datetime import datetime, timedelta

class CeleryView(APIView):
    def get(self, request):
        # 异步发短信
        mobile = request.query_params.get('mobile')
        code = get_code()
        # 使用celery 异步提交任务
        res = celery_send_sms.delay(mobile=mobile, code=code)
        return APIResponse(msg=f'短信已发送{str(res)}')

    def post(self, request):
        x = request.data.get('x')
        y = request.data.get('y')
        eta = datetime.utcnow() + timedelta(seconds=10)
        res = celery_add.apply_async(args=(x, y), eta=eta)
        return APIResponse(msg=f'计算结果为{res}')

官方推荐的方案

【1】安装模块

# 1 安装模块
pip install Django==3.2.22
pip install celery
pip install redis
pip install eventlet  #在windows环境下需要安装eventlet包

【2】创建celery.py

在项目内层根目录下创建celery.py

image-20240516222208005

配置内容

import os
import django
from celery import Celery
from django.utils import timezone

# 为celery设置环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')
django.setup()

# 创建app
app = Celery('celery_demo')

# 会自动从django的配置文件中导入celery配置项
# 这里namespace配置之后,配置文件里面的celery配置项都需带上CELERY前缀 如下图
app.config_from_object('django.conf:settings', namespace='CELERY')

# 会自动检索django每个app下的task.py 里的函数作为任务
app.autodiscover_tasks()

# 解决时区问题,定时任务启动就循环输出
app.now = timezone.now

django的配置文件

image-20240516222345809

# django配置文件.py
# celery配置项

# Broker配置,使用Redis作为消息中间件
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/1'
# BACKEND配置,这里使用redis
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'
# 结果序列化方案
CELERY_RESULT_SERIALIZER = 'json'
# 并发任务数worker数量
CELERYD_CONCURRENCY = 20
# 每个worker最大执行任务数
CELERYD_MAX_TASKS_PER_CHILD = 20

更多配置

# 为django_celery_results存储Celery任务执行结果设置后台
# 格式为:db+scheme://user:password@host:port/dbname
# 支持数据库django-db和缓存django-cache存储任务状态及结果
CELERY_RESULT_BACKEND = "django-db"
# celery内容等消息的格式设置,默认json
CELERY_ACCEPT_CONTENT = ['application/json', ]
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

# 为任务设置超时时间,单位秒。超时即中止,执行下个任务。
CELERY_TASK_TIME_LIMIT = 5

# 为存储结果设置过期日期,默认1天过期。如果beat开启,Celery每天会自动清除。
# 设为0,存储结果永不过期
CELERY_RESULT_EXPIRES = xx

# 任务限流
CELERY_TASK_ANNOTATIONS = {'tasks.add': {'rate_limit': '10/s'}}

# Worker并发数量,一般默认CPU核数,可以不设置
CELERY_WORKER_CONCURRENCY = 2

# 每个worker执行了多少任务就会死掉,默认是无限的
CELERY_WORKER_MAX_TASKS_PER_CHILD = 200

完整配置

【3】修改__init__.py

配置与celery.py同级的__init__.py文件

from .celery1 import app as celery_app

__all__ = ['celery_app']

【4】建立tasks.py

在某个app下创建task.py,写对应的功能

注意,这个文件必须要tasks.py

例子

# 导入装饰器函数 shared_task
from celery import shared_task

@shared_task
def add(a, b):
    return a + b

【5】视图函数使用

from rest_framework.views import APIView
from utils.common_response import APIResponse

#导入task.py里面的任务函数
from .task import add

class CeleryTestView(APIView):
    def post(self, request):
        x = request.data.get('x')
        y = request.data.get('y')
        res = add.delay(x, y)
        return APIResponse(msg=f'执行了:>>>{str(res)}')

异步调用任务两种方法

# 方法一:delay方法
task_name.delay(args1, args2, kwargs=value_1, kwargs2=value_2)

# 方法二: apply_async方法,与delay类似,但支持更多参数
task.apply_async(args=[arg1, arg2], kwargs={key:value, key:value})

【6】启动worker

要注意项目名导入时的路径

因为它是基于环境变量导入的

所以要加上luffy_api

celery -A luffy_api.celery1 worker -l debug -P eventlet

补充(定时提交任务)

在celery.py添加配置

CELERY_BEAT_SCHEDULE = {
    'every_1_minutes': {
        'task': 'luffy_api.apps.home.task.add', # 
        'schedule': timedelta(seconds=2),
        'args': ()
    },
}

celery -A luffy_api.celery1 worker -l debug -P eventlet
celery -A luffy_api.celery1 beat -l debug

app.task和shared_task区别

1 两个装饰器的区别

# @app.task(bind=True)装饰器:

这是Celery库中的装饰器,用于将函数注册为Celery任务。
bind=True指定任务函数的第一个参数为任务实例本身(通常命名为self),允许您在任务函数内部访问任务实例的属性和方法。

# @shared_task(bind=True)装饰器:

这是Celery的另一个装饰器,用于将函数注册为共享任务(shared task)。
共享任务是一种特殊类型的任务,可以跨多个Celery应用程序共享和调用。
如果您的任务需要在多个Celery应用程序中共享,或者您希望使用共享任务的特性,那么可以考虑使用此装饰器。
 

#  使用建议

哪个装饰器更好取决于您的具体需求和使用情况:

如果您只是在单个Celery应用程序中定义和使用任务函数,那么@app.task(bind=True)装饰器可能更适合。

如果您需要共享任务或跨多个Celery应用程序使用任务,那么@shared_task(bind=True)装饰器可能更适合。

标签:CELERY,task,app,django,celery,使用,import
From: https://www.cnblogs.com/Hqqqq/p/18199224

相关文章

  • Celery包结构
    目录结构项目名├──celery_task #celery包│├──__init__.py#包文件│├──celery.py#celery连接和配置相关文件,且名字必须叫celery.py│└──user_tasks.py#所有用户相关任务函数 └──order_tasks.py#所有订......
  • Celery快速使用
    安装#0创建Python项目#1创建虚拟环境#2安装celerypipinstallcelery#3安装redis(消息队列和结果存储使用redis)pipinstallredis#4安装eventlet(win平台,如果是mac,linux不需要)pipinstalleventlet快速使用celery_demo.py--主文件fromceleryimportCel......
  • 使用django_celery_beat在admin后台配置计划任务
    使用步骤安装包pipinstalldjango-celery-beatapp注册app注册INSTALLED_APPS=[....'django_celery_beat',]配置文件:屏蔽原来的调度器CELERY_BEAT_SCHEDULER='django_celery_beat.schedulers.DatabaseScheduler'设置时区LANGUAGE_CODE='z......
  • Flower 监控celery任务
    Flower监控celery任务如果不想通django的管理界面监控任务的执行,还可以通过Flower插件来进行任务的监控。Flower的界面更加丰富,可以监控的信息更全Flower是一个用于监控和管理Celery集群的开源Web应用程序。它提供有关Celeryworkers和tasks状态的实时信息功能【1】......
  • Celery admin监视任务
    在控制台监控任务执行情况,还不是很方便,最好是能够通过web界面看到任务的执行情况,如有多少任务在执行,有多少任务执行失败了等这个Celery也是可以做到了,就是将任务执行结果写到数据库中,通过web界面显示出来。这里要用到django-celery-results插件。通过插件可以使用Django的orm作......
  • elementUI 使用 el-select 的远程搜索功能,导致数据无法回显怎么解决?
    问题:解决方法:在数据初始化的时候将获取到的数据做进一步的处理,进行本地select组件的一个添加constlabels=[];constvalues=[];res.data.rows.forEach((ele)=>{labels.push(ele.buildName);values.push(ele.buildCode)......
  • restcontroller中使用delete请求发送带参路径报错
    报错内容:Resolved[org.springframework.web.HttpRequestMethodNotSupportedException:Requestmethod'DELETE'notsupported]具体情景:postman中发送的路径: 每次用这个路径发送delete请求都报错,但是用这个路径发送get请求做查询却没问题,真的让我苦恼好久 解决:因为HttpC......
  • 一对一视频app开发,“锁”的合理使用很重要
    一对一视频app开发,“锁”的合理使用很重要,比如公平锁和非公平锁。一、基本概念公平锁:线程按照到来的先后顺序,排队等待使用资源。非公平锁:线程不一定按照先后顺序使用资源,而是可能出现“插队”的情况。ReentrantLock的公平锁和非公平锁synchronized是一种非公平锁,而Ree......
  • Asp-Net-Core开发笔记:使用ActionFilterAttribute实现非侵入式的参数校验
    前言在现代应用开发中,确保API的安全性和可靠性至关重要。面向切面编程(AOP)通过将横切关注点(如验证、日志记录、异常处理)与核心业务逻辑分离,极大地提升了代码的模块化和可维护性。在ASP.NETCore中,利用ActionFilterAttribute可以方便地实现AOP的理念,能够以简洁、高效的方式进行自......
  • 使用puppeteer url转换pdf中文不显示
    本来正在优化一个url转pdf的node服务,项目是使用puppeteer进行转换。功能优化提测后,发现转换后的pdf中文都不显示了。而本身这次优化没有涉及到转换本身相关的内容,这就很奇怪,而且本地转换都很正常。一般中文不显示,很可能是缺少中文字体导致的,于是看了项目中的Dockerfil......