首页 > 其他分享 >Django celery 定时任务与周期任务的创建-暂停-开始-删除

Django celery 定时任务与周期任务的创建-暂停-开始-删除

时间:2023-09-25 14:33:05浏览次数:106  
标签:CELERY task name param Django celery 任务 import

发开阶段遇到了需要定时任务以及周期任务才能进行的事情,这里进行记录一下,防止下次我再写的时候写不明白。

首先在你们项目里面创建以下文件:

celery:

import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "settings")

from celery import Celery
from quality_control.celery_task import celeryconfig


app = Celery("quality_control")

app.config_from_object(celeryconfig)

app.autodiscover_tasks([
    "quality_control.tasks",
])

celeryconfig:

import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "settings")
from django.conf import settings


# 上线时需要使用setting中的redis
#CELERY_RESULT_BACKEND = settings.CELERY_RESULT_BACKEND
#BROKER_URL = settings.BROKER_URL
# 本地测试需要的redis配置
CELERY_RESULT_BACKEND = "redis://127.0.0.1:6379/0"
BROKER_URL = "redis://127.0.0.1:6379/1"


CELERY_WOEKER_CONCURRENCY = 20

CELERY_PREFETCH_MULTIPLIER = 20

CELERY_FORCE_EXECV = True

CELERY_WORKER_MAX_TASKS_PER_CHILD = 100

CELERY_DISABLE_RATE_LIMITS = True

CELERY_ENABLE_UTC = False
CELERY_TIMEZONE = settings.TIME_ZONE
DJANGO_CELERY_BEAT_TZ_AWARE = False
CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler"

createcelery

import json
from django_celery_beat.models import PeriodicTask, CrontabSchedule, ClockedSchedule
from datetime import datetime
from django.conf import settings


class CreatCelerySchedule:
    __doc__ = \
        """
            这是一个定时、周期任务的相关类
            如果你对定时任务并不熟悉,那么这个类已经实现了大部分的方法,你可以直接调用。
            其中delete 与 enabled 看上去是多余的方法,当你会使用了celery的定时任务,那么你可以直接调celery的方法即可。
            返参中加入注解类型,方便客户端调用
        """

    @staticmethod
    def clockedschedule(dtime: datetime or int(10)) -> (ClockedSchedule, bool):
        """
        创建仅仅执行一次的定时任务模型对象
        :param dtime: datetime 对象 或者是时间戳
        :return: schedule: django_celery_beat.models.ClockedSchedule 即 ClockedSchedule 对象
                 created:创建成功:true,更新成功:false,失败:引发异常
        """
        if isinstance(dtime, int):
            # 这是自己写的方法,他可以将时间戳转时间对象
            dtime = Qu.time_transform(dtime)

        schedule, created = ClockedSchedule.objects.get_or_create(clocked_time=dtime)
        return schedule, created

    @staticmethod
    def crontabschedule(minute: str = "*", hour: str = "*", day_of_week: str = "*", day_of_month: str = "*") -> (
            CrontabSchedule, bool):
        """
        创建周期任务模型对象
        :param minute: 定时任务的分钟数 (0,30) or */30 第零分钟与第三十分钟进行一次 或者是 每三十分钟进行一次
        :param hour: 定时任务的小时数 (8,20) or */12 第八时与第二十时进行一次 或者是 每十二个小时进行一次
        :param day_of_week: 定时任务的天关于周 (0,5) 每周的周日 或者是 周五进行一次
        :param day_of_month: 定时任务的天关于月 (10,15) 每月的十号与十五号进行一次
        :return: 创建后的任务对象,是否‘创建’成功:如果创建成功会返回true,否则返回false 值得注意的是:get到数据后,create会返回false
        """
        schedule, created = CrontabSchedule.objects.get_or_create(minute=minute,
                                                                  hour=hour,
                                                                  day_of_week=day_of_week,
                                                                  day_of_month=day_of_month,
                                                                  timezone=settings.TIME_ZONE,
                                                                  )
        return schedule, created

    @staticmethod
    def periodicschedule(schedule, name: str, task: str, args: list, one_off: bool = False) -> PeriodicTask:
        """
        将你创建好的对应对象,注册到celery-beat队列里面,并向数据库存入
        :param schedule: 你必须传一个模型实例进来,否则我无法得知你要创建什么实例
        :param name:  为你的任务起一个名字,这个名字不可以重复,如果他是重复的,那么将会更新以前的
        :param task:  你需要执行的函数。他的书写格式必须为:‘app.dir.tasks.function’
        :param args: 为你的函数传入参数,目前版本仅支持位置入参
        :param one_off: 是否仅执行一次,默认不是
        :return:
        """
        if not isinstance(args, list):
            raise TypeError("你必须传入一个list对象,这是定时任务的要求的入参,你必须按照规定传参。")
        args = json.dumps(args)
        if isinstance(schedule, (CrontabSchedule, ClockedSchedule)):
            raise TypeError("你必须传入一个定时任务对象,或者是时钟对象")
        if isinstance(schedule, CrontabSchedule):
            _ = PeriodicTask.objects.update_or_create(name=name, defaults={
                "crontab": schedule,
                "name": name,
                "task": task,
                "one_off": one_off,
                "args": args or [],
            })
        else:
            _ = PeriodicTask.objects.update_or_create(name=name, defaults={
                "clocked": schedule,
                "name": name,
                "task": task,
                "one_off": True,
                "args": args or [],
            })
        return _

    @staticmethod
    def selectperiodic(name):
        """
        使用id查值没有意义,请使用name进行查找
        :param name: 你需要查找的名字
        :return: Queryset 对象
        """
        clocked = PeriodicTask.objects.filter(name=name + "_clocked")
        if not clocked.exists():
            clocked = None
        crontab = PeriodicTask.objects.filter(name=name + "_crontab")
        if not crontab.exists():
            clocked = None
        return clocked, crontab

    @staticmethod
    def delete(ptmodel: PeriodicTask):
        """
        :param ptmodel: 将 PeriodicTask 的 model 传入 ,切记不是 queryset 对象
        :return:
        """
        ptmodel.delete()

    @staticmethod
    def enabled(ptmodel: PeriodicTask, ny: bool):
        """
        :param ptmodel: 将 PeriodicTask 的 model 传入 ,切记不是 queryset 对象
        :param ny: 开启还是关闭,需要一个 bool 值
        :return:
        """
        ptmodel.enabled = ny
        ptmodel.save()

