首页 > 其他分享 >Celery周期性任务定义beat

Celery周期性任务定义beat

时间:2023-10-04 12:12:31浏览次数:41  
标签:celery tasks schedule beat Celery 周期性 add task

通过celery beat可以使用周期性任务的定义。

https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html

周期性任务beat相关设置:

https://docs.celeryq.dev/en/stable/userguide/configuration.html#std-setting-beat_schedule

您必须确保一次只运行一个beat调度程序,否则您最终会遇到重复的任务。

定义周期性任务

要定期调用任务,您必须向beat调度列表中添加一个实例条目。

from celery import Celery
from celery.schedules import crontab

app = Celery()

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # Calls test('hello') every 10 seconds.
    sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')

    # Calls test('hello') every 30 seconds.
    # It uses the same signature of previous task, an explicit name is
    # defined to avoid this task replacing the previous one defined.
    sender.add_periodic_task(30.0, test.s('hello'), name='add every 30')

    # Calls test('world') every 30 seconds
    sender.add_periodic_task(30.0, test.s('world'), expires=10)

    # Executes every Monday morning at 7:30 a.m.
    sender.add_periodic_task(
        crontab(hour=7, minute=30, day_of_week=1),
        test.s('Happy Mondays!'),
    )

@app.task
def test(arg):
    print(arg)

@app.task
def add(x, y):
    z = x + y
    print(z)

请注意, on_after_configure 是在应用程序设置后发送的,因此声明应用程序的模块外部的任务(例如,位于 celery.Celery.autodiscover_tasks() 的tasks.py 文件中)必须使用 on_after_finalize

add_periodic_task() 函数会在后台将实例条目添加到 beat_schedule 设置中,

同样我们还可以用设置选项的方式手动设置周期性任务:

示例:每 30 秒运行一次tasks.add 任务。

app.conf.beat_schedule = {
    'add-every-30-seconds': { # 任务名
        'task': 'tasks.add',  # 任务执行的函数名
        'schedule': 30.0,     # 调度策略,可以是整数秒数、timedelta 或 crontab 。您还可以通过扩展 schedule 的接口来定义自己的自定义计划类型。
        'args': (16, 16)      # 传递给任务函数的参数
    },
    "add-every-hour": {
         "task": "tasks.add",
         'schedule': timedelta(hours=1), # 每小时执行1次
         'args': (3, 8) # 传递参数-
     },
}

beat_schedule 的参数:https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html#available-fields

周期性任务调度器

Crontab schedules

https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html#crontab-schedules

这种调度策略就是linux中的crontab。比较灵活。

from celery.schedules import crontab

app.conf.beat_schedule = {
    # Executes every Monday morning at 7:30 a.m.
    'add-every-monday-morning': {
        'task': 'tasks.add',
        'schedule': crontab(hour=7, minute=30, day_of_week=1),
        'args': (16, 16),
    },
}

Solar schedules根据太阳日落日出调度

如果你有一个任务需要根据日出、日落、黎明或黄昏执行,你可以使用 solar 调度类型:

https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html#solar-schedules

这个一般不怎么用.....

启动beat调度程序服务器

celery.exe -A celery_tasks.main beat -l INFO

您还可以通过启用workers -B 选项将beat嵌入到worker中,如果您永远不会运行多个worker节点,这会很方便,但它并不常用,因此不建议用于生产用途:【这玩意windows不支持!!!】

celery -A proj worker -B

Beat 需要将任务的最后运行时间存储在本地数据库文件(默认名称为 celerybeat-schedule,格式是.db)中,因此需要在当前目录中进行写入,或者您可以为此文件指定自定义位置:

celery -A proj beat -s /home/celery/var/run/celerybeat-schedule

标签:celery,tasks,schedule,beat,Celery,周期性,add,task
From: https://www.cnblogs.com/juelian/p/17742099.html

相关文章

  • django-celery-results - 使用 Django ORM/Cache 作为结果后端
    https://docs.celeryq.dev/en/stable/django/first-steps-with-django.html#django-celery-results-using-the-django-orm-cache-as-a-result-backend这个一般自己设置一下result_backend也行,要用django-celery-results也是一个选择。......
  • django-celery-beat插件使用
    该插件从Django管理界面管理celery的定期任务,您可以在其中动态****创建、编辑和删除定期任务以及它们的运行频率。django-celery-beat提供了几种添加定时或周期性任务的方式,预先在在settings.py中添加好定时任务。通过Djangoadmin后台动态添加。(实际上就是操作model模型类)......
  • flower插件-监视celery
    安装和使用:https://flower.readthedocs.io/en/latest/install.html#installationhttps://github.com/mher/flower/tree/master/examplescelery相关配置:#发送与任务相关的事件,以便可以使用flower之类的工具来监控任务#或者在启动worker服务时,使用-E参数。worker_send_task_......
  • filebeat 收集 nginx 日志到 kibana 展示
    首先是nginx.conf的日志格式json格式很多,不一定非要这个log_formatjson'{"access_time":"$time_iso8601","remote_addr":"$remote_addr","remote_user":"$remote_user","request":"$request&qu......
  • fastapi+tortoise-orm+redis+celery 多worker数据库连接
    我用fastapi在写接口,数据库orm用的是tortoise-orm,接口的数据库操作是正常的。现在加入了celery,但是每个celery在执行任务时,不能获取到数据库连接我想要每个worker获得数据库连接,但是不要每个任务都去连接一次,并在每个worker结束时,断开连接,但是不能断开其他worker的数据库连接from......
  • Django celery 定时任务与周期任务的创建-暂停-开始-删除
    发开阶段遇到了需要定时任务以及周期任务才能进行的事情,这里进行记录一下,防止下次我再写的时候写不明白。首先在你们项目里面创建以下文件:celery:importosos.environ.setdefault("DJANGO_SETTINGS_MODULE","settings")fromceleryimportCeleryfromquality_control.ce......
  • 关于搭建ELK的一些问题--filebeat收集旧日志时,旧日志不全(被截断等问题)
    由于我只是简单搭建日志监测平台,logstash比较吃系统资源,我用filebeat代替了logstash日志收集的职能(也没有做日志筛选)用的是7.5.1版本 由于搭建时已经存在旧日志,在导入时出现了日志不全的问题也就是说旧日志传输到elasticsearch时被截断了。解决办法:1.检查一下filebeat.yml ......
  • K8S:使用Filebeat收集K8S内Pod应用日志
    之前是针对标准输出进行采集,现在来看一下针对于容器当中的日志,是在pod当中添加一个日志采集器,这里部署一个应用,单独部署一个容器,这个容器是filebeat日志采集器,这一块就通过emptydir来实现数据的共享。filebeat的配置放在configmap当中,指明了日志采集的路径在哪,这个日志没有在标准......
  • Segment tree Beats!
    前言SegmentBeats可以用来解决处理区间最值,区间历史值的问题。不保证这些题都实现过。区间最值操作HDU5306.GorgeousSequence给出一个长度为\(n(n\le10^6)\)的序列\(A\)和\(m(m\leq10^6)\)次操作,每次操作为以下三种类型之一:给出\(l,r,k\),对于所有\(i\in[l,r......
  • ELK+Filebeat 部署安装
    一、ELK+Filebeat介绍ELK是Elasticsearch、Logstash、Kibana三大开源框架首字母大写简称(但是后期出现的filebeat(beats中的一种)可以用来替代logstash的数据收集功能,比较轻量级)。市面上也被成为ElasticStack。Elasticsearch是ElasticStack核心的分布式搜索和分析引擎,是一个......