首页 > 数据库 >python3数据库操作及ORM框架sqlalchemy使用

python3数据库操作及ORM框架sqlalchemy使用

时间:2024-10-09 17:02:52浏览次数:1  
标签:session sqlalchemy name ORM User query id python3 user

sqlite3

import sqlite3

try:
    # 连接到SQLite数据库,数据库文件是test.db,如果文件不存在,会自动在当前目录创建:
    conn = sqlite3.connect("test.db")

    # 创建一个Cursor
    cursor = conn.cursor()

    # 执行SQL语句
    cursor.execute("create table user (id varchar(20) primary key, name varchar(20))")
    cursor.execute("insert into user (id, name) values ('1', 'Michael')")

    # 提交事务
    conn.commit()

    # 获得插入的行数
    print(cursor.rowcount)

    # 查询
    cursor.execute("select * from user where id=?", ("1",))
    values = cursor.fetchall()
    print(values)

except:
    pass

finally:
    # 关闭Cursor
    cursor.close()

    # 关闭Connection
    conn.close()

mysql

# pip install mysql-connector-python

import mysql.connector

try:
     # 连接mysql数据库
    conn = mysql.connector.connect(user="root", password="", database="test")

    # 创建一个Cursor
    cursor = conn.cursor()

    # 执行SQL语句
    cursor.execute("create table test (id varchar(20) primary key, name varchar(20))")
    cursor.execute("insert into test (id, name) values ('1', 'Michael')")

    # 提交事务
    conn.commit()

    # 获得插入的行数
    print(cursor.rowcount)

    cursor.execute("delete from test where id=1")
    # 回滚事务
    conn.rollback()

    # 查询
    cursor.execute("select * from user where id = %s", ("1",))
    values = cursor.fetchall()
    print(values)

except:
    pass

finally:
    # 关闭Cursor
    cursor.close()

    # 关闭Connection
    conn.close()

 ORM

# SQLAlchemy是一个基于Python实现的ORM框架。该框架建立在 DB API之上,使用关系对象映射进行数据库操作,
# 简言之便是:将类和对象转换成SQL,然后使用 DB API执行SQL并获取执行结果
# pip install sqlalchemy

from sqlalchemy import create_engine
from sqlalchemy import Column, Integer, String, DateTime, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import declarative_base, scoped_session
from sqlalchemy.sql import text, func
from sqlalchemy import and_, or_
import datetime

