首页 > 数据库 >SQLAlchemy+aiomysql

SQLAlchemy+aiomysql

时间:2022-12-13 14:48:13浏览次数:56  
标签:engine __ SQLAlchemy name mysql sqlalchemy import aiomysql

1、安装模板

pip install aiomysql
pip install sqlalchemy

2、engine核心

2.1、初始化数据库

# -*- coding: utf-8 -*-


import asyncio

from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import declarative_base

from sqlalchemy import Column
from sqlalchemy import BigInteger, String

Base = declarative_base()


class User(Base):
    __tablename__ = 't_user'
    id = Column(BigInteger, primary_key=True, autoincrement=True)
    username = Column(String(length=6), unique=True, comment='名字')


async def init_db():
    mysql_username = 'test'
    mysql_password = 'test'
    mysql_ip = '192.168.10.8'
    mysql_port = '3306'
    db_name = 'tcp_db'

    engine = create_async_engine(
        "mysql+aiomysql://{username}:{password}@{ip}:{port}/{db_name}?charset=utf8".format(
            username=mysql_username,
            password=mysql_password,
            ip=mysql_ip,
            port=mysql_port,
            db_name=db_name
        ),
        echo=True,
    )

    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)  # 删除所有表
        await conn.run_sync(Base.metadata.create_all)  # 创建所有表

    await engine.dispose()


if __name__ == '__main__':
    asyncio.run(init_db())

2.2、插入数据 

# -*- coding: utf-8 -*-

import asyncio

from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import declarative_base

from sqlalchemy import Column,insert
from sqlalchemy import BigInteger, String

Base = declarative_base()


class User(Base):
    __tablename__ = 't_user'
    id = Column(BigInteger, primary_key=True, autoincrement=True)
    username = Column(String(length=6), unique=True, comment='名字')


async def insert_data():
    mysql_username = 'test'
    mysql_password = 'test'
    mysql_ip = '192.168.10.8'
    mysql_port = '3306'
    db_name = 'tcp_db'

    engine = create_async_engine(
        "mysql+aiomysql://{username}:{password}@{ip}:{port}/{db_name}?charset=utf8".format(
            username=mysql_username,
            password=mysql_password,
            ip=mysql_ip,
            port=mysql_port,
            db_name=db_name
        ),
        echo=True,
    )

    async with engine.begin() as conn:
        await conn.execute(
            insert(User), [{"username": "some-1"}, {"username": "some-2"}]
        )

    await engine.dispose()


if __name__ == '__main__':
    asyncio.run(insert_data())

2.3、查询数据

# -*- coding: utf-8 -*-

import asyncio

from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.future import select
from sqlalchemy.orm import declarative_base

from sqlalchemy import Column, insert
from sqlalchemy import BigInteger, String

Base = declarative_base()

class User(Base):
    __tablename__ = 't_user'
    id = Column(BigInteger, primary_key=True, autoincrement=True)
    username = Column(String(length=6), unique=True, comment='名字')


async def select_data():
    mysql_username = 'test'
    mysql_password = 'test'
    mysql_ip = '192.168.10.8'
    mysql_port = '3306'
    db_name = 'tcp_db'

    engine = create_async_engine(
        "mysql+aiomysql://{username}:{password}@{ip}:{port}/{db_name}?charset=utf8".format(
            username=mysql_username,
            password=mysql_password,
            ip=mysql_ip,
            port=mysql_port,
            db_name=db_name
        ),
        echo=True,
    )
    async with engine.connect() as conn:
        result = await conn.execute(select(User).where(User.id == '1'))

        print(result.fetchall())

    await engine.dispose()

if __name__ == '__main__':
    asyncio.run(select_data())

2.4、更新数据

# -*- coding: utf-8 -*-

import asyncio

from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import declarative_base

from sqlalchemy import Column, update, values
from sqlalchemy import BigInteger, String

Base = declarative_base()

