首页 > 编程问答 >自定义的基于线程的监控如何影响 celery 任务的启动时间?

自定义的基于线程的监控如何影响 celery 任务的启动时间?

时间:2024-07-31 14:36:30浏览次数:19  
标签:python multithreading celery

我使用 Flask 和 celery 来构建后端 api。

为了防止任务运行时间过长,我实现了一个自定义的基于线程的监视类来监视任务,并在任务运行时间过长时停止它们。

这是我的实现。

import os
import sys
from flask import Flask
from celery import Celery, Task
import logging
from logging.handlers import TimedRotatingFileHandler
from celery.signals import setup_logging, task_prerun, task_postrun
import time
from datetime import datetime
import threading

CELERY_WORKER_POOL = 'prefork'
CELERY_WORKER_CONCURRENCY = 2
soft_time_limit = 1200
check_interval = 90

if CELERY_WORKER_POOL == 'prefork':
    # fix torch CUDA error
    # this forces the application to use spawn instead of fork
    os.environ["FORKED_BY_MULTIPROCESSING"] = "1"
    if os.name != "nt":
        from billiard import context
        context._force_start_method("spawn")
    # this can prevent pytorch model initialization hanging
    # not needed with worker_pool='threads'
    os.environ['OMP_NUM_THREADS'] = '1'

# Ensure Flask app context for Celery tasks
class ContextTask(Task):
    def __call__(self, *args, **kwargs):
        with app.app_context():
            return self.run(*args, **kwargs)
            
def make_celery(app):
    # Configure Celery to use Redis as the broker
    celery_app = Celery(
        app.import_name,
        backend=DATABASE_URL,
        broker=REDIS_URL,
        include=[
            'apis.my_api',
        ]
    )
    celery_app.conf.update(app.config)

    # Set the worker processes
    celery_app.conf.update(
        worker_pool=CELERY_WORKER_POOL,
        worker_concurrency=CELERY_WORKER_CONCURRENCY,
    )

    celery_app.Task = ContextTask
    return celery_app

app = Flask(__name__)
celery_app = make_celery(app)


class LongRunningTaskError(Exception):
    pass

class TaskMonitor:
    def __init__(self, task_id, timeout=soft_time_limit):
        self.task_id = task_id
        self.timeout = timeout
        self.start_time = time.time()
        self.stop_event = threading.Event()
        self.thread = threading.Thread(target=self.monitor)
        self.thread.start()

    def monitor(self):
        while not self.stop_event.is_set():
            end_time = time.time()
            duration = end_time - self.start_time
            if duration > self.timeout:
                # Revoke the task
                if CELERY_WORKER_POOL == 'prefork':
                    celery_app.control.revoke(self.task_id, terminate=True, signal='SIGTERM') # cannot be used with worker_pool='threads'
                self.stop_event.set()  # Stop the monitor after sending the alarm
            time.sleep(check_interval)  # Check every check_interval

    def stop(self):
        self.stop_event.set()
        self.thread.join()

task_monitors = {}

@task_prerun.connect
def task_prerun_handler(task_id, task, *args, **kwargs):
    task_monitors[task_id] = TaskMonitor(task_id)

@task_postrun.connect
def task_postrun_handler(task_id, task, *args, **kwargs):
    if task_id in task_monitors:
        task_monitors[task_id].stop()
        del task_monitors[task_id]

我认为当收到任务时, TaskMonitor.monitor 会在新线程中启动,并且不会影响 celery 队列中任务的启动时间。 然而事实是,如果我设置 check_interval 为90秒, CELERY_WORKER_CONCURRENCY 为2,并且我同时发送4个API请求,那么前2个任务将立即执行。 尽管前 2 个任务很快完成(< 1 秒),但其余 2 个任务仍然在 90 秒后开始执行,该时间等于 check_interval。 如果我使用 time.sleep 让任务执行运行90~180秒,那么自从前2个任务开始后,剩下的2个任务仍然在180秒后开始执行。

所以任务的开始时间发生在| ||秒。 有谁明白为什么我的执行 check_interval 会影响 celery 任务的开始时间? 非常感谢! TaskMonitor 这是 celery github 存储库中的讨论线程:

https://github.com/celery/celery/discussions/9167 以上是我的尝试、观察和我的问题。

Above was my try, observation, and my problem.


你观察到的行为的原因与你自定义的任务监控和 check_interval 无关,而与 Celery 的 worker_concurrency 设置以及你如何运行长时间任务有关。

让我们来分解一下:

  1. worker_concurrency : 你将 Celery worker 的并发设置为 2 ( CELERY_WORKER_CONCURRENCY = 2 )。这意味着你的 worker 同时最多只能执行 2 个任务。

  2. 长时间运行的任务 : 当你发送 4 个 API 请求时,前 2 个请求会被立即接收并由 2 个可用的 worker 处理。然而,由于这些任务是长时间运行的任务(在你的测试中,要么休眠 90-180 秒,要么需要很长时间才能完成),因此它们会阻塞 worker,使它们无法处理其他任务。

  3. 队列 : 剩下的 2 个任务会被放在 Celery 队列中。 只有当其中一个 worker 可用时,它们才会开始执行。

  4. check_interval : 你的 TaskMonitor 和它的 check_interval 对任务的_启动_时间没有影响。 TaskMonitor 在任务已经开始运行_之后_监控任务。 因此, check_interval 不会延迟队列中任务的启动。

那么为什么你会看到 90 秒的延迟呢?

你看到的 90 秒延迟很可能是因为你长时间运行的任务大约在 90 秒后完成。 当一个 worker 完成一个任务后,它会从队列中获取下一个任务。 由于你的 check_interval 也是 90 秒,因此你可能会错误地将这两个事件关联起来。

如何解决这个问题?

为了减少长时间运行任务带来的延迟,你可以考虑以下选项:

  • 增加 worker_concurrency : 这将允许你的 worker 同时处理更多任务。 但是,你需要根据你的硬件资源和任务的要求来平衡这一点。
  • 使用不同的队列 : 对于长时间运行的任务,使用一个单独的队列,并为其分配专用的 worker。 这将确保长时间运行的任务不会阻塞其他任务的执行。
  • 异步任务 : 对于确实需要很长时间才能完成的任务,考虑使用异步任务处理库,比如 asyncio gevent 。 这将允许你的 worker 在等待长时间运行的任务完成时处理其他任务。

关于你的 TaskMonitor 实现

虽然你的 TaskMonitor 实现可以工作,但它有一些潜在的问题:

  • 资源密集型 : 持续运行线程来监控任务可能会占用大量资源,尤其是当你有很多任务时。
  • 不必要的复杂性 : Celery 已经内置了任务超时机制,你可以使用这些机制来实现类似的功能,而无需创建自定义监控系统。

建议

  • 使用 Celery 的内置任务超时机制 ( task_soft_time_limit task_time_limit ) 来控制任务的执行时间。
  • 如果你需要更精细的监控,请考虑使用 Celery 事件或结果后端来跟踪任务的进度和状态,而不是创建自定义的线程监控系统。

通过实现这些更改,你可以提高 Celery 应用程序的性能和效率。

标签:python,multithreading,celery
From: 78814559

相关文章