首页 > 数据库 >sqlalchemy

sqlalchemy

时间:2024-06-17 20:11:14浏览次数:30  
标签:engine __ sqlalchemy create session import

(一些框架介绍)

 1 # 1 sqlalchemy 企业级orm框架
 2 官网:https://www.sqlalchemy.org/
 3 # 2 python界的orm框架
 4     -1 django-orm     #只能django框架用
 5     -2 peewee          # 小型orm框架:https://docs.peewee-orm.com/en/latest/peewee/quickstart.html
 6     -----同步orm框架-----
 7     -3 sqlalchemy     # 企业级,使用非常广泛:https://docs.sqlalchemy.org/en/20/orm/quickstart.html  # 可以用在flask上,还可以用在fastapi
 8     ---中间态----
 9     -4 Tortoise ORM  # fastapi用的比较多  
10     
11 # 3 django ,flask---》同步框架--》新版本支持异步
12 
13 # 4 fastapi,sanic,tornado---》异步框架--》支持同步的写法--》如果在异步框架上写同步
14     -众多第三方库---》都是同步的--》导致异步框架性能发挥不出来
15     -redis:aioredis  --》redis-py
16     -mysql:aiomysql  --》pymysql

 

[快速使用]

 1 #  1 安装 pip install sqlalchemy   
 2     - 2.0.30版本
 3         
 4 # 2 架构
 5     Engine,框架的引擎
 6     Connection Pooling ,数据库连接池
 7     Dialect,选择连接数据库的DB API种类 
 8     SQL Exprression Language,SQL表达式语言
 9     
10 # 3 链接不同数据库
11    3.1 postgresql
12     # default
13       engine = create_engine("postgresql://scott:tiger@localhost/mydatabase")
14     # psycopg2
15       engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/mydatabase")
16     # pg8000
17       engine = create_engine("postgresql+pg8000://scott:tiger@localhost/mydatabase")
18 
19   3.2 Mysql
20    # default
21      engine = create_engine("mysql://scott:tiger@localhost/foo")
22    # mysqlclient (a maintained fork of MySQL-Python)
23      engine = create_engine("mysql+mysqldb://scott:tiger@localhost/foo")
24   # PyMySQL
25     engine = create_engine("mysql+pymysql://scott:tiger@localhost/foo")
26 
27   3.3 oracle
28     engine = create_engine("oracle://scott:[email protected]:1521/sidname")
29     engine = create_engine("oracle+cx_oracle://scott:tiger@tnsname")
30 
31   3.4 Microsoft SQL Server
32     pyodbc
33       engine = create_engine("mssql+pyodbc://scott:tiger@mydsn")
34     pymssql
35       engine = create_engine("mssql+pymssql://scott:tiger@hostname:port/dbname")
36 
37   3.5 sqlite
38     Unix/Mac - 4 initial slashes in total
39       engine = create_engine("sqlite:////absolute/path/to/foo.db")
40     Windows
41       engine = create_engine("sqlite:///C:\\path\\to\\foo.db")
42     Windows alternative using raw string
43       engine = create_engine(r"sqlite:///C:\path\to\foo.db")

 

 

【sqlalchemy 原生操作】

 1 import threading
 2 #  1 创建 engin
 3 from sqlalchemy import create_engine
 4 
 5 engine = create_engine(
 6     "mysql+pymysql://root:[email protected]:3306/choose_classes?charset=utf8",
 7     max_overflow=0,  # 超过连接池大小外最多创建的连接
 8     pool_size=5,  # 连接池大小
 9     pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
10     pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
11 )
12 
13 
14 # 2 原生操作mysql---》用了连接池---》就可以不用dbutils模块了
15 def task():
16     conn = engine.raw_connection()
17     cursor = conn.cursor()
18     cursor.execute(
19         "select * from school"
20     )
21     result = cursor.fetchall()
22     print(result)
23     cursor.close()  # 把链接放回到链接池
24     conn.close()
25 
26 
27 for i in range(20):
28     t = threading.Thread(target=task)
29     t.start()

 

 

【sqlalchemy操作表】

 1 from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
 2 import datetime
 3 # 第一步:创建基类,以后所有类,都必须继承基类
 4 
 5 # 1 老版本创建基类(不建议)
 6 # from sqlalchemy.ext.declarative import declarative_base
 7 # Base = declarative_base()
 8 
 9 # 2 新版本创建基类