class User(Base):
    __tablename__ = 't_user'
    id = Column(BigInteger, primary_key=True, autoincrement=True)
    username = Column(String(length=6), unique=True, comment='名字')


async def update_data():
    mysql_username = 'test'
    mysql_password = 'test'
    mysql_ip = '192.168.10.8'
    mysql_port = '3306'
    db_name = 'tcp_db'

    engine = create_async_engine(
        "mysql+aiomysql://{username}:{password}@{ip}:{port}/{db_name}?charset=utf8".format(
            username=mysql_username,
            password=mysql_password,
            ip=mysql_ip,
            port=mysql_port,
            db_name=db_name
        ),
        echo=True,
    )
    async with engine.connect() as conn:
        await conn.execute(update(User).where(User.id == '1').values(username='test-1'))

        await conn.commit()

    await engine.dispose()

if __name__ == '__main__':
    asyncio.run(update_data())

2.5、删除数据

# -*- coding: utf-8 -*-

import asyncio

from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import declarative_base

from sqlalchemy import Column, delete
from sqlalchemy import BigInteger, String

Base = declarative_base()


class User(Base):
    __tablename__ = 't_user'
    id = Column(BigInteger, primary_key=True, autoincrement=True)
    username = Column(String(length=6), unique=True, comment='名字')


async def delete_data():
    mysql_username = 'test'
    mysql_password = 'test'
    mysql_ip = '192.168.10.8'
    mysql_port = '3306'
    db_name = 'tcp_db'

    engine = create_async_engine(
        "mysql+aiomysql://{username}:{password}@{ip}:{port}/{db_name}?charset=utf8".format(
            username=mysql_username,
            password=mysql_password,
            ip=mysql_ip,
            port=mysql_port,
            db_name=db_name
        ),
        echo=True,
    )
    async with engine.connect() as conn:
        await conn.execute(delete(User).where(User.id == '1'))
        await conn.commit()
    await engine.dispose()

if __name__ == '__main__':
    asyncio.run(delete_data())

2.6、查询返回异步可迭代对象

# -*- coding: utf-8 -*-

import asyncio

from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import declarative_base

from sqlalchemy import Column, select
from sqlalchemy import BigInteger, String

Base = declarative_base()

class User(Base):
    __tablename__ = 't_user'
    id = Column(BigInteger, primary_key=True, autoincrement=True)
    username = Column(String(length=6), unique=True, comment='名字')

async def select_data():
    mysql_username = 'test'
    mysql_password = 'test'
    mysql_ip = '192.168.10.8'
    mysql_port = '3306'
    db_name = 'tcp_db'

    engine = create_async_engine(
        "mysql+aiomysql://{username}:{password}@{ip}:{port}/{db_name}?charset=utf8".format(
            username=mysql_username,
            password=mysql_password,
            ip=mysql_ip,
            port=mysql_port,
            db_name=db_name
        ),
        echo=True,
    )
    async with engine.connect() as conn:
        async_result = await conn.stream(select(User))
        async for result in async_result:
            print(result)
    await engine.dispose()

if __name__ == '__main__':
    asyncio.run(select_data())

3、异步会话ORM介绍

3.1、初始化数据库

# -*- coding: utf-8 -*-

import asyncio

from sqlalchemy import Column
from sqlalchemy import DateTime
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.future import select
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import relationship
from sqlalchemy.orm import selectinload
from sqlalchemy.orm import sessionmaker

Base = declarative_base()


class Student(Base):
    __tablename__ = "t_student"

    id = Column(Integer, primary_key=True)
    name = Column(String(length=64))
    create_date = Column(DateTime, server_default=func.now())
    clazz = relationship("Clazz")

    # 在刷新之后访问具有服务器默认值或SQL表达式默认值的列,而不会触发过期加载
    __mapper_args__ = {"eager_defaults": True}


