首页 > 数据库 >fastapi+tortoise-orm+redis+celery 多worker数据库连接

fastapi+tortoise-orm+redis+celery 多worker数据库连接

时间:2023-09-26 11:46:13浏览次数:76  
标签:redis tortoise fastapi worker celery init task import loop

我用fastapi在写接口,数据库orm用的是tortoise-orm,接口的数据库操作是正常的。
现在加入了celery,但是每个celery在执行任务时,不能获取到数据库连接
我想要每个worker获得数据库连接,但是不要每个任务都去连接一次,并在每个worker结束时,断开连接,但是不能断开其他worker的数据库连接
from celery.signals import worker_process_init, worker_process_shutdown
from tortoise import Tortoise
@worker_process_init.connect
def init_worker(**kwargs):
loop = asyncio.get_event_loop()
loop.run_until_complete(Tortoise.init(
db_url='postgres://localhost:5432/mydb',
modules={'models': ['my_app.models']}
))
@worker_process_shutdown.connect
def close_worker(**kwargs):
loop = asyncio.get_event_loop()
loop.run_until_complete(Tortoise.close_connections())
我用这样的代码,但是会报错:
ask celery_app.img_ocr_zuowen_Bknd[f739a92a-8fe6-4bbb-91f4-2b10c596d239] raised unexpected: InterfaceError('cannot perform operation: another operation is in progress')
注意:我的celery版本是5.2.7 ,没有 maybe_async

你的问题可能是由于在异步环境中同步地调用了异步函数,导致了这个错误。你可以尝试使用Celery的内置事件循环来运行这些函数。

首先,你需要获取到Celery worker的事件循环,然后在这个事件循环上运行你的异步代码。以下是一个示例:

```python


from tortoise import Tortoise
from celery.signals import worker_process_init, worker_process_shutdown

from startup import init_db
@worker_process_init.connect
def on_worker_init(*args, **kwargs):
print('初始化数据库')
from celery._state import _task_stack # Celery's internal task context stack

if _task_stack.top is not None:
loop = _task_stack.top.request.loop # Get the event loop for the current task
else:
loop=asyncio.get_event_loop()
loop.run_until_complete(init_db())

@worker_process_shutdown.connect
def on_worker_shutdown(*args, **kwargs):
print('关闭数据库')
from celery._state import _task_stack
if _task_stack.top is not None:
loop = _task_stack.top.request.loop # Get the event loop for the current task
else:
loop=asyncio.get_event_loop()
loop.run_until_complete(Tortoise.close_connections())

import asyncio#windows上加载不出来
from apps.zuowen.tool import img_orc_bknd,img_orc_bknd1
@app.task()
def img_ocr_zuowen_Bknd(zuowen_id,attach_id):
asyncio.get_event_loop().run_until_complete(img_orc_bknd(zuowen_id,attach_id))

```

注意:以上代码只能在Celery任务中工作,如果你需要在其他地方初始化Tortoise ORM(例如,在启动worker时),那么你可能需要创建一个新的事件循环。

另外,请确保Tortoise ORM和Celery都支持相同版本的Python asyncio库。

标签:redis,tortoise,fastapi,worker,celery,init,task,import,loop
From: https://www.cnblogs.com/rainny/p/17729726.html

相关文章

  • 使用 Spring Integration 实现基于 Redis 的分布式锁以及踩坑
    背景分布式锁的应用场景应该还是蛮多的,这里就不赘述了。之前在开发中实现分布式锁都是自己基于Redis造轮子,虽然也不复杂并且自己实现一次能对分布式锁有更深的了解,但是终归有些麻烦。尤其是新项目需要的时候还得CV一次。然后在查询过程中(毫不意外地)发现Spring有现成的组......
  • 在线问诊 Python、FastAPI、Neo4j — 问题反馈
    目录查出节点拼接节点属性测试结果问答演示通过节点关系,找出对应的节点,获取节点属性值,并拼接成想要的结果。接上节生成的CQL#输入question_class={'args':{'看东西有时候清楚有时候不清楚':['symptom']},'question_types':['symptom_disease']}#输出[{'question_typ......
  • 在线问诊 Python、FastAPI、Neo4j — Cypher 生成
    目录构建节点字典构建CypherCQL语句Test这边只是为了测试,演示效果和思路,实际应用中,可以通过NLP构建CQL接上一篇的问题分类question="请问最近看东西有时候清楚有时候不清楚是怎么回事"#最终输出data={'args':{'看东西有时候清楚有时候不清楚':['symptom']},'questio......
  • Redis大key问题解决方案
     Redis的大key如何处理 介绍 大key并不是指key的值很大,而是key对应的value很大(非常占内存)一般而言,下面这两种情况被称为大key:String类型的值大于10KB;Hash、List、Set、ZSet类型的元素的个数超过5000个;为什么会出现大key数据结构不合理:当使用Re......
  • Redis之Lua语言入门
    前言Redis通过lua脚本来支持多条语句的原子性。Linux下安装#下载压缩包curl-R-Ohttp://www.lua.org/ftp/lua-5.4.3.tar.gz#解压tar-zxvflua-5.4.3.tar.gz#进入解压目录cdlua-5.4.3#编译makelinuxtest#安装,此步骤也可以省略makeinstall运行my......
  • Redis
    一、简介    NoSql的一种。性能优越,支持每秒十几万次的读写操作。性能远超数据库,还支持集群、分布式、主从同步等。还支持一定的事务能力。保证了高并发场景下的数据安全和一致性。二、Redis与Memcache的区别    1、Redis支持更丰富额度数据类型(支持更复杂的应......
  • 在线问诊 Python、FastAPI、Neo4j — 构建问题分类器
    目录构建字典数据构建Trie字典树按实体组装字典问题分析将问题进行分析,和系统已有的分类进行关联构建字典数据将构建的知识图片字典化,用于后面对问题的解析,下图为症状的字典,其它字典同理构建Trie字典树将建字典数据,组装集合cur_dir='/'.join(os.path.abspath(__file......
  • Redis常用操作命令
     关于Redis的操作命令实在是太多了,我们不可能每个都说,用的时候查找即可,现在只简单说明一下1.对键操作的命令exists(key):确认一个key是否存在del(key):删除一个keytype(key):返回值的类型keys(pattern):返回满足给定pattern的所有keyrandomkey:随机返回key空间的一个keyrename(o......
  • Redis 客户端连接
     Redis命令用于在redis服务上执行操作。要在redis服务上执行命令需要一个redis客户端。Redis客户端在Redis包中有提供,这个包在我们前面的安装教程中就有安装过了。Redis通过监听一个TCP端口或者Unixsocket的方式来接收来自客户端的连接,当一个连接建立......
  • 初识Redis
    Redis简介Redis(REmoteDIctionaryServer)是一个由SalvatoreSanfilippo写的key-value存储系统。Redis是一个开源的使用ANSIC语言编写、遵守BSD协议、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API。它通常被称为数据结构服务器,因为值(value)可以......