首页 > 数据库 >sqlalchemy_learn_sqlite

sqlalchemy_learn_sqlite

时间:2023-03-27 22:24:29浏览次数:39  
标签:sqlite sqlalchemy res db email user learn async id

/Users/song/codelearn/sqlalchemy_learn/init_test_data.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import asyncio
import random

from faker import Faker
from loguru import logger

from sqlalchemy import text, select, insert, update, and_, or_, between

from app.database.db_mysql import async_db_session

from app.models import User
from app.models import Email

from app.schemas.user import CreateUser
from app.schemas.email import CreateEmail


class InitData:
    """ 初始化数据 """

    def __init__(self):
        self.fake = Faker('zh_CN')

    async def fake_user(self):
        """ 自动创建锁定普通用户 """
        username = self.fake.user_name()

        user_obj = CreateUser(
            name=username,
        )

        # 2023-03-25 18:55:32.325 | DEBUG    | __main__:fake_user:32 - name='qinxia'
        # 2023-03-25 18:55:32.326 | DEBUG    | __main__:fake_user:33 - {'name': 'qinxia'}
        logger.debug(user_obj)
        logger.debug(user_obj.dict())

        async with async_db_session.begin() as db:
            await db.execute(
                # 方式一:
                # insert(User).values(name='bruce')

                # 方式二:
                # **user_obj.dict()可以将 {'name': 'qinxia'}解包成 ----------> name = 'qinxia'
                insert(User).values(**user_obj.dict())
            )
            await db.commit()

    async def fake_email(self):
        """ 自动创建普通用户 """
        email_obj = CreateEmail(
            pwd=self.fake.password(),
            account=self.fake.email(),
            user_id=random.randint(1, 4)
        )

        logger.debug(email_obj)

        async with async_db_session.begin() as db:
            await db.execute(
                insert(Email).values(**email_obj.dict())
            )
            await db.commit()

    async def get_username(self):
        async with async_db_session.begin() as db:
            stmt = "select username from sys_user"
            sql_res = await db.execute(
                text(stmt)
            )

            # => [('alice',), ('fang06',), ('jiajuan',), ('juan33',), ('min40',), ('min94',), ('otian',), ('song',), ('xiulanmo',), ('xiuying38',)]
            # res = sql_res.fetchall()

            # =>alice
            # res = sql_res.scalar()

            # => [('alice',), ('fang06',), ('jiajuan',), ('juan33',), ('min40',), ('min94',), ('otian',), ('song',), ('xiulanmo',), ('xiuying38',)]
            # res = sql_res.all()

            # => [('alice',), ('fang06',)]
            # res = sql_res.fetchmany(2)

            # => ('alice',)
            # res = sql_res.fetchone()

            # => ('alice',)
            # res = sql_res.first()

            # res = sql_res.freeze()
            # => [('alice',), ('fang06',), ('jiajuan',), ('juan33',), ('min40',), ('min94',), ('otian',), ('song',), ('xiulanmo',), ('xiuying38',)]
            # logger.debug(res.data)

            # => RMKeyView(['username'])
            # res = sql_res.keys()

            # res = sql_res.mappings()
            # => [{'username': 'alice'}, {'username': 'fang06'}, {'username': 'jiajuan'}, {'username': 'juan33'}, {'username': 'min40'}, {'username': 'min94'}, {'username': 'otian'}, {'username': 'song'}, {'username': 'xiulanmo'}, {'username': 'xiuying38'}]
            # logger.debug(res.all())

            # => alice
            # res = sql_res.scalar()
            # logger.debug(res)

    async def get_username_and_email(self):
        async with async_db_session.begin() as db:
            stmt = "select username,email from sys_user"
            sql_res = await db.execute(
                text(stmt)
            )

            # res = sql_res.mappings()
            # => [{'username': 'song', 'email': 'song@qq.com'}, {'username': 'juan33', 'email': 'juanxiong@example.com'}, {'username': 'otian', 'email': 'xiaxiong@example.net'}, {'username': 'min40', 'email': 'xfang@example.net'}, {'username': 'xiuying38', 'email': 'xiulanlong@example.net'}, {'username': 'alice', 'email': 'alice@qq.com'}, {'username': 'min94', 'email': 'xiulanhou@example.com'}, {'username': 'xiulanmo', 'email': 'nashi@example.com'}, {'username': 'fang06', 'email': 'ping56@example.net'}, {'username': 'jiajuan', 'email': 'wei24@example.net'}]

            # res = sql_res.first()
            # logger.debug(res._mapping)
            # 2023-03-25 16:36:47.853 | DEBUG    | __main__:get_username_and_email:130 - {'username': 'song', 'email': 'song@qq.com'}

            res = sql_res.first()
            logger.debug(res._mapping)

    async def select_username(self):
        stmt = "select * from user"
        async with async_db_session.begin() as db:
            sql_res = await db.execute(
                text(stmt)
            )

        res = sql_res.first()
        logger.debug(res._mapping)

    async def select_username2(self):
        stmt = text("select * from tuser where name=:name")
        async with async_db_session.begin() as db:
            sql_res = await db.execute(
                stmt, {'name': 'yuxia'}
            )

        res = sql_res.first()
        logger.debug(res._mapping)

    async def select_username3(self):
        async with async_db_session.begin() as db:
            sql_res = await db.execute(
                # 使用and_,连接两个条件
                # select(User.name, User.id).where(and_(User.name == 'dcai', User.id > 9))
                select(User.name, User.id).where(and_(User.name == 'dcai', User.id > 1))
            )
        res = sql_res.first()
        if not res:
            return None
        logger.debug(res._mapping)

    async def select_username4(self):
        async with async_db_session.begin() as db:
            sql_res = await db.execute(
                # 使用or_,连接两个条件
                select(User.name, User.id).where(or_(User.name == 'dca', User.id == 5))
            )
        res = sql_res.first()
        if not res:
            return None
        logger.debug(res._mapping)

    async def select_username5(self):
        async with async_db_session.begin() as db:
            sql_res = await db.execute(
                # 使用between,范围条件
                select(User.name, User.id).where(User.id.between(2, 3))
            )
        # 获取所有的row的对象形式
        res = sql_res.mappings().all()
        if not res:
            return None
        logger.debug(res)
        # 2023-03-27 21:42:05.195 | DEBUG    | __main__:select_username5:181 - [{'name': 'yuxia', 'id': 2}, {'name': 'xia43', 'id': 3}]

    async def select_username6(self):
        async with async_db_session.begin() as db:
            sql_res = await db.execute(
                # 使用between,范围条件
                select(User.name, User.id).where(User.id.between(2, 3))
            )
        res = sql_res.mappings().all()
        if not res:
            return None
        logger.debug(res)

    async def select_username7(self):
        async with async_db_session.begin() as db:
            sql_res = await db.execute(
                # 使用in_,范围条件
                select(User.name, User.id).where(User.name.in_(['yuxia', 'dcai']))
            )
        res = sql_res.mappings().all()
        if not res:
            return None
        logger.debug(res)
        # 2023-03-27 21:48:47.785 | DEBUG    | __main__:select_username7:203 - [{'name': 'dcai', 'id': 5}, {'name': 'yuxia', 'id': 2}]

    async def insert_email(self):
        async with async_db_session.begin() as db:
            await db.execute(
                # 插入数据
                insert(Email).values(
                    account='song@qq.com',
                    pwd='123456',
                    user_id=1
                )
            )
            await db.commit()

    async def insert_email2(self):
        async with async_db_session.begin() as db:
            # 批量插入数据
            await db.execute(
                insert(Email).values([
                    {"account": "song1@qq.com", "pwd": "1234568", "user_id": 2},
                    {"account": "song2@qq.com", "pwd": "1234569", "user_id": 2},
                ])
            )
            await db.commit()

    async def init_data(self):
        """ 自动创建数据 """
        # await self.fake_user()
        # await self.fake_email()
        # await self.select_username7()

        # await self.get_username()
        # await self.get_username_and_email()
        await self.insert_email2()