class Clazz(Base):
    __tablename__ = "t_clazz"
    id = Column(Integer, primary_key=True)
    student_id = Column(ForeignKey("t_student.id"))
    name = Column(String(length=64))


def get_engine():
    mysql_username = 'test'
    mysql_password = 'test'
    mysql_ip = '192.168.10.8'
    mysql_port = '3306'
    db_name = 'tcp_db'

    return create_async_engine(
        "mysql+aiomysql://{username}:{password}@{ip}:{port}/{db_name}?charset=utf8".format(
            username=mysql_username,
            password=mysql_password,
            ip=mysql_ip,
            port=mysql_port,
            db_name=db_name
        ),
        echo=True,
    )


async def init_db():
    engine = get_engine()
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)
        await conn.run_sync(Base.metadata.create_all)

    await engine.dispose()

if __name__ == '__main__':
    # 初始化数据库
    asyncio.run(init_db())

3.2、批量插入数据

# -*- coding: utf-8 -*-
import asyncio
from sqlalchemy import Column
from sqlalchemy import DateTime
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.future import select
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import relationship
from sqlalchemy.orm import selectinload
from sqlalchemy.orm import sessionmaker

Base = declarative_base()

class Student(Base):
    __tablename__ = "t_student"

    id = Column(Integer, primary_key=True)
    name = Column(String(length=64))
    create_date = Column(DateTime, server_default=func.now())
    clazz = relationship("Clazz")

    # 在刷新之后访问具有服务器默认值或SQL表达式默认值的列,而不会触发过期加载
    __mapper_args__ = {"eager_defaults": True}

class Clazz(Base):
    __tablename__ = "t_clazz"
    id = Column(Integer, primary_key=True)
    student_id = Column(ForeignKey("t_student.id"))
    name = Column(String(length=64))

def get_engine():
    mysql_username = 'test'
    mysql_password = 'test'
    mysql_ip = '192.168.10.8'
    mysql_port = '3306'
    db_name = 'tcp_db'

    return create_async_engine(
        "mysql+aiomysql://{username}:{password}@{ip}:{port}/{db_name}?charset=utf8".format(
            username=mysql_username,
            password=mysql_password,
            ip=mysql_ip,
            port=mysql_port,
            db_name=db_name
        ),
        echo=True,
    )

async def batch_insert_data():
    # 批量增加数据
    engine = get_engine()

    # expire_on_commit=False:表示修改提交值后,后面引这个变量的值,是修改后的值
    async_session = sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)

    async with async_session() as session:
        async with session.begin():
            # 批量增加数据
            session.add_all(
                [
                    Student(clazz=[Clazz(name='班级1'), Clazz(name='班级2')], name="学生-张三"),
                    Student(clazz=[Clazz(name='班级3')], name="学生-王五"),
                    Student(clazz=[Clazz(name='班级4'), Clazz(name='班级5')], name="学生-李四"),
                ]
            )
    await engine.dispose()

if __name__ == '__main__':
    asyncio.run(batch_insert_data())

3.3、查询

# -*- coding: utf-8 -*-

import asyncio

from sqlalchemy import Column
from sqlalchemy import DateTime
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.future import select
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import relationship
from sqlalchemy.orm import selectinload
from sqlalchemy.orm import sessionmaker

Base = declarative_base()


class Student(Base):
    __tablename__ = "t_student"

    id = Column(Integer, primary_key=True)
    name = Column(String(length=64))
    create_date = Column(DateTime, server_default=func.now())
    clazz = relationship("Clazz")

    # 在刷新之后访问具有服务器默认值或SQL表达式默认值的列,而不会触发过期加载
    __mapper_args__ = {"eager_defaults": True}


class Clazz(Base):
    __tablename__ = "t_clazz"
    id = Column(Integer, primary_key=True)
    student_id = Column(ForeignKey("t_student.id"))
    name = Column(String(length=64))


