(一些框架介绍)
1 # 1 sqlalchemy 企业级orm框架 2 官网:https://www.sqlalchemy.org/ 3 # 2 python界的orm框架 4 -1 django-orm #只能django框架用 5 -2 peewee # 小型orm框架:https://docs.peewee-orm.com/en/latest/peewee/quickstart.html 6 -----同步orm框架----- 7 -3 sqlalchemy # 企业级,使用非常广泛:https://docs.sqlalchemy.org/en/20/orm/quickstart.html # 可以用在flask上,还可以用在fastapi 8 ---中间态---- 9 -4 Tortoise ORM # fastapi用的比较多 10 11 # 3 django ,flask---》同步框架--》新版本支持异步 12 13 # 4 fastapi,sanic,tornado---》异步框架--》支持同步的写法--》如果在异步框架上写同步 14 -众多第三方库---》都是同步的--》导致异步框架性能发挥不出来 15 -redis:aioredis --》redis-py 16 -mysql:aiomysql --》pymysql
[快速使用]
1 # 1 安装 pip install sqlalchemy 2 - 2.0.30版本 3 4 # 2 架构 5 Engine,框架的引擎 6 Connection Pooling ,数据库连接池 7 Dialect,选择连接数据库的DB API种类 8 SQL Exprression Language,SQL表达式语言 9 10 # 3 链接不同数据库 11 3.1 postgresql 12 # default 13 engine = create_engine("postgresql://scott:tiger@localhost/mydatabase") 14 # psycopg2 15 engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/mydatabase") 16 # pg8000 17 engine = create_engine("postgresql+pg8000://scott:tiger@localhost/mydatabase") 18 19 3.2 Mysql 20 # default 21 engine = create_engine("mysql://scott:tiger@localhost/foo") 22 # mysqlclient (a maintained fork of MySQL-Python) 23 engine = create_engine("mysql+mysqldb://scott:tiger@localhost/foo") 24 # PyMySQL 25 engine = create_engine("mysql+pymysql://scott:tiger@localhost/foo") 26 27 3.3 oracle 28 engine = create_engine("oracle://scott:[email protected]:1521/sidname") 29 engine = create_engine("oracle+cx_oracle://scott:tiger@tnsname") 30 31 3.4 Microsoft SQL Server 32 pyodbc 33 engine = create_engine("mssql+pyodbc://scott:tiger@mydsn") 34 pymssql 35 engine = create_engine("mssql+pymssql://scott:tiger@hostname:port/dbname") 36 37 3.5 sqlite 38 Unix/Mac - 4 initial slashes in total 39 engine = create_engine("sqlite:////absolute/path/to/foo.db") 40 Windows 41 engine = create_engine("sqlite:///C:\\path\\to\\foo.db") 42 Windows alternative using raw string 43 engine = create_engine(r"sqlite:///C:\path\to\foo.db")
【sqlalchemy 原生操作】
1 import threading 2 # 1 创建 engin 3 from sqlalchemy import create_engine 4 5 engine = create_engine( 6 "mysql+pymysql://root:[email protected]:3306/choose_classes?charset=utf8", 7 max_overflow=0, # 超过连接池大小外最多创建的连接 8 pool_size=5, # 连接池大小 9 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 10 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) 11 ) 12 13 14 # 2 原生操作mysql---》用了连接池---》就可以不用dbutils模块了 15 def task(): 16 conn = engine.raw_connection() 17 cursor = conn.cursor() 18 cursor.execute( 19 "select * from school" 20 ) 21 result = cursor.fetchall() 22 print(result) 23 cursor.close() # 把链接放回到链接池 24 conn.close() 25 26 27 for i in range(20): 28 t = threading.Thread(target=task) 29 t.start()
【sqlalchemy操作表】
1 from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index 2 import datetime 3 # 第一步:创建基类,以后所有类,都必须继承基类 4 5 # 1 老版本创建基类(不建议) 6 # from sqlalchemy.ext.declarative import declarative_base 7 # Base = declarative_base() 8 9 # 2 新版本创建基类 10 from sqlalchemy.orm import DeclarativeBase 11 12 13 class Base(DeclarativeBase): 14 pass 15 16 17 # 第二步:写个类,继承 18 # 第三步:写字段:传统方式,类型方式 19 class User(Base): 20 __tablename__ = 'user' # 指定表名 21 id = Column(Integer, primary_key=True, autoincrement=True) 22 name = Column(String(32), index=True, nullable=False) # name列varchar32,索引,不可为空 23 email = Column(String(32), unique=True) # email 列,varchar32,唯一 24 # datetime.datetime.now不能加括号,加了括号,以后永远是当前时间 25 ctime = Column(DateTime, default=datetime.datetime.now) 26 extra = Column(Text, nullable=True) 27 28 29 if __name__ == '__main__': 30 from sqlalchemy import create_engine 31 32 engine = create_engine( 33 "mysql+pymysql://root:[email protected]:3306/test2?charset=utf8", 34 max_overflow=0, # 超过连接池大小外最多创建的连接 35 pool_size=5, # 连接池大小 36 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 37 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) 38 ) 39 # 第四步:创建表(不能创建库,只能新增或删除表,不能增加删除字段) 40 Base.metadata.create_all(engine) 41 42 # 第五步:删除表 43 # Base.metadata.drop_all(engine)
【增删查改】
模型表:
1 from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index 2 import datetime 3 # 第一步:创建基类,以后所有类,都必须继承基类 4 5 # 1 老版本创建基类(不建议) 6 # from sqlalchemy.ext.declarative import declarative_base 7 # Base = declarative_base() 8 9 # 2 新版本创建基类 10 from sqlalchemy.orm import DeclarativeBase 11 12 13 class Base(DeclarativeBase): 14 pass 15 16 17 # 第二步:写个类,继承 18 # 第三步:写字段:传统方式,类型方式 19 class User(Base): 20 __tablename__ = 'user' # 指定表名 21 id = Column(Integer, primary_key=True, autoincrement=True) 22 name = Column(String(32), index=True, nullable=False) # name列varchar32,索引,不可为空 23 email = Column(String(32), unique=True) # email 列,varchar32,唯一 24 # datetime.datetime.now不能加括号,加了括号,以后永远是当前时间 25 ctime = Column(DateTime, default=datetime.datetime.now) 26 extra = Column(Text, nullable=True) 27 28 def __str__(self): 29 return self.name 30 31 # 放在别的对象里面触发的是repr 32 def __repr__(self): 33 return self.name 34 35 36 if __name__ == '__main__': 37 from sqlalchemy import create_engine 38 39 engine = create_engine( 40 "mysql+pymysql://root:[email protected]:3306/test2?charset=utf8", 41 max_overflow=0, # 超过连接池大小外最多创建的连接 42 pool_size=5, # 连接池大小 43 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 44 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) 45 ) 46 # 第四步:创建表(不能创建库,只能新增或删除表,不能增加删除字段) 47 Base.metadata.create_all(engine) 48 49 # 第五步:删除表 50 # Base.metadata.drop_all(engine)
1 from models import User 2 # 第一步:创建engine 3 from sqlalchemy import create_engine 4 5 engine = create_engine( 6 "mysql+pymysql://root:[email protected]:3306/test2?charset=utf8", 7 max_overflow=0, # 超过连接池大小外最多创建的连接 8 pool_size=5, # 连接池大小 9 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 10 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) 11 ) 12 13 # 第二步:创建 session对象---老方式 14 # from sqlalchemy.orm import sessionmaker 15 # Session = sessionmaker(bind=engine) 16 # session=Session() 17 # 第二步:创建 session对象---新方式(推荐) 18 from sqlalchemy.orm import Session 19 20 session = Session(engine) 21 22 # 第三步:使用session对象 23 # 1 新增 24 user1 = User(name='jh', email='[email protected]') 25 user2 = User(name='cxx', email='[email protected]') 26 # session.add(user) 27 session.add_all([user1, user2]) 28 session.commit() # 提交事务 29 session.close() 30 31 # 2 搜索--》简单 32 res = session.query(User).filter_by(name='cxx').all() 33 res = session.query(User).filter_by(id=1).all() 34 print(res) # [<models.User object at 0x000002601E3A8610>] 35 print(res[0]) 36 # 打印的都是对象,重写方法 37 # def __repr__(self): 38 # return self.name 39 40 # 3 删除 41 res = session.query(User).filter_by(name='cxx').delete() 42 print(res) 43 session.commit() 44 45 # 4 修改 46 res = session.query(User).filter_by(id=1).update({'name': 'ooo'}) 47 session.commit() 48 user = session.query(User).filter_by(id=1).first() 49 print(user.id) 50 user.name = 'uuii' 51 session.add(user) # [id字段在不在] 如果对象不存在,就是新增,如果对象存在,就是修改 52 session.commit()
【表关系】
一对多关系
模型表基于上述模型表新增
1 from sqlalchemy.orm import relationship 2 3 # 一对多关系 4 class Hobby(Base): 5 __tablename__ = 'hobby' 6 id = Column(Integer, primary_key=True) 7 caption = Column(String(50), default='篮球') 8 9 def __str__(self): 10 return self.caption 11 12 13 class Person(Base): 14 __tablename__ = 'person' 15 nid = Column(Integer, primary_key=True) 16 name = Column(String(32), index=True, nullable=True) 17 # hobby指的是tablename而不是类名 18 hobby_id = Column(Integer, ForeignKey("hobby.id")) # 一个爱好,可以被多个人喜欢,一个人只能喜欢一个爱好 19 20 # 跟数据库无关,不会新增字段,只用于快速链表操作 21 # 类名,backref用于反向查询 22 hobby = relationship('Hobby', backref='pers') 23 24 def __str__(self): 25 return self.name 26 27 def __repr__(self): 28 return self.name
1 from models import Person, Hobby 2 # 第一步:创建engine 3 from sqlalchemy import create_engine 4 5 engine = create_engine( 6 "mysql+pymysql://root:[email protected]:3306/test2?charset=utf8", 7 max_overflow=0, # 超过连接池大小外最多创建的连接 8 pool_size=5, # 连接池大小 9 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 10 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) 11 ) 12 13 # 第二步:创建 session对象---老方式 14 from sqlalchemy.orm import Session 15 16 session = Session(engine) 17 18 if __name__ == '__main__': 19 # 笨办法增加 20 # 1 先增加一个hobby 21 hobby = Hobby() 22 session.add(hobby) 23 session.commit() 24 # 2 增加Person---》必须要有hobby_id 25 person = Person(name='wxx', hobby_id=1) 26 session.add(person) 27 session.commit() 28 29 # 简便方法--》增加person的同时,增加了Hobby 30 person = Person(name='lqz', hobby=Hobby(caption='乒乓球')) 31 session.add(person) 32 session.commit() 33 hobby = session.query(Hobby).filter_by(id=1).first() 34 person = Person(name='xh', hobby=hobby) 35 session.add(person) 36 session.commit() 37 38 # 基于对象的跨表查询--->正向 39 person = session.query(Person).filter_by(nid=2).first() 40 print(person) 41 print(person.hobby_id) 42 print(person.hobby) 43 44 # 基于对象的跨表查询--->反向 45 hobby = session.query(Hobby).filter_by(id=1).first() 46 print(hobby.caption) 47 print(hobby.pers)
【scoped线程安全】
# 1 如果我们把session 做成全局 单例
-每个视图函数,用同一个session,有问题
#2 如果在每个视图函数中,都对session实例化一次
-代码有点麻烦
# 3 全局就用一个session对象,它在不同线程中---》都是这个线程自己的
1 from models import User 2 import threading 3 # 第一步:创建engine 4 from sqlalchemy import create_engine 5 6 engine = create_engine( 7 "mysql+pymysql://root:[email protected]:3306/sqlalchemy001?charset=utf8", 8 max_overflow=0, # 超过连接池大小外最多创建的连接 9 pool_size=5, # 连接池大小 10 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 11 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) 12 ) 13 14 # 第二步:创建 线程安全的 session 15 from sqlalchemy.orm import sessionmaker 16 from sqlalchemy.orm import scoped_session 17 18 Session = sessionmaker(bind=engine) 19 session = scoped_session(Session) 20 21 22 # 第三步:正常使用-->再flask中,使用全局的session即可,实现:不同线程使用线程自己的session对象 23 def task(se, i): 24 session = se() 25 session.add(User(name='xxx', email=f'{i}@qq.com')) 26 session.commit() 27 print('=========', session) 28 29 30 if __name__ == '__main__': 31 l = [] 32 for i in range(10): 33 t = threading.Thread(target=task, args=[session, i]) 34 t.start() 35 l.append(t) 36 for i in l: 37 i.join()
标签:engine,__,sqlalchemy,create,session,import From: https://www.cnblogs.com/liuliu1/p/18253128