if __name__ == '__main__':
    init = InitData()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(init.init_data())

/Users/song/codelearn/sqlalchemy_learn/app/database/base_class.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import uuid
from datetime import datetime
from typing import Optional

from sqlalchemy import func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, declared_attr, MappedAsDataclass
from typing_extensions import Annotated

# 通用 Mapped 类型主键, 需手动添加,参考以下使用方式
# MappedBase -> id: Mapped[id_key]
# DataClassBase && Base -> id: Mapped[id_key] = mapped_column(init=False)
id_key = Annotated[int, mapped_column(primary_key=True, index=True, autoincrement=True, comment='主键id')]


class _BaseMixin:
    """
    Mixin 数据类

    Mixin: 一种面向对象编程概念, 使结构变得更加清晰, `Wiki <https://en.wikipedia.org/wiki/Mixin/>`__
    """
    # create_user: Mapped[int] = mapped_column(comment='创建者')
    # update_user: Mapped[Optional[int]] = mapped_column(default=None, comment='修改者')
    created_time: Mapped[datetime] = mapped_column(init=False, default=func.now(), comment='创建时间')
    updated_time: Mapped[Optional[datetime]] = mapped_column(init=False, onupdate=func.now(), comment='更新时间')


class MappedBase(DeclarativeBase):
    """
    声明性基类, 原始 DeclarativeBase 类, 作为所有基类或数据模型类的父类而存在

    `DeclarativeBase <https://docs.sqlalchemy.org/en/20/orm/declarative_config.html>`__
    `mapped_column() <https://docs.sqlalchemy.org/en/20/orm/mapping_api.html#sqlalchemy.orm.mapped_column>`__
    """

    @declared_attr.directive
    def __tablename__(cls) -> str:  # noqa
        return cls.__name__.lower()


