首页 > 数据库 >Celery 任务:SQLAlchemy 会话处理指南

Celery 任务:SQLAlchemy 会话处理指南

时间:2024-09-02 19:56:48浏览次数:11  
标签:engine ... SQLAlchemy self task Celery 会话 session

Celery 任务:SQLAlchemy 会话处理指南_解决方案

最近在做 AI RAG 相关的项目功能,对于 RAG 需要生成一些文本处理的异步任务,使用到了 Celery 。今天就写写关于Celery 任务的文章,SQLAlchemy 的真实情况是:

  • 它的学习曲线比 Django ORM 更陡峭
  • 需要一些示例代码
  • 你需要了解一些较低层次的概念
  • 有一些难以理解的文档

如果您确实使用 `SQLAlchemy`,则必须为 `Celery` 任务编写一些示例文件,其中包含风险意大利面条代码。

在本文中,我将向您介绍一些基本的 SQLAlchemy 概念,并向您展示如何在 Celery 任务中使用 SQLAlchemy,而无需求助于第三方包,这

  • 帮助您了解事物是如何运作的
  • 提供了一个通用的解决方案,即使没有 Web 框架,也可以与 FlaskFastAPI 或其他任何东西一起使用

Celery 任务:SQLAlchemy 会话处理指南_自定义_02

SQLAlchemy

Django ORM 世界中的生活非常简单。数据库操作通过模型对象提供:

from celery import Celery

app = Celery(...)

@app.task()
def my_task():
   book = Book.objects.get(title="To Kill a Mockingbird")
   ...
   book.save()

SQLAlchemy 世界中,情况非常不同。所有数据库操作都是通过会话对象执行的。会话与模型对象严格分开:

from celery import Celery
from sqlalchemy import create_engine
from sqlalchemy.orm import Session

app = Celery(...)
engine = create_engine("...")

@app.task()
def my_task():
   session = Session(engine)
   book = session.query(Book).filter_by(title="To Kill a Mockingbird").one()
   ...
   session.add(book)
   session.commit()
   session.close()

会话建立与数据库的对话,并代表您在其生命周期内加载、创建或操作的所有对象的暂存区域。

会话管理

您可以将 SQLAlchemy 会话视为数据库事务。作为一般规则,会话的生命周期应该与访问和操作数据库数据的函数和对象分开并位于外部。会议应该很短。例如,在传入 Celery 任务请求的上下文中,应在任务代码的开头创建会话并在结束时关闭,而不是无限期地保持打开状态并在任务之间共享。

from celery import Celery
from sqlalchemy import create_engine
from sqlalchemy.orm import Session

app = Celery(...)
engine = create_engine("...")

@app.task()
def my_task():
   session = Session(engine)
   book = session.query(Book).filter_by(title="To Kill a Mockingbird").one()
   ...
   session.add(book)
   session.commit()
   session.close()

或者,使用上下文管理器:

from celery import Celery
from sqlalchemy import create_engine
from sqlalchemy.orm import Session

app = Celery(...)
engine = create_engine("...")

@app.task()
def my_task():
   with Session(engine) as session:
      book = session.query(Book).filter_by(title="To Kill a Mockingbird").one()
      ...
      session.add(book)
      session.commit()

Celery task

我对上述两个选项的问题是,它在每个任务中涉及大量重复的锅炉代码。如果每个 Celery 任务请求都带有一个现成的会话对象,而不必在开始时创建它并在最后关闭它,那就太好了。像这样的事情:

def my_task(session):
   book = session.query(Book).filter_by(title="To Kill a Mockingbird").one()
   ...
   session.add(book)
   session.commit()

事实证明,在运行时注入会话变量是不可能的。不过可以绑定任务。绑定任务始终将任务实例作为其第一个参数。

@app.task(bind=True)
def my_task(self):
   ...

默认情况下,self类型为celery.Taskcelery.Task定义了可用于 Celery 任务的所有方法,例如apply_asyncretry

您的代码和 Celery 任务之间的每次交互以及您的工作线程和 Celery 任务之间的每次交互都是通过这些celery.Task方法发生的。事实上,当你的worker处理一个任务时,它总是遵循以下顺序:

  1. 跑步before_start
  2. 运行任务
  3. 跑步after_return

即使步骤 2 中的任务抛出异常,也after_return能保证运行。您可以使用它来简化 SQLAlchemy 会话的创建和拆卸:

  • 在中创建会话before_start
  • 使会话可用于绑定任务
  • 关闭会话after_return
import celery
from sqlalchemy.orm import Session

class MyTask(celery.Task):
    def __init__(self):
        self.sessions = {}

    def before_start(self, task_id, args, kwargs):
        self.sessions[task_id] = Session(...)
        super().before_start(task_id, args, kwargs)

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        session = self.sessions.pop(task_id)
        session.close()
        super().after_return(status, retval, task_id, args, kwargs, einfo)

    @property
    def session(self):
        return self.sessions[self.request.id]

请注意,每个流程只有一个任务实例,这意味着流程中的每个任务共享相同的任务对象。为了隔离每个任务请求的 SQLAlchemy 会话,我使用字典和唯一的任务请求 ID 作为键。

绑定任务

到目前为止我们有:

  • MyTask,自定义celery.Task实现
  • 一个任务,绑定celery.Task到 Celery 任务

缺少的是绑定MyTask而不是celery.Task任务。为此,Celery 提供了以下base参数:

from celery import Celery
from sqlalchemy import create_engine
from sqlalchemy.orm import Session

