1创建文件夹 clery_task
2. 创建一个celery 的py文件
from celery import Celery
from celery.schedules import timedelta
# 1 实例化 celery 对象
celery_app = Celery(
# 定义任务名称
'celery_app',
# 缓存的数据库
broker='redis://127.0.0.1:6379/1',
# 分发任务的 库
backend='redis://127.0.0.1:6379/2',
# 任务的目录
include=['celery_task.tasks', ]
)
# 2. 修改默认时区 Asia 亚洲
celery_app.conf.timezone = 'Asia/Shanghai'
celery_app.conf.enable_utc = False
# .使用 celery 定时查询失败数据库的信息,完成重试机制
celery_app.conf.beat_schedule = {
'app-10': {
'task': 'celery_task.tasks.thread_detail',
'schedule': timedelta(seconds=10)
}
}
“”“
”选哟添加的任务“”
from celery_task.celery import celery_app
import requests
from threading import Thread
from utils.get_info import headers
@celery_app.task
def thread_detail():
print('周考3')
return 'aaaaa'
4 添加定时任务
celery beat -A celery_task
#5 执行任务
Celery -A 【项目名称】worker -l info -P eventlet
标签:cekery,task,app,celery,任务,conf,import,定时 From: https://www.cnblogs.com/guofeng-1016/p/17278116.html