class DataClassBase(MappedAsDataclass, MappedBase):
    """
    声明性数据类基类, 它将带有数据类集成, 允许使用更高级配置, 但你必须注意它的一些特性, 尤其是和 DeclarativeBase 一起使用时

    `MappedAsDataclass <https://docs.sqlalchemy.org/en/20/orm/dataclasses.html#orm-declarative-native-dataclasses>`__
    """
    __abstract__ = True


class Base(_BaseMixin, MappedAsDataclass, MappedBase):
    """
    声明性 Mixin 数据类基类, 带有数据类集成, 并包含 MiXin 数据类基础表结构, 你可以简单的理解它为含有基础表结构的数据类基类
    """

    __abstract__ = True


def use_uuid() -> str:
    """
    使用uuid

    :return:
    """
    return uuid.uuid4().hex

/Users/song/codelearn/sqlalchemy_learn/app/database/db_mysql.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker

from ..common.log import log
from ..core.conf import settings
from ..core.path_conf import SqlitePath
from ..database.base_class import MappedBase

""" 
说明:SqlAlchemy
"""

SQLALCHEMY_DATABASE_URL = f'sqlite+aiosqlite:///{SqlitePath}/test.db'

try:
    # 数据库引擎
    async_engine = create_async_engine(SQLALCHEMY_DATABASE_URL, echo=settings.DB_ECHO, future=True)
    # log.success('数据库连接成功')
except Exception as e:
    log.error('❌ 数据库链接失败 {}', e)
    sys.exit()
else:
    async_db_session = async_sessionmaker(bind=async_engine, autoflush=False, expire_on_commit=False)


async def get_db() -> AsyncSession:
    """
    session 生成器

    :return:
    """
    session = async_db_session()
    try:
        yield session
    except Exception as se:
        await session.rollback()
        raise se
    finally:
        await session.close()


async def create_table():
    """
    创建数据库表
    """
    async with async_engine.begin() as coon:
        await coon.run_sync(MappedBase.metadata.create_all)

/Users/song/codelearn/sqlalchemy_learn/app/core/path_conf.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
from pathlib import Path

# 获取项目根目录
# 或使用绝对路径,指到backend目录为止,例如windows:BasePath = D:\git_project\fastapi_mysql\backend
BasePath = Path(__file__).resolve().parent.parent

# 迁移文件存放路径
Versions = os.path.join(BasePath, 'app', 'alembic', 'versions')

# 日志文件路径
LogPath = os.path.join(BasePath.parent, 'log')

# sqlite的路径
SqlitePath = os.path.join(BasePath.parent, 'sqlite_db')

/Users/song/codelearn/sqlalchemy_learn/app/core/conf.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from functools import lru_cache

from pydantic import BaseSettings


