首页 > 编程问答 >在工作线程中找到基于 Celery 类的任务,但在使用时得到 NotRegistered

在工作线程中找到基于 Celery 类的任务,但在使用时得到 NotRegistered

时间:2024-07-29 16:21:38浏览次数:13  
标签:python celery

我像那样配置 Celery

from celery import Celery

from settings.config import settings

celery_app = Celery(
    broker=settings.RABBITMQ_URL,
    backend="rpc://",
)

celery_app.config_from_object(settings.CELERY_SETTINGS_MODULE)
celery_app.autodiscover_tasks(["app.tasks"], force=True)

在另一个文件中,我有:

class AsyncTask(celery.Task):
    def run(self, *args, **kwargs):
        coro = self._run(*args, **kwargs)
        try:
            result = self._thread_isolated_worker(coro)
            return result
        except (DeprecatedTimeoutError, TimeoutError):
            raise TimeoutError
    @abstractmethod
    async def task(self, *args, **kwargs):
        # do something




async_task = celery_app.register_task(AsyncTask())

当我运行工作程序时,我看到任务已注册,但是当我运行时 这个简单的脚本

from app.tasks import async_task
async_task.delay().get()

我得到

celery.exceptions.NotRegistered: 'async_task'

Celery版本: celery==5.3.6

如果我运行一个任务,它是用通常的@shared_task装饰器注册的,它工作得很好。

我认为这与注册有关,但它对我来说看起来是正确的。


问题在于你是在将 实例 化的任务类注册到 Celery,而不是类本身。当你实例化一个类时(使用 () ),你实际上是在创建一个该类的对象。Celery 需要注册的是类本身,以便它可以根据需要创建新的任务实例。

你需要做以下更改:

  1. 不要实例化 AsyncTask : celery_app.register_task(AsyncTask()) 中移除 ()
  2. 使用 @async_task 装饰器: Celery 提供了一个方便的装饰器,用于注册基于类的任务。

以下是修改后的代码:

from celery import Celery, shared_task

# ... other imports

@celery_app.task(base=AsyncTask)
class AsyncTask(celery.Task):
    def run(self, *args, **kwargs):
        coro = self._run(*args, **kwargs)
        try:
            result = self._thread_isolated_worker(coro)
            return result
        except (DeprecatedTimeoutError, TimeoutError):
            raise TimeoutError

    @abstractmethod
    async def task(self, *args, **kwargs):
        # do something

# ... later in your code
from app.tasks import AsyncTask
AsyncTask.delay().get()

解释:

  • 我们使用 @celery_app.task 装饰器将 AsyncTask 注册为 Celery 任务。
  • base=AsyncTask 参数告诉 Celery 使用 AsyncTask 类作为基类。
  • 现在,可以直接使用 AsyncTask.delay().get() 调用该任务,而无需手动实例化它。

通过这些更改,Celery 将能够正确注册你的任务,并且你将不再遇到 NotRegistered 错误。

标签:python,celery
From: 78792353

相关文章

  • Cmake配置Qt程序调用python库的配置方法
    在网上找了一些配置方法,最简单直接的是在cmake中加入如下语句:set(PYTHON_EXECUTABLE/Python/Python311/python.exe)include_directories("/PythonPython311/include")link_directories("/PythonPython311/libs")link_libraries(python3.lib)link_libraries(python311.lib)直......
  • VSCode 的 Python 扩展中更详细的属性提示
    假设我有一个对象args由parser.parse_args()返回,并且它应该具有像args.port=6001、args.seed=1234这样的属性。当我在VSCode中按args.时,port和seed不会显示在建议的属性列表中,因为这些属性可能会......
  • 编写用于关键字检测和按钮发送的 Python Telegram 机器人
    我需要帮助用Python为我的Telegram机器人编写代码。我有一个config.py文件,其中包含两个关键字列表:keywords和button_phrases。keywords-负责在单击时显示子按钮的按钮。Button_phrases-负责单击时打开链接的按钮。我需要机器人检查用户输入的文本并按以下顺......
  • Python monorepo 打包,使用 Poetry
    我想将我的Python源代码组织到一个单一存储库中,具有以下基本结构:projectrootdir-libraryone-pyproject.toml-README-src/orgname/libraryone-__init__.py-somemodule.py-webapi-pyproject.toml-README-src/organa......
  • 如何使用Python AST给表达式a == b添加括号?
    请问,有谁知道如何使用PythonAST在代码中为a==b这样的表达式添加括号?我尝试过重写visit_Compare,但是ast.unparse中的delimit_if自动删除了我添加的括号,因为优先级a==b的值更高。你说的对,直接使用ast.unparse会因为优先级问题导致添加的括号被移除。为了解......
  • 使用 powershell 或 python 从网页列出公司名称
    我希望使用PowerShell或python仅列出URL中的公司名称:https://www.moneycontrol.com/markets/earnings/results-calendar/?activeDate=2024-07-29下面是我的python脚本用于获取网页的结构:importrequestsfrombs4importBeautifulSoup#URLo......
  • T3/A40i支持Linux-5.10新内核啦,Docker、Qt、Python统统升级!
    自2021年创龙科技推出全志国产化率100%的T3/A40i工业核心板后,不到两年时间已超过800家工业客户选择创龙科技T3/A40i平台。随着客户产品的不断升级与迭代,部分“能源电力”、“工业自动化”行业客户对T3/A40i的Linux版本提出了更高要求,主要涉及Docker、Qt、Python等组件特性。秉持......
  • 使用 Python 中的多处理防止共享内存中的数据损坏?
    我目前正在开发一个多处理Python程序,其中每个进程将其索引作为连续的4字节整数写入共享内存。并且有一个读取器可以在没有任何锁的情况下读取其他进程的索引。因为我没有使用任何同步原语,所以我担心读取器进程可能会由于逐字节写入内存而读取损坏的数据(例如,一个索引的前2个......
  • Python如何统治AI世界?一文读懂它的优势与挑战
    一、Python语言介绍1.1Python语言概述Python是一种由GuidovanRossum于1991年首次发布的高级编程语言。其设计理念强调代码的可读性和简洁性,使其成为了许多开发者的首选语言。Python的语法简洁直观,采用了缩进来定义代码块,这与其他使用花括号或关键词的编程语言不同。Python......
  • 找到一种方法将program1的输出作为python中program2的输入发送
    有人可以帮我找到一种方法将program1的输出作为python中的program2的输入发送将其保存为.csv文件不会对我有帮助,因为该程序应该尽快执行这些任务。因此我正在寻找一种方法将程序1的终端输出直接发送到程序2在Python中,可以使用子进程模块将一个程序的输出发送到另一个程......