目录结构
E:.
│ db.sqlite3
│ Dockerfile
│ manage.py
│ requirements.txt
│
├─celery_tasks # 自定义一个celery的工作目录
│ │ config.py # celery配置文件
│ │ __init__.py
│ │
│ ├─sms
│ tasks.py # worker任务
│ __init__.py
│
│
├─dashboard
│ │ asgi.py
│ │ celery.py # 同settings.py目录下 创建该文件
│ │ settings.py
│ │ urls.py
│ │ wsgi.py
│ │ __init__.py # 修改该文件
│
├─dbcv # app
│ admin.py
│ apps.py
│ models.py
│ serializers.py
│ test.py
│ tests.py
│ views.py
│ __init__.py
......
celery
1、创建celery工作目录
在项目根目录下创建 celery_tasks
2、创建config.py配置文件
# 配置异步任务
# 设置结果存储
result_backend = 'redis://127.0.0.1:6379/1'
# 设置代理人broker
broker_url = 'redis://127.0.0.1:6379/2'
# celery 的启动工作数量设置
CELERY_WORKER_CONCURRENCY = 20
# 任务预取功能,就是每个工作的进程/线程在获取任务的时候,会尽量多拿 n 个,以保证获取的通讯成本可以压缩。
WORKER_PREFETCH_MULTIPLIER = 20
# 非常重要,有些情况下可以防止死锁
CELERYD_FORCE_EXECV = True
# celery 的 worker 执行多少个任务后进行重启操作
CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
# 禁用所有速度限制,如果网络资源有限,不建议开足马力。
WORKER_DISABLE_RATE_LIMITS = True
# 明确指示在启动时进行连接重试
# BROKER_CONNECTION_RETRY_ON_STARTUP = True
broker_connection_retry_on_startup = True
# 2、配置定时任务
timezone = 'Asia/Shanghai'
DJANGO_CELERY_BEAT_TZ_AWARE = False
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
``
2、创建celery.py
同settings.py目录下 创建该文件
import os
from django.utils import timezone
from celery import Celery
from celery.schedules import crontab, timedelta
from celery_tasks import config
设置环境变量, 导入django配置; 后续便于使用django组件功能
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'dashboard.settings')
app = Celery("apps")
读取配置文件中的配置
app.config_from_object(config)
自动从django注册的app中发现所有任务
app.autodiscover_tasks()
解决时区问题,定时任务启动就循环输出
app.now = timezone.now
celery -A dashboard worker -l info -P eventlet
3、创建个wocker文件:tasks.py
常见的如发送邮件、短信等等
tasks.py
import time
from dashboard.celery import app
通过 @app.task 装饰器,定义celery的wocker函数对象
@app.task
def send_sms(desc=None):
print("send_sms start")
if desc:
print(desc)
time.sleep(20)
print("send_sms end")
return "send_sms ok"
4、在app视图中调用
@api_view(['GET'])
def test(request):
"""
测试token
"""
current_time = timezone.now()
print("当前时间是:{}".format(current_time))
# .delay() 方法调用
result = tasks.send_sms.delay("视图调用celery任务")
res_data = {'data': {'status_code': 200}}
return Response(**res_data)
5、运行celery
打开终端,win+r,cmd
windows下需要加 -P eventlet 运行,否则会报错(celery的问题)
celery -A dashboard worker -l info -P eventlet
Linux下
celery -A dashboard worker -l info
6、运行django
python manage.py runserver
7、通过接口调用接口API,观察celery的运行即可
完成!!!
标签:tasks,app,py,redis,django,celery,import
From: https://www.cnblogs.com/lanjianhua/p/18447192