settings.py
# Broker配置,使用Redis作为消息中间件 BROKER_URL = 'redis://127.0.0.1:6379/0' # BACKEND配置,这里使用redis CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # 结果序列化方案 CELERY_RESULT_SERIALIZER = 'json' # 任务结果过期时间,秒 CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 时区配置 CELERY_TIMEZONE = 'Asia/Shanghai' # 指定导入的任务模块,可以指定多个 CELERY_IMPORTS = ( 'scriptOperate.tasks' ) CELERYBEAT_SCHEDULE = { 'ali_script': { # 任务路径 'task': 'scriptOperate.tasks.get_ali_script_status', # 每日七点执行 # 'schedule': crontab(hour=7, minute=0), 'schedule': timedelta(seconds=20) # 'schedule':5 }, 'log_file': { # 任务路径 'task': 'scriptOperate.tasks.produce_log', # 每日零点执行 'schedule': crontab(hour=0, minute=0), # 通过crontab进行定时 }, 'weather_file':{ 'task': 'scriptOperate.tasks.get_weather_status', 'schedule': crontab(hour=7, minute=0) # 'schedule': timedelta(seconds=20) }, 'power_file':{ 'task': 'scriptOperate.tasks.get_power_status', 'schedule': crontab(hour=5, minute=30), # 'schedule': timedelta(seconds=20) }, 'overview':{ 'task': 'scriptOperate.tasks.get_overview_status', 'schedule': crontab(hour=7, minute=10), # 'schedule': timedelta(seconds=30) } }
settings.py同级目录下创建celery.py
import os import django from celery import Celery from django.conf import settings from celery.schedules import crontab # 设置系统环境变量,安装django,必须设置,否则在启动celery时会报错 # celery_study 是当前项目名 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'code_work2.settings') # django.setup() celery_app = Celery('scriptOperate') celery_app.config_from_object('django.conf:settings') celery_app.autodiscover_tasks(settings.INSTALLED_APPS)
在app下创建tasks.py
import logging import json import datetime import time import os.path import re import pysftp from celery import shared_task from log.log_files import Logger from code_work2.celery import celery_app from scriptOperate.task import ali_monitor, weather_monitor, power_monitor, over_view_monitor # log = Logger('../log/files/data.log', level='debug') # logger = log.logger # @shared_task @celery_app.task def get_ali_script_status(): ali_monitor.get_ali_status() @celery_app.task def get_weather_status(): weather_monitor.get_weather_status() @celery_app.task def get_power_status(): power_monitor.get_power_status() @celery_app.task def get_overview_status(): over_view_monitor.get_overview_status() @shared_task def produce_log(): Logger(logfile, level='debug')
启动任务
celery -A code_work2 beat -l info 启动beat
celery -A code_work2 worker -l info -P eventlet 启动celery
标签:status,task,get,schedule,django,celery,import,定时 From: https://www.cnblogs.com/sxy-blog/p/17393127.html