10 from sqlalchemy.orm import DeclarativeBase
11 
12 
13 class Base(DeclarativeBase):
14     pass
15 
16 
17 # 第二步:写个类,继承
18 # 第三步:写字段:传统方式,类型方式
19 class User(Base):
20     __tablename__ = 'user'  # 指定表名
21     id = Column(Integer, primary_key=True, autoincrement=True)
22     name = Column(String(32), index=True, nullable=False)  # name列varchar32,索引,不可为空
23     email = Column(String(32), unique=True)  # email 列,varchar32,唯一
24     # datetime.datetime.now不能加括号,加了括号,以后永远是当前时间
25     ctime = Column(DateTime, default=datetime.datetime.now)
26     extra = Column(Text, nullable=True)
27 
28 
29 if __name__ == '__main__':
30     from sqlalchemy import create_engine
31 
32     engine = create_engine(
33         "mysql+pymysql://root:[email protected]:3306/test2?charset=utf8",
34         max_overflow=0,  # 超过连接池大小外最多创建的连接
35         pool_size=5,  # 连接池大小
36         pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
37         pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
38     )
39     # 第四步:创建表(不能创建库,只能新增或删除表,不能增加删除字段)
40     Base.metadata.create_all(engine)
41 
42     # 第五步:删除表
43     # Base.metadata.drop_all(engine)

 

 

 

 

【增删查改】

模型表:

 1 from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
 2 import datetime
 3 # 第一步:创建基类,以后所有类,都必须继承基类
 4 
 5 # 1 老版本创建基类(不建议)
 6 # from sqlalchemy.ext.declarative import declarative_base
 7 # Base = declarative_base()
 8 
 9 # 2 新版本创建基类
10 from sqlalchemy.orm import DeclarativeBase
11 
12 
13 class Base(DeclarativeBase):
14     pass
15 
16 
17 # 第二步:写个类,继承
18 # 第三步:写字段:传统方式,类型方式
19 class User(Base):
20     __tablename__ = 'user'  # 指定表名
21     id = Column(Integer, primary_key=True, autoincrement=True)
22     name = Column(String(32), index=True, nullable=False)  # name列varchar32,索引,不可为空
23     email = Column(String(32), unique=True)  # email 列,varchar32,唯一
24     # datetime.datetime.now不能加括号,加了括号,以后永远是当前时间
25     ctime = Column(DateTime, default=datetime.datetime.now)
26     extra = Column(Text, nullable=True)
27 
28     def __str__(self):
29         return self.name
30 
31     # 放在别的对象里面触发的是repr
32     def __repr__(self):
33         return self.name
34 
35 
36 if __name__ == '__main__':
37     from sqlalchemy import create_engine
38 
39     engine = create_engine(
40         "mysql+pymysql://root:[email protected]:3306/test2?charset=utf8",
41         max_overflow=0,  # 超过连接池大小外最多创建的连接
42         pool_size=5,  # 连接池大小
43         pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
44         pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
45     )
46     # 第四步:创建表(不能创建库,只能新增或删除表,不能增加删除字段)
47     Base.metadata.create_all(engine)
48 
49     # 第五步:删除表
50     # Base.metadata.drop_all(engine)

 

 

 1 from models import User
 2 # 第一步:创建engine
 3 from sqlalchemy import create_engine
 4 
 5 engine = create_engine(
 6     "mysql+pymysql://root:[email protected]:3306/test2?charset=utf8",
 7     max_overflow=0,  # 超过连接池大小外最多创建的连接
 8     pool_size=5,  # 连接池大小
 9     pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
10     pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
11 )
12 
13 # 第二步:创建 session对象---老方式
14 # from sqlalchemy.orm import sessionmaker
15 # Session = sessionmaker(bind=engine)
16 # session=Session()
17 # 第二步:创建 session对象---新方式(推荐)
18 from sqlalchemy.orm import Session
19 
20 session = Session(engine)
21 
22 # 第三步:使用session对象
23 # 1 新增
24 user1 = User(name='jh', email='[email protected]')
25 user2 = User(name='cxx', email='[email protected]')
26 # session.add(user)
27 session.add_all([user1, user2])
28 session.commit()  # 提交事务
29 session.close()
30 
31 # 2 搜索--》简单
32 res = session.query(User).filter_by(name='cxx').all()
33 res = session.query(User).filter_by(id=1).all()
34 print(res)  # [<models.User object at 0x000002601E3A8610>]
35 print(res[0])
36 # 打印的都是对象,重写方法
37 # def __repr__(self):
38 #     return self.name
39 
40 # 3 删除
41 res = session.query(User).filter_by(name='cxx').delete()
42 print(res)
43 session.commit()
44 
45 # 4 修改
46 res = session.query(User).filter_by(id=1).update({'name': 'ooo'})
47 session.commit()
48 user = session.query(User).filter_by(id=1).first()
49 print(user.id)
50 user.name = 'uuii'
51 session.add(user)  # [id字段在不在] 如果对象不存在,就是新增,如果对象存在,就是修改
52 session.commit()

 

 

 