class Settings(BaseSettings):
    """ 配置类 """
    # FastAPI
    TITLE: str = 'FastAPI'
    VERSION: str = 'v0.0.1'
    DESCRIPTION: str = """fastapi_sqlalchemy_mysql"""
    DOCS_URL: str = '/v1/docs'
    REDOCS_URL: str = None
    OPENAPI_URL: str = '/v1/openapi'

    # Uvicorn
    UVICORN_HOST: str = '127.0.0.1'
    UVICORN_PORT: int = 8000
    UVICORN_RELOAD: bool = True
    # 如果此处为True,在 @app.on_event("startup") 时发生异常,则程序不会终止,详情:https://github.com/encode/starlette/issues/486

    # Static Server
    STATIC_FILES: bool = True

    # DB
    DB_ECHO: bool = False
    DB_HOST: str = '127.0.0.1'
    DB_PORT: int = 3306
    DB_USER: str = 'root'
    DB_PASSWORD: str = '123456'
    DB_DATABASE: str = 'fsm'
    DB_CHARSET: str = 'utf8mb4'

    # Redis
    REDIS_OPEN: bool = False
    REDIS_HOST: str = '127.0.0.1'
    REDIS_PORT: int = 6379
    REDIS_PASSWORD: str = ''
    REDIS_DATABASE: int = 0
    REDIS_TIMEOUT: int = 5

    # Token
    TOKEN_ALGORITHM: str = 'HS256'  # 算法
    TOKEN_SECRET_KEY: str = '1VkVF75nsNABBjK_7-qz7GtzNy3AMvktc9TCPwKczCk'  # 密钥 secrets.token_urlsafe(32))
    TOKEN_EXPIRE_MINUTES: int = 60 * 24 * 1  # token 时效 60 * 24 * 1 = 1 天

    # Email
    EMAIL_DESCRIPTION: str = 'fastapi_sqlalchemy_mysql'  # 默认发件说明
    EMAIL_SERVER: str = 'smtp.qq.com'
    EMAIL_PORT: int = 465
    EMAIL_USER: str = '729519678@qq.com'
    EMAIL_PASSWORD: str = 'gmrvkkppberzbega'  # 授权密码,非邮箱密码
    EMAIL_SSL: bool = True

    # 邮箱登录验证码过期时间
    EMAIL_LOGIN_CODE_MAX_AGE: int = 60 * 2  # 时效 60 * 2 = 2 分钟

    # Cookies
    COOKIES_MAX_AGE: int = 60 * 5  # cookies 时效 60 * 5 = 5 分钟

    # Middleware
    MIDDLEWARE_CORS: bool = True
    MIDDLEWARE_GZIP: bool = True
    MIDDLEWARE_ACCESS: bool = False


@lru_cache
def get_settings():
    """ 读取配置优化写法 """
    return Settings()


settings = get_settings()

/Users/song/codelearn/sqlalchemy_learn/app/core/__init__.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

/Users/song/codelearn/sqlalchemy_learn/app/models/user.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from datetime import datetime
from typing import Optional, List

from sqlalchemy import func, String, ForeignKey
from sqlalchemy.dialects.mysql import LONGTEXT
from sqlalchemy.orm import Mapped, mapped_column, relationship

from app.database.base_class import use_uuid, id_key, DataClassBase


class User(DataClassBase):
    """ User Table """
    __tablename__ = 'tuser'

    id: Mapped[id_key] = mapped_column(init=False)
    name: Mapped[str] = mapped_column(String(20), unique=True, index=True, comment='用户名')

    emails: Mapped[List['Email']] = relationship('Email', back_populates='user', cascade='all,delete-orphan')

