我用fastapi在写接口,数据库orm用的是tortoise-orm,接口的数据库操作是正常的。
现在加入了celery,但是每个celery在执行任务时,不能获取到数据库连接
我想要每个worker获得数据库连接,但是不要每个任务都去连接一次,并在每个worker结束时,断开连接,但是不能断开其他worker的数据库连接
from celery.signals import worker_process_init, worker_process_shutdown
from tortoise import Tortoise
@worker_process_init.connect
def init_worker(**kwargs):
loop = asyncio.get_event_loop()
loop.run_until_complete(Tortoise.init(
db_url='postgres://localhost:5432/mydb',
modules={'models': ['my_app.models']}
))
@worker_process_shutdown.connect
def close_worker(**kwargs):
loop = asyncio.get_event_loop()
loop.run_until_complete(Tortoise.close_connections())
我用这样的代码,但是会报错:
ask celery_app.img_ocr_zuowen_Bknd[f739a92a-8fe6-4bbb-91f4-2b10c596d239] raised unexpected: InterfaceError('cannot perform operation: another operation is in progress')
注意:我的celery版本是5.2.7 ,没有 maybe_async
你的问题可能是由于在异步环境中同步地调用了异步函数,导致了这个错误。你可以尝试使用Celery的内置事件循环来运行这些函数。
首先,你需要获取到Celery worker的事件循环,然后在这个事件循环上运行你的异步代码。以下是一个示例:
```python
from tortoise import Tortoise
from celery.signals import worker_process_init, worker_process_shutdown
from startup import init_db
@worker_process_init.connect
def on_worker_init(*args, **kwargs):
print('初始化数据库')
from celery._state import _task_stack # Celery's internal task context stack
if _task_stack.top is not None:
loop = _task_stack.top.request.loop # Get the event loop for the current task
else:
loop=asyncio.get_event_loop()
loop.run_until_complete(init_db())
@worker_process_shutdown.connect
def on_worker_shutdown(*args, **kwargs):
print('关闭数据库')
from celery._state import _task_stack
if _task_stack.top is not None:
loop = _task_stack.top.request.loop # Get the event loop for the current task
else:
loop=asyncio.get_event_loop()
loop.run_until_complete(Tortoise.close_connections())
import asyncio#windows上加载不出来
from apps.zuowen.tool import img_orc_bknd,img_orc_bknd1
@app.task()
def img_ocr_zuowen_Bknd(zuowen_id,attach_id):
asyncio.get_event_loop().run_until_complete(img_orc_bknd(zuowen_id,attach_id))
```
注意:以上代码只能在Celery任务中工作,如果你需要在其他地方初始化Tortoise ORM(例如,在启动worker时),那么你可能需要创建一个新的事件循环。
另外,请确保Tortoise ORM和Celery都支持相同版本的Python asyncio库。
标签:redis,tortoise,fastapi,worker,celery,init,task,import,loop From: https://www.cnblogs.com/rainny/p/17729726.html