我有一个捆绑包,其中包含用于任务的 Celery 和 RabbitMQ 以及用于 Web 请求的 FastApi 应用程序。
celery 应用程序从命令提示符启动
celery -A celery_app worker -l info -P gevent
Rabbit 部署在 Docker 容器中。
FastApi 从 python 启动脚本。
这是代码。问题如下。
fastapi_app/main.py
from __future__ import absolute_import
from fastapi import FastAPI
from celery.result import AsyncResult
from celery_app.tasks import task, get_current_tasks
from celery_app.celery import c_app
from fastapi_app.model import Worker, Task
f_app = FastAPI()
@f_app.post("/task/")
def run_task():
_task = task.apply_async()
return {"task_id": _task.id}
@f_app.get("/task_info/{task_id}")
def get_progress(task_id):
result = AsyncResult(task_id, app=c_app)
return Task(id=task_id, state=result.state, meta=result.info)
@f_app.get("/curr_progress/")
def get_current_progress():
response = {'workers': []}
for worker, tasks_list in get_current_tasks().items():
worker_tasks_id = [task_.get('id') for task_ in tasks_list]
worker_ = Worker(name=worker)
for id_ in worker_tasks_id:
result = AsyncResult(id_, app=c_app)
worker_.tasks.append(Task(id=id_, state=result.state, meta=result.info))
response['workers'].append(worker_)
return response
if __name__ == "__main__":
import uvicorn
uvicorn.run(f_app, host="localhost", port=8000)
fastapi_app/model.py
from pydantic import BaseModel
from typing import List, Any, Optional
class Task(BaseModel):
id: Optional[str] = None
state: Optional[str] = None
meta: dict | Any | None = None
class Worker(BaseModel):
name: Optional[str] = None
tasks: List[Task] = list()
celery_app/tasks.py
from __future__ import absolute_import
import threading
import time
from celery_app.celery import c_app
def get_current_tasks() -> dict:
i = c_app.control.inspect()
return i.active()
def get_registered_tasks() -> dict:
i = c_app.control.inspect()
return i.registered()
@c_app.task(bind=True)
def task(self):
print(f'task started in {threading.current_thread()}. Thread alive:')
for i in threading.enumerate():
print(i)
n = 60
for i in range(0, n):
self.update_state(state='PROGRESS', meta={'done': i, 'total': n})
time.sleep(1)
print(f'task finished in {threading.current_thread()}\n')
return n
celery_app/celery.py
from __future__ import absolute_import
from celery import Celery
c_app = Celery('celery_app',
broker='amqp://guest:guest@localhost',
backend='rpc://',
include=['celery_app.tasks'])
那里,在
fastapi_app/main.py
我有这个函数,它启动任务
run_task()
和函数,获取所有正在运行的任务的当前进度
get_current_progress()
最后一个依赖于
celery.result.AsyncResult()
,它依赖于
update_state()
method in
task(self)
function in| ||.
celery_app/tasks.py
问题就在这里
当我通过请求 FastApi 服务器仅启动一项任务时,任务进度会正确显示。 但是当我启动多个任务(向 FastApi 服务器发送几个任务请求)时,显示会变得不正确。特别是进度的元信息。
{
"workers": [
{
"name": "celery@wsmsk1n3075",
"tasks": [
{
"id": "271531c2-48e6-4c71-a9ef-31bce434c649",
"state": "PROGRESS",
"meta": {
"done": 3,
"total": 60
}
}
]
}
]
}
需要注意的是,在 celery 中,所有 4 个任务都在单独的线程中执行:
{
"workers": [
{
"name": "celery@wsmsk1n3075",
"tasks": [
{
"id": "4d05d0f0-f058-4372-8eec-c84853188655",
"state": "PROGRESS",
"meta": {
"done": 9,
"total": 60
}
},
{
"id": "0ca82db4-7e04-4bfd-9d73-6a190decd4c6",
"state": "PROGRESS",
"meta": null
},
{
"id": "ba8aa34b-e185-47cf-bed3-f3ca07257afc",
"state": "PROGRESS",
"meta": null
},
{
"id": "3e2941f5-1285-4062-aea0-31a3c9b1cc21",
"state": "PROGRESS",
"meta": null
}
]
}
]
}
因此,它们必须有自己的进度(这似乎不是您所看到的那样)在最后一个 json 块中)。
[2024-07-23 13:02:06,427: WARNING/MainProcess] <_DummyThread(Dummy-6, started daemon 1901360139936)>
[2024-07-23 13:02:06,427: WARNING/MainProcess] <_DummyThread(Dummy-7, started daemon 1901360142016)>
[2024-07-23 13:02:06,428: WARNING/MainProcess] <_DummyThread(Dummy-8, started daemon 1901360137056)>
[2024-07-23 13:02:06,429: WARNING/MainProcess] <_DummyThread(Dummy-9, started daemon 1901360139776)>
如何在多任务情况下获取每个任务状态,就像在单任务示例中一样?
UDP
:看起来这是一个计时问题芹菜罩。如果我们在每个 AsyncResult 之前添加一些 sleep() ,就像这样: 我们将为每个 AsyncResult 提供 1 秒的转变,但是,至少我们将通过其元获得任务的实际进度(每个先前的过时转变为 1 秒)结果是c)。
@f_app.get("/curr_progress/")
def get_current_progress():
response = {'workers': []}
for worker, tasks_list in get_current_tasks().items():
worker_tasks_id = [task_.get('id') for task_ in tasks_list]
worker_ = Worker(name=worker)
for id_ in worker_tasks_id:
time.sleep(1)
result = AsyncResult(id_, app=c_app)
worker_.tasks.append(Task(id=id_, state=result.state, meta=result.info))
response['workers'].append(worker_)
return response
当我们尝试在
函数中手动通过当前任务的task_id获取进度时,会实现相同的行为。只是 time.sleep 将被人类计时所取代。
fastapi_app/main.py
get_progress(task_id)
function. Just time.sleep is going to be replaced with human timing.
此问题与 Celery 的结果后端和并发更新任务状态的方式有关。当有多个任务同时运行时,对
update_state
的快速连续调用会导致 Celery 结果后端出现竞争条件。因此,某些任务的元数据可能无法正确更新或反映在的
get_current_progress
函数中。
以下是可以解决此问题的几种方法:
1. 使用 Celery 的
sleep()
增强结果后端:
Celery 的文档建议在循环内使用
time.sleep()
时,使用 Celery 提供的
sleep()
函数。这使 Celery 能够在后台处理事件并可能更可靠地更新结果。
from celery import sleep
@c_app.task(bind=True)
def task(self):
n = 60
for i in range(0, n):
self.update_state(state='PROGRESS', meta={'done': i, 'total': n})
sleep(1)
return n
2. 增加结果后端的轮询间隔:
如果使用的是缓存或数据库作为结果后端,则可以增加轮询间隔以减少竞争条件的可能性。这可以通过在 Celery worker 的配置中设置
CELERYD_RESULT_BACKEND_POLL_INTERVAL
来完成。
3. 使用不同的结果后端:
某些结果后端(例如 Redis)可能比其他后端更适合处理并发更新。考虑切换到更强大的结果后端,例如 Redis。
4. 实现锁机制:
为了完全避免竞争条件,可以在更新任务状态时实现锁机制。这将确保一次只有一个线程可以更新特定任务的状态。但是,这会增加复杂性并可能影响性能。
from threading import Lock
task_locks = {} # 使用任务 ID 作为键的字典
@c_app.task(bind=True)
def task(self):
task_id = str(self.request.id)
task_locks[task_id] = Lock()
with task_locks[task_id]:
n = 60
for i in range(0, n):
self.update_state(state='PROGRESS', meta={'done': i, 'total': n})
time.sleep(1)
del task_locks[task_id]
return n
5. 不要过分依赖
update_state
:
请记住,
update_state
主要用于长期运行的任务,并且频繁更新可能会导致开销。如果需要实时进度更新,请考虑使用 WebSocket 或服务器发送的事件之类的机制来代替频繁轮询结果后端。
通过实现这些解决方案之一,应该能够解决在 FastAPI 应用程序中看到的竞争条件,并确保所有任务状态都能在多任务环境中正确更新和检索。请记住,最佳方法取决于的特定需求和应用程序的规模。
标签:python,python-3.x,rabbitmq,celery,fastapi From: 78782881