上面的三个py文件放在一个文件夹里面,名字你看着起就可以,你的项目里面就应该长这样:

然后重新启动一个文件夹,主要放你需要的tasks 记住,你的py文件名字,必须是tasks.py文件名

# 导入celery相关配置
from quality_control.celery_task.celery import app
from common.log import logger
# 导入定时任务需要跑的逻辑程序
from quality_control.utils import request_db
from quality_control.celery_task.createcelery import CreatCelerySchedule as Ccs


@app.task
def cycle_regular_task(name):
    logger.info("任务开始绘制:%s" % name)
    request_db.celery_task_request(name)


# 调用时,你只需要这样写就可以。 def create_tasks(dtime, name, args, cyc_time): clocked, _ = Ccs.clockedschedule(dtime) Ccs.periodicschedule(clocked, name + "_clocked", "quality_control.tasks.tasks.cycle_regular_task", args) crontab, _ = Ccs.crontabschedule(hour=cyc_time) Ccs.periodicschedule(crontab, name + "_crontab", "quality_control.tasks.tasks.cycle_regular_task", args) # 以下是celery启动命令 # celery -A quality_control.celery_task beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler # celery -A quality_control.celery_task worker -l info -P eventlet

然后你的代码路径就应该长这样:

上面代码的路径按照自己项目的路径进行配置。

然后你的调用方法直接就可以是:

 

 修改定时任务可以是这样的:

 你的views函数里面也写完之后,配置你的setting下文件:

IS_USE_CELERY = True
CELERY_IMPORTS = (
    'quality_control.tasks.tasks',
)
if IS_USE_CELERY:
    INSTALLED_APPS = locals().get("INSTALLED_APPS", [])
    INSTALLED_APPS += ("django_celery_beat", "django_celery_results")
    CELERY_ENABLE_UTC = False
    CELERYBEAT_SCHEDULER = "django_celery_beat.schedulers.DatabaseScheduler"

自己测试的时候,需要先开启worker

