首页 > 数据库 >【python】mysql操作封装类,sql小白也会操作mysql

【python】mysql操作封装类,sql小白也会操作mysql

时间:2023-01-07 00:45:33浏览次数:63  
标签:.__ name python self mysql value sql table condition

懒得自己写了,转载于Python pymysql 简单封装_Clown程序员的博客-CSDN博客_python 封装pymysql,保存起来
因为原来的写得用起来有点难受,就稍微改了一下
推荐到我的博客网站看:https://www.df100.ltd/286.asp

#类本体:
 
class MysqlManager(object):
 
    def __init__(self, db, user, passwd, host='localhost', port=3306, charset='utf8'):
        """
        数据库配置
        :param db:              数据库名字
        :param user:            链接的用户名
        :param passwd:          链接的密码
        :param host:            IP地址默认是:127.0.0.1  localhost
        :param port:            端口默认:3306 可修改
        :param charset:         默认转码:utf8
        """
        self.__db = db
        self.__user = user
        self.__passwd = passwd
        self.__host = host
        self.__port = port
        self.__charset = charset
        self.__connect = None
        self.__cursor = None
 
    def connect_db(self):
        """
        dbManager._connect_db()
        连接数据库
        :return:
        """
 
        params = pymysql.connect(
            host=self.__host,
            port=self.__port,
            user=self.__user,
            password=self.__passwd,
            database=self.__db,
            use_unicode=True,
            charset=self.__charset
        )
 
        self.__connect = params
        self.__cursor = self.__connect.cursor()
 
    def Close_DB(self):
        """
        dbManager._close_db()
        :return:
        """
        self.__cursor.close()
        self.__connect.close()
 
    def Establish_DB(self, DB_name):
        """
        创建数据库
        :param DB_name: 创建的数据库名字
        :return:
        """
        self.connect_db()
        try:
            self.__cursor.execute("CREATE DATABASE %S" % DB_name)
        except Exception as e:
            print('创建数据库失败,失败原因:', e)
        else:
            print('数据库创建成功')
 
    def Establish_table(self, surface_name, condition):
        """
        创建表
        :param surface_name: 要创建的表名字
        :param condition: 要创建表的条件
        :return:
        """
        self.connect_db()
        try:
            create_table = "CREATE TABLE {table_name} ({value})".format(table_name=surface_name,value=condition)
            self.__cursor.execute(create_table)
        except Exception as e:
            print("创建表失败:",e)
        else:
            print("创建表成功")
 
 
    def Insert_DB(self, table_name, insert_data):
        """
        DBManager.insert(table, insert_data)
        :param table_name: str --> table 为字符串
        :param insert_data: [a:b] --> 为列表中嵌套字典类型
        :return:
        """
        # 用户传入数据自读那列表数据,根据key, value 添加进数据库
        # 连接数据库
        self.connect_db()
        try:
            data = self.Handle_value(insert_data)
            key = data[0]
            value = data[1]
            sql = "INSERT INTO {table_name}({key}) values ({values})".format(table_name=table_name, key=key,
                                                                             values=value)
            self.__cursor.execute(sql)
            self.__connect.commit()
 
        except Exception as e:
            print('数据插入失败,失败原因:', e)
        else:
            self.Close_DB()
            print('数据插入成功')
 
    def Delete(self, table_name, condition):
        """
        dbManager.delete(table, condition)
        传入相应的条件 -- > 删除数据库中的数据
        :param table_name: 表名
        :param condition: 传入条件
        :return:
        """
        self.connect_db()
        condition_Text = ' and '.join(self.Handle_value(condition))
        try:
            # 构建sql语句
            sql = "DELETE FROM {table_name} WHERE {condition}".format(table_name=table_name, condition=condition_Text)
            self.__cursor.execute(sql)
            self.__connect.commit()
        except Exception as e:
            print('删除失败:', e)
        else:
            self.Close_DB()
            print('删除成功')
 
    def Update(self, table_name, data, condition=None):
        """
        dbManager.update(table, date,condition)
        :param table_name: 表名
        :param data: dict -> data 字典类型
        :param condition: dict -> condition 字典类型
        :return:
        """
        self.connect_db()
        update_data = ','.join(self.Handle_value(data))
        try:
            if condition is not None:
                # 处理传入的条件
                condition_data = ' and '.join(self.Handle_value(condition))
                sql = "UPDATE {table} SET {values} WHERE {conditions}".format(table=table_name, values=update_data,
                                                                              conditions=condition_data)
            else:
                sql = "UPDATE {table} SET {values}".format(table=table_name, values=update_data)
            self.__cursor.execute(sql)
            self.__connect.commit()
        except Exception as e:
            print('更新失败:', e)
        else:
            self.Close_DB()
            print('更新成功')
 
    def Select_DB(self, table_name, show_ist, condition=None, get_one=False):
        """
        查数据
        :param table_name: --> str 字符串类型
        :param show_ist: --> 列表类型
        :param condition: --> 字典类型
        :param get_one: --> 布尔类型
        :return:
        """
        self.connect_db()
        # 处理显示的数据
        shou_list = ','.join(show_ist)
        try:
            if condition is not None:
                condition_list = self.Handle_value(condition)
                condition_data = ' and '.join(condition_list)
                sql = "SELECT {key} FROM {table} WHERE {values}".format(key=shou_list, table=table_name,
                                                                        values=condition_data)
            else:
                sql = "SELECT {key} FROM {table}".format(key=shou_list, table=table_name)
 
            self.__cursor.execute(sql)
            result = list()
            for i in self.__cursor.fetchall():
                result.append(i)
                
        except Exception as e:
            print("查询失败:", e)
        else:
            self.Close_DB()
            print("查询成功")
        return result
    # todo 处理传进来的Value
    def Handle_value(self, value):
        """
        处理传进来的value
        self.deal_values(value) --> str or list
        :param value: 传进来的value
        :return:
        """
        result = []
        for k, v in value[0].items():
            if isinstance(k, int):
                if k == 0:
                    content_KEY = []
                    content_VALUE = []
                    for vs in v:
                        for kx, vx in vs.items():
                            value = self.handel_text(value=vx, ks=k)
                            content_KEY.append(str(kx))
                            content_VALUE.append(value)
                    condition_key = ','.join(content_KEY)
                    condition_value = ','.join(content_VALUE)
                    return condition_key, condition_value
                else:
                    for vs in v:
                        for kx, vx in vs.items():
                            res = self.handel_text(key=kx, value=vx, ks=k)
                            result.append(res)
 
        return result
 
    def handel_text(self, value, ks, key=None):
        """
        处理进来的条件
        :param key: 传进来的Key
        :param value: 传进来的Value
        :param ks 传进来的K值
        :return:
        """
        condition = self.Judeg_parameter(ks)
        if isinstance(value, str):
            v = ("'{value}'".format(value=value))
        else:
            v = str(value)
        if ks == 0:
            return v
        else:
            return "{key}{condition}{value}".format(key=key, condition=condition, value=v)
 
    def Judeg_parameter(self, judeg_structure):
        if judeg_structure == 1:
            return "="
        elif judeg_structure == 2:
            return ">"
        elif judeg_structure == 3:
            return "<"
        elif judeg_structure == 4:
            return ">="
        elif judeg_structure == 5:
            return "<="
 
 