def get_engine():
    mysql_username = 'test'
    mysql_password = 'test'
    mysql_ip = '192.168.10.8'
    mysql_port = '3306'
    db_name = 'tcp_db'

    return create_async_engine(
        "mysql+aiomysql://{username}:{password}@{ip}:{port}/{db_name}?charset=utf8".format(
            username=mysql_username,
            password=mysql_password,
            ip=mysql_ip,
            port=mysql_port,
            db_name=db_name
        ),
        echo=True,
    )

async def select_data():
    # 查询数据
    engine = get_engine()

    # expire_on_commit=False:表示修改提交值后,后面引这个变量的值,是修改后的值
    async_session = sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)

    async with async_session() as session:

        # 查询学生表,再查询学生对应的班级
        stmt = select(Student).options(selectinload(Student.clazz))

        result = await session.execute(stmt)

        for stu in result.scalars():
            for clazz_item in stu.clazz:  # 查询每个学生对应的班级
                print(f'学生名字:{stu.name},班级:{clazz_item.name},创建时间:{stu.create_date}')
    await engine.dispose()


if __name__ == '__main__':
    asyncio.run(select_data())

3.4、查询并且更新

# -*- coding: utf-8 -*-

import asyncio

from sqlalchemy import Column
from sqlalchemy import DateTime
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.future import select
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import relationship
from sqlalchemy.orm import sessionmaker

Base = declarative_base()

class Student(Base):
    __tablename__ = "t_student"

    id = Column(Integer, primary_key=True)
    name = Column(String(length=64))
    create_date = Column(DateTime, server_default=func.now())
    clazz = relationship("Clazz")

    # 在刷新之后访问具有服务器默认值或SQL表达式默认值的列,而不会触发过期加载
    __mapper_args__ = {"eager_defaults": True}

class Clazz(Base):
    __tablename__ = "t_clazz"
    id = Column(Integer, primary_key=True)
    student_id = Column(ForeignKey("t_student.id"))
    name = Column(String(length=64))

def get_engine():
    mysql_username = 'test'
    mysql_password = 'test'
    mysql_ip = '192.168.10.8'
    mysql_port = '3306'
    db_name = 'tcp_db'

    return create_async_engine(
        "mysql+aiomysql://{username}:{password}@{ip}:{port}/{db_name}?charset=utf8".format(
            username=mysql_username,
            password=mysql_password,
            ip=mysql_ip,
            port=mysql_port,
            db_name=db_name
        ),
        echo=True,
    )

async def select_and_update():
    # 查询并且更新
    engine = get_engine()

    # expire_on_commit=False:表示修改提交值后,后面引这个变量的值,是修改后的值
    async_session = sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)

    async with async_session() as session:
        # 查询出学生并且排序
        result = await session.execute(select(Student).order_by(Student.id))

        stu_first = result.scalars().first()

        # 修改第一学生的名字
        stu_first.name = "new data"

        await session.commit()

        # 修改后的值
        print(stu_first.name)

    await engine.dispose()

if __name__ == '__main__':
    asyncio.run(select_and_update())

4、异步session调用普通函数传入普通session工作

# -*- coding: utf-8 -*-

import asyncio

from sqlalchemy import Column
from sqlalchemy import DateTime
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.future import select
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import relationship
from sqlalchemy.orm import sessionmaker

Base = declarative_base()


class Student(Base):
    __tablename__ = "t_student"

    id = Column(Integer, primary_key=True)
    name = Column(String(length=64))
    create_date = Column(DateTime, server_default=func.now())
    clazz = relationship("Clazz")

    # 在刷新之后访问具有服务器默认值或SQL表达式默认值的列,而不会触发过期加载
    __mapper_args__ = {"eager_defaults": True}


class Clazz(Base):
    __tablename__ = "t_clazz"
    id = Column(Integer, primary_key=True)
    student_id = Column(ForeignKey("t_student.id"))
    name = Column(String(length=64))