celery -A quality_control.celery_task worker  -l info -P eventlet
再启动beat
celery -A quality_control.celery_task beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
教程来源:
https://www.php.cn/faq/538405.html
您也可以直接看教程里面的,其中启动worker时,命令需要用上面的,问问百度为什么就可以了。

 

标签:CELERY,task,name,param,Django,celery,任务,import
From: https://www.cnblogs.com/Pyxin/p/17727879.html

相关文章

  • django快速建站
    #pipinstalldjango#pipinstallrequestsimportosimporttimedefcreatefile(filepath,filetext):  ifnotos.path.exists(filepath):    withopen(filepath,'w',encoding='utf-8')asfile:      file.write(filetext)ifn......
  • linux Screen 多任务处理
    一、背景系统管理员经常需要SSH或者telent远程登录到Linux服务器,经常运行一些需要很长时间才能完成的任务,比如系统备份、ftp传输等等。通常情况下我们都是为每一个这样的任务开一个远程终端窗口,因为它们执行的时间太长了。必须等待它们执行完毕,在此期间不能关掉窗口或者断开......
  • Django的模型设计
    摘要通过Django框架设计一个商城网站,为了记忆其中的关键设置要点,同时对项目的重要知识点进行回顾记忆加深,通过笔记的形式进行记录方便记忆学习。一、商城的路由1、路由的分发规则​ 首先一个完整的路由包含:路由地址、视图函数(或者视图类)、路由变量和路由命名。其中基本信息必......
  • 金融领域预训练模型用于分类任务,大模型应用参考
    在bert的基础上加了一个分类层:代码实现:output=bert.model.outputoutput=Lambda(lambdax:x[:,0],name='CLS-token')(output)output=Dense(units=num_classes,activation='softmax',kernel_initializer=bert.initializer)(output)model......
  • 194_win7_10任务栏合并但不隐藏标签
    这是一篇原发布于2020-01-2310:28:00得益小站的文章,备份在此处。前言我们都知道Windows的任务栏有个合并、隐藏标签的功能。但Windows提供的三个选项中,就是少了一个不合并但隐藏标签的选项,本文就来解决这个问题。不想看原理,只想快速解决的同学,可以翻到最后,有一键设置的方法。......
  • 异步任务与定时任务
     1.异步任务使用(1)创建线程池配置@Configuration@EnableAsync//开启多线程publicclassThreadPoolConfig{@Bean("taskExecutor")publicExecutorasyncServiceExecutor(){ThreadPoolTaskExecutorexecutor=newThreadPoolTaskExecutor();......
  • django初学
    其实还有个drf框架#django把框架分为一个项目包含很多应用pythonmanage.pystartapppolls该命令在在manage.py同级下创建应用目录polls是应用的名字!!!!!!!!!!!!!!!!#应用创建完之后需要在setting里面注册 #编写url和views函数的对应关系 页面的话,#映射网页的话,在应......
  • Hadoop是什么? Hadoop是一个由Apache开发的开源分布式计算框架,它能够处理大规模数据并
    Hadoop是什么?Hadoop是一个由Apache开发的开源分布式计算框架,它能够处理大规模数据并行处理任务,支持大规模数据存储和处理。Hadoop的核心组件包括分布式文件系统HDFS和分布式计算框架MapReduce,它们使得Hadoop可以在廉价的硬件上并行地处理大量数据。Hadoop还包括很多相关的项目和子......
  • 栈和堆的区别、FreeRTOS 中的任务栈
    栈和堆的区别、FreeRTOS中的任务栈01 堆和栈的概念堆功能堆是一块用于动态分配内存的区域,用于存储程序运行时动态创建的对象。堆的大小可以在程序运行时动态调整。特点堆的分配和释放是由程序员手动控制的。堆的分配和释放顺序的任意的,不需要遵循先进先出的原则......
  • win11任务栏变成透明的教程
    win11任务栏变成透明的教程其实win11原版的任务栏以纯色为主,并且没有任何的透明效果,让桌面壁纸无法完美展示出来,非常难看,因此我们可以通过第三方软件的方式来将它透明化,下面就一起来看一下具体的教程吧。win11任务栏怎么透明方法一:1、首先右键桌面空白处打开右键菜单,选择“......