/Users/song/codelearn/sqlalchemy_learn/app/models/__init__.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
# 导入所有模型,并将 Base 放在最前面, 以便 Base 拥有它们
# imported by Alembic
"""
from app.database.base_class import MappedBase  #

from app.models.user import User
from app.models.email import Email

/Users/song/codelearn/sqlalchemy_learn/app/models/email.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from datetime import datetime
from typing import List, Optional

from sqlalchemy import ForeignKey, func, String
from sqlalchemy.dialects.mysql import LONGTEXT
from sqlalchemy.orm import Mapped, mapped_column, relationship

from app.database.base_class import id_key, DataClassBase


class Email(DataClassBase):
    """ Email Table """
    __tablename__ = 'temail'

    id: Mapped[id_key] = mapped_column(init=False)

    account: Mapped[str] = mapped_column(String(20), unique=True, index=True, comment='email account')
    pwd: Mapped[str] = mapped_column(String(20), unique=True, index=True, comment='email password')

    user_id: Mapped[int] = mapped_column(ForeignKey('tuser.id'), nullable=True, comment='the email owner id')

    # User 表示是的定义表的那个类,不是表名,特别是表名和类名一样的时候,特别容易弄错,所以首字母需要大写
    user: Mapped['User'] = relationship('User', back_populates='emails')

/Users/song/codelearn/sqlalchemy_learn/app/schemas/user.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import datetime
from typing import Optional

from pydantic import BaseModel,  EmailStr


class CreateUser(BaseModel):
    name: str


class Auth(BaseModel):
    username: str
    password: str


class ELCode(BaseModel):
    email: EmailStr


class Auth2(ELCode):
    code: str


class UpdateUser(BaseModel):
    username: str
    email: str
    mobile_number: Optional[str] = None
    wechat: Optional[str] = None
    qq: Optional[str] = None
    blog_address: Optional[str] = None
    introduction: Optional[str] = None


class GetUserInfo(UpdateUser):
    id: int
    uid: str
    avatar: Optional[str] = None
    time_joined: datetime.datetime = None
    last_login: Optional[datetime.datetime] = None
    is_superuser: bool
    is_active: bool

    class Config:
        orm_mode = True


class DeleteUser(BaseModel):
    id: int


class ResetPassword(BaseModel):
    code: str
    password1: str
    password2: str

/Users/song/codelearn/sqlalchemy_learn/app/schemas/email.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import datetime
from typing import Optional

from pydantic import BaseModel,  EmailStr


class CreateEmail(BaseModel):
    account: str
    pwd: str
    user_id: int

/Users/song/codelearn/sqlalchemy_learn/app/common/log.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os

from loguru import logger
from app.core import path_conf


class Logger:

    @staticmethod
    def log() -> logger:
        if not os.path.exists(path_conf.LogPath):
            os.mkdir(path_conf.LogPath)

        # 日志文件
        log_file = os.path.join(path_conf.LogPath, "FastBlog.log")

        # loguru日志
        # more: https://github.com/Delgan/loguru#ready-to-use-out-of-the-box-without-boilerplate
        logger.add(
            log_file,
            encoding='utf-8',
            level="DEBUG",
            rotation='00:00',  # 每天 0 点创建一个新日志文件
            retention="7 days",  # 定时自动清理文件
            enqueue=True,  # 异步安全
            backtrace=True,  # 错误跟踪
            diagnose=True,
        )

        return logger


log = Logger().log()

/Users/song/codelearn/sqlalchemy_learn/app/crud/crud_user.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Optional, NoReturn

from sqlalchemy import func, select, update, delete, desc
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.sql import Select

from app.models import User
from app.schemas.user import CreateUser, DeleteUser, UpdateUser


async def get_user_by_id(db: AsyncSession, user_id: int) -> Optional[User]:
    user = await db.execute(select(User).where(User.id == user_id))
    return user.scalars().first()


async def get_user_by_username(db: AsyncSession, username: str) -> Optional[User]:
    user = await db.execute(select(User).where(User.name == username))
    return user.scalars().first()


async def update_user_login_time(db: AsyncSession, username: str) -> int:
    user = await db.execute(
        update(User)
        .where(User.name == username)
        .values(last_login=func.now())
    )
    return user.rowcount


async def get_email_by_username(db: AsyncSession, username: str) -> str:
    user = await get_user_by_username(db, username)
    return user.email


async def get_username_by_email(db: AsyncSession, email: str) -> str:
    user = await db.execute(select(User).where(User.email == email))
    return user.scalars().first().name


async def get_avatar_by_username(db: AsyncSession, username: str) -> str:
    user = await db.execute(select(User).where(User.name == username))
    return user.scalars().first().avatar


async def create_user(db: AsyncSession, create: CreateUser) -> NoReturn:
    create.password = jwt.get_hash_password(create.password)
    new_user = User(**create.dict())
    db.add(new_user)


async def update_userinfo(db: AsyncSession, current_user: User, obj: UpdateUser) -> int:
    user = await db.execute(
        update(User)
        .where(User.id == current_user.id)
        .values(**obj.dict())
    )
    return user.rowcount


async def update_avatar(db: AsyncSession, current_user: User, avatar: str) -> int:
    user = await db.execute(
        update(User)
        .where(User.id == current_user.id)
        .values(avatar=avatar)
    )
    return user.rowcount


async def delete_user(db: AsyncSession, user_id: DeleteUser) -> int:
    user = await db.execute(delete(User).where(User.id == user_id))
    return user.rowcount


async def check_email(db: AsyncSession, email: str) -> User:
    mail = await db.execute(select(User).where(User.email == email))
    return mail.scalars().first()


async def delete_avatar(db: AsyncSession, user_id: int) -> int:
    user = await db.execute(
        update(User)
        .where(User.id == user_id)
        .values(avatar=None)
    )
    return user.rowcount


async def reset_password(db: AsyncSession, username: str, password: str) -> int:
    user = await db.execute(
        update(User)
        .where(User.name == username)
        .values(password=jwt.get_hash_password(password))
    )
    return user.rowcount


def get_users() -> Select:
    return select(User).order_by(desc(User.time_joined))


async def get_user_is_super(db: AsyncSession, user_id: int) -> bool:
    user = await get_user_by_id(db, user_id)
    return user.is_superuser


async def get_user_is_active(db: AsyncSession, user_id: int) -> bool:
    user = await get_user_by_id(db, user_id)
    return user.is_active


async def super_set(db: AsyncSession, user_id: int) -> int:
    super_status = await get_user_is_super(db, user_id)
    user = await db.execute(
        update(User)
        .where(User.id == user_id)
        .values(is_superuser=False if super_status else True)
    )
    return user.rowcount


async def active_set(db: AsyncSession, user_id: int) -> int:
    active_status = await get_user_is_active(db, user_id)
    user = await db.execute(
        update(User)
        .where(User.id == user_id)
        .values(is_active=False if active_status else True)
    )
    return user.rowcount

/Users/song/codelearn/sqlalchemy_learn/alembic/env.py

from app.models import MappedBase
from logging.config import fileConfig

from sqlalchemy import engine_from_config
from sqlalchemy import pool

from alembic import context

# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config

# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)

# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = MappedBase.metadata


# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.


def run_migrations_offline():
    """Run migrations in 'offline' mode.

    This configures the context with just a URL
    and not an Engine, though an Engine is acceptable
    here as well.  By skipping the Engine creation
    we don't even need a DBAPI to be available.

    Calls to context.execute() here emit the given string to the
    script output.

    """
    url = config.get_main_option("sqlalchemy.url")
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
    )

    with context.begin_transaction():
        context.run_migrations()


def run_migrations_online():
    """Run migrations in 'online' mode.

    In this scenario we need to create an Engine
    and associate a connection with the context.

    """
    connectable = engine_from_config(
        config.get_section(config.config_ini_section),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )

    with connectable.connect() as connection:
        context.configure(
            connection=connection, target_metadata=target_metadata
        )

        with context.begin_transaction():
            context.run_migrations()


if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()

/Users/song/codelearn/sqlalchemy_learn/alembic/versions/3ec04987db9e_create_user_and_email_table.py

"""create user and email table

