发开阶段遇到了需要定时任务以及周期任务才能进行的事情,这里进行记录一下,防止下次我再写的时候写不明白。
首先在你们项目里面创建以下文件:
celery:
import os os.environ.setdefault("DJANGO_SETTINGS_MODULE", "settings") from celery import Celery from quality_control.celery_task import celeryconfig app = Celery("quality_control") app.config_from_object(celeryconfig) app.autodiscover_tasks([ "quality_control.tasks", ])
celeryconfig:
import os os.environ.setdefault("DJANGO_SETTINGS_MODULE", "settings") from django.conf import settings # 上线时需要使用setting中的redis #CELERY_RESULT_BACKEND = settings.CELERY_RESULT_BACKEND #BROKER_URL = settings.BROKER_URL # 本地测试需要的redis配置 CELERY_RESULT_BACKEND = "redis://127.0.0.1:6379/0" BROKER_URL = "redis://127.0.0.1:6379/1" CELERY_WOEKER_CONCURRENCY = 20 CELERY_PREFETCH_MULTIPLIER = 20 CELERY_FORCE_EXECV = True CELERY_WORKER_MAX_TASKS_PER_CHILD = 100 CELERY_DISABLE_RATE_LIMITS = True CELERY_ENABLE_UTC = False CELERY_TIMEZONE = settings.TIME_ZONE DJANGO_CELERY_BEAT_TZ_AWARE = False CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler"
createcelery
import json from django_celery_beat.models import PeriodicTask, CrontabSchedule, ClockedSchedule from datetime import datetime from django.conf import settings class CreatCelerySchedule: __doc__ = \ """ 这是一个定时、周期任务的相关类 如果你对定时任务并不熟悉,那么这个类已经实现了大部分的方法,你可以直接调用。 其中delete 与 enabled 看上去是多余的方法,当你会使用了celery的定时任务,那么你可以直接调celery的方法即可。 返参中加入注解类型,方便客户端调用 """ @staticmethod def clockedschedule(dtime: datetime or int(10)) -> (ClockedSchedule, bool): """ 创建仅仅执行一次的定时任务模型对象 :param dtime: datetime 对象 或者是时间戳 :return: schedule: django_celery_beat.models.ClockedSchedule 即 ClockedSchedule 对象 created:创建成功:true,更新成功:false,失败:引发异常 """ if isinstance(dtime, int): # 这是自己写的方法,他可以将时间戳转时间对象 dtime = Qu.time_transform(dtime) schedule, created = ClockedSchedule.objects.get_or_create(clocked_time=dtime) return schedule, created @staticmethod def crontabschedule(minute: str = "*", hour: str = "*", day_of_week: str = "*", day_of_month: str = "*") -> ( CrontabSchedule, bool): """ 创建周期任务模型对象 :param minute: 定时任务的分钟数 (0,30) or */30 第零分钟与第三十分钟进行一次 或者是 每三十分钟进行一次 :param hour: 定时任务的小时数 (8,20) or */12 第八时与第二十时进行一次 或者是 每十二个小时进行一次 :param day_of_week: 定时任务的天关于周 (0,5) 每周的周日 或者是 周五进行一次 :param day_of_month: 定时任务的天关于月 (10,15) 每月的十号与十五号进行一次 :return: 创建后的任务对象,是否‘创建’成功:如果创建成功会返回true,否则返回false 值得注意的是:get到数据后,create会返回false """ schedule, created = CrontabSchedule.objects.get_or_create(minute=minute, hour=hour, day_of_week=day_of_week, day_of_month=day_of_month, timezone=settings.TIME_ZONE, ) return schedule, created @staticmethod def periodicschedule(schedule, name: str, task: str, args: list, one_off: bool = False) -> PeriodicTask: """ 将你创建好的对应对象,注册到celery-beat队列里面,并向数据库存入 :param schedule: 你必须传一个模型实例进来,否则我无法得知你要创建什么实例 :param name: 为你的任务起一个名字,这个名字不可以重复,如果他是重复的,那么将会更新以前的 :param task: 你需要执行的函数。他的书写格式必须为:‘app.dir.tasks.function’ :param args: 为你的函数传入参数,目前版本仅支持位置入参 :param one_off: 是否仅执行一次,默认不是 :return: """ if not isinstance(args, list): raise TypeError("你必须传入一个list对象,这是定时任务的要求的入参,你必须按照规定传参。") args = json.dumps(args) if isinstance(schedule, (CrontabSchedule, ClockedSchedule)): raise TypeError("你必须传入一个定时任务对象,或者是时钟对象") if isinstance(schedule, CrontabSchedule): _ = PeriodicTask.objects.update_or_create(name=name, defaults={ "crontab": schedule, "name": name, "task": task, "one_off": one_off, "args": args or [], }) else: _ = PeriodicTask.objects.update_or_create(name=name, defaults={ "clocked": schedule, "name": name, "task": task, "one_off": True, "args": args or [], }) return _ @staticmethod def selectperiodic(name): """ 使用id查值没有意义,请使用name进行查找 :param name: 你需要查找的名字 :return: Queryset 对象 """ clocked = PeriodicTask.objects.filter(name=name + "_clocked") if not clocked.exists(): clocked = None crontab = PeriodicTask.objects.filter(name=name + "_crontab") if not crontab.exists(): clocked = None return clocked, crontab @staticmethod def delete(ptmodel: PeriodicTask): """ :param ptmodel: 将 PeriodicTask 的 model 传入 ,切记不是 queryset 对象 :return: """ ptmodel.delete() @staticmethod def enabled(ptmodel: PeriodicTask, ny: bool): """ :param ptmodel: 将 PeriodicTask 的 model 传入 ,切记不是 queryset 对象 :param ny: 开启还是关闭,需要一个 bool 值 :return: """ ptmodel.enabled = ny ptmodel.save()
上面的三个py文件放在一个文件夹里面,名字你看着起就可以,你的项目里面就应该长这样:
然后重新启动一个文件夹,主要放你需要的tasks 记住,你的py文件名字,必须是tasks.py文件名
# 导入celery相关配置 from quality_control.celery_task.celery import app from common.log import logger # 导入定时任务需要跑的逻辑程序 from quality_control.utils import request_db from quality_control.celery_task.createcelery import CreatCelerySchedule as Ccs @app.task def cycle_regular_task(name): logger.info("任务开始绘制:%s" % name) request_db.celery_task_request(name)
# 调用时,你只需要这样写就可以。 def create_tasks(dtime, name, args, cyc_time): clocked, _ = Ccs.clockedschedule(dtime) Ccs.periodicschedule(clocked, name + "_clocked", "quality_control.tasks.tasks.cycle_regular_task", args) crontab, _ = Ccs.crontabschedule(hour=cyc_time) Ccs.periodicschedule(crontab, name + "_crontab", "quality_control.tasks.tasks.cycle_regular_task", args) # 以下是celery启动命令 # celery -A quality_control.celery_task beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler # celery -A quality_control.celery_task worker -l info -P eventlet
然后你的代码路径就应该长这样:
上面代码的路径按照自己项目的路径进行配置。
然后你的调用方法直接就可以是:
修改定时任务可以是这样的:
你的views函数里面也写完之后,配置你的setting下文件:
IS_USE_CELERY = True CELERY_IMPORTS = ( 'quality_control.tasks.tasks', ) if IS_USE_CELERY: INSTALLED_APPS = locals().get("INSTALLED_APPS", []) INSTALLED_APPS += ("django_celery_beat", "django_celery_results") CELERY_ENABLE_UTC = False CELERYBEAT_SCHEDULER = "django_celery_beat.schedulers.DatabaseScheduler"
自己测试的时候,需要先开启worker
celery -A quality_control.celery_task worker -l info -P eventlet
再启动beat
celery -A quality_control.celery_task beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
教程来源:
https://www.php.cn/faq/538405.html
您也可以直接看教程里面的,其中启动worker时,命令需要用上面的,问问百度为什么就可以了。
标签:CELERY,task,name,param,Django,celery,任务,import From: https://www.cnblogs.com/Pyxin/p/17727879.html