【表关系】

一对多关系

模型表基于上述模型表新增

 1 from sqlalchemy.orm import relationship
 2 
 3 # 一对多关系
 4 class Hobby(Base):
 5     __tablename__ = 'hobby'
 6     id = Column(Integer, primary_key=True)
 7     caption = Column(String(50), default='篮球')
 8 
 9     def __str__(self):
10         return self.caption
11 
12 
13 class Person(Base):
14     __tablename__ = 'person'
15     nid = Column(Integer, primary_key=True)
16     name = Column(String(32), index=True, nullable=True)
17     # hobby指的是tablename而不是类名
18     hobby_id = Column(Integer, ForeignKey("hobby.id"))  # 一个爱好,可以被多个人喜欢,一个人只能喜欢一个爱好
19 
20     # 跟数据库无关,不会新增字段,只用于快速链表操作
21     # 类名,backref用于反向查询
22     hobby = relationship('Hobby', backref='pers')
23 
24     def __str__(self):
25         return self.name
26 
27     def __repr__(self):
28         return self.name

 

 

 1 from models import Person, Hobby
 2 # 第一步:创建engine
 3 from sqlalchemy import create_engine
 4 
 5 engine = create_engine(
 6     "mysql+pymysql://root:[email protected]:3306/test2?charset=utf8",
 7     max_overflow=0,  # 超过连接池大小外最多创建的连接
 8     pool_size=5,  # 连接池大小
 9     pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
10     pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
11 )
12 
13 # 第二步:创建 session对象---老方式
14 from sqlalchemy.orm import Session
15 
16 session = Session(engine)
17 
18 if __name__ == '__main__':
19     # 笨办法增加
20     # 1 先增加一个hobby
21     hobby = Hobby()
22     session.add(hobby)
23     session.commit()
24     # 2 增加Person---》必须要有hobby_id
25     person = Person(name='wxx', hobby_id=1)
26     session.add(person)
27     session.commit()
28 
29     # 简便方法--》增加person的同时,增加了Hobby
30     person = Person(name='lqz', hobby=Hobby(caption='乒乓球'))
31     session.add(person)
32     session.commit()
33     hobby = session.query(Hobby).filter_by(id=1).first()
34     person = Person(name='xh', hobby=hobby)
35     session.add(person)
36     session.commit()
37 
38     # 基于对象的跨表查询--->正向
39     person = session.query(Person).filter_by(nid=2).first()
40     print(person)
41     print(person.hobby_id)
42     print(person.hobby)
43 
44     # 基于对象的跨表查询--->反向
45     hobby = session.query(Hobby).filter_by(id=1).first()
46     print(hobby.caption)
47     print(hobby.pers)

 

【scoped线程安全】

# 1 如果我们把session 做成全局 单例   

   -每个视图函数,用同一个session,有问题   

#2 如果在每个视图函数中,都对session实例化一次   

   -代码有点麻烦   

# 3 全局就用一个session对象,它在不同线程中---》都是这个线程自己的

 

 1 from models import User
 2 import threading
 3 # 第一步:创建engine
 4 from sqlalchemy import create_engine
 5 
 6 engine = create_engine(
 7     "mysql+pymysql://root:[email protected]:3306/sqlalchemy001?charset=utf8",
 8     max_overflow=0,  # 超过连接池大小外最多创建的连接
 9     pool_size=5,  # 连接池大小
10     pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
11     pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
12 )
13 
14 # 第二步:创建 线程安全的 session
15 from sqlalchemy.orm import sessionmaker
16 from sqlalchemy.orm import scoped_session
17 
18 Session = sessionmaker(bind=engine)
19 session = scoped_session(Session)
20 
21 
22 # 第三步:正常使用-->再flask中,使用全局的session即可,实现:不同线程使用线程自己的session对象
23 def task(se, i):
24     session = se()
25     session.add(User(name='xxx', email=f'{i}@qq.com'))
26     session.commit()
27     print('=========', session)
28 
29 
30 if __name__ == '__main__':
31     l = []
32     for i in range(10):
33         t = threading.Thread(target=task, args=[session, i])
34         t.start()
35         l.append(t)
36     for i in l:
37         i.join()

 

