目录结构
项目名
├── celery_task # celery包
│ ├── __init__.py # 包文件
│ ├── celery.py # celery连接和配置相关文件,且名字必须叫celery.py
│ └── user_tasks.py # 所有用户相关任务函数
└── order_tasks.py # 所有订单相关任务函数
├── add_task.py # 添加任务
└── get_result.py # 获取结果
Celery.py
用于创建celery对象 以及配置相关配置
include用于监控任务
Celery 需要知道所有可用的任务。如果你有多个定义任务的模块,使用 include
可以确保这些模块在 Celery worker 启动时被正确加载,从而确保任务可以被找到并执行。
from celery import Celery
import time
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
# 1 创建app对象
app = Celery('app', broker=broker, backend=backend,include=['celery_task.order_task','celery_task.user_task]
task.py
用于定义一些需要执行的任务函数
通常按照功能模块划分
我的例子里 用户相关的任务就被写在user.py,订单相关的任务就被写在order.py
# 以下我只是写了一些示例 没有任何意义
# 定义的任务函数需要用app.task装饰器来装饰
# user.py
from .celery import app
import time
@app.task
def add(x, y):
time.sleep(2)
return x + y
# order.py
from .celery import app
import time
@app.task
def send_sms(mobile, code):
return f'下单成功,手机号{mobile},发送短信{code}成功'
提交任务add_task.py
通过执行这个py文件来提交任务到消息队列中
在这文件中导入想要提交的任务函数
然后对这个函数调用delay方法,有参数的话传入对应参数
from celery_task.order_task import send_sms
#1 同步调用
# res=send_sms('111111',888)
# print(res)
# 2 提交到任务队列被 worker执行
res=send_sms.delay('1895323234',9999)
# 返回值是任务的唯一id
print(res) #536bd01f-0100-4266-93c6-a12ebe7bbef1
查看任务结果get_result.py
在这个文件中定义任务的唯一id
在执行这个py文件,就可以查看任务的结果
from celery_task.celery import app
from celery.result import AsyncResult
id = '536bd01f-0100-4266-93c6-a12ebe7bbef1'
if __name__ == '__main__':
result = AsyncResult(id=id, app=app)
if result.successful():
result = result.get()
print(result)
elif result.failed():
print('任务失败')
elif result.status == 'PENDING':
print('任务等待中被执行')
elif result.status == 'RETRY':
print('任务异常后正在重试')
elif result.status == 'STARTED':
print('任务已经开始被执行')
启动worker
# 包所在目录下,启动worker
celery -A celery_task worker -l debug -P eventlet
延时任务
延时任务需要用apply_async来提交任务
from celery_task.user_task import add
from datetime import datetime, timedelta
# eta需要传一个时间对象
eta = datetime.utcnow() + timedelta(seconds=10)
res = add.apply_async(args=(2, 6), eta=eta)
定时任务
需要在celery.py中写配置文件
然后在启动一个beat 来提交任务到消息队列
from celery import Celery
import time
# 消息中间件
broker = 'redis://127.0.0.1:6379/1'
# 结果存储
backend = 'redis://127.0.0.1:6379/2'
# 1 创建app对象
app = Celery('app', broker=broker, backend=backend, include=['celery_task.order_task', 'celery_task.user_task'])
# 定时任务:配置文件
# 配置文件
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
'add-task': {
'task': 'celery_task.user_task.add',
'schedule': timedelta(seconds=3),
# 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
'args': (300, 150),
},
'send-sms-task': {
'task': 'celery_task.order_task.send_sms',
# 'schedule': timedelta(seconds=30),
'schedule': crontab(hour=11, minute=20), # 每天11点20执行
'args': ('189232222', 888),
},
}
# 启动beat 定时提交任务到消息队列
celery -A celery_task beat -l debug
# 启动worker
celery -A celery_task worker -l debug -P eventlet
路飞项目轮播图定时更新
首先新在celery_task里新建一个任务文件home_task.py(因为轮播图是首页的功能)
然后定义更新轮播图任务
思路就是
拿到数据---》更新数据
from .celery import app
from django.core.cache import cache
from home.models import Banner
from home.common.serializer import BannerSerializer
@app.task
def update_banner():
# 查询所有轮播图
queryset = Banner.objects.filter(is_delete=False, is_show=True).order_by('orders')[:3]
serializer = BannerSerializer(instance=queryset, many=True)
# 拼接图片路径
for banner in serializer.data:
banner['image'] = 'http://127.0.0.1:8000/' + banner['image']
# 更新缓存数据
cache.set('banner_data', serializer.data)
return '轮播图数据更新成功'
然后在celery.py里面更新配置文件
# 把home_task这个文件添加到监控列表
app = Celery('app', broker=broker, backend=backend, include=[
'celery_task.order_task',
'celery_task.user_task',
'celery_task.home_task',
])
# 配置定时任务
# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
'update-banner-task': {
'task': 'celery_task.home_task.update_banner',
'schedule': timedelta(seconds=30),
'args': (),
},
}
# 启动beat 定时提交任务到消息队列
celery -A celery_task beat -l debug
# 启动worker
celery -A celery_task worker -l debug -P eventl
标签:celery,task,app,py,Celery,任务,import,结构
From: https://www.cnblogs.com/Hqqqq/p/18199222