首页 > 数据库 >sqlalchemy_装饰器获取session

sqlalchemy_装饰器获取session

时间:2023-02-19 12:23:45浏览次数:43  
标签:sqlalchemy name fastapi session import async 装饰

/Users/codelearn/fastapi-tutorial-fastapi_with_async_sqlalchemy/run.py

# uvicorn backend.app.main:app --host 127.0.0.1 --port 8000
# uvicorn src.main:app --host 127.0.0.1 --port 8000 --reload

/Users/codelearn/fastapi-tutorial-fastapi_with_async_sqlalchemy/src/main.py

from fastapi import FastAPI
from .routers import category


app = FastAPI()
app.include_router(category.router)

/Users/codelearn/fastapi-tutorial-fastapi_with_async_sqlalchemy/src/routers/category.py

import time
import asyncio
from fastapi import APIRouter, HTTPException, Response


from ..queries.async_category import async_fetch_categories


router = APIRouter()


@router.get('/async_categories', status_code=200)
async def list_async_categories(name:str):
    print('router:name = '+name)
    await asyncio.sleep(0.2)
    return await async_fetch_categories(name)

/Users/codelearn/fastapi-tutorial-fastapi_with_async_sqlalchemy/src/queries/async_category.py

from sqlalchemy import select
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from ..db.async_base import async_session
from ..models.category import Category

# 获取db回话的装饰器
@async_session
async def async_fetch_categories(async_session:AsyncSession,name:str):
    print('queries: name'+name)
    stmt = select(Category).filter()
    result = await async_session.execute(stmt)
    return result.scalars().all()

/Users/codelearn/fastapi-tutorial-fastapi_with_async_sqlalchemy/src/models/category.py

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import INTEGER, VARCHAR, Column, TIMESTAMP, func


Base = declarative_base()


class Category(Base):
    __tablename__ = 'category'
    id = Column('id', INTEGER, primary_key=True)
    name = Column('name', VARCHAR(30), nullable=False)
    description = Column('description', VARCHAR(255))
    timestamp = Column('timestamp', TIMESTAMP, server_default=func.now())

/Users/codelearn/fastapi-tutorial-fastapi_with_async_sqlalchemy/src/db/async_base.py

from curses import echo
import os

from contextlib import asynccontextmanager
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker



db_url = f'sqlite+aiosqlite:///../../test.db'

async_engine = create_async_engine(db_url, echo=True)

AsyncLocalSession = sessionmaker(async_engine, expire_on_commit=False, class_=AsyncSession)


@asynccontextmanager
async def get_async_session():
    session = AsyncLocalSession()
    try:
        print('开启数据库会话')
        yield session
    except Exception as e:
        print(e)
        await session.rollback()
    finally:
        await session.close()
        print('关闭数据库会话')


def async_session(func):
    async def wrapper(*args, **kwargs):
        async with get_async_session() as session:
            return await func(session, *args, **kwargs)
    return wrapper

输出结果

  1. 每次查询都会打开一个数据库回话,查询完毕之后就关闭这个数据库回话
    img

标签:sqlalchemy,name,fastapi,session,import,async,装饰
From: https://www.cnblogs.com/zhuoss/p/17134504.html

相关文章