首页 > 数据库 >sqlalchemy

sqlalchemy

时间:2022-12-15 20:14:27浏览次数:66  
标签:engine __ sqlalchemy name session Base

1 sqlalchemy介绍和快速使用

# sqlalchemy:orm框架
    -django orm:只能给django用,不能独立用
    -sqlalchemy:独立使用,集成到web项目中
    -peewee:小
    -tortoise-orm :异步orm框架
    
    
# 安装:pip3 install sqlalchemy

# 组成部分:
    Engine,框架的引擎
    Connection Pooling ,数据库连接池
    Dialect,选择连接数据库的DB API种类:mysql,sqllite。。。
    Schema/Types,架构和类型
    SQL Exprression Language,SQL表达式语言
    
# 能够操作的关系型数据库
    pymysql
        mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]

    MySQL-Connector
        mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>

    cx_Oracle
        oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]

    更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html

1.1 原生操作的快速使用

# 使用步骤:
# 第一步:导入包
from threading import Thread
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine

# 第二步:实例化得到一个engine
engine = create_engine(
    "mysql+pymysql://root:[email protected]:3306/luffy?charset=utf8",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有,线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)

# 第三步:通过engine拿到一个链接
# 拿到一个conn对象,从连接池中取出一个链接
def task():
    conn = engine.raw_connection()
    cursor = conn.cursor()
    cursor.execute("select * from luffy_banner")
    print(cursor.fetchall())

##第三步:多线程测试
for i in range(20):
    t=Thread(target=task)
    t.start()

2 创建操作数据表

