前言
本地跑一个小项目当然不想还拿mysql了
说不定要发给别人还得让别人搞个mysql,再不济得搞云
又或者整sqllite,但学习成本.............
所以要找个代替的...要是几年前,我一定会说txt都行啊,json也行啊
但是现在,我一定推荐yaml
不仅简单,而且高级!对,就是高级!
直接上代码:
# !/usr/bin/env python
# -*-coding:utf-8 -*-
"""
@File : mysql_yaml.py
@Time :2022/9/21 下午4:20
@author :umbrella
@email : 2260334995@qq.com
@version :python 3.8
@Description:使用yaml作为mysql的类,使用方法:from mysql_yaml import mysql
"""
import yaml, time
from copy import deepcopy
class MySQLYaml:
"""
需要加表时更新这句话:mysql = MySQLYaml(table_set={table_name, table_name, ...})
该文件下方需要有table_name.yml文件
增加数据:mysql.create_by_filed
删除数据:mysql.delete_by_filed
查找数据:mysql.search_by_filed
改变数据:mysql.update_by_filed
保存数据:mysql.save_data
注意:增删改数据时,尽量少用save=True,要更新整个yml文件,比较久
因此,在必要时,save=True即可。非必要时,只要操作到最后调用mysql.save_data即可
"""
def __init__(self, *, table_set: set):
self.table_set = set(table_set) # {'cp'}
self.all_filed = {}
self.table_len = {}
self.last_id = {}
self.sql_data = self._get_sql_data()
def _get_sql_data(self):
_sql_data = {}
for table in self.table_set:
with open(f'{table}.yml', 'r', encoding='utf-8') as yam_file:
result = yaml.load(yam_file, Loader=yaml.Loader)
_sql_data[table] = result
self.all_filed.update({table: list(result[0].keys())})
self.table_len[table] = len(result)
self.last_id[table] = result[-1]['id']
return _sql_data
def search_by_filed(self, table=None, need=None, **kwargs):
"""
暂时只支持一个数据表的,当然了后续可以输入数据表名字来决定
:param need: 返回的个数
:param table: 数据表名
:param kwargs: filed1: {value: xxx, extra: True}, filed1: {value: xxx, extra: False},extra表示是否精确搜索
:return:
"""
if table not in self.table_set or table not in self.sql_data:
return '找不到这张表'
if need is None or need == 0:
return []
# 找到正确的key和value
searches = {key: value for key, value in kwargs.items() if key in self.all_filed[table] and 'value' in value}
result = []
for i in range(self.table_len[table] // 1000 + 1):
result += self._do_search(searches, self.sql_data[table][1000 * i: 1000 * (i + 1)], need, 0)
return result[:need]
def _do_search(self, searches, result, num, found):
"""
搜索接口,递归,但是次数最多1000次
:param found: 已经找到的个数
:param num: 返回的个数
:param searches: filed1: {value: xxx, extra: True}, filed1: {value: xxx, extra: False},extra表示是否精确搜索
:param result: 递归的结果
:return:
"""
if not searches: # 递归结束
return result
if found >= num:
return result
_searches = deepcopy(searches)
for search_key, search_value in _searches.items():
extra = False if 'extra' not in search_value else bool(search_value['extra'])
if extra:
result = [table_data for table_data in result if search_value['value'] == table_data[search_key]]
else:
result = [table_data for table_data in result if search_value['value'] in table_data[search_key]]
searches.pop(search_key)
found += len(result)
return self._do_search(searches, result, num, found)
def delete_by_filed(self, table, del_id=None, record=None, save=False):
"""
删除整个记录,如果传id,也能删除
:param table: 数据表名
:param del_id: 要删除的id
:param record: 要删除的记录
:param save: 是否保存
:return:
"""
if table not in self.table_set or table not in self.sql_data:
return '找不到这张表'
if not record and not del_id:
return "没有记录"
if del_id: # 如果传输了id,就不看record了,也就是id优先
for data in self.sql_data[table]:
if data['id'] == del_id:
record = data
break<p>
if not record:
return '记录查找失败'
self.sql_data[table].remove(record)
if del_id:
if del_id == self.last_id[table]:
self.last_id[table] = self.sql_data[table][-1]['id']
elif record:
if record['id'] == self.last_id[table]:
self.last_id[table] = self.sql_data[table][-1]['id']
if save:
self.save_data(table)
return record
def create_by_filed(self, table, record: dict, save=False):
"""
创建整个记录,id会失效
:param table: 数据表名
:param record: 要删除的记录
:param save: 是否保存
:return:
"""
if table not in self.table_set or table not in self.sql_data:
return '找不到这张表'
record_id = self.last_id[table] + 1
record['id'] = record_id
self.sql_data[table].append(record)
self.last_id[table] += 1
if save:
self.save_data(table)
def update_by_filed(self, table, update_id, record, save=False):
"""
更新整个记录
:param table: 数据表名
:param update_id: 要更新的id
:param record: 要更新的记录
:param save: 是否保存
:return:
"""
if table not in self.table_set or table not in self.sql_data:
return '找不到这张表'
if not update_id:
return "请输入更新的id"
if not record:
return "没有记录"
if 'id' in record and update_id != record['id']:
return '不能更新id'
for data in self.sql_data[table]:
if data['id'] == update_id:
data.update(record)
break
if save:
self.save_data(table)
def save_data(self, table):
self.sql_data[table].sort(key=lambda x: x['id'])
with open(f'{table}.yml', 'w', encoding='utf-8') as sql_file:
yaml.dump(self.sql_data[table], sql_file, allow_unicode=True)
mysql = MySQLYaml(table_set={'cp'})
备注都在代码里,一学就会!
标签:return,record,self,yaml,代替,mysql,table,data,id From: https://www.cnblogs.com/zaxl932946/p/16723890.html