使用官方方案之前,先看看目录结构。
luffy_api/
__init__.py
celery.py
settings.py
urls.py
wsgi.py
myapp/
__init__.py
tasks.py
# 也就是放在项目名称同名的内部文件夹下(和settings)同名
# 必须是这样的结构。
第一步 安装必要的模块
# 1 安装必要的模块
pip install Django
pip install celery
pip install redis
pip install eventlet #在windows环境下需要安装eventlet包
第二步 设置中配置
# 常规的配置
# 如果redis安装在本机,使用localhost或127.0.0.1
# 如果docker部署的redis,使用redis://redis:6379
CELERY_BROKER_URL = "redis://127.0.0.1:6379/1"
CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/2'
CELERY_ACCEPT_CONTENT=['json']
CELERY_TASK_SERIALIZER='json'
CELERY_RESULT_SERIALIZER='json'
CELERY_TASK_RESULT_EXPIRES=60 * 60 * 24
# celery时区设置,建议与Django settings中TIME_ZONE同样时区,防止时差
# Django设置时区需同时设置USE_TZ=True和TIME_ZONE = 'Asia/Shanghai'
CELERY_TIMEZONE = TIME_ZONE
# 2 在配置文件settings.py
# 选择需要的配置就行
CELERY_BROKER_URL = "redis://127.0.0.1:6379/1"
CELERY_RESULT_BACKEND = "redis://127.0.0.1:6379/2"
# celery 的启动工作数量设置
CELERY_WORKER_CONCURRENCY = 5
# 任务预取功能,就是每个工作的进程/线程在获取任务的时候,会尽量多拿 N 个,以保证获取的通讯成本可以压缩。
CELERY_WORKER_PREFETCH_MULTIPLIER = 5
# CELERY 的 WORKER 执行多少个任务后进行重启操作
CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
# 禁用所有速度限制,如果网络资源有限,不建议开足马力。
CELERY_WORKER_DISABLE_RATE_LIMITS = True
# 指定任务接受的序列化类型
CELERY_ACCEPT_CONTENT = ['json']
# 指定任务序列化方式
CELERY_TASK_SERIALIZER = 'json'
# 指定结果序列化的方式
CELERY_RESULT_SERIALIZER = 'json'
# celery时区设置,建议与Django settings中TIME_ZONE同样时区,防止时差
# Django设置时区需同时设置USE_TZ=True和TIME_ZONE = 'Asia/Shanghai'
CELERY_TIMEZONE = "Asia/Shanghai"
# 为任务设置超时时间,单位秒。超时即中止,执行下个任务。
CELERY_TASK_TIME_LIMIT = 5
# 为存储结果设置过期日期,默认1天过期。如果beat开启,Celery每天会自动清除。
# 设为0,存储结果永不过期
CELERY_RESULT_EXPIRES = 0
# 任务限流
# CELERY_TASK_ANNOTATIONS = {'tasks.add': {'rate_limit': '10/s'}}
# Worker并发数量,一般默认CPU核数,可以不设置
# CELERY_WORKER_CONCURRENCY = 2
# 每个worker执行了多少任务就会死掉,默认是无限的
# CELERY_WORKER_MAX_TASKS_PER_CHILD = 200
# 是否启用UTC时间
CELERY_ENABLE_UTC = False
# 设置CELERY BEAT(任务调度器)检查任务的频率。在这里,CELERY_BEAT_SYNC_EVERY = 1 表示CELERY BEAT 每秒钟检查一次任务
CELERY_BEAT_SYNC_EVERY = 1
# SETTINGS USE_TZ=FALSE时添加该选项,否启动 DJANGO CELERY BEAT 的时候,出现这个错误TYPEERROR: CAN'T COMPARE OFFSET-NAIVE AND OFFSET-AWARE DATETIMES
CELERY_DJANGO_CELERY_BEAT_TZ_AWARE = False
# 休眠最大秒数
CELERY_BEAT_MAX_LOOP_INTERVAL = 300
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
第三步 写入逻辑代码
# luffy_api/luffy_api/celery.py
from celery import Celery
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffy_api.settings.dev")
import django
django.setup()
from django.conf import settings
app = Celery("luffy_api") # 这个luffy_api 即 celery -A 后面的名称
# namespace='CELERY'作用是允许你在Django配置文件中对Celery进行配置
# 但所有Celery配置项必须以CELERY开头,防止冲突
app.config_from_object('django.conf:settings', namespace='CELERY')
# 自动从Django的已注册app中发现任务
app.autodiscover_tasks()
# luffy_api/luffy_api/__init__.py
from .celery import app as celery_app
__all__ = ('celery_app',)
第四步 创建任务
注意,这个名称必须叫做tasks.py
# app下面的tasks.py
from celery import shared_task
@shared_task
def add(x, y):
return x + y
第五步
启动worker,如果有定时任务启动beat
# 在项目根目录下运行
celery -A luffy_api worker -l info -P eventlet # windows
# 启动beat 如果有
celery -A luffy_api beat -l info
第六步
通过视图去执行代码
from .tasks import add
class CeleryView(APIView):
def get(self, request):
x = request.query_params.get('x')
y = request.query_params.get('y')
res = add.delay(x, y)
return APIResponse(msg=str(res.get()))
# http://127.0.0.1:8000/api/v1/user/celery/?x=10&y=19
参考资料:
https://celery.xgqyq.com/Django.html
https://pythondjango.cn/django/advanced/12-sync-periodic-tasks-with-celery/
https://www.jianshu.com/p/39fa4ac3a236