标签:engine,__,sqlalchemy,create,session,import
From: https://www.cnblogs.com/liuliu1/p/18253128

相关文章

  • NoSuchModuleError: Can‘t load plugin: sqlalchemy.dialects:clickhouse解决方案
    NoSuchModuleError:Can'tloadplugin:sqlalchemy.dialects:clickhouse解决方案:全面解析问题概述当您使用SQLAlchemy连接ClickHouse数据库时,遇到NoSuchModuleError:Can'tloadplugin:sqlalchemy.dialects:clickhouse错误时,这意味着无法加载ClickHouse方言插件。......
  • 关于 python 循环和 sqlalchemy
    defgetBeforePoint(userId):today=datetime.now()子查询=(db.session.query(T_user_point.acquired_at、M_promotion_code.valid_days、T_user_promotion_code.promotion_code_id、T_user_point.user......
  • sqlite 不支持毫秒怎么办,可以用sqlalchemy自定义类型
    fromsqlalchemyimportDECIMAL,Index,String,Date,Integer,Text,CHAR,SmallInteger,Float,Time,case,and_,extract,Boolean,Enum,TypeDecorator#自定义类型classDateTimeString(TypeDecorator):impl=Stringdefprocess_bind_param(self,value......
  • SQLAlchemy
    SQLAlchemy是一个Python的ORM(对象关系映射)库,它提供了一种将关系型数据库中的表映射为Python对象的方式。在SQLAlchemy中,joinedload和subqueryload是两种常用的加载策略,用于优化关联数据的加载方式。joinedloadjoinedload是一种预先加载(eagerloading)策略,它使用JOIN......
  • db.create_all() 报错上下文?flask_sqlalchemy创建数据库找不到上下文?
    问题报错:RuntimeError:Workingoutsideofapplicationcontext.Thistypicallymeansthatyouattemptedtousefunctionalitythatneededthecurrentapplication.Tosolvethis,setupanapplicationcontextwithapp.app_context().Seethedocumentationform......
  • mysqlalchemy audit extension
    mysqlalchemyauditextensionhttps://sqlalchemy-declarative-extensions.readthedocs.io/en/stable/audit_tables.htmlfromsqlalchemyimportColumn,typesfromsqlalchemy.ormimportdeclarative_basefromsqlalchemy_declarative_extensionsimportdeclarative_......
  • 【flask sqlalchemy】A,B两个模型,A是父级模型,B是子级模型。 B创建依赖A模型的id。 如何
    fromflaskimportFlaskfromflask_sqlalchemyimportSQLAlchemyapp=Flask(__name__)app.config['SQLALCHEMY_DATABASE_URI']='数据库连接字符串'db=SQLAlchemy(app)#定义父级模型AclassA(db.Model):id=db.Column(db.Integer,primary_key=Tr......
  • SQLAlchemy中filter()和filter_by()有什么区别
    1.filter用类名.属性名,比较用==,filter_by直接用属性名,比较用=2.filter不支持组合查询,只能连续调用filter来变相实现。session.query(Dashboard).filter(Dashboard.id.in_(dashboard_ids_int)) .all()dashboard=(db.session.query(Dashboard).filter_by(id=dashboard_......
  • SQLAlchemy 2.0 中文文档翻译完成
    SqlAlchemy2.0中文文档概述SQLAlchemyUnifiedTutorial建立连接-Engine处理事务和DBAPI处理数据库元数据处理数据使用插入语句使用SELECT语句使用UPDATE和DELETE语句使用ORM进行数据操作处理ORM相关对象进一步阅读SQLAlchemyORMORM快速入门ORM......
  • pandas读取sql文件出现:告警UserWarning: pandas only supports SQLAlchemy connectabl
    ​错误原因:导入sql的方式更新了解决方法:importpandasaspdfromsqlalchemyimportcreate_engineMYSQL_HOST='localhost'MYSQL_PORT='3306'MYSQL_USER='root'MYSQL_PASSWORD='123456'MYSQL_DB='cldk_data'engine=......