1 sqlalchemy介绍和快速使用
# sqlalchemy:orm框架 -django orm:只能给django用,不能独立用 -sqlalchemy:独立使用,集成到web项目中 -peewee:小 -tortoise-orm :异步orm框架 # 安装:pip3 install sqlalchemy # 组成部分: Engine,框架的引擎 Connection Pooling ,数据库连接池 Dialect,选择连接数据库的DB API种类:mysql,sqllite。。。 Schema/Types,架构和类型 SQL Exprression Language,SQL表达式语言 # 能够操作的关系型数据库 pymysql mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>] MySQL-Connector mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname> cx_Oracle oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...] 更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html
1.1 原生操作的快速使用
# 使用步骤: # 第一步:导入包 from threading import Thread import sqlalchemy from sqlalchemy import create_engine from sqlalchemy.engine.base import Engine # 第二步:实例化得到一个engine engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/luffy?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有,线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) # 第三步:通过engine拿到一个链接 # 拿到一个conn对象,从连接池中取出一个链接 def task(): conn = engine.raw_connection() cursor = conn.cursor() cursor.execute("select * from luffy_banner") print(cursor.fetchall()) ##第三步:多线程测试 for i in range(20): t=Thread(target=task) t.start()
2 创建操作数据表
#通过类 创建和删除表 -第一步:导入一些依赖 -第二步:创建成一个Base:Base = declarative_base() -第三步:写类:都集成Base class User(Base): -第四步:写字段,字段都是Column类的对象,通过参数控制字段类型,是否可以为空,是否索引。。。 id = Column(Integer, primary_key=True) # id 主键 name = Column(String(32), index=True, nullable=False) -第五步:定义表名,联合唯一,联合索引 __tablename__ = 'users' # 数据库表名称 # 定义联合索引,联合唯一 __table_args__ = ( UniqueConstraint('id', 'name', name='uix_id_name'), # 联合唯一 Index('ix_id_name', 'name', 'email'), # 联合索引 ) -第六步:把被Base管理的所有表,同步到数据库中[不能创建数据库,不能删除修改字段] engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/aaa?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) # 创建出所有被Base管理的表 Base.metadata.create_all(engine) -第七步:删除被Base管理的所有表 # 删除所有被Base管理的表 Base.metadata.drop_all(engine)
# 写一个个类,继承某个父类,写字段 # 第一步:导入一些依赖 from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index import datetime from sqlalchemy import create_engine # 第二步:创建一个父类 Base = declarative_base() # 第三步:写类,继承父类 class User(Base): # 第四步:写字段,所有字段都是Column的对象,在里面通过参数控制类型 id = Column(Integer, primary_key=True) # id 主键 name = Column(String(32), index=True, nullable=False) # varchar32 name列,索引,不可为空 email = Column(String(32), unique=True) # 唯一 # datetime.datetime.now不能加括号,加了括号,以后永远是当前时间 ctime = Column(DateTime, default=datetime.datetime.now) extra = Column(Text, nullable=True) # 定义表名字 __tablename__ = 'users' # 数据库表名称 # 定义联合索引,联合唯一 __table_args__ = ( UniqueConstraint('id', 'name', name='uix_id_name'), # 联合唯一 Index('ix_id_name', 'name', 'email'), # 联合索引 ) class Book(Base): __tablename__ = 'books' # 数据库表名称 id = Column(Integer, primary_key=True) # id 主键 name = Column(String(32), index=True, nullable=False) # varchar32 name列,索引,不可为空 price = Column(Integer) class Publish(Base): __tablename__ = 'publish' # 数据库表名称 id = Column(Integer, primary_key=True) # id 主键 name = Column(String(32), nullable=True) # 第5步:sqlalchemy没有迁移一说,只能创建出被Base管理的所有表,和删除被Base管理的所有表 # sqlalchemy不能创建数据库,不能修改,删除字段,只能创建表,和删除表 def init_db(): engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/aaa?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) # 创建出所有被Base管理的表 Base.metadata.create_all(engine) def drop_db(): engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/aaa?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) # 删除所有被Base管理的表 Base.metadata.drop_all(engine) if __name__ == '__main__': init_db() # drop_db()
2.2 sqlalchemy快速插入数据
from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from models import User,Book # 第一步:创建engine engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/aaa", max_overflow=0, pool_size=5) # 第二步:通过engine,获得session对象:跟之前学的cookie,session不是一个东西 Session = sessionmaker(bind=engine) # 每次执行数据库操作时,都需要创建一个Connection session = Session() # 第三步,通过session操作插入数据 # book=Book(name='',pri水浒传ce=33) user=User(name='lqz',email='3@qq.com',extra='很帅') session.add(user) session.commit() session.close()
3 scoped_session线程安全
from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from models import User, Book # 第一步:创建engine engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/aaa", max_overflow=0, pool_size=5) # 第二步:通过engine,获得session对象:跟之前学的cookie,session不是一个东西 Session = sessionmaker(bind=engine) # session是链接对象,如果集成到flask中,我们是吧session定义成全局,还是每个视图函数一个session呢?正常来讲要每个视图函数定义一个session,有些麻烦 # sqlalchemy 帮咱提供了一个只要定义一次的session,能够做到在不同线程中,使用的是自己的session,底层基于local from sqlalchemy.orm import scoped_session from threading import Thread # 原来 # session=Session() #不是线程安全 # 以后咱们使用这个它做到了线程安全 session = scoped_session(Session) def task(i): user = User(name='彭于晏%s' % i, email='%s@qq.com' % i, extra='很丑') session.add(user) session.commit() session.close() for i in range(50): t = Thread(target=task, args=[i, ]) t.start()