首页 > 编程问答 >Celery 返回有关一名工作人员当前任务的错误信息

Celery 返回有关一名工作人员当前任务的错误信息

时间:2024-08-01 08:19:57浏览次数:21  
标签:python python-3.x rabbitmq celery fastapi

我有一个捆绑包,其中包含用于任务的 Celery 和 RabbitMQ 以及用于 Web 请求的 FastApi 应用程序。
celery 应用程序从命令提示符启动 celery -A celery_app worker -l info -P gevent
Rabbit 部署在 Docker 容器中。
FastApi 从 python 启动脚本。

这是代码。问题如下。

fastapi_app/main.py

from __future__ import absolute_import
from fastapi import FastAPI
from celery.result import AsyncResult
from celery_app.tasks import task, get_current_tasks
from celery_app.celery import c_app
from fastapi_app.model import Worker, Task

f_app = FastAPI()


@f_app.post("/task/")
def run_task():
    _task = task.apply_async()
    return {"task_id": _task.id}


@f_app.get("/task_info/{task_id}")
def get_progress(task_id):
    result = AsyncResult(task_id, app=c_app)
    return Task(id=task_id, state=result.state, meta=result.info)


@f_app.get("/curr_progress/")
def get_current_progress():
    response = {'workers': []}
    for worker, tasks_list in get_current_tasks().items():
        worker_tasks_id = [task_.get('id') for task_ in tasks_list]
        worker_ = Worker(name=worker)
        for id_ in worker_tasks_id:
            result = AsyncResult(id_, app=c_app)
            worker_.tasks.append(Task(id=id_, state=result.state, meta=result.info))
        response['workers'].append(worker_)
    return response


if __name__ == "__main__":
    import uvicorn
    uvicorn.run(f_app, host="localhost", port=8000)

fastapi_app/model.py

from pydantic import BaseModel
from typing import List, Any, Optional


class Task(BaseModel):
    id: Optional[str] = None
    state: Optional[str] = None
    meta: dict | Any | None = None


class Worker(BaseModel):
    name: Optional[str] = None
    tasks: List[Task] = list()

celery_app/tasks.py

from __future__ import absolute_import

import threading
import time
from celery_app.celery import c_app


def get_current_tasks() -> dict:
    i = c_app.control.inspect()
    return i.active()


def get_registered_tasks() -> dict:
    i = c_app.control.inspect()
    return i.registered()


@c_app.task(bind=True)
def task(self):
    print(f'task started in {threading.current_thread()}. Thread alive:')
    for i in threading.enumerate():
        print(i)

    n = 60
    for i in range(0, n):
        self.update_state(state='PROGRESS', meta={'done': i, 'total': n})
        time.sleep(1)
    print(f'task finished in {threading.current_thread()}\n')
    return n

celery_app/celery.py

from __future__ import absolute_import
from celery import Celery

c_app = Celery('celery_app',
               broker='amqp://guest:guest@localhost',
               backend='rpc://',
               include=['celery_app.tasks'])

那里,在 fastapi_app/main.py 我有这个函数,它启动任务 run_task() 和函数,获取所有正在运行的任务的当前进度 get_current_progress()
最后一个依赖于 celery.result.AsyncResult() ,它依赖于 update_state() method in task(self) function in| ||. celery_app/tasks.py 问题就在这里

当我通过请求 FastApi 服务器仅启动一项任务时,任务进度会正确显示。 但是当我启动多个任务(向 FastApi 服务器发送几个任务请求)时,显示会变得不正确。特别是进度的元信息。

{
  "workers": [
    {
      "name": "celery@wsmsk1n3075",
      "tasks": [
        {
          "id": "271531c2-48e6-4c71-a9ef-31bce434c649",
          "state": "PROGRESS",
          "meta": {
            "done": 3,
            "total": 60
          }
        }
      ]
    }
  ]
}

需要注意的是,在 celery 中,所有 4 个任务都在单独的线程中执行:

{
  "workers": [
    {
      "name": "celery@wsmsk1n3075",
      "tasks": [
        {
          "id": "4d05d0f0-f058-4372-8eec-c84853188655",
          "state": "PROGRESS",
          "meta": {
            "done": 9,
            "total": 60
          }
        },
        {
          "id": "0ca82db4-7e04-4bfd-9d73-6a190decd4c6",
          "state": "PROGRESS",
          "meta": null
        },
        {
          "id": "ba8aa34b-e185-47cf-bed3-f3ca07257afc",
          "state": "PROGRESS",
          "meta": null
        },
        {
          "id": "3e2941f5-1285-4062-aea0-31a3c9b1cc21",
          "state": "PROGRESS",
          "meta": null
        }
      ]
    }
  ]
}

因此,它们必须有自己的进度(这似乎不是您所看到的那样)在最后一个 json 块中)。

[2024-07-23 13:02:06,427: WARNING/MainProcess] <_DummyThread(Dummy-6, started daemon 1901360139936)>
[2024-07-23 13:02:06,427: WARNING/MainProcess] <_DummyThread(Dummy-7, started daemon 1901360142016)>
[2024-07-23 13:02:06,428: WARNING/MainProcess] <_DummyThread(Dummy-8, started daemon 1901360137056)>
[2024-07-23 13:02:06,429: WARNING/MainProcess] <_DummyThread(Dummy-9, started daemon 1901360139776)>

如何在多任务情况下获取每个任务状态,就像在单任务示例中一样?

UDP

:看起来这是一个计时问题芹菜罩。如果我们在每个 AsyncResult 之前添加一些 sleep() ,就像这样: 我们将为每个 AsyncResult 提供 1 秒的转变,但是,至少我们将通过其元获得任务的实际进度(每个先前的过时转变为 1 秒)结果是c)。

@f_app.get("/curr_progress/")
def get_current_progress():
    response = {'workers': []}
    for worker, tasks_list in get_current_tasks().items():
        worker_tasks_id = [task_.get('id') for task_ in tasks_list]
        worker_ = Worker(name=worker)
        for id_ in worker_tasks_id:
            time.sleep(1)
            result = AsyncResult(id_, app=c_app)
            worker_.tasks.append(Task(id=id_, state=result.state, meta=result.info))
        response['workers'].append(worker_)
    return response

当我们尝试在
函数中手动通过当前任务的task_id获取进度时,会实现相同的行为。只是 time.sleep 将被人类计时所取代。 fastapi_app/main.py get_progress(task_id) function. Just time.sleep is going to be replaced with human timing.


此问题与 Celery 的结果后端和并发更新任务状态的方式有关。当有多个任务同时运行时,对 update_state 的快速连续调用会导致 Celery 结果后端出现竞争条件。因此,某些任务的元数据可能无法正确更新或反映在的 get_current_progress 函数中。

以下是可以解决此问题的几种方法:

1. 使用 Celery 的 sleep() 增强结果后端:

Celery 的文档建议在循环内使用 time.sleep() 时,使用 Celery 提供的 sleep() 函数。这使 Celery 能够在后台处理事件并可能更可靠地更新结果。

from celery import sleep

@c_app.task(bind=True)
def task(self):
    n = 60
    for i in range(0, n):
        self.update_state(state='PROGRESS', meta={'done': i, 'total': n})
        sleep(1)
    return n

2. 增加结果后端的轮询间隔:

如果使用的是缓存或数据库作为结果后端,则可以增加轮询间隔以减少竞争条件的可能性。这可以通过在 Celery worker 的配置中设置 CELERYD_RESULT_BACKEND_POLL_INTERVAL 来完成。

3. 使用不同的结果后端:

某些结果后端(例如 Redis)可能比其他后端更适合处理并发更新。考虑切换到更强大的结果后端,例如 Redis。

4. 实现锁机制:

为了完全避免竞争条件,可以在更新任务状态时实现锁机制。这将确保一次只有一个线程可以更新特定任务的状态。但是,这会增加复杂性并可能影响性能。