# 初始化数据库连接:
engine = create_engine(
    "mysql+mysqlconnector://root:@localhost:3306/test?charset=utf8mb4",
    max_overflow=0,  # 超过连接池大小外最多可以创建的连接数量
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1,  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
# 获取原生连接
# conn = engine.raw_connection()

# 创建对象的基类
Base = declarative_base()


# 定义User对象:
class User(Base):
    # 表的名字:
    __tablename__ = "user"

    # 表的结构:
    id = Column(Integer, primary_key=True)
    name = Column(String(20), index=True, nullable=False)
    age = Column(Integer)
    email = Column(String(32), unique=True, nullable=False)
    ctime = Column(DateTime, default=datetime.datetime.now)
    group = Column(String(50), nullable=False)

    # 建立索引,指定字符集和引擎
    __table_args__ = (
        UniqueConstraint("group", "name", name="uidx_id_name"),  # 联合唯一
        Index("idx_name_email", "name", "email"),  # 索引
        {
            "mysql_charset": "utf8mb4",
            "mysql_engine": "InnoDB",
        },
    )


# 删除所有表
# Base.metadata.drop_all(engine)
# 把表同步到数据库
Base.metadata.create_all(engine)

# 创建DBSession类型:
DBSession = sessionmaker(bind=engine)

# 创建session对象 线程安全:
session = scoped_session(DBSession)

# 创建新User对象:
new_user = User(name="susie1", age=22, email="[email protected]", group="A")
new_user2 = User(name="daisy1", age=25, email="[email protected]", group="A")
new_user3 = User(name="lily1", age=24, email="[email protected]", group="B")
new_user4 = User(name="bob1", age=23, email="[email protected]", group="B")

# 添加到session:
# session.add(new_user)
session.add_all([new_user, new_user2, new_user3, new_user4])

# 原生sql查询
user = session.query(User).from_statement(text("SELECT * FROM user where email=:email")).params(email="[email protected]").first()

# 查询所有
users = session.query(User).all()  # 是个普通列表

# 查询某些字段
user = session.query(User.name.label("Name"), User.email).first()

# 查询:filter表达式
user = session.query(User).filter(User.name == "susie").one()
print(user.id, user.name)

user = session.query(User).filter(User.name == "susie").first()
print(user.id, user.name)

users = session.query(User).filter(User.name == "susie").all()
user = users[0]
print(user.id, user.name)

user_name = session.query(User.name).select_from(User).filter(User.id == 1).scalar()
print(user_name)

user_num = session.query(func.count("*")).select_from(User).scalar()
print(user_num)

# 查询:filter_by参数
user = session.query(User).filter_by(name="lily").first()
user = session.query(User).filter_by(id=6).first()

# 占位符
user = session.query(User).filter(text("id<:value or name=:name")).params(value=10, name="lily").first()

# and 表达式条件
user = session.query(User).filter(User.name == "susie", User.email == "[email protected]").first()

# between
users = session.query(User).filter(User.id.between(1, 8)).all()

# in
users = session.query(User).filter(User.id.in_([2, 3, 4])).all()

# 非
users = session.query(User).filter(~User.id.in_([1, 3, 4])).all()

# and or条件
users = session.query(User).filter(or_(User.id < 10, and_(User.name == "lily", User.id > 3), User.email.like("%@test.com"))).all()

# 排序
users = session.query(User).order_by(User.email.desc()).all()
# 多字段排序
users = session.query(User).order_by(User.name.desc(), User.id.asc())

# 分页
users = session.query(User)[0:10]
users = session.query(User).offset(1).limit(4).all()

# 分组查询 聚合函数
users = session.query(User.group, func.max(User.id), func.sum(User.id), func.min(User.id)).group_by(User.group).all()

# having
users = session.query(User.group, func.max(User.id), func.sum(User.id), func.min(User.id)).group_by(User.group).having(User.group == "A").all()

# 二次筛选
users = session.query(User).filter(User.id.in_(session.query(User.id).filter_by(name="lily"))).all()

# 修改
res = session.query(User).filter_by(id=1).update({"age": 24})
session.commit()
print(res)  # 影响的行数

session.query(User).filter(User.id > 0).update({User.age: User.age + 1})
session.query(User).filter(User.id > 0).update({"age": User.age + 1})

# 删除
res = session.query(User).filter_by(id=6).delete()
session.commit()
print(res)  # 影响的行数

user = session.query(User).filter_by(id=7).first()
session.delete(user)
session.flush()  # 写数据库,但并不提交
session.rollback()  # 回滚

# 批量操作
session.execute(
    User.__table__.insert(),
    [
        {"name": "john", "age": 34, "email": "[email protected]", "group": "D"},
    ],
)

# 提交即保存到数据库:
session.commit()

# 关闭session:
session.close()

关联关系

from sqlalchemy import create_engine
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy.orm import declarative_base, scoped_session
import datetime

# 初始化数据库连接:
engine = create_engine("mysql+mysqlconnector://root:@localhost:3306/test?charset=utf8mb4")

# 创建对象的基类
Base = declarative_base()


# 定义User对象:
class User(Base):
    # 表的名字:
    __tablename__ = "user"

    # 表的结构:
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(20), index=True, nullable=False, comment="名字")
    age = Column(Integer)
    email = Column(String(32), unique=True, nullable=False)
    ctime = Column(DateTime, default=datetime.datetime.now)
    group = Column(String(50), nullable=False)

    # 关联关系一对多
    books = relationship("Book", backref="person")

    # 关联关系多对多
    hobbys = relationship("Hobby", secondary="user_bobby", backref="pers")

    # 建立索引,指定字符集和引擎
    __table_args__ = (
        UniqueConstraint("group", "name", name="uidx_id_name"),  # 联合唯一
        Index("idx_name_email", "name", "email"),  # 索引
        {
            "mysql_charset": "utf8mb4",
            "mysql_engine": "InnoDB",
        },
    )


class Book(Base):
    __tablename__ = "book"

    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(20), unique=True, nullable=False)
    # 关联关系通过外键关联到user表的
    user_id = Column(Integer, ForeignKey("user.id"))

    __table_args__ = (
        {
            "mysql_charset": "utf8mb4",
            "mysql_engine": "InnoDB",
        },
    )


class Hobby(Base):
    __tablename__ = "hobby"

    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(20), unique=True, nullable=False)

    __table_args__ = (
        {
            "mysql_charset": "utf8mb4",
            "mysql_engine": "InnoDB",
        },
    )


class UserHobby(Base):
    __tablename__ = "user_bobby"

    id = Column(Integer, primary_key=True, autoincrement=True)
    user_id = Column(Integer, ForeignKey("user.id"))
    hobby_id = Column(Integer, ForeignKey("hobby.id"))


# 删除所有表
Base.metadata.drop_all(engine)
# 把表同步到数据库
Base.metadata.create_all(engine)

# 创建DBSession类型:
DBSession = sessionmaker(bind=engine)

# 创建session对象:
session = scoped_session(DBSession)

# 创建新对象:
new_user = User(name="susie", age=22, email="[email protected]", group="A")
new_user2 = User(name="daisy", age=25, email="[email protected]", group="A")
new_user3 = User(name="lily", age=24, email="[email protected]", group="B")
new_user4 = User(name="bob", age=23, email="[email protected]", group="B")
book = Book(name="dd", user_id=3)
book2 = Book(name="ee", user_id=2)

# 添加到session:
session.add_all([new_user, new_user2, new_user3, new_user4])
session.add_all([book, book2])

user = session.query(User).filter_by(name="lily").first()
user.hobbys = [Hobby(name="足球"), Hobby(name="篮球")]
user.books = [Book(name="执行力"), Book(name="纸飞机")]
session.add(user)
session.commit()

