我像那样配置 Celery
from celery import Celery
from settings.config import settings
celery_app = Celery(
broker=settings.RABBITMQ_URL,
backend="rpc://",
)
celery_app.config_from_object(settings.CELERY_SETTINGS_MODULE)
celery_app.autodiscover_tasks(["app.tasks"], force=True)
在另一个文件中,我有:
class AsyncTask(celery.Task):
def run(self, *args, **kwargs):
coro = self._run(*args, **kwargs)
try:
result = self._thread_isolated_worker(coro)
return result
except (DeprecatedTimeoutError, TimeoutError):
raise TimeoutError
@abstractmethod
async def task(self, *args, **kwargs):
# do something
async_task = celery_app.register_task(AsyncTask())
当我运行工作程序时,我看到任务已注册,但是当我运行时 这个简单的脚本
from app.tasks import async_task
async_task.delay().get()
我得到
celery.exceptions.NotRegistered: 'async_task'
Celery版本:
celery==5.3.6
如果我运行一个任务,它是用通常的@shared_task装饰器注册的,它工作得很好。
我认为这与注册有关,但它对我来说看起来是正确的。
问题在于你是在将
实例
化的任务类注册到 Celery,而不是类本身。当你实例化一个类时(使用
()
),你实际上是在创建一个该类的对象。Celery 需要注册的是类本身,以便它可以根据需要创建新的任务实例。
你需要做以下更改:
-
不要实例化
AsyncTask
: 从celery_app.register_task(AsyncTask())
中移除()
。 -
使用
@async_task
装饰器: Celery 提供了一个方便的装饰器,用于注册基于类的任务。
以下是修改后的代码:
from celery import Celery, shared_task
# ... other imports
@celery_app.task(base=AsyncTask)
class AsyncTask(celery.Task):
def run(self, *args, **kwargs):
coro = self._run(*args, **kwargs)
try:
result = self._thread_isolated_worker(coro)
return result
except (DeprecatedTimeoutError, TimeoutError):
raise TimeoutError
@abstractmethod
async def task(self, *args, **kwargs):
# do something
# ... later in your code
from app.tasks import AsyncTask
AsyncTask.delay().get()
解释:
-
我们使用
@celery_app.task
装饰器将AsyncTask
注册为 Celery 任务。 -
base=AsyncTask
参数告诉 Celery 使用AsyncTask
类作为基类。 -
现在,可以直接使用
AsyncTask.delay().get()
调用该任务,而无需手动实例化它。
通过这些更改,Celery 将能够正确注册你的任务,并且你将不再遇到
NotRegistered
错误。