def get_engine():
    mysql_username = 'test'
    mysql_password = 'test'
    mysql_ip = '192.168.10.8'
    mysql_port = '3306'
    db_name = 'tcp_db'

    return create_async_engine(
        "mysql+aiomysql://{username}:{password}@{ip}:{port}/{db_name}?charset=utf8".format(
            username=mysql_username,
            password=mysql_password,
            ip=mysql_ip,
            port=mysql_port,
            db_name=db_name
        ),
        echo=False,
    )


def query_first(session):
    """
    :param session: 同步的session:<class 'sqlalchemy.orm.session.Session'>
    :return:
    """
    stmt = select(Student)

    result = session.execute(stmt)

    # 查询所有所学对象
    for stu in result.scalars():
        print(stu)

        # lazy loads,遍历学生下面的班级
        for clazz_item in stu.clazz:
            print(clazz_item)

    # legacy Query use 查询学生对象,排序好,取一个值,修改姓名
    stu_obj = session.query(Student).order_by(Student.id).first()

    stu_obj.name = "new data"


async def select_and_update():
    # 查询并且更新
    engine = get_engine()

    # expire_on_commit=False:表示修改提交值后,后面引这个变量的值,是修改后的值
    async_session = sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)

    async with async_session() as session:
        # session:异步的session,<class 'sqlalchemy.orm.session.AsyncSession'>

        await session.run_sync(query_first)  # 调用同步session可以使用些函数
        await session.commit()

    await engine.dispose()


if __name__ == '__main__':
    asyncio.run(select_and_update())

5、使用asyncio扩展的事件

5.1、将异步engine、connection、session转为普通同步函数

AsyncEngine.sync_engine
AsyncConnection.sync_connection
AsyncConnection.sync_engine
AsyncSession.sync_session


使用示例:
engine = get_engine() # 自己生成的engine
async_session = AsyncSession(engine)
sync_session = async_session.sync_session
print(type(sync_session))  # <class 'sqlalchemy.orm.session.Session'>

5.2、Core events的示例

# -*- coding: utf-8 -*-

import asyncio

from sqlalchemy import event, text
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import declarative_base
from sqlalchemy.engine import Engine

Base = declarative_base()

## Core events ##
def get_async_engine():
    mysql_username = 'test'
    mysql_password = 'test'
    mysql_ip = '192.168.10.8'
    mysql_port = '3306'
    db_name = 'tcp_db'

    return create_async_engine(
        "mysql+aiomysql://{username}:{password}@{ip}:{port}/{db_name}?charset=utf8".format(
            username=mysql_username,
            password=mysql_password,
            ip=mysql_ip,
            port=mysql_port,
            db_name=db_name
        ),
        echo=False,
    )

engine = get_async_engine()

@event.listens_for(engine.sync_engine, 'connect')
def my_connect(sync_conn, conn_record):
    """
        连接时,需要做的处理
    :param new_conn: 连接对象
    :param conn_record: 链接池的记录
    :return:
    """
    print('1、连接前的操作:', sync_conn)
    cursor = sync_conn.cursor()

    # sync样式API用于适应DBAPI连接/游标
    cursor.execute("select '2、execute from event'")
    print(cursor.fetchone()[0])

# before_execute event on all Engine instances
@event.listens_for(Engine, "before_execute")
def my_before_execute(
        conn,
        clauseelement,
        multiparams,
        params,
        execution_options,
):
    # 执行SQL之前的操作
    print("3、执行SQL之前的操作,before execute!")

session = AsyncSession(engine)

async def task():
    result = await session.execute(text('select 1'))
    print(f'4、{result}')
    await session.close()
    await engine.dispose()

if __name__ == '__main__':
    asyncio.run(task())

运行结果:
1、连接前的操作: <AdaptedConnection <aiomysql.connection.Connection object at 0x000001A1A36851F0>>
2、execute from event
3、执行SQL之前的操作,before execute!
4、<sqlalchemy.engine.cursor.CursorResult object at 0x000001A1A3685EE0>

