我使用 Flask 和 celery 来构建后端 api。
为了防止任务运行时间过长,我实现了一个自定义的基于线程的监视类来监视任务,并在任务运行时间过长时停止它们。
这是我的实现。
import os
import sys
from flask import Flask
from celery import Celery, Task
import logging
from logging.handlers import TimedRotatingFileHandler
from celery.signals import setup_logging, task_prerun, task_postrun
import time
from datetime import datetime
import threading
CELERY_WORKER_POOL = 'prefork'
CELERY_WORKER_CONCURRENCY = 2
soft_time_limit = 1200
check_interval = 90
if CELERY_WORKER_POOL == 'prefork':
# fix torch CUDA error
# this forces the application to use spawn instead of fork
os.environ["FORKED_BY_MULTIPROCESSING"] = "1"
if os.name != "nt":
from billiard import context
context._force_start_method("spawn")
# this can prevent pytorch model initialization hanging
# not needed with worker_pool='threads'
os.environ['OMP_NUM_THREADS'] = '1'
# Ensure Flask app context for Celery tasks
class ContextTask(Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
def make_celery(app):
# Configure Celery to use Redis as the broker
celery_app = Celery(
app.import_name,
backend=DATABASE_URL,
broker=REDIS_URL,
include=[
'apis.my_api',
]
)
celery_app.conf.update(app.config)
# Set the worker processes
celery_app.conf.update(
worker_pool=CELERY_WORKER_POOL,
worker_concurrency=CELERY_WORKER_CONCURRENCY,
)
celery_app.Task = ContextTask
return celery_app
app = Flask(__name__)
celery_app = make_celery(app)
class LongRunningTaskError(Exception):
pass
class TaskMonitor:
def __init__(self, task_id, timeout=soft_time_limit):
self.task_id = task_id
self.timeout = timeout
self.start_time = time.time()
self.stop_event = threading.Event()
self.thread = threading.Thread(target=self.monitor)
self.thread.start()
def monitor(self):
while not self.stop_event.is_set():
end_time = time.time()
duration = end_time - self.start_time
if duration > self.timeout:
# Revoke the task
if CELERY_WORKER_POOL == 'prefork':
celery_app.control.revoke(self.task_id, terminate=True, signal='SIGTERM') # cannot be used with worker_pool='threads'
self.stop_event.set() # Stop the monitor after sending the alarm
time.sleep(check_interval) # Check every check_interval
def stop(self):
self.stop_event.set()
self.thread.join()
task_monitors = {}
@task_prerun.connect
def task_prerun_handler(task_id, task, *args, **kwargs):
task_monitors[task_id] = TaskMonitor(task_id)
@task_postrun.connect
def task_postrun_handler(task_id, task, *args, **kwargs):
if task_id in task_monitors:
task_monitors[task_id].stop()
del task_monitors[task_id]
我认为当收到任务时,
TaskMonitor.monitor
会在新线程中启动,并且不会影响 celery 队列中任务的启动时间。
然而事实是,如果我设置
check_interval
为90秒,
CELERY_WORKER_CONCURRENCY
为2,并且我同时发送4个API请求,那么前2个任务将立即执行。
尽管前 2 个任务很快完成(< 1 秒),但其余 2 个任务仍然在 90 秒后开始执行,该时间等于 check_interval。
如果我使用
time.sleep
让任务执行运行90~180秒,那么自从前2个任务开始后,剩下的2个任务仍然在180秒后开始执行。
所以任务的开始时间发生在| ||秒。
有谁明白为什么我的执行
check_interval
会影响 celery 任务的开始时间?
非常感谢!
TaskMonitor
这是 celery github 存储库中的讨论线程:
https://github.com/celery/celery/discussions/9167 以上是我的尝试、观察和我的问题。
Above was my try, observation, and my problem.
你观察到的行为的原因与你自定义的任务监控和
check_interval
无关,而与 Celery 的
worker_concurrency
设置以及你如何运行长时间任务有关。
让我们来分解一下:
-
worker_concurrency
: 你将 Celery worker 的并发设置为 2 (CELERY_WORKER_CONCURRENCY = 2
)。这意味着你的 worker 同时最多只能执行 2 个任务。 -
长时间运行的任务 : 当你发送 4 个 API 请求时,前 2 个请求会被立即接收并由 2 个可用的 worker 处理。然而,由于这些任务是长时间运行的任务(在你的测试中,要么休眠 90-180 秒,要么需要很长时间才能完成),因此它们会阻塞 worker,使它们无法处理其他任务。
-
队列 : 剩下的 2 个任务会被放在 Celery 队列中。 只有当其中一个 worker 可用时,它们才会开始执行。
-
check_interval
: 你的TaskMonitor
和它的check_interval
对任务的_启动_时间没有影响。TaskMonitor
在任务已经开始运行_之后_监控任务。 因此,check_interval
不会延迟队列中任务的启动。
那么为什么你会看到 90 秒的延迟呢?
你看到的 90 秒延迟很可能是因为你长时间运行的任务大约在 90 秒后完成。 当一个 worker 完成一个任务后,它会从队列中获取下一个任务。 由于你的
check_interval
也是 90 秒,因此你可能会错误地将这两个事件关联起来。
如何解决这个问题?
为了减少长时间运行任务带来的延迟,你可以考虑以下选项:
-
增加
worker_concurrency
: 这将允许你的 worker 同时处理更多任务。 但是,你需要根据你的硬件资源和任务的要求来平衡这一点。 - 使用不同的队列 : 对于长时间运行的任务,使用一个单独的队列,并为其分配专用的 worker。 这将确保长时间运行的任务不会阻塞其他任务的执行。
-
异步任务
: 对于确实需要很长时间才能完成的任务,考虑使用异步任务处理库,比如
asyncio
或gevent
。 这将允许你的 worker 在等待长时间运行的任务完成时处理其他任务。
关于你的
TaskMonitor
实现
虽然你的
TaskMonitor
实现可以工作,但它有一些潜在的问题:
- 资源密集型 : 持续运行线程来监控任务可能会占用大量资源,尤其是当你有很多任务时。
- 不必要的复杂性 : Celery 已经内置了任务超时机制,你可以使用这些机制来实现类似的功能,而无需创建自定义监控系统。
建议
-
使用 Celery 的内置任务超时机制 (
task_soft_time_limit
和task_time_limit
) 来控制任务的执行时间。 - 如果你需要更精细的监控,请考虑使用 Celery 事件或结果后端来跟踪任务的进度和状态,而不是创建自定义的线程监控系统。
通过实现这些更改,你可以提高 Celery 应用程序的性能和效率。
标签:python,multithreading,celery From: 78814559