python 操作 PostgreSQL 数据库,线程并行修改 5w 条数据,性能优化 110
获取新 xls 表中的所有数据并整理为列表形式返回
其实修改的代码量不大,但是要考虑保留之前我们用的函数和方法还要继续能使用。
excel2sql.py 中:
在我们创建对象 OpExcel 时,为文件 url 和 sheet 名添加默认的值:
class OpExcel(object):
def __init__(
self,
url:"str类型的文件路径" = config.src_path+"\\data\\2019最新全国城市省市县区行政级别对照表(194).xls",
sheet:"excel中的表单名" = "全国城市省市县区域列表"):
当我们不给予参数时,我们打开的文件时原始文件。
当我们打开新的拥有地域名和所有经纬度的 xls 文件时像如下:
test = OpExcel(config.src_path+"\\data\\test.xls","全国城市省市县区域列表")
则我们可以查询到新的文件和数据
之前我们写过的函数 init_SampleViaProvince_name,时通过传参而返回指定省的第 1 级别的地点和地域名(list)形式
我们修改为默认不输入传输则返回所有的数据:
# 通过高德地图API查询每个地域名的经纬度,并返回为list数据
def init_SampleViaProvince_name(
self,
Province_name:"省名" = None
) ->"insert的数据,列表形式[('地域名1','1','经纬度'),('地域名2','1','经纬度')]":
geo_app = Geo_mapInterface(config.geo_key)
all_data = [self.sh_data.row_values(i) for i in range(self.rows)]
print(all_data)
if Province_name:# 生成测试用数据
cities_data=[ [ ["".join(i[0:3]),1,'test'], ["".join(i[1:3]),1,'test']][i[0]==i[1]] for i in all_data if i[0] == Province_name]
else: # 生成实际数据
# 生成第1级别地点和经纬度
cities_data=[[["".join(i[0:3]),1,i[4]],["".join(i[1:3]),1,i[4]]][i[0]==i[1]] for i in all_data[1:]]
# 继续添加第0级别的地点和经纬度:
for i in all_data:
cities_data.append(["".join(i[0:2-(i[0]==i[1])]),0,i[4]])
# for i in cities_data:
# i.append(geo_app.get_coordinatesViaaddress("".join(i[0])))
return cities_data
测试一下:
f:/workspace/env/Scripts/python.exe f:/workspace/城市距离爬取/process_data/excel2sql.py
-----
犁哈萨克自治州', 0, 12345.0], ['新疆省伊犁哈萨克自治州', 0, 12345.0], ['新疆省伊犁哈萨
(展示部分数据)
---
成功。
新的函数已经定义好,那么我们可以在主函数中同样利用之前的方法将所有的数据传入到数据库中。
优化代码,写入更便捷的打印日志系统
在 config.py 文件中写入:
# 构造打印日志格式,调用config.logger.info()即可
logging.basicConfig(stream=open(src_path + '/log/syserror.log', encoding="utf-8", mode="a"), level = logging.DEBUG, format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s')
logger = logging.getLogger(__name__)
再需要打印日志的时候再来打印日志。
比如我们初始化数据库的链接的代码:
opsql.py
#!/usr/bin/python
# -*- coding: utf-8 -*-
#__author__: stray_camel
'''
定义对mysql数据库基本操作的封装
1.数据插入
2.表的清空
3.查询表的所有数据
'''
import logging,datetime,time,sys,os
import psycopg2
import asyncio,multiprocessing,threading
# 线程池
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED
absPath = os.path.abspath(__file__) #返回代码段所在的位置,肯定是在某个.py文件中
temPath = os.path.dirname(absPath) #往上返回一级目录,得到文件所在的路径
temPath = os.path.dirname(temPath) #在往上返回一级,得到文件夹所在的路径
sys.path.append(temPath)
from public import config
class OperationDbInterface(object):
#定义初始化连接数据库
def __init__(self,
host_db : '数据库服务主机' = 'localhost',
user_db: '数据库用户名' = 'postgres',
passwd_db: '数据库密码' = '1026shenyang',
name_db: '数据库名称' = 'linezone',
port_db: '端口号,整型数字'=5432):
# 创建任务池
self.all_task = []
# 创建默认返回result,默认为失败
result={'code':'9999','message':'默认message','data':'默认data'}
try:
self.conn=psycopg2.connect(database=name_db, user=user_db, password=passwd_db, host=host_db, port=port_db)#创建数据库链接
print("创建数据库成功|postgresql, %s,user:%s,host:%s,port:%s"%(name_db,user_db,host_db,port_db))
self.cur=self.conn.cursor()
except psycopg2.Error as e:
self.conn = ""
self.cur = ""
print("创建数据库连接失败,退出运行|postgresql Error,请看系统日志!")
config.logger.exception(e)
sys.exit(0)
修改我们之前 OP 数据库的时候的清理某表数据和查询函数:
#定义对表的清空
def ini_table(self,
tablename:"表名")->"清空表数据结果":
try:
rows_affect = self.cur.execute("select count(*) from {}".format(tablename))
test = self.cur.fetchone() # 获取一条结果
self.cur.execute("truncate table {}".format(tablename))
self.conn.commit()
result={'code':'0000','message':'执行清空表操作成功','data':test[0]}
config.logger.info("清空{}表,操作数据{}条,操作:{}!".format(tablename,result['data'],result['message']))
except psycopg2.Error as e:
self.conn.rollback() # 执行回滚操作
result={'code':'9999','message':'执行批量插入异常','data':[]}
print ("数据库错误|insert_data : %s" % (e.args[0]))
config.logger.exception(e)
return result
#定义对表的清空
def ini_table(self,
tablename:"表名")->"清空表数据结果":
try:
rows_affect = self.cur.execute("select count(*) from {}".format(tablename))
test = self.cur.fetchone() # 获取一条结果
self.cur.execute("truncate table {}".format(tablename))
self.conn.commit()
result={'code':'0000','message':'执行清空表操作成功','data':test[0]}
config.logger.info("清空{}表,操作数据{}条,操作:{}!".format(tablename,result['data'],result['message']))
except psycopg2.Error as e:
self.conn.rollback() # 执行回滚操作
result={'code':'9999','message':'执行批量插入异常','data':[]}
print ("数据库错误|insert_data : %s" % (e.args[0]))
config.logger.exception(e)
return result
#查询表的所有数据
def select_all(self,
tablename:"表名")->"返回list,存放查询的结果":
try:
rows_affect = self.cur.execute("select * from {}".format(tablename))
test = self.cur.fetchall()
self.conn.commit()
result={'code':'0000','message':'查询表成功','data':test}
config.logger.info("查询{}表,共查询数据{}条,操作:{}!".format(tablename,len(result['data']),result['message']))
except psycopg2.Error as e:
self.conn.rollback() # 执行回滚操作
result={'code':'9999','message':'查询数据异常','data':[]}
print ("数据库错误| select_all,请看系统日志!")
config.logger.exception(e)
return result
以及调用完毕后的链接断开代码:
#数据库关闭
def __del__(self):
# 情况线程池的任务
if self.all_task != []:
wait(self.all_task, return_when=ALL_COMPLETED)
print(len(self.all_task))
if self.conn:
self.conn.close()
print("数据库断开链接...")
config.logger.info("数据库断开链接...")
测试一下清空操作:
if __name__ == "__main__":
op_postgre = OperationDbInterface()
# 初始化样表
result =op_postgre.ini_table("sample_data")
if result['code']=='0000':
print("操作数据{}条,操作:{}!".format(result['data'],result['message']))
else:
print(result)
(env) PS F:\workspace> & f:/workspace/env/Scripts/python.exe f:/workspace/城市距离爬取/op_postgresql/opsql.py
创建数据库成功|postgresql, linezone,user:postgres,host:localhost,port:5432
操作数据3206条,操作:执行清空表操作成功!
(env) PS F:\workspace>
ok!nice