日志记录工具
代码(log_util.py)
import os
import logging
import logging.config
USER_PATH = '' # 日志文件夹
BASIC_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)))
if USER_PATH:
if not os.path.isdir(USER_PATH):
raise Exception('USER_PATH 目录不存在')
if not os.path.isdir(os.path.join(BASIC_PATH, 'logs')):
os.makedirs(os.path.join(BASIC_PATH, 'logs'))
PATH = os.path.join(BASIC_PATH, 'logs')
else:
if not os.path.isdir(os.path.join(USER_PATH, 'logs')):
os.makedirs(os.path.join(USER_PATH, 'logs'))
PATH = os.path.join(USER_PATH, 'logs')
class Logger:
simple = '[%(asctime)s][%(levelname)s][%(filename)s:%(lineno)d]>>>: %(message)s'
standard = '%(levelname)s [%(asctime)s][%(pathname)s:%(lineno)d][%(process)d][%(processName)s][%(thread)d][%(threadName)s]>>>: %(message)s'
default_log_path = '{}/{}'.format(PATH, 'log.log')
app_log_path = '{}/{}'.format(PATH, 'app.log')
config_dic = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'simple': {
'format': simple,
'datefmt': '%H:%M:%S'
},
'standard': {
'format': standard,
'datefmt': '%Y-%m-%d %H:%M:%S'
}
},
'handlers': {
'console': {
'level': 'DEBUG',
'class': 'logging.StreamHandler',
'formatter': 'simple'
},
'default_file': {
'level': 'DEBUG',
'class': 'logging.handlers.RotatingFileHandler',
'formatter': 'standard',
'filename': default_log_path,
'maxBytes': 1024 * 1024 * 20,
'backupCount': 1,
'encoding': 'utf-8',
},
'app_file': {
'level': 'DEBUG',
'class': 'logging.handlers.RotatingFileHandler',
'formatter': 'standard',
'filename': app_log_path,
'maxBytes': 1024 * 1024 * 20,
'backupCount': 1,
'encoding': 'utf-8',
}
},
'loggers': {
'default': {
'handlers': ['console', 'default_file'],
'level': 'INFO',
'propagate': False,
},
'app': {
'handlers': ['console', 'app_file'],
'level': 'INFO',
'propagate': False,
}
},
}
@classmethod
def load_logger(cls, logger_name='default'):
logging.config.dictConfig(cls.config_dic)
return logging.getLogger(logger_name)
logger = Logger.load_logger()
logger_app = Logger.load_logger('app')
"""
配置 DIR_PATH 参数
from ... import logger,logger_app
logger.info('hello')
logger_app.info('hello app')
"""
RSA加解密
代码(rsa_code.py)
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# @author: A.L.Kun
# @file :
# @time : 2022/4/25 6:22
from Crypto import Random
from Crypto.PublicKey import RSA
random_generator = Random.new().read # 生成随机偏移量
# print(random_generator)
rsa = RSA.generate(2048, random_generator) # 生成一个私钥
# print(rsa)
# 生成私钥
private_key = rsa.exportKey() # 导出私钥
# print(private_key.decode())
# 生成公钥
public_key = rsa.publickey().exportKey() # 生成私钥所对应的公钥
# print(public_key.decode())
with open('rsa_private_key.pem', 'wb') as f:
f.write(private_key) # 将私钥内容写入文件中
with open('rsa_public_key.pem', 'wb') as f:
f.write(public_key) # 将公钥内容写入文件中
# !/usr/bin/env python
# -*- coding: UTF-8 -*-
# @author: A.L.Kun
# @file :
# @time : 2022/4/25 6:22
import base64
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_v1_5 as PKCS1_cipher
def get_key(key_file):
with open(key_file) as f:
data = f.read() # 获取,密钥信息
key = RSA.importKey(data)
return key
def encrypt_data(msg):
public_key = get_key('rsa_public_key.pem') # 读取公钥信息
cipher = PKCS1_cipher.new(public_key) # 生成一个加密的类
encrypt_text = base64.b64encode(cipher.encrypt(msg.encode())) # 对数据进行加密
return encrypt_text.decode() # 对文本进行解码码
def decrypt_data(encrypt_msg):
private_key = get_key('rsa_private_key.pem') # 读取私钥信息
cipher = PKCS1_cipher.new(private_key) # 生成一个解密的类
back_text = cipher.decrypt(base64.b64decode(encrypt_msg), 0) # 进行解密
return back_text.decode() # 对文本内容进行解码
msg = "A.L.Kun"
encrypt_text = encrypt_data(msg) # 加密
decrypt_text = decrypt_data(encrypt_text) # 解密
print(decrypt_text, encrypt_text)
ASE 加密
代码(ase_code.py)
import base64
from Crypto.Cipher import AES
iv = '1234567887654321'
key = 'miyaoxuyao16ziji'
data = 'hello world'
# 将原始的明文用空格填充到16字节
def pad(data):
pad_data = data
for i in range(0, 16 - len(data) % 16):
pad_data = pad_data + ' '
return pad_data
# 将明文用AES加密
def aes(key, data):
# 将长度不足16字节的字符串补齐
if len(data) < 16:
data = pad(data)
# 创建加密对象
AES_obj = AES.new(key.encode("utf-8"), AES.MODE_CBC, iv.encode("utf-8"))
# 完成加密
AES_en_str = AES_obj.encrypt(data.encode("utf-8"))
# 用base64编码一下
AES_en_str = base64.b64encode(AES_en_str)
# 最后将密文转化成字符串
AES_en_str = AES_en_str.decode("utf-8")
return AES_en_str
def aes_de(key, data):
# 解密过程逆着加密过程写
# 将密文字符串重新编码成二进制形式
data = data.encode("utf-8")
# 将base64的编码解开
data = base64.b64decode(data)
# 创建解密对象
AES_de_obj = AES.new(key.encode("utf-8"), AES.MODE_CBC, iv.encode("utf-8"))
# 完成解密
AES_de_str = AES_de_obj.decrypt(data)
# 去掉补上的空格
AES_de_str = AES_de_str.strip()
# 对明文解码
AES_de_str = AES_de_str.decode("utf-8")
return AES_de_str
if __name__ == '__main__':
data = aes(key, data)
print(data) # 加密后的文本
data = aes_de(key, data)
print(data) # 解密后的文本
连接mysql 连接池
代码(config.py)
import os
class Config(object):
"""项目配置"""
# 项目路径
ROOT_PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__).replace('\\', '/')))
# 数据库设置
_Mysql = {
'test': {
'MYSQL_HOST': 'localhost',
'MYSQL_PORT': 3306,
'MYSQL_USER': 'root',
'MYSQL_PASSWD': 'mysql',
'MYSQL_DBNAME': 'localhostdata',
'MYSQL_CHARSET': 'utf8mb4',
},
'pro': {
'MYSQL_HOST': '地址',
'MYSQL_PORT': 3306,
'MYSQL_USER': 'root',
'MYSQL_PASSWD': '123',
'MYSQL_DBNAME': 'data',
'MYSQL_CHARSET': 'utf8mb4',
}
}
_Mysql_Type = 'pro'
@classmethod
def get_mysql_config(cls):
return cls._Mysql.get(cls._Mysql_Type)
代码(connection.py)
import pymysql
from dbutils.pooled_db import PooledDB
from config import Config
mysql_config = Config().get_mysql_config()
def create_pool():
pool = PooledDB(
creator=pymysql,
host=mysql_config.get("MYSQL_HOST"),
port=mysql_config.get("MYSQL_PORT"),
user=mysql_config.get("MYSQL_USER"),
passwd=mysql_config.get("MYSQL_PASSWD"),
db=mysql_config.get("MYSQL_DBNAME"),
charset=mysql_config.get("MYSQL_CHARSET"),
mincached=0, # 初始化连接池时创建的连接数。默认为0,即初始化时不创建连接。
maxcached=0, # 池中空闲连接的最大数量。默认为0,即无最大数量限制。
maxshared=20, # 池中共享连接的最大数量。
maxconnections=100, # 被允许的最大连接数。
blocking=True, # 连接数达到最大时,新连接是否可阻塞。默认False,即达到最大连接数时,再取新连接将会报错。(建议True,达到最大连接数时,新连接阻塞,等待连接数减少再连接)
maxusage=0, # 连接的最大使用次数。默认0,即无使用次数限制。
setsession=None
)
return pool
class Connection:
_pool = create_pool()
@classmethod
def get_connection(cls):
return cls._pool.connection()
代码(msyqlbase.py)
# coding=utf-8
from connection import Connection
class MysqlBase(object):
def __init__(self):
self._connection = Connection.get_connection()
self.connect = self._connection
self.cursor = self._connection.cursor()
def close(self):
self.cursor.close()
self.connect.close()
@staticmethod
def parse_res(raw):
data = list()
if not raw:
return []
if len(raw[0]) == 1:
for item in raw:
data.append(item[0])
else:
for item in raw:
data.append(list(item))
return data
def execute_batch(self, sql, kwargs_list):
try:
self.cursor.executemany(sql, kwargs_list)
self.connect.commit()
except Exception as e:
self.connect.rollback()
raise e
finally:
self.close()
def query(self, sql, **kwargs):
self.cursor.execute(sql, kwargs)
res = self.parse_res(self.cursor.fetchall())
return res
def query_one(self, sql, **kwargs):
try:
res = self.query(sql, **kwargs)
if res:
return res[0]
else:
return ''
except Exception as e:
raise e
finally:
self.close()
def query_all(self, sql, **kwargs):
try:
return self.query(sql, **kwargs)
except Exception as e:
raise e
finally:
self.close()
def execute(self, sql, **kwargs):
try:
self.cursor.execute(sql, kwargs)
self.connect.commit()
except Exception as e:
self.connect.rollback()
raise e
finally:
self.close()
代码(mysqlservice.py) -- 目前没用到
class MysqlService(object):
def __init__(self):
self._connection = Connection.get_connection()
self.connect = self._connection
self.cursor = self._connection.cursor()
def rollback(self):
self._connection.rollback()
def commit(self):
self._connection.commit()
def transform_res(self, rows):
if rows:
if len(rows[0]) == 1:
return [item[0] for item in rows]
else:
return [list(item) for item in rows]
else:
return {}
def close(self):
self.cursor.close()
self.connect.close()
代码(change_insert_sql)
# =================== 整理批量sql插入语句 ==================
def insert_sql_to_dic_str(insert_sql):
import json
"""
把复制的insert sql 语句 转换成字典
:param insert_sql: 复制的查询sql语句
:return:_lst, _dic, _string 列表,字典,字符串
"""
insert_str = insert_sql.replace('\n', '')
start = insert_str.find('(') + 1
end = insert_str.find(')')
json_str = '[' + insert_str[start:end].replace('`', '"') + ']'
# 获取field(域)列表
_lst = json.loads(json_str)
# 获取插入字典
_dic = dict.fromkeys(_lst, '')
_result = '({})'.format(",".join(['%({})s'.format(item) for item in _lst]))
# 获取批量插入sql语句
_string = '{} VALUES {};'.format(insert_sql.partition('values')[0], _result)
return _lst, _dic, _string
if __name__ == '__main__':
insert_sql = """INSERT INTO `pc_yz_vip_level`(`id`, `level_name`, `status`) values (1, NULL, NULL);"""
lst, dic, string = insert_sql_to_dic_str(insert_sql)
print(lst) # ['id', 'level_name', 'status']
print(dic) # {'id': '', 'level_name': '', 'status': ''}
# INSERT INTO `pc_yz_vip_level`(`id`, `level_name`, `status`) VALUES (%(id)s,%(level_name)s,%(status)s);
print(string)