from threading import Lock

task_locks = {}  # 使用任务 ID 作为键的字典

@c_app.task(bind=True)
def task(self):
    task_id = str(self.request.id)
    task_locks[task_id] = Lock()

    with task_locks[task_id]:
        n = 60
        for i in range(0, n):
            self.update_state(state='PROGRESS', meta={'done': i, 'total': n})
            time.sleep(1)
    del task_locks[task_id]
    return n

5. 不要过分依赖 update_state

请记住, update_state 主要用于长期运行的任务,并且频繁更新可能会导致开销。如果需要实时进度更新,请考虑使用 WebSocket 或服务器发送的事件之类的机制来代替频繁轮询结果后端。

通过实现这些解决方案之一,应该能够解决在 FastAPI 应用程序中看到的竞争条件,并确保所有任务状态都能在多任务环境中正确更新和检索。请记住,最佳方法取决于的特定需求和应用程序的规模。

标签:python,python-3.x,rabbitmq,celery,fastapi
From: 78782881

相关文章

  • 如何将优先级任务添加到celery队列而不禁用获取
    我有一个celery工作线程,并发度设置为1,它从RabbitMQ获取任务。我想创建一个在单个并发设置中只有一个队列的系统,因此,所有任务都将添加到主队列中。关于任务-它只是一个循环,我们用更新状态。|||并行地我有两个服务。task.update_state()Celery-beat......
  • ffmpeg python 导致死锁
    我在使用ffmpegpython处理相机帧时遇到问题。我使用process.communicate()的第一种方法效果很好,但存在延迟问题。process=(ffmpeg.input('pipe:',format='rawvideo',pix_fmt='rgb24',s='{}x{}'.format(width,height))......
  • 将 HTTP 分块编码数据流代码片段从 Node.js 转换为 Python
    我有一个Node.js客户端代码,它将请求发送到HTTP服务器,然后连续接收分块编码数据。这是带有一些流量数据输出的Node.js代码。consthttp=require('http');constoptions={hostname:'...',path:'...',port:...,...};constreq=http.request(......
  • 如何从python读取matlab持续时间对象
    我创建一个matlab持续时间对象并将其保存到.mat文件:timeend=seconds(123);save('time.mat',timeend,'-v7.3');然后我从python读取它:withh5py.File('time.mat','r')asf:var=f['timeend'][:]print(list(var))......
  • 通过 python 连接到 Snowflake 时出错“UnpicklingError: invalid load key, '\x00'
    我在使用snowflake.connector.connect通过python连接到snowflake时遇到以下错误importsnowflake.connector#pipinstallsnowflake-connector-python#iamgettingtheenvfrom.envfileistoredlocallycnx=snowflake.connector.connect(user=os.getenv('USER'),pass......
  • Python Selenium 单击 webdriverwait 与 find_element
    我无法理解这两个代码块之间的区别。发送点击在webdriverwait和find_elements中都有效。代码1fromseleniumimportwebdriverfromselenium.webdriver.common.byimportByfromselenium.webdriver.support.uiimportWebDriverWaitfromselenium.webdriver.suppo......
  • 使用Python时如何避免`setattr`(和`getattr`)?以及是否有必要避免
    如果我想向协议缓冲区中的字段添加一个在编译时未知的值,我目前正在做setattr我通常不喜欢使用setattr,因为它看起来不太安全。但是当我知道该对象是protobuf时,我认为这很好,因为我设置它的值必须是protobuf允许的类型。所以也许它并不是真的不安全?让我举......
  • Java sshtools 生成的 EDDSA 签名与 Python 的 pycryptome 生成的签名不匹配
    我有一个python库,它使用pycryptodomelibrary使用openssh格式的ED25519私钥使用Ed25519算法对数据进行签名。然后需要使用sshtools库和相应的公钥在Java应用程序中验证签名。但是签名验证失败。约束:从文件中读取私钥/公钥很重要。我无法......