controller.py
from model import HistoryModel
from view import HistoryView
class HistoryController:
def __init__(self):
self.model = HistoryModel()
self.view = HistoryView()
def show_records(self):
# view视图,获取记录,model取数据,view展示
records = self.model.get_all_records()
self.view.show_records(records)
def show_paging(self, page_size: int, page_number: int):
page_info, page_number_data = self.model.all_records_paging2(
page_size, page_number
)
self.view.show_paging(page_info, page_number_data)
def like_search(self, k):
s = self.model.like_search_records(k)
self.view.like_search(s)
def glob_search(self, k):
s = self.model.glob_search_records(k)
self.view.glob_search(s)
def in_search(self, k):
s = self.model.in_search_records(k)
self.view.in_search(s)
def export_xlsx(self, k):
if k == "Y" :
filewhere = self.model.export_op()
self.export_xlsx(filewhere)
def run(self):
while True:
# view视图,展示给用户读数据。所见即所得
user_input = self.view.prompt_for_input()
if user_input.lower() == "exit":
break
if user_input.lower() == "delete":
id = input("id:")
self.model.delete_one_record(id)
self.show_records()
continue
if user_input.lower() == "clear":
self.model.clear_all_record()
self.show_records()
continue
if user_input.lower() == "q":
s1 = int(input("数:"))
s2 = int(input("位:"))
self.show_paging(page_size=s1, page_number=s2)
continue
if user_input.lower() == "s":
s1 = input("s:")
self.like_search(s1)
self.in_search(s1)
self.glob_search(s1)
continue
if user_input.lower() == "export":
s1 = input("export:Y or N")
self.export_xlsx(s1)
continue
# 调用model.add_record添加记录
self.model.add_record(user_input)
self.show_paging(10, 2)
self.model.close()
if __name__ == "__main__":
controller = HistoryController()
controller.run()
model.py
import sqlite3
import pandas as pd
# 假设数据库文件是 history.db
db_path = "history.db"
# 定义一个上下文管理器,创建与数据库的连接
class DatabaseConnection:
def __init__(self, path):
self.path = path
def __enter__(self):
self.connection = sqlite3.connect(self.path)
self.cursor = self.connection.cursor()
return self.cursor
def __exit__(self, exc_type, exc_value, traceback):
self.connection.commit()
self.connection.close()
class Pagination:
"""
分页使用的类,其中list列表分页
属性:
方法:
list_pagination(self, data: list, page_size: int, page_number: int): 列表分页
"""
def __init__(self) -> None:
pass
def list_pagination(self, data: list, page_size: int, page_number: int):
"""
对给定数据列表第一维进行分页处理。
:param data: 要分页的数据列表
:param page_size: 每一页的大小
:param page_number: 当前页码
:return: pagination_info, page_number_data
当前页的数据列表
分页结果的字典: 数据总条数、总页面数、所在当前页面、当前数据范围、剩余多少页面、前有多少页面
"""
# 计算总页数
total_count = len(data) # 总条数
total_pages = (total_count + page_size - 1) // page_size # 总页面数,向上取整
# 获取分页页码总数后,检查当前页码是否有效,如果不是,则修正为有效的页码
page_number = max(1, min(page_number, total_pages))
# 计算当前页的起始索引和结束索引
start_index = (page_number - 1) * page_size
end_index = min(start_index + page_size, total_count)
# 获取当前页的数据
page_number_data = data[start_index:end_index]
# 当前数据范围
page_number_data_rang = (start_index + 1, end_index)
# 分页结果的字典: 数据总条数、总页面数、所在当前页面、当前数据范围、剩余多少页面、前有多少页面
pagination_info = {
"total_count": total_count,
"total_pages": total_pages,
"current_page": page_number,
"page_number_data_rang": page_number_data_rang,
"have_next": page_number < total_pages,
"have_prev": page_number > 1,
}
return pagination_info, page_number_data
class HistoryModel:
"""
历史记录model, 增删改查
属性:
self.auto_incremented_data = [] 全局变量,给分页后数据缓存
方法:
"""
def __init__(self):
self.create_table()
self.create_trigger()
# 将不连续的ID转换为自增的ID,加进列表重新排序
self.auto_incremented_data = []
def auto_incremented_data_use(self, rows):
"""把获取到的压入列表"""
# 为每一行数据添加连续的自增ID
self.auto_incremented_data.clear()
for index, row in enumerate(rows):
self.auto_incremented_data.append((index + 1, row[0], row[1]))
# 建表
def create_table(self):
with DatabaseConnection(db_path) as self.cursor:
self.cursor.execute(
"""
CREATE TABLE IF NOT EXISTS history (
id INTEGER PRIMARY KEY,
record TEXT NOT NULL
)
"""
)
self.cursor.execute(
"""
CREATE TABLE IF NOT EXISTS audit(
his_id INT NOT NULL,
his_date TEXT NOT NULL
)
"""
)
# 创建触发器
def create_trigger(self):
with DatabaseConnection(db_path) as self.cursor:
self.cursor.execute(
"""
CREATE TRIGGER IF NOT EXISTS audit_log AFTER INSERT ON history
BEGIN
INSERT INTO audit(his_id, his_date) VALUES (new.ID, datetime('now'));
END
"""
)
def add_record(self, record):
"""加入数据"""
with DatabaseConnection(db_path) as self.cursor:
self.cursor.execute("INSERT INTO history (record) VALUES (?);", (record,))
def get_all_records(self):
"""
获取所有数据,并压入列表排序
"""
with DatabaseConnection(db_path) as self.cursor:
self.cursor.execute("SELECT * FROM history")
rows = self.cursor.fetchall()
self.auto_incremented_data_use(rows)
return self.auto_incremented_data
def like_search_records(self, search_term):
"""LIKE查询"""
search_term = "%" + search_term + "%"
with DatabaseConnection(db_path) as self.cursor:
self.cursor.execute(
"SELECT * FROM history WHERE record LIKE ?", (search_term,)
)
rows = self.cursor.fetchall()
return rows
def glob_search_records(self, search_term):
"""
GLOB查询
http://www.17bigdata.com/book/sqlite/SQLITEJiChuJiaoCheng/SQLITEGLOBZiJu.html
sqlite 的 GLOB 运算符是用来匹配通配符指定模式的文本值。如果搜索表达式与模式表达式匹配,GLOB 运算符
将返回真(true),也就是 1。与 LIKE 运算符不同的是,GLOB 是大小写敏感的,对于下面的通配符,它遵循 UNIX 的语法。
星号 (*)
问号 (?)
星号(*)代表零个、一个或多个数字或字符。问号(?)代表一个单一的数字或字符。这些符号可以被组合使用。
"""
search_term = "*" + search_term + "*"
with DatabaseConnection(db_path) as self.cursor:
self.cursor.execute(
"SELECT * FROM history WHERE record GLOB ?", (search_term,)
)
rows = self.cursor.fetchall()
return rows
def in_search_records(self, search_term):
"""IN查询"""
with DatabaseConnection(db_path) as self.cursor:
self.cursor.execute(
"SELECT * FROM history WHERE record in(?)", (search_term,)
)
rows = self.cursor.fetchall()
return rows
def all_records_paging1(self, data: list, page_size: int, page_number: int):
"""
进行分页1,list实现
"""
self.get_all_records()
page_number_data, pagination_info = Pagination().list_pagination(
data, page_size, page_number
)
return pagination_info, page_number_data
def all_records_paging2(self, page_size: int, page_number: int):
"""
进行分页2,sql实现
"""
# 计算总条数、分页查询数据
with DatabaseConnection(db_path) as self.cursor:
sql_total = "SELECT COUNT(*) FROM history"
self.cursor.execute(sql_total)
total_records = self.cursor.fetchall()[0][0]
print(total_records)
# 计算分页总数
total_pages = total_records // page_size
if total_records % page_size > 0:
total_pages += 1
# 检查分页是否超出范围
if page_number < 1 or total_pages == 0:
page_number = 1
elif page_number > total_pages:
page_number = total_pages
# 执行分页查询 # OFFSET
offset = (page_number - 1) * page_size
sql_paging = "SELECT * FROM history LIMIT ? OFFSET ?"
self.cursor.execute(
sql_paging,
(
page_size,
offset,
),
)
page_number_data = self.cursor.fetchall()
# 当前数据范围
page_number_data_end = offset + len(page_number_data)
page_number_data_rang = (
min(offset + 1, page_number_data_end),
page_number_data_end,
)
# 分页结果的字典: 数据总条数、总页面数、所在当前页面、当前数据范围、剩余多少页面、前有多少页面
pagination_info = {
"total_count": total_records,
"total_pages": total_pages,
"current_page": page_number,
"page_number_data_rang": page_number_data_rang,
"have_next": page_number < total_pages,
"have_prev": page_number > 1,
}
return pagination_info, page_number_data
def views_create(self): ...
def delete_one_record(self, delete_auto_id):
"""
删除已序列化的某一行,即用列表又排了一次序的
"""
try:
# 检查输入类型
delete_auto_id = int(delete_auto_id)
if not isinstance(delete_auto_id, int):
raise TypeError("arg must be of type int")
# 获取次列表缓存,更新对应关系
except Exception as e:
return
self.get_all_records()
with DatabaseConnection(db_path) as self.cursor:
# 获取要删除的原始ID
# print(self.auto_incremented_data)
original_id_to_delete = self.auto_incremented_data[delete_auto_id - 1][1]
# print(original_id_to_delete)
self.cursor.execute(
"DELETE FROM history WHERE id = (?);", (original_id_to_delete,)
)
self.cursor.fetchall()
def export_op(self):
conn = sqlite3.connect(db_path)
table_name = "data"
df = pd.read_sql_query(f"SELECT * FROM history", conn)
df.to_excel(f"{table_name}.xlsx", index=False)
conn.close()
return f"{table_name}.xlsx"
def clear_all_record(self):
"""清除所有数据"""
with DatabaseConnection(db_path) as self.cursor:
self.cursor.execute("DELETE FROM history;")
self.cursor.fetchall()
def close(self):
pass
view.py
class HistoryView:
def show_records(self, records):
for record in records:
print(f"{record[0]}: {record[2]}")
def show_paging(self, page_info, page_number_data):
for i in page_number_data:
print(i[1])
print(page_info)
def like_search(self, search_term):
print(search_term)
def glob_search(self, search_term):
print(search_term)
def in_search(self, search_term):
print(search_term)
def export_print(self, file_where):
print(file_where)
def prompt_for_input(self):
return input("请输入新的历史记录:")
标签:历史记录,SQLite,search,self,number,MVC,data,page,def
From: https://www.cnblogs.com/huangjinbang1996/p/18624302