from sqlalchemy import create_engine,Column,String,Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
HOSTNAME = '127.0.0.1'
PORT = "3306"
USERNAME = "root"
PASSWORD = "123456"
DATABASE = "xt_fastapi"
DB_URI = "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(USERNAME,PASSWORD,HOSTNAME,PORT,DATABASE)
engine = create_engine(DB_URI)
Base = declarative_base(engine)
# Session = sessionmaker(engine)
# session = Session()
# 上面两行等价于下面一行
session = sessionmaker(engine)()
class Person(Base):
__tablename__ = "person"
id = Column(Integer,primary_key=True,autoincrement=True)
# String(50)==>varchar(50)
name = Column(String(50))
age = Column(Integer)
country = Column(String(50))
# 返回字符串
def __str__(self):
return "<Person(name:%s,age:%s,country:%s)>" % (self.name,self.age,self.country)
# session会话
# 增
def add_data():
p = Person(name="angle", age=16, country="china")
p1 = Person(name="angle1",age=16,country="china")
p2 = Person(name='angle2',age=20,country='china')
session.add_all([p1,p2])
# session.add(p)
session.commit()
# 查
def serach_data():
# # 指定表,all()方法返回所有数据
# all_person = session.query(Person).all()
# # print(all_person)
# for p in all_person:
# print(p)
# 指定筛选
# all_person = session.query(Person).filter_by(name='angle1').all()
# for p in all_person:
# print(p)
# # filter(条件)
# all_person = session.query(Person).filter(Person.name.contains('angle'))
# for p in all_person:
# print(p)
# # 获取id为1的数据
# person = session.query(Person).get(1)
# print(person)
#使用first方法获取结果集中的第一条数据
person = session.query(Person).first()
print(person)
# 改
def update_data():
person = session.query(Person).first()
person.name = 'angle_update'
session.commit()
# 删
def delete_data():
person = session.query(Person).first()
session.delete(person)
session.commit()
if __name__ == "__main__":
# Base.metadata.create_all()
# add_data()
# serach_data()
# update_data()
delete_data()
标签:__,SQLAlchemy,name,person,fastapi,crud,Person,session,data From: https://www.cnblogs.com/edeny/p/17746721.html