#通过类 创建和删除表
    -第一步:导入一些依赖
    -第二步:创建成一个Base:Base = declarative_base()
    -第三步:写类:都集成Base
        class User(Base):
    -第四步:写字段,字段都是Column类的对象,通过参数控制字段类型,是否可以为空,是否索引。。。
        id = Column(Integer, primary_key=True)  # id 主键
        name = Column(String(32), index=True, nullable=False)
    -第五步:定义表名,联合唯一,联合索引
         __tablename__ = 'users'  # 数据库表名称
        # 定义联合索引,联合唯一
        __table_args__ = (
            UniqueConstraint('id', 'name', name='uix_id_name'),  # 联合唯一
            Index('ix_id_name', 'name', 'email'),  # 联合索引
        )
    -第六步:把被Base管理的所有表,同步到数据库中[不能创建数据库,不能删除修改字段]
         engine = create_engine(
            "mysql+pymysql://root:[email protected]:3306/aaa?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
        # 创建出所有被Base管理的表
        Base.metadata.create_all(engine)
    -第七步:删除被Base管理的所有表
        # 删除所有被Base管理的表
        Base.metadata.drop_all(engine)
# 写一个个类,继承某个父类,写字段
# 第一步:导入一些依赖
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
import datetime
from sqlalchemy import create_engine

# 第二步:创建一个父类
Base = declarative_base()


# 第三步:写类,继承父类
class User(Base):
    # 第四步:写字段,所有字段都是Column的对象,在里面通过参数控制类型
    id = Column(Integer, primary_key=True)  # id 主键
    name = Column(String(32), index=True, nullable=False)  # varchar32  name列,索引,不可为空
    email = Column(String(32), unique=True)  # 唯一
    # datetime.datetime.now不能加括号,加了括号,以后永远是当前时间
    ctime = Column(DateTime, default=datetime.datetime.now)
    extra = Column(Text, nullable=True)

    # 定义表名字
    __tablename__ = 'users'  # 数据库表名称
    # 定义联合索引,联合唯一
    __table_args__ = (
        UniqueConstraint('id', 'name', name='uix_id_name'),  # 联合唯一
        Index('ix_id_name', 'name', 'email'),  # 联合索引
    )


class Book(Base):
    __tablename__ = 'books'  # 数据库表名称
    id = Column(Integer, primary_key=True)  # id 主键
    name = Column(String(32), index=True, nullable=False)  # varchar32  name列,索引,不可为空
    price = Column(Integer)


class Publish(Base):
    __tablename__ = 'publish'  # 数据库表名称
    id = Column(Integer, primary_key=True)  # id 主键
    name = Column(String(32), nullable=True)

# 第5步:sqlalchemy没有迁移一说,只能创建出被Base管理的所有表,和删除被Base管理的所有表


# sqlalchemy不能创建数据库,不能修改,删除字段,只能创建表,和删除表
def init_db():
    engine = create_engine(
        "mysql+pymysql://root:[email protected]:3306/aaa?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
    # 创建出所有被Base管理的表
    Base.metadata.create_all(engine)


def drop_db():
    engine = create_engine(
        "mysql+pymysql://root:[email protected]:3306/aaa?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
    # 删除所有被Base管理的表
    Base.metadata.drop_all(engine)


if __name__ == '__main__':
    init_db()
    # drop_db()

2.2 sqlalchemy快速插入数据

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import User,Book
# 第一步:创建engine
engine = create_engine("mysql+pymysql://root:[email protected]:3306/aaa", max_overflow=0, pool_size=5)

# 第二步:通过engine,获得session对象:跟之前学的cookie,session不是一个东西
Session = sessionmaker(bind=engine)
# 每次执行数据库操作时,都需要创建一个Connection
session = Session()

# 第三步,通过session操作插入数据
# book=Book(name='',pri水浒传ce=33)
user=User(name='lqz',email='[email protected]',extra='很帅')
session.add(user)
session.commit()
session.close()

3 scoped_session线程安全

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import User, Book

# 第一步:创建engine
engine = create_engine("mysql+pymysql://root:[email protected]:3306/aaa", max_overflow=0, pool_size=5)

# 第二步:通过engine,获得session对象:跟之前学的cookie,session不是一个东西
Session = sessionmaker(bind=engine)


# session是链接对象,如果集成到flask中,我们是吧session定义成全局,还是每个视图函数一个session呢?正常来讲要每个视图函数定义一个session,有些麻烦

# sqlalchemy 帮咱提供了一个只要定义一次的session,能够做到在不同线程中,使用的是自己的session,底层基于local

from sqlalchemy.orm import scoped_session
from threading import Thread

# 原来
# session=Session() #不是线程安全
# 以后咱们使用这个它做到了线程安全
session = scoped_session(Session)
def task(i):
    user = User(name='彭于晏%s' % i, email='%[email protected]' % i, extra='很丑')
    session.add(user)
    session.commit()
    session.close()

for i in range(50):
    t = Thread(target=task, args=[i, ])
    t.start()

 

标签:engine,__,sqlalchemy,name,session,Base
From: https://www.cnblogs.com/shangxin-bai/p/16985919.html

相关文章

  • SQLAlchemy+aiomysql
    1、安装模板pipinstallaiomysqlpipinstallsqlalchemy2、engine核心2.1、初始化数据库#-*-coding:utf-8-*-importasynciofromsqlalchemy.ext.asynci......
  • Ubuntu20.04安装python3-pip后安装sqlalchemy报错AttributeError: module 'platform'
    解决方法如下:1.首先卸载已经安装python3-pipsudoaptremovepython3-pip2.安装python3.8-pipsudopython3.8-measy_installpip3.安装sqlalchemywang@wang:~$sudopip3.......
  • sqlalchemy - sqlalchemy中执行原生sql - 传参方式避免了sql注入(转)
    https://blog.csdn.net/xuezhangjun0121/article/details/103993135 defget_data_all(user_id,name,start_time,end_time,page=1,limit=10):"""sqlalc......
  • flask SQLAlchemy 增删改查
    前言一直在用flask+ SQLAlchemy,每次数据联动,因为踩过坑,就更新一下自己的认识,若有错误,请谅解准备模块click==8.1.3Flask==1.1.2Flask-SQLAlchem......
  • Flask-SQLAlchemy
    一.介绍SQLAlchemy是一个基于Python实现的ORM框架。该框架建立在DBAPI之上,使用关系对象映射进行数据库操作,简言之便是:将类和对象转换成SQL,然后使用数据API执行SQL并获......
  • Flask 学习-97.Flask-SQLAlchemy 排序 order_by()
    前言order_by()对查询结果排序按字段排序根据id字段排序,默认是正序a=Students.query.order_by(Students.id).all()print(a)使用asc()函数正序a=Student......
  • Flask 学习-96.Flask-SQLAlchemy 判断查询结果是否存在的几种方式
    前言在查询的时候,经常需要先判断是否存在结果,再进行下一步操作。这里总结了判断查询结果是否存在的几种方式count()统计个数count()方法返回记录条数,使用示例withap......
  • Flask 学习-95.Flask-SQLAlchemy 查询今天当天的数据
    前言查询今天的数据,或者查询某一天的数据SQLDATE()function使我们能够从特定的历史或当前时间戳值访问日期值。DATE()函数Date()函数返回从传递的datetime表达式中提......
  • sqlalchemy的连接方式
    这是比较推荐的连接方式,基于threading.local实现的。#-*-coding:utf-8-*-fromsqlalchemyimportcreate_enginefromsqlalchemy.ormimportsessionmaker,sc......
  • sqlalchemy--m2m
    这一块先建个表,后面再讨论#-*-coding:utf-8-*-fromsqlalchemy.ext.declarativeimportdeclarative_basefromsqlalchemyimportcreate_engine,Column,String,......