app = Celery(...)
engine = create_engine("...")

class MyTask(celery.Task):
    def __init__(self):
        self.sessions = {}

    def before_start(self, task_id, args, kwargs):
        self.sessions[task_id] = Session(...)
        super().before_start(task_id, args, kwargs)

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        session = self.sessions.pop(task_id)
        session.close()
        super().after_return(status, retval, task_id, args, kwargs, einfo)

    @property
    def session(self):
        return self.sessions[self.request.id] 

@app.task(bind=True, base=MyTask)
def my_task(self):
    book = self.session.query(Book).filter_by(title="To Kill a Mockingbird").one()
    ...
    self.session.add(book)
    self.session.commit()

这是一个通用解决方案,将 SQLAlchemy 会话处理委托给自定义任务类。它使您的任务代码免受重复的样板代码的影响。你怎么认为?可以在评论区讨论

标签:engine,...,SQLAlchemy,self,task,Celery,会话,session
From: https://blog.51cto.com/demo007x/10803640

相关文章

  • Winserver 阴影会话,远程协助相关
    场景(方便管理协助其他远程会话)用户都是运行在winserver上的一个一个远程会话,有时需要看一下客户操作了什么。换做本地客户端的情况,可以让客户开远程工具。方式1:远程协助 有一个轻松连接,这个目前没弄出来。效果跟金万维的一样,只要知道对方IP、计算机名。还有6位连接码就可以......
  • 使用Flask快速构建Web后端项目:Python、Flask、Mysql、Migrate、SQLAlchemy、Login、Se
    Flask是一个用Python编写的轻量级Web应用框架。它设计简单且易于扩展,如果与Jinja2模板引擎和WerkzeugWSGI工具集结合使用,Flask可以用来快速开发小型到中型的网站。Flask鼓励快速开发和简洁的代码,同时保持了扩展性和灵活性。本文旨在如何使用Flask及其相关组件快......
  • AutoGen:微软AI多智能体会话框架的新变革
            在人工智能发展的今天,对话系统和自然语言处理技术的创新日新月异。微软推出了名为AutoGen的框架,它标志着在使用大型预训练语言模型(LLM)开发应用程序方面的一次飞跃。AutoGen不仅因其技术突破而受到业界的广泛关注,并且在短短的时间里,已经被TheSequence评为2023年......
  • 【爬虫实战】——利用bs4和sqlalchemy操作mysql数据库,实现网站多行数据表格爬取数据
    前言此篇接上一篇的内容,在其基础上爬取网站的多行表格数据,以及把数据写入到mysql数据库中目录一、定位表格查找元素二、提取数据三、写入mysql数据库四、附录一、定位表格查找元素首先打开网站,如图需要爬取多行数据的表格,利用查找元素定位,看图中分析得知我要爬取的是tr......
  • 在 SQLAlchemy 中实现数据处理的时候,实现表自引用、多对多、联合查询,有序id等常见的一
    有时候,我们在使用SQLAlchemy操作某些表的时候,需要使用外键关系来实现一对多或者多对多的关系引用,以及对多表的联合查询,有序列的uuid值或者自增id值,字符串的分拆等常见处理操作。1、在SQLAlchemy中定义具有嵌套children关系的表要在SQLAlchemy中定义具有嵌套children关系......
  • Python开发中,SQLAlchemy 的同步操作和异步操作封装,以及常规CRUD的处理。
    在我们使用Python来和数据库打交道中,SQLAlchemy是一个非常不错的ORM工具,通过它我们可以很好的实现多种数据库的统一模型接入,而且它提供了非常多的特性,通过结合不同的数据库驱动,我们可以实现同步或者异步的处理封装。1、SQLAlchemy介绍SQLAlchemy 是一个功能强大且灵活的Python......
  • 第一次项目搭建笔记&路由导航守卫&web前后端会话跟踪
    1.重新搭建后端项目在IDEA中重新创建一个JavaEE项目,记得勾选Webprofile之后在java文件中重新搭建分级的文件夹按照标准创建com.xxxx.dorm文件夹并创建dao(数据处理),filter(过滤器),model(模型),util(工具),web(服务端)等文件夹进行不同功能部分的分类搭建完基本的框......
  • SQLALchemy ORM 的关联关系之 ORM 中的多对多
    SQLALchemyORM的关联关系之ORM中的多对多场景示例实现多对多关系定义模型插入和查询数据总结在SQLAlchemyORM中,多对多(Many-to-Many)关联关系是一种常见的关系类型,它表示两个表中的行可以相互关联,即一个表中的多行可以与另一个表中的多行相......
  • 由于没有远程桌面授权服务器可以提供许可,远程会话已断开。请跟服务器管理员联系
    原创文档编写不易,未经许可请勿转载,目前仅发布于博客园,其他平台均为非法转载。文档中有疑问的可以邮件联系我文章。邮箱:[email protected]说明文档适用于win2008和win2012,用于远程登录提示“由于没有远程桌面授权服务器可以提供许可,远程会话已断开。请跟服务器管理员联系”报错。......
  • 使用 Flask、Celery 和 Python 实现每月定时任务
    为了创建一个使用Flask、Celery和Python实现的每月定时任务,我们需要按照以下步骤进行:1.安装必要的库我们需要安装Flask、Celery和Redis(作为消息代理)。我们可以使用pip来安装它们:bash复制代码​pipinstallflaskceleryredis2.设置Flask和Celery首先,我们需......