def 增(dbManager,表名:str,insert_data):
    '''
     0 代表是插入数据,因为插入数据和 其他 查询 修改 更新数据条件不一样
    insert_data = [{
        0: [
            {"name": 'GFGF'},
            {"age": 3333},
            {"sex": 1}
        ]
    }]
    '''
    # 增
    return dbManager.Insert_DB(table_name=表名, insert_data=insert_data)
 
 
def 删(dbManager,表名:str,WHERE):
    # 删
    # 如果 条件很多 就在字典里加条件就可以,如果只有一条数据,就写一个字典就好
    # 0 没有任何条件
    # 1 是 = 号
    # 2 是 > 号
    # 剩下具体看 Judeg_parameter方法
    '''
    WHERE = [{
        1: [{
            "name": 'FFFF',
            "sex": 13
        }],
        2: [{
            "age": 300
        }]
    }]
    '''
    return dbManager.Delete(table_name=表名, condition=WHERE)
 
 
def 改(dbManager,表名,数据,WHERE=None):
    # 改
    # 一个是带处理条件的查询,一个是不带处理条件的查询
    # 0 没有任何条件
    # 1 是 = 号
    # 2 是 > 号
    # 剩下具体看 Judeg_parameter方法
    '''
    WHERE = [{
        1: [{
            "ID": 10,
            "name": "RRR"
        }]
    }]
    数据 = [{
        1: [{
            "name": 'TTT',
            "sex": 6
        }]
    }]
    '''
    return dbManager.Update(table_name=表名, condition=WHERE, data=数据)
    # dbManager.Update(table_name='user', data=data)                # 这个是直接修改某个表里面 字段的所有参数
 
 
def 查(dbManager,表名:str,字段:list,WHERE=None):
    """
    condition: 传入None 则没有查询条件, 传入查询条件,则查询传入条件的规则
    get_one: 查询是否查一条 还是查询所有 False 是符合条件的所有东西  True是查符合条件的一条数据
    字段 = ['用户名','密码']
    WHERE = [{
        1: [{
            "ID": 4,
            "name": "DDD"
        }]
    }]
    """
    return dbManager.Select_DB(table_name=表名, show_ist=字段, condition=WHERE, get_one=True)
 
#下面是使用代码示例
if __name__ == '__main__':
    '''
    下面是增删改查的所有使用方法
    因为可以添加多种查询条件,所以 在传入值的时候会稍微复杂一点点,但也不难明白
    [{
        0: [
            {"name": 'GFGF'},
            {"age": 3333},
            {"sex": 1}
        ],
        1: [
            {"name": 'GFGF'},
            {"age": 3333},
            {"sex": 1}
        ]
    }]
    可能是这样是数据结构。 这个KEY = 0 或者 1 是什么意思呢??
    它就是传入条件 比如 某个条件可能 xx > 1 and ff = '某某某' 
    1 2 3 4 5 这些是用来区分 = > < 符号的,具体看 Judeg_parameter 这个方法。什么对应什么
    
    插数据是因为只需要字符串就可以,所以做特殊处理。用 0 表示
    
    下面 每个方法 删 改 查 都有写 怎么使用
    '''
    db = 'mysql_test' #数据库名
    user = 'root' #用户
    passwd = '' #密码
    dbManager = MysqlManager(db='mysql_test', user='root', passwd='')
    print(查(dbManager=dbManager,表名='消息记录',字段=['*'],WHERE=[{1:[{'信息发送者':'127'}]}]))
    print(删(dbManager=dbManager,表名='消息记录',WHERE=[{1:[{'信息发送者':'127'}]}]))
    '''
    PRIMARY KEY AUTO_INCREMENT    这句话的意思是 把这个字段设置成主KEY  并且自增长
    VARCHAR(255)                  字符串类型 长度 255
    '''
    # condition = "ID INT NOT NULL PRIMARY KEY AUTO_INCREMENT,name VARCHAR(255),sex INT,age INT"
    # dbManager.Establish_table(surface_name='user', condition=condition)

标签:.__,name,python,self,mysql,value,sql,table,condition
From: https://www.cnblogs.com/xhsz/p/17032008.html

相关文章