/Users/song/codelearn/sqlalchemy_learn/init_test_data.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import asyncio
import random
from faker import Faker
from loguru import logger
from sqlalchemy import text, select, insert, update, and_, or_, between
from app.database.db_mysql import async_db_session
from app.models import User
from app.models import Email
from app.schemas.user import CreateUser
from app.schemas.email import CreateEmail
class InitData:
""" 初始化数据 """
def __init__(self):
self.fake = Faker('zh_CN')
async def fake_user(self):
""" 自动创建锁定普通用户 """
username = self.fake.user_name()
user_obj = CreateUser(
name=username,
)
# 2023-03-25 18:55:32.325 | DEBUG | __main__:fake_user:32 - name='qinxia'
# 2023-03-25 18:55:32.326 | DEBUG | __main__:fake_user:33 - {'name': 'qinxia'}
logger.debug(user_obj)
logger.debug(user_obj.dict())
async with async_db_session.begin() as db:
await db.execute(
# 方式一:
# insert(User).values(name='bruce')
# 方式二:
# **user_obj.dict()可以将 {'name': 'qinxia'}解包成 ----------> name = 'qinxia'
insert(User).values(**user_obj.dict())
)
await db.commit()
async def fake_email(self):
""" 自动创建普通用户 """
email_obj = CreateEmail(
pwd=self.fake.password(),
account=self.fake.email(),
user_id=random.randint(1, 4)
)
logger.debug(email_obj)
async with async_db_session.begin() as db:
await db.execute(
insert(Email).values(**email_obj.dict())
)
await db.commit()
async def get_username(self):
async with async_db_session.begin() as db:
stmt = "select username from sys_user"
sql_res = await db.execute(
text(stmt)
)
# => [('alice',), ('fang06',), ('jiajuan',), ('juan33',), ('min40',), ('min94',), ('otian',), ('song',), ('xiulanmo',), ('xiuying38',)]
# res = sql_res.fetchall()
# =>alice
# res = sql_res.scalar()
# => [('alice',), ('fang06',), ('jiajuan',), ('juan33',), ('min40',), ('min94',), ('otian',), ('song',), ('xiulanmo',), ('xiuying38',)]
# res = sql_res.all()
# => [('alice',), ('fang06',)]
# res = sql_res.fetchmany(2)
# => ('alice',)
# res = sql_res.fetchone()
# => ('alice',)
# res = sql_res.first()
# res = sql_res.freeze()
# => [('alice',), ('fang06',), ('jiajuan',), ('juan33',), ('min40',), ('min94',), ('otian',), ('song',), ('xiulanmo',), ('xiuying38',)]
# logger.debug(res.data)
# => RMKeyView(['username'])
# res = sql_res.keys()
# res = sql_res.mappings()
# => [{'username': 'alice'}, {'username': 'fang06'}, {'username': 'jiajuan'}, {'username': 'juan33'}, {'username': 'min40'}, {'username': 'min94'}, {'username': 'otian'}, {'username': 'song'}, {'username': 'xiulanmo'}, {'username': 'xiuying38'}]
# logger.debug(res.all())
# => alice
# res = sql_res.scalar()
# logger.debug(res)
async def get_username_and_email(self):
async with async_db_session.begin() as db:
stmt = "select username,email from sys_user"
sql_res = await db.execute(
text(stmt)
)
# res = sql_res.mappings()
# => [{'username': 'song', 'email': 'song@qq.com'}, {'username': 'juan33', 'email': 'juanxiong@example.com'}, {'username': 'otian', 'email': 'xiaxiong@example.net'}, {'username': 'min40', 'email': 'xfang@example.net'}, {'username': 'xiuying38', 'email': 'xiulanlong@example.net'}, {'username': 'alice', 'email': 'alice@qq.com'}, {'username': 'min94', 'email': 'xiulanhou@example.com'}, {'username': 'xiulanmo', 'email': 'nashi@example.com'}, {'username': 'fang06', 'email': 'ping56@example.net'}, {'username': 'jiajuan', 'email': 'wei24@example.net'}]
# res = sql_res.first()
# logger.debug(res._mapping)
# 2023-03-25 16:36:47.853 | DEBUG | __main__:get_username_and_email:130 - {'username': 'song', 'email': 'song@qq.com'}
res = sql_res.first()
logger.debug(res._mapping)
async def select_username(self):
stmt = "select * from user"
async with async_db_session.begin() as db:
sql_res = await db.execute(
text(stmt)
)
res = sql_res.first()
logger.debug(res._mapping)
async def select_username2(self):
stmt = text("select * from tuser where name=:name")
async with async_db_session.begin() as db:
sql_res = await db.execute(
stmt, {'name': 'yuxia'}
)
res = sql_res.first()
logger.debug(res._mapping)
async def select_username3(self):
async with async_db_session.begin() as db:
sql_res = await db.execute(
# 使用and_,连接两个条件
# select(User.name, User.id).where(and_(User.name == 'dcai', User.id > 9))
select(User.name, User.id).where(and_(User.name == 'dcai', User.id > 1))
)
res = sql_res.first()
if not res:
return None
logger.debug(res._mapping)
async def select_username4(self):
async with async_db_session.begin() as db:
sql_res = await db.execute(
# 使用or_,连接两个条件
select(User.name, User.id).where(or_(User.name == 'dca', User.id == 5))
)
res = sql_res.first()
if not res:
return None
logger.debug(res._mapping)
async def select_username5(self):
async with async_db_session.begin() as db:
sql_res = await db.execute(
# 使用between,范围条件
select(User.name, User.id).where(User.id.between(2, 3))
)
# 获取所有的row的对象形式
res = sql_res.mappings().all()
if not res:
return None
logger.debug(res)
# 2023-03-27 21:42:05.195 | DEBUG | __main__:select_username5:181 - [{'name': 'yuxia', 'id': 2}, {'name': 'xia43', 'id': 3}]
async def select_username6(self):
async with async_db_session.begin() as db:
sql_res = await db.execute(
# 使用between,范围条件
select(User.name, User.id).where(User.id.between(2, 3))
)
res = sql_res.mappings().all()
if not res:
return None
logger.debug(res)
async def select_username7(self):
async with async_db_session.begin() as db:
sql_res = await db.execute(
# 使用in_,范围条件
select(User.name, User.id).where(User.name.in_(['yuxia', 'dcai']))
)
res = sql_res.mappings().all()
if not res:
return None
logger.debug(res)
# 2023-03-27 21:48:47.785 | DEBUG | __main__:select_username7:203 - [{'name': 'dcai', 'id': 5}, {'name': 'yuxia', 'id': 2}]
async def insert_email(self):
async with async_db_session.begin() as db:
await db.execute(
# 插入数据
insert(Email).values(
account='song@qq.com',
pwd='123456',
user_id=1
)
)
await db.commit()
async def insert_email2(self):
async with async_db_session.begin() as db:
# 批量插入数据
await db.execute(
insert(Email).values([
{"account": "song1@qq.com", "pwd": "1234568", "user_id": 2},
{"account": "song2@qq.com", "pwd": "1234569", "user_id": 2},
])
)
await db.commit()
async def init_data(self):
""" 自动创建数据 """
# await self.fake_user()
# await self.fake_email()
# await self.select_username7()
# await self.get_username()
# await self.get_username_and_email()
await self.insert_email2()
if __name__ == '__main__':
init = InitData()
loop = asyncio.get_event_loop()
loop.run_until_complete(init.init_data())
/Users/song/codelearn/sqlalchemy_learn/app/database/base_class.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import uuid
from datetime import datetime
from typing import Optional
from sqlalchemy import func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, declared_attr, MappedAsDataclass
from typing_extensions import Annotated
# 通用 Mapped 类型主键, 需手动添加,参考以下使用方式
# MappedBase -> id: Mapped[id_key]
# DataClassBase && Base -> id: Mapped[id_key] = mapped_column(init=False)
id_key = Annotated[int, mapped_column(primary_key=True, index=True, autoincrement=True, comment='主键id')]
class _BaseMixin:
"""
Mixin 数据类
Mixin: 一种面向对象编程概念, 使结构变得更加清晰, `Wiki <https://en.wikipedia.org/wiki/Mixin/>`__
"""
# create_user: Mapped[int] = mapped_column(comment='创建者')
# update_user: Mapped[Optional[int]] = mapped_column(default=None, comment='修改者')
created_time: Mapped[datetime] = mapped_column(init=False, default=func.now(), comment='创建时间')
updated_time: Mapped[Optional[datetime]] = mapped_column(init=False, onupdate=func.now(), comment='更新时间')
class MappedBase(DeclarativeBase):
"""
声明性基类, 原始 DeclarativeBase 类, 作为所有基类或数据模型类的父类而存在
`DeclarativeBase <https://docs.sqlalchemy.org/en/20/orm/declarative_config.html>`__
`mapped_column() <https://docs.sqlalchemy.org/en/20/orm/mapping_api.html#sqlalchemy.orm.mapped_column>`__
"""
@declared_attr.directive
def __tablename__(cls) -> str: # noqa
return cls.__name__.lower()
class DataClassBase(MappedAsDataclass, MappedBase):
"""
声明性数据类基类, 它将带有数据类集成, 允许使用更高级配置, 但你必须注意它的一些特性, 尤其是和 DeclarativeBase 一起使用时
`MappedAsDataclass <https://docs.sqlalchemy.org/en/20/orm/dataclasses.html#orm-declarative-native-dataclasses>`__
"""
__abstract__ = True
class Base(_BaseMixin, MappedAsDataclass, MappedBase):
"""
声明性 Mixin 数据类基类, 带有数据类集成, 并包含 MiXin 数据类基础表结构, 你可以简单的理解它为含有基础表结构的数据类基类
"""
__abstract__ = True
def use_uuid() -> str:
"""
使用uuid
:return:
"""
return uuid.uuid4().hex
/Users/song/codelearn/sqlalchemy_learn/app/database/db_mysql.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from ..common.log import log
from ..core.conf import settings
from ..core.path_conf import SqlitePath
from ..database.base_class import MappedBase
"""
说明:SqlAlchemy
"""
SQLALCHEMY_DATABASE_URL = f'sqlite+aiosqlite:///{SqlitePath}/test.db'
try:
# 数据库引擎
async_engine = create_async_engine(SQLALCHEMY_DATABASE_URL, echo=settings.DB_ECHO, future=True)
# log.success('数据库连接成功')
except Exception as e:
log.error('❌ 数据库链接失败 {}', e)
sys.exit()
else:
async_db_session = async_sessionmaker(bind=async_engine, autoflush=False, expire_on_commit=False)
async def get_db() -> AsyncSession:
"""
session 生成器
:return:
"""
session = async_db_session()
try:
yield session
except Exception as se:
await session.rollback()
raise se
finally:
await session.close()
async def create_table():
"""
创建数据库表
"""
async with async_engine.begin() as coon:
await coon.run_sync(MappedBase.metadata.create_all)
/Users/song/codelearn/sqlalchemy_learn/app/core/path_conf.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
from pathlib import Path
# 获取项目根目录
# 或使用绝对路径,指到backend目录为止,例如windows:BasePath = D:\git_project\fastapi_mysql\backend
BasePath = Path(__file__).resolve().parent.parent
# 迁移文件存放路径
Versions = os.path.join(BasePath, 'app', 'alembic', 'versions')
# 日志文件路径
LogPath = os.path.join(BasePath.parent, 'log')
# sqlite的路径
SqlitePath = os.path.join(BasePath.parent, 'sqlite_db')
/Users/song/codelearn/sqlalchemy_learn/app/core/conf.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from functools import lru_cache
from pydantic import BaseSettings
class Settings(BaseSettings):
""" 配置类 """
# FastAPI
TITLE: str = 'FastAPI'
VERSION: str = 'v0.0.1'
DESCRIPTION: str = """fastapi_sqlalchemy_mysql"""
DOCS_URL: str = '/v1/docs'
REDOCS_URL: str = None
OPENAPI_URL: str = '/v1/openapi'
# Uvicorn
UVICORN_HOST: str = '127.0.0.1'
UVICORN_PORT: int = 8000
UVICORN_RELOAD: bool = True
# 如果此处为True,在 @app.on_event("startup") 时发生异常,则程序不会终止,详情:https://github.com/encode/starlette/issues/486
# Static Server
STATIC_FILES: bool = True
# DB
DB_ECHO: bool = False
DB_HOST: str = '127.0.0.1'
DB_PORT: int = 3306
DB_USER: str = 'root'
DB_PASSWORD: str = '123456'
DB_DATABASE: str = 'fsm'
DB_CHARSET: str = 'utf8mb4'
# Redis
REDIS_OPEN: bool = False
REDIS_HOST: str = '127.0.0.1'
REDIS_PORT: int = 6379
REDIS_PASSWORD: str = ''
REDIS_DATABASE: int = 0
REDIS_TIMEOUT: int = 5
# Token
TOKEN_ALGORITHM: str = 'HS256' # 算法
TOKEN_SECRET_KEY: str = '1VkVF75nsNABBjK_7-qz7GtzNy3AMvktc9TCPwKczCk' # 密钥 secrets.token_urlsafe(32))
TOKEN_EXPIRE_MINUTES: int = 60 * 24 * 1 # token 时效 60 * 24 * 1 = 1 天
# Email
EMAIL_DESCRIPTION: str = 'fastapi_sqlalchemy_mysql' # 默认发件说明
EMAIL_SERVER: str = 'smtp.qq.com'
EMAIL_PORT: int = 465
EMAIL_USER: str = '729519678@qq.com'
EMAIL_PASSWORD: str = 'gmrvkkppberzbega' # 授权密码,非邮箱密码
EMAIL_SSL: bool = True
# 邮箱登录验证码过期时间
EMAIL_LOGIN_CODE_MAX_AGE: int = 60 * 2 # 时效 60 * 2 = 2 分钟
# Cookies
COOKIES_MAX_AGE: int = 60 * 5 # cookies 时效 60 * 5 = 5 分钟
# Middleware
MIDDLEWARE_CORS: bool = True
MIDDLEWARE_GZIP: bool = True
MIDDLEWARE_ACCESS: bool = False
@lru_cache
def get_settings():
""" 读取配置优化写法 """
return Settings()
settings = get_settings()
/Users/song/codelearn/sqlalchemy_learn/app/core/__init__.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
/Users/song/codelearn/sqlalchemy_learn/app/models/user.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from datetime import datetime
from typing import Optional, List
from sqlalchemy import func, String, ForeignKey
from sqlalchemy.dialects.mysql import LONGTEXT
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database.base_class import use_uuid, id_key, DataClassBase
class User(DataClassBase):
""" User Table """
__tablename__ = 'tuser'
id: Mapped[id_key] = mapped_column(init=False)
name: Mapped[str] = mapped_column(String(20), unique=True, index=True, comment='用户名')
emails: Mapped[List['Email']] = relationship('Email', back_populates='user', cascade='all,delete-orphan')
/Users/song/codelearn/sqlalchemy_learn/app/models/__init__.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
# 导入所有模型,并将 Base 放在最前面, 以便 Base 拥有它们
# imported by Alembic
"""
from app.database.base_class import MappedBase #
from app.models.user import User
from app.models.email import Email
/Users/song/codelearn/sqlalchemy_learn/app/models/email.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from datetime import datetime
from typing import List, Optional
from sqlalchemy import ForeignKey, func, String
from sqlalchemy.dialects.mysql import LONGTEXT
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database.base_class import id_key, DataClassBase
class Email(DataClassBase):
""" Email Table """
__tablename__ = 'temail'
id: Mapped[id_key] = mapped_column(init=False)
account: Mapped[str] = mapped_column(String(20), unique=True, index=True, comment='email account')
pwd: Mapped[str] = mapped_column(String(20), unique=True, index=True, comment='email password')
user_id: Mapped[int] = mapped_column(ForeignKey('tuser.id'), nullable=True, comment='the email owner id')
# User 表示是的定义表的那个类,不是表名,特别是表名和类名一样的时候,特别容易弄错,所以首字母需要大写
user: Mapped['User'] = relationship('User', back_populates='emails')
/Users/song/codelearn/sqlalchemy_learn/app/schemas/user.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import datetime
from typing import Optional
from pydantic import BaseModel, EmailStr
class CreateUser(BaseModel):
name: str
class Auth(BaseModel):
username: str
password: str
class ELCode(BaseModel):
email: EmailStr
class Auth2(ELCode):
code: str
class UpdateUser(BaseModel):
username: str
email: str
mobile_number: Optional[str] = None
wechat: Optional[str] = None
qq: Optional[str] = None
blog_address: Optional[str] = None
introduction: Optional[str] = None
class GetUserInfo(UpdateUser):
id: int
uid: str
avatar: Optional[str] = None
time_joined: datetime.datetime = None
last_login: Optional[datetime.datetime] = None
is_superuser: bool
is_active: bool
class Config:
orm_mode = True
class DeleteUser(BaseModel):
id: int
class ResetPassword(BaseModel):
code: str
password1: str
password2: str
/Users/song/codelearn/sqlalchemy_learn/app/schemas/email.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import datetime
from typing import Optional
from pydantic import BaseModel, EmailStr
class CreateEmail(BaseModel):
account: str
pwd: str
user_id: int
/Users/song/codelearn/sqlalchemy_learn/app/common/log.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
from loguru import logger
from app.core import path_conf
class Logger:
@staticmethod
def log() -> logger:
if not os.path.exists(path_conf.LogPath):
os.mkdir(path_conf.LogPath)
# 日志文件
log_file = os.path.join(path_conf.LogPath, "FastBlog.log")
# loguru日志
# more: https://github.com/Delgan/loguru#ready-to-use-out-of-the-box-without-boilerplate
logger.add(
log_file,
encoding='utf-8',
level="DEBUG",
rotation='00:00', # 每天 0 点创建一个新日志文件
retention="7 days", # 定时自动清理文件
enqueue=True, # 异步安全
backtrace=True, # 错误跟踪
diagnose=True,
)
return logger
log = Logger().log()
/Users/song/codelearn/sqlalchemy_learn/app/crud/crud_user.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Optional, NoReturn
from sqlalchemy import func, select, update, delete, desc
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.sql import Select
from app.models import User
from app.schemas.user import CreateUser, DeleteUser, UpdateUser
async def get_user_by_id(db: AsyncSession, user_id: int) -> Optional[User]:
user = await db.execute(select(User).where(User.id == user_id))
return user.scalars().first()
async def get_user_by_username(db: AsyncSession, username: str) -> Optional[User]:
user = await db.execute(select(User).where(User.name == username))
return user.scalars().first()
async def update_user_login_time(db: AsyncSession, username: str) -> int:
user = await db.execute(
update(User)
.where(User.name == username)
.values(last_login=func.now())
)
return user.rowcount
async def get_email_by_username(db: AsyncSession, username: str) -> str:
user = await get_user_by_username(db, username)
return user.email
async def get_username_by_email(db: AsyncSession, email: str) -> str:
user = await db.execute(select(User).where(User.email == email))
return user.scalars().first().name
async def get_avatar_by_username(db: AsyncSession, username: str) -> str:
user = await db.execute(select(User).where(User.name == username))
return user.scalars().first().avatar
async def create_user(db: AsyncSession, create: CreateUser) -> NoReturn:
create.password = jwt.get_hash_password(create.password)
new_user = User(**create.dict())
db.add(new_user)
async def update_userinfo(db: AsyncSession, current_user: User, obj: UpdateUser) -> int:
user = await db.execute(
update(User)
.where(User.id == current_user.id)
.values(**obj.dict())
)
return user.rowcount
async def update_avatar(db: AsyncSession, current_user: User, avatar: str) -> int:
user = await db.execute(
update(User)
.where(User.id == current_user.id)
.values(avatar=avatar)
)
return user.rowcount
async def delete_user(db: AsyncSession, user_id: DeleteUser) -> int:
user = await db.execute(delete(User).where(User.id == user_id))
return user.rowcount
async def check_email(db: AsyncSession, email: str) -> User:
mail = await db.execute(select(User).where(User.email == email))
return mail.scalars().first()
async def delete_avatar(db: AsyncSession, user_id: int) -> int:
user = await db.execute(
update(User)
.where(User.id == user_id)
.values(avatar=None)
)
return user.rowcount
async def reset_password(db: AsyncSession, username: str, password: str) -> int:
user = await db.execute(
update(User)
.where(User.name == username)
.values(password=jwt.get_hash_password(password))
)
return user.rowcount
def get_users() -> Select:
return select(User).order_by(desc(User.time_joined))
async def get_user_is_super(db: AsyncSession, user_id: int) -> bool:
user = await get_user_by_id(db, user_id)
return user.is_superuser
async def get_user_is_active(db: AsyncSession, user_id: int) -> bool:
user = await get_user_by_id(db, user_id)
return user.is_active
async def super_set(db: AsyncSession, user_id: int) -> int:
super_status = await get_user_is_super(db, user_id)
user = await db.execute(
update(User)
.where(User.id == user_id)
.values(is_superuser=False if super_status else True)
)
return user.rowcount
async def active_set(db: AsyncSession, user_id: int) -> int:
active_status = await get_user_is_active(db, user_id)
user = await db.execute(
update(User)
.where(User.id == user_id)
.values(is_active=False if active_status else True)
)
return user.rowcount
/Users/song/codelearn/sqlalchemy_learn/alembic/env.py
from app.models import MappedBase
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = MappedBase.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
/Users/song/codelearn/sqlalchemy_learn/alembic/versions/3ec04987db9e_create_user_and_email_table.py
"""create user and email table
Revision ID: 3ec04987db9e
Revises:
Create Date: 2023-03-25 18:11:20.807740
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '3ec04987db9e'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('user',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(length=20), nullable=False, comment='用户名'),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_user_id'), 'user', ['id'], unique=False)
op.create_index(op.f('ix_user_name'), 'user', ['name'], unique=True)
op.create_table('email',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('account', sa.String(length=20), nullable=False, comment='email account'),
sa.Column('pwd', sa.String(length=20), nullable=False, comment='email password'),
sa.Column('user_id', sa.Integer(), nullable=True, comment='the email owner id'),
sa.ForeignKeyConstraint(['user_id'], ['user.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_email_account'), 'email', ['account'], unique=True)
op.create_index(op.f('ix_email_id'), 'email', ['id'], unique=False)
op.create_index(op.f('ix_email_pwd'), 'email', ['pwd'], unique=True)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_email_pwd'), table_name='email')
op.drop_index(op.f('ix_email_id'), table_name='email')
op.drop_index(op.f('ix_email_account'), table_name='email')
op.drop_table('email')
op.drop_index(op.f('ix_user_name'), table_name='user')
op.drop_index(op.f('ix_user_id'), table_name='user')
op.drop_table('user')
# ### end Alembic commands ###
/Users/song/codelearn/sqlalchemy_learn/alembic/versions/896025c8be4b_rename_user_table_2_tuser_table_email_2_.py
"""rename user table 2 tUser table,email 2 tEmail table
Revision ID: 896025c8be4b
Revises: 3ec04987db9e
Create Date: 2023-03-25 19:51:26.049487
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '896025c8be4b'
down_revision = '3ec04987db9e'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('tuser',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(length=20), nullable=False, comment='用户名'),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_tuser_id'), 'tuser', ['id'], unique=False)
op.create_index(op.f('ix_tuser_name'), 'tuser', ['name'], unique=True)
op.create_table('temail',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('account', sa.String(length=20), nullable=False, comment='email account'),
sa.Column('pwd', sa.String(length=20), nullable=False, comment='email password'),
sa.Column('user_id', sa.Integer(), nullable=True, comment='the email owner id'),
sa.ForeignKeyConstraint(['user_id'], ['tuser.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_temail_account'), 'temail', ['account'], unique=True)
op.create_index(op.f('ix_temail_id'), 'temail', ['id'], unique=False)
op.create_index(op.f('ix_temail_pwd'), 'temail', ['pwd'], unique=True)
op.drop_index('ix_email_account', table_name='email')
op.drop_index('ix_email_id', table_name='email')
op.drop_index('ix_email_pwd', table_name='email')
op.drop_table('email')
op.drop_index('ix_user_id', table_name='user')
op.drop_index('ix_user_name', table_name='user')
op.drop_table('user')
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('user',
sa.Column('id', sa.INTEGER(), nullable=False),
sa.Column('name', sa.VARCHAR(length=20), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_index('ix_user_name', 'user', ['name'], unique=False)
op.create_index('ix_user_id', 'user', ['id'], unique=False)
op.create_table('email',
sa.Column('id', sa.INTEGER(), nullable=False),
sa.Column('account', sa.VARCHAR(length=20), nullable=False),
sa.Column('pwd', sa.VARCHAR(length=20), nullable=False),
sa.Column('user_id', sa.INTEGER(), nullable=True),
sa.ForeignKeyConstraint(['user_id'], ['user.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index('ix_email_pwd', 'email', ['pwd'], unique=False)
op.create_index('ix_email_id', 'email', ['id'], unique=False)
op.create_index('ix_email_account', 'email', ['account'], unique=False)
op.drop_index(op.f('ix_temail_pwd'), table_name='temail')
op.drop_index(op.f('ix_temail_id'), table_name='temail')
op.drop_index(op.f('ix_temail_account'), table_name='temail')
op.drop_table('temail')
op.drop_index(op.f('ix_tuser_name'), table_name='tuser')
op.drop_index(op.f('ix_tuser_id'), table_name='tuser')
op.drop_table('tuser')
# ### end Alembic commands ###
标签:sqlite,sqlalchemy,res,db,email,user,learn,async,id
From: https://www.cnblogs.com/zhuoss/p/17263232.html