5.3、ORM events示例

# -*- coding: utf-8 -*-

import asyncio

from sqlalchemy import event, text
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import Session

Base = declarative_base()

## Core events ##

def get_async_engine():
    mysql_username = 'test'
    mysql_password = 'test'
    mysql_ip = '192.168.10.8'
    mysql_port = '3306'
    db_name = 'tcp_db'

    return create_async_engine(
        "mysql+aiomysql://{username}:{password}@{ip}:{port}/{db_name}?charset=utf8".format(
            username=mysql_username,
            password=mysql_password,
            ip=mysql_ip,
            port=mysql_port,
            db_name=db_name
        ),
        echo=False,
    )

engine = get_async_engine()

session = AsyncSession(engine)

# before_commit event on instance of Session
@event.listens_for(session.sync_session, "before_commit")
def my_before_commit(session):
    """
        提交之前,使用同步session进行操作
    :param session:
    :return:
    """
    print("before commit!")

    # sync style API use on Session
    connection = session.connection()

    # sync style API use on Connection
    result = connection.execute(text("select 'execute from event'"))
    print(result.first())


# after_commit event on all Session instances
@event.listens_for(Session, "after_commit")
def my_after_commit(session):
    """
        提交之后,使用同步session进行操作
    :param session:
    :return:
    """
    print("after commit!")

async def task():
    await session.commit()
    await session.close()
    await engine.dispose()

if __name__ == '__main__':
    asyncio.run(task())


# 运行结果
before commit!
('execute from event',)
after commit!

6、使用多个异步事件循环

# -*- coding: utf-8 -*-

from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import declarative_base
from sqlalchemy.pool import NullPool

Base = declarative_base()

## Core events ##

def get_async_engine():
    mysql_username = 'test'
    mysql_password = 'test'
    mysql_ip = '192.168.10.8'
    mysql_port = '3306'
    db_name = 'tcp_db'

    return create_async_engine(
        "mysql+aiomysql://{username}:{password}@{ip}:{port}/{db_name}?charset=utf8".format(
            username=mysql_username,
            password=mysql_password,
            ip=mysql_ip,
            port=mysql_port,
            db_name=db_name
        ),
        echo=False,
        poolclass=NullPool # 此处设置非连接池
    )

7、异步会话作用域

# -*- coding: utf-8 -*-
import asyncio
from asyncio import current_task

from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy.ext.asyncio import async_scoped_session, create_async_engine
from sqlalchemy.ext.asyncio import AsyncSession

Base = declarative_base()

def get_async_engine():
    mysql_username = 'test'
    mysql_password = 'test'
    mysql_ip = '192.168.10.8'
    mysql_port = '3306'
    db_name = 'tcp_db'

    return create_async_engine(
        "mysql+aiomysql://{username}:{password}@{ip}:{port}/{db_name}?charset=utf8".format(
            username=mysql_username,
            password=mysql_password,
            ip=mysql_ip,
            port=mysql_port,
            db_name=db_name
        ),
        echo=False,
    )

async def run_session():
    engine = get_async_engine()
    async_session_factory = sessionmaker(engine, class_=AsyncSession)
    AsyncScopedSession = async_scoped_session(async_session_factory, scopefunc=current_task)

    some_async_session = AsyncScopedSession()

    # 增加数据
    some_async_session.add('SQL object')

    # 从上下文中,提交数据
    await AsyncScopedSession.commit()

    # 从上下文 中,删除该会话
    await AsyncScopedSession.remove()

if __name__ == '__main__':
    asyncio.run(run_session())

8、使用检查器检查对象模式

# -*- coding: utf-8 -*-
import asyncio

from sqlalchemy import inspect
from sqlalchemy.orm import declarative_base
from sqlalchemy.ext.asyncio import create_async_engine