# 关联关系
user = session.query(User).filter_by(name="lily").first()
for row in user.books:
    print(row.name)
for row in user.hobbys:
    print(row.name)
print("-------------------")

book = session.query(Book).filter_by(name="ee").first()
print(book.person.name)
print("-------------------")

hobby = session.query(Hobby).filter_by(name="足球").first()
for row in hobby.pers:
    print(row.name)
print("-------------------")

# 连表查询
res = session.query(User, Book).filter(Book.user_id == User.id).all()
for row in res:
    print(row[0].name, row[1].name)
print("-------------------")

# join表,默认是inner join,自动按外键关联
res = session.query(User).join(Book).all()
for row in res:
    print(row.name)
print("-------------------")

# left join
res = session.query(User).join(Book, isouter=True).all()
for row in res:
    print(row.name)
print("-------------------")

# left join指定关联的字段
res = session.query(User).join(Book, Book.user_id == User.id, isouter=True).all()
for row in res:
    print(row.name)
print("-------------------")

# 多对多关系连表
res = session.query(User, Hobby, UserHobby).filter(User.id == UserHobby.user_id, Hobby.id == UserHobby.hobby_id).all()
res = session.query(User).join(UserHobby).join(Hobby).filter(User.id >= 2).all()


# 关闭session:
session.close()

 

标签:session,sqlalchemy,name,ORM,User,query,id,python3,user
From: https://www.cnblogs.com/caroline2016/p/18454654

相关文章

  • python3常用库之itertools使用
    无限迭代器importitertools#无限迭代器#无限序列只有在for迭代时才会无限地迭代下去,如果只是创建了一个迭代对象,它不会事先把无限个元素生成出来,事实上也不可能在内存中创建无限多个元素。natuals=itertools.count(1)odd=itertools.count(0,2)cs=itertools.cycl......
  • python3常用内置函数及常用库functools使用
    常用内置函数#lambda函数-----------------------------add=lambdaa,b,c:a+b+cprint(add(1,2,3))#6#sorted函数-----------------------------a_l=[1,3,5,7,0,-1,-9,-4,-5,8]print(sorted(a_l))#[-9,-5,-4,-1,0,1,3,5,7,8]p......
  • python3常用库之哈希hashlib和hmac使用
    hashlibimporthashlib#MD5是最常见的哈希算法,速度很快,生成结果是固定的128bit/16字节,通常用一个32位的16进制字符串表示。md5=hashlib.md5()md5.update("hello".encode())print(md5.hexdigest())#5d41402abc4b2a76b9719d911017c592#数据量很大时分块多次调用up......
  • python3常用库之collections集合库
    namedtuple#namedtuple是一个函数,它用来创建一个自定义的tuple对象,并且规定了tuple元素的个数,并可以用属性而不是索引来引用tuple的某个元素。#用namedtuple可以很方便地定义一种数据类型,它具备tuple的不变性,又可以根据属性来引用Coord=collections.namedtuple("Coord",["......
  • python3常用库之datetime库
    日期时间fromdatetimeimportdatetime,timedelta,timezonenow=datetime.now()print(now)#2024-02-0214:27:12.247121dt=datetime(2023,12,31,12,30,00)print(dt)#2023-12-3112:30:00#时间戳,和时区无关ts=dt.timestamp()print(ts)#17039970......
  • python3常用库之解析命令行参数argparse
    在命令行程序中需要获取命令行参数可以使用sys库和argparse库。sys库可用于处理简单的命令行参数,argparse库可用于处理复杂的命令行参数。#argparse解析命令行参数importargparse,sysdefmain():#定义一个ArgumentParser实例:参数分别为程序名、描述、说明信息......
  • python3常用库之Base64编码
    Base64是一种用64个字符来表示任意二进制数据的方法。importbase64by="abc中文".encode()b=base64.b64encode(by)print(by)#b'abc\xe4\xb8\xad\xe6\x96\x87'print(b)#b'YWJj5Lit5paH'by2=base64.b64decode(b)print(by2)#b'abc\xe......
  • flask_sqlalchemy连接建表
    database.pyfromflask_sqlalchemyimportSQLAlchemydb=SQLAlchemy()config.pyimportosfromdatabaseimportdbfromflaskimportFlaskbasedir=os.path.abspath(os.path.dirname(__name__))app=Flask(__name__)#Dabaseconfigurationapp.config[&#......
  • SQLAlchemy模块
    1、执行原生SQLfromsqlalchemyimportcreate_engine,text#创建engine对象engine=create_engine("sqlite:///demo.db",echo=False)withengine.connect()ascon:#先删除persons表con.execute(text('droptableifexistspersons'))#创建一个p......
  • SQLAlchemy入门:详细介绍SQLAlchemy的安装、配置及基本使用方法
    SQLAlchemy是一个流行的PythonSQL工具包和对象关系映射(ORM)框架,它为开发人员提供了一种高效、灵活的方式来与数据库进行交互。本文将详细介绍SQLAlchemy的安装、配置及基本使用方法,并通过代码示例和案例分析,帮助新手朋友快速上手。一、SQLAlchemy简介SQLAlchemy由MikeBa......