Revision ID: 3ec04987db9e
Revises: 
Create Date: 2023-03-25 18:11:20.807740

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '3ec04987db9e'
down_revision = None
branch_labels = None
depends_on = None


def upgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.create_table('user',
    sa.Column('id', sa.Integer(), nullable=False),
    sa.Column('name', sa.String(length=20), nullable=False, comment='用户名'),
    sa.PrimaryKeyConstraint('id')
    )
    op.create_index(op.f('ix_user_id'), 'user', ['id'], unique=False)
    op.create_index(op.f('ix_user_name'), 'user', ['name'], unique=True)
    op.create_table('email',
    sa.Column('id', sa.Integer(), nullable=False),
    sa.Column('account', sa.String(length=20), nullable=False, comment='email account'),
    sa.Column('pwd', sa.String(length=20), nullable=False, comment='email password'),
    sa.Column('user_id', sa.Integer(), nullable=True, comment='the email owner id'),
    sa.ForeignKeyConstraint(['user_id'], ['user.id'], ),
    sa.PrimaryKeyConstraint('id')
    )
    op.create_index(op.f('ix_email_account'), 'email', ['account'], unique=True)
    op.create_index(op.f('ix_email_id'), 'email', ['id'], unique=False)
    op.create_index(op.f('ix_email_pwd'), 'email', ['pwd'], unique=True)
    # ### end Alembic commands ###


def downgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.drop_index(op.f('ix_email_pwd'), table_name='email')
    op.drop_index(op.f('ix_email_id'), table_name='email')
    op.drop_index(op.f('ix_email_account'), table_name='email')
    op.drop_table('email')
    op.drop_index(op.f('ix_user_name'), table_name='user')
    op.drop_index(op.f('ix_user_id'), table_name='user')
    op.drop_table('user')
    # ### end Alembic commands ###

/Users/song/codelearn/sqlalchemy_learn/alembic/versions/896025c8be4b_rename_user_table_2_tuser_table_email_2_.py

"""rename user table 2 tUser table,email 2 tEmail table

Revision ID: 896025c8be4b
Revises: 3ec04987db9e
Create Date: 2023-03-25 19:51:26.049487

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '896025c8be4b'
down_revision = '3ec04987db9e'
branch_labels = None
depends_on = None


def upgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.create_table('tuser',
    sa.Column('id', sa.Integer(), nullable=False),
    sa.Column('name', sa.String(length=20), nullable=False, comment='用户名'),
    sa.PrimaryKeyConstraint('id')
    )
    op.create_index(op.f('ix_tuser_id'), 'tuser', ['id'], unique=False)
    op.create_index(op.f('ix_tuser_name'), 'tuser', ['name'], unique=True)
    op.create_table('temail',
    sa.Column('id', sa.Integer(), nullable=False),
    sa.Column('account', sa.String(length=20), nullable=False, comment='email account'),
    sa.Column('pwd', sa.String(length=20), nullable=False, comment='email password'),
    sa.Column('user_id', sa.Integer(), nullable=True, comment='the email owner id'),
    sa.ForeignKeyConstraint(['user_id'], ['tuser.id'], ),
    sa.PrimaryKeyConstraint('id')
    )
    op.create_index(op.f('ix_temail_account'), 'temail', ['account'], unique=True)
    op.create_index(op.f('ix_temail_id'), 'temail', ['id'], unique=False)
    op.create_index(op.f('ix_temail_pwd'), 'temail', ['pwd'], unique=True)
    op.drop_index('ix_email_account', table_name='email')
    op.drop_index('ix_email_id', table_name='email')
    op.drop_index('ix_email_pwd', table_name='email')
    op.drop_table('email')
    op.drop_index('ix_user_id', table_name='user')
    op.drop_index('ix_user_name', table_name='user')
    op.drop_table('user')
    # ### end Alembic commands ###


def downgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.create_table('user',
    sa.Column('id', sa.INTEGER(), nullable=False),
    sa.Column('name', sa.VARCHAR(length=20), nullable=False),
    sa.PrimaryKeyConstraint('id')
    )
    op.create_index('ix_user_name', 'user', ['name'], unique=False)
    op.create_index('ix_user_id', 'user', ['id'], unique=False)
    op.create_table('email',
    sa.Column('id', sa.INTEGER(), nullable=False),
    sa.Column('account', sa.VARCHAR(length=20), nullable=False),
    sa.Column('pwd', sa.VARCHAR(length=20), nullable=False),
    sa.Column('user_id', sa.INTEGER(), nullable=True),
    sa.ForeignKeyConstraint(['user_id'], ['user.id'], ),
    sa.PrimaryKeyConstraint('id')
    )
    op.create_index('ix_email_pwd', 'email', ['pwd'], unique=False)
    op.create_index('ix_email_id', 'email', ['id'], unique=False)
    op.create_index('ix_email_account', 'email', ['account'], unique=False)
    op.drop_index(op.f('ix_temail_pwd'), table_name='temail')
    op.drop_index(op.f('ix_temail_id'), table_name='temail')
    op.drop_index(op.f('ix_temail_account'), table_name='temail')
    op.drop_table('temail')
    op.drop_index(op.f('ix_tuser_name'), table_name='tuser')
    op.drop_index(op.f('ix_tuser_id'), table_name='tuser')
    op.drop_table('tuser')
    # ### end Alembic commands ###

标签:sqlite,sqlalchemy,res,db,email,user,learn,async,id
From: https://www.cnblogs.com/zhuoss/p/17263232.html

相关文章