Base = declarative_base()


def get_async_engine():
    mysql_username = 'test'
    mysql_password = 'test'
    mysql_ip = '192.168.10.8'
    mysql_port = '3306'
    db_name = 'tcp_db'

    return create_async_engine(
        "mysql+aiomysql://{username}:{password}@{ip}:{port}/{db_name}?charset=utf8".format(
            username=mysql_username,
            password=mysql_password,
            ip=mysql_ip,
            port=mysql_port,
            db_name=db_name
        ),
        echo=False,
    )


def use_inspector(conn):
    inspector = inspect(conn)
    # 获取视图列表
    print(inspector.get_view_names())

    # 返回当前所有的表名
    return inspector.get_table_names()


async def async_main():
    engine = get_async_engine()
    async with engine.connect() as conn:
        tables = await conn.run_sync(use_inspector)
        print(tables)

    await engine.dispose()

if __name__ == '__main__':
    asyncio.run(async_main())

 

标签:engine,__,SQLAlchemy,name,mysql,sqlalchemy,import,aiomysql
From: https://www.cnblogs.com/ygbh/p/16972845.html

相关文章

  • Ubuntu20.04安装python3-pip后安装sqlalchemy报错AttributeError: module 'platform'
    解决方法如下:1.首先卸载已经安装python3-pipsudoaptremovepython3-pip2.安装python3.8-pipsudopython3.8-measy_installpip3.安装sqlalchemywang@wang:~$sudopip3.......
  • sqlalchemy - sqlalchemy中执行原生sql - 传参方式避免了sql注入(转)
    https://blog.csdn.net/xuezhangjun0121/article/details/103993135 defget_data_all(user_id,name,start_time,end_time,page=1,limit=10):"""sqlalc......
  • flask SQLAlchemy 增删改查
    前言一直在用flask+ SQLAlchemy,每次数据联动,因为踩过坑,就更新一下自己的认识,若有错误,请谅解准备模块click==8.1.3Flask==1.1.2Flask-SQLAlchem......
  • Flask-SQLAlchemy
    一.介绍SQLAlchemy是一个基于Python实现的ORM框架。该框架建立在DBAPI之上,使用关系对象映射进行数据库操作,简言之便是:将类和对象转换成SQL,然后使用数据API执行SQL并获......
  • Flask 学习-97.Flask-SQLAlchemy 排序 order_by()
    前言order_by()对查询结果排序按字段排序根据id字段排序,默认是正序a=Students.query.order_by(Students.id).all()print(a)使用asc()函数正序a=Student......
  • Flask 学习-96.Flask-SQLAlchemy 判断查询结果是否存在的几种方式
    前言在查询的时候,经常需要先判断是否存在结果,再进行下一步操作。这里总结了判断查询结果是否存在的几种方式count()统计个数count()方法返回记录条数,使用示例withap......
  • Flask 学习-95.Flask-SQLAlchemy 查询今天当天的数据
    前言查询今天的数据,或者查询某一天的数据SQLDATE()function使我们能够从特定的历史或当前时间戳值访问日期值。DATE()函数Date()函数返回从传递的datetime表达式中提......
  • sqlalchemy的连接方式
    这是比较推荐的连接方式,基于threading.local实现的。#-*-coding:utf-8-*-fromsqlalchemyimportcreate_enginefromsqlalchemy.ormimportsessionmaker,sc......
  • sqlalchemy--m2m
    这一块先建个表,后面再讨论#-*-coding:utf-8-*-fromsqlalchemy.ext.declarativeimportdeclarative_basefromsqlalchemyimportcreate_engine,Column,String,......
  • 原生sqlalchemy
    一、简介SQLAlchemy是一个基于Python实现的ORM框架。该框架建立在DBAPI之上,使用关系对象映射进行数据库操作,简言之便是:将类和对象转换成SQL,然后使用数据API执行SQL并......