首页 > 数据库 >python 操作 PostgreSQL 数据库,线程并行修改 5w 条数据,性能优化

python 操作 PostgreSQL 数据库,线程并行修改 5w 条数据,性能优化

时间:2023-05-15 21:11:29浏览次数:58  
标签:PostgreSQL python self print 线程 result test data 数据库

python 操作 PostgreSQL 数据库,线程并行修改 5w 条数据,性能优化 1

 

 获取新 xls 表中的所有数据并整理为列表形式返回

其实修改的代码量不大,但是要考虑保留之前我们用的函数和方法还要继续能使用。

excel2sql.py 中:

在我们创建对象 OpExcel 时,为文件 url 和 sheet 名添加默认的值:


class OpExcel(object):

    def __init__(

        self, 

        url:"str类型的文件路径" = config.src_path+"\\data\\2019最新全国城市省市县区行政级别对照表(194).xls", 

        sheet:"excel中的表单名" = "全国城市省市县区域列表"):

当我们不给予参数时,我们打开的文件时原始文件。

当我们打开新的拥有地域名和所有经纬度的 xls 文件时像如下:


test = OpExcel(config.src_path+"\\data\\test.xls","全国城市省市县区域列表")

则我们可以查询到新的文件和数据

之前我们写过的函数 init_SampleViaProvince_name,时通过传参而返回指定省的第 1 级别的地点和地域名(list)形式

我们修改为默认不输入传输则返回所有的数据:


# 通过高德地图API查询每个地域名的经纬度,并返回为list数据

    def init_SampleViaProvince_name(

        self, 

        Province_name:"省名" = None

        ) ->"insert的数据,列表形式[('地域名1','1','经纬度'),('地域名2','1','经纬度')]":

        geo_app = Geo_mapInterface(config.geo_key)

        all_data = [self.sh_data.row_values(i) for i in range(self.rows)]

        print(all_data)

        if Province_name:# 生成测试用数据

            cities_data=[ [ ["".join(i[0:3]),1,'test'], ["".join(i[1:3]),1,'test']][i[0]==i[1]] for i in all_data if i[0] == Province_name]

        else: # 生成实际数据

            # 生成第1级别地点和经纬度

            cities_data=[[["".join(i[0:3]),1,i[4]],["".join(i[1:3]),1,i[4]]][i[0]==i[1]] for i in all_data[1:]]

            # 继续添加第0级别的地点和经纬度:

            for i in all_data:

                cities_data.append(["".join(i[0:2-(i[0]==i[1])]),0,i[4]])

        # for i in cities_data:

        #     i.append(geo_app.get_coordinatesViaaddress("".join(i[0])))

        return cities_data

测试一下:


f:/workspace/env/Scripts/python.exe f:/workspace/城市距离爬取/process_data/excel2sql.py

-----

犁哈萨克自治州', 0, 12345.0], ['新疆省伊犁哈萨克自治州', 0, 12345.0], ['新疆省伊犁哈萨

(展示部分数据)

---

成功。

新的函数已经定义好,那么我们可以在主函数中同样利用之前的方法将所有的数据传入到数据库中。

 优化代码,写入更便捷的打印日志系统

在 config.py 文件中写入:


# 构造打印日志格式,调用config.logger.info()即可

logging.basicConfig(stream=open(src_path + '/log/syserror.log', encoding="utf-8", mode="a"), level = logging.DEBUG, format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s')

logger = logging.getLogger(__name__)

再需要打印日志的时候再来打印日志。

比如我们初始化数据库的链接的代码:

opsql.py


#!/usr/bin/python

# -*- coding: utf-8 -*-

#__author__: stray_camel

'''

定义对mysql数据库基本操作的封装

1.数据插入

2.表的清空

3.查询表的所有数据

'''

import logging,datetime,time,sys,os

import psycopg2

import asyncio,multiprocessing,threading

# 线程池

from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED

absPath = os.path.abspath(__file__)   #返回代码段所在的位置,肯定是在某个.py文件中

temPath = os.path.dirname(absPath)    #往上返回一级目录,得到文件所在的路径

temPath = os.path.dirname(temPath)    #在往上返回一级,得到文件夹所在的路径

sys.path.append(temPath)

from public import config

class OperationDbInterface(object):

    #定义初始化连接数据库

    def __init__(self, 

    host_db : '数据库服务主机' = 'localhost', 

    user_db: '数据库用户名' = 'postgres', 

    passwd_db: '数据库密码' = '1026shenyang', 

    name_db: '数据库名称' = 'linezone', 

    port_db: '端口号,整型数字'=5432):

        # 创建任务池

        self.all_task = []

        # 创建默认返回result,默认为失败

        result={'code':'9999','message':'默认message','data':'默认data'}

        try:

            self.conn=psycopg2.connect(database=name_db, user=user_db, password=passwd_db, host=host_db, port=port_db)#创建数据库链接

            print("创建数据库成功|postgresql, %s,user:%s,host:%s,port:%s"%(name_db,user_db,host_db,port_db))

            self.cur=self.conn.cursor()

        except psycopg2.Error as e:

            self.conn = ""

            self.cur = ""

            print("创建数据库连接失败,退出运行|postgresql Error,请看系统日志!")

            config.logger.exception(e)

            sys.exit(0)

修改我们之前 OP 数据库的时候的清理某表数据和查询函数:


#定义对表的清空

def ini_table(self,

tablename:"表名")->"清空表数据结果":

    try:

        rows_affect = self.cur.execute("select count(*) from {}".format(tablename))

        test = self.cur.fetchone()  # 获取一条结果

        self.cur.execute("truncate table {}".format(tablename))

        self.conn.commit()

        result={'code':'0000','message':'执行清空表操作成功','data':test[0]}

        config.logger.info("清空{}表,操作数据{}条,操作:{}!".format(tablename,result['data'],result['message']))

    except psycopg2.Error as e:

        self.conn.rollback()  # 执行回滚操作

        result={'code':'9999','message':'执行批量插入异常','data':[]}

        print ("数据库错误|insert_data : %s" % (e.args[0]))

        config.logger.exception(e)

    return result

    #定义对表的清空

    def ini_table(self,

    tablename:"表名")->"清空表数据结果":

        try:

            rows_affect = self.cur.execute("select count(*) from {}".format(tablename))

            test = self.cur.fetchone()  # 获取一条结果

            self.cur.execute("truncate table {}".format(tablename))

            self.conn.commit()

            result={'code':'0000','message':'执行清空表操作成功','data':test[0]}

            config.logger.info("清空{}表,操作数据{}条,操作:{}!".format(tablename,result['data'],result['message']))

        except psycopg2.Error as e:

            self.conn.rollback()  # 执行回滚操作

            result={'code':'9999','message':'执行批量插入异常','data':[]}

            print ("数据库错误|insert_data : %s" % (e.args[0]))

            config.logger.exception(e)

        return result

    #查询表的所有数据

    def select_all(self, 

    tablename:"表名")->"返回list,存放查询的结果":

        try:

            rows_affect = self.cur.execute("select * from {}".format(tablename))

            test = self.cur.fetchall()

            self.conn.commit()

            result={'code':'0000','message':'查询表成功','data':test}

            config.logger.info("查询{}表,共查询数据{}条,操作:{}!".format(tablename,len(result['data']),result['message']))

        except psycopg2.Error as e:

            self.conn.rollback()  # 执行回滚操作

            result={'code':'9999','message':'查询数据异常','data':[]}

            print ("数据库错误| select_all,请看系统日志!")

            config.logger.exception(e)

        return result

以及调用完毕后的链接断开代码:


    #数据库关闭

    def __del__(self):

        # 情况线程池的任务

        if self.all_task != []:

            wait(self.all_task, return_when=ALL_COMPLETED)

            print(len(self.all_task))

        if self.conn:

            self.conn.close()

            print("数据库断开链接...")

            config.logger.info("数据库断开链接...")

测试一下清空操作:


if __name__ == "__main__":

    op_postgre = OperationDbInterface()

    # 初始化样表

    result =op_postgre.ini_table("sample_data")

    if result['code']=='0000':

        print("操作数据{}条,操作:{}!".format(result['data'],result['message']))

    else:

        print(result)


(env) PS F:\workspace> & f:/workspace/env/Scripts/python.exe f:/workspace/城市距离爬取/op_postgresql/opsql.py

创建数据库成功|postgresql, linezone,user:postgres,host:localhost,port:5432

操作数据3206条,操作:执行清空表操作成功!

(env) PS F:\workspace>

ok!nice

 同样采用线程并发池,来 op 数据库,修改之前的代码,优化

标签:PostgreSQL,python,self,print,线程,result,test,data,数据库
From: https://www.cnblogs.com/yaoyangding/p/17403140.html

相关文章

  • python openpyxl 操作excel
     openpyxl是从1开始计数,遍历操作时请用1为起始点,且总数加一,例:foriinrange(1,n+1) 单元格样式所需导入模块fromopenpyxl.stylesimportAlignment,Font,Border,Side,PatternFill样式#设置边框线align=Alignment(horizontal='center',vertical='c......
  • SICP:元循环求值器(Python实现)
    求值器完整实现代码我已经上传到了GitHub仓库:TinySCM,感兴趣的童鞋可以前往查看。这里顺便强烈推荐UCBerkeley的同名课程CS61A。在这个层次结构的最底层是对象语言。对象语言只涉及特定的域,而不涉及对象语言本身(比如它们的文法规则,或其中的其体句子)。如要涉及它们,则要有一种元......
  • Java并发(六)----线程start、run、state方法
    1、start与run调用runpublicstaticvoidmain(String[]args){  Threadt1=newThread("t1"){    @Override    publicvoidrun(){      log.debug(Thread.currentThread().getName());//打印线程名称      FileRe......
  • Python基础语法入门
    Python基础语法入门1、Python的注释符号1、什么是注释?#学习任何一门代码,先学注释,注释是代码之母注释就是对一段代码的解释说明,它不会参与到代码的运行,只是起到提示作用2、如何注释?2.1、#单行注释#它可以使用快捷键帮助我们把代码写的更加规范快捷键:Ctrl+alt+1(格式......
  • Java并发(五)----线程常见方法总结
    常见方法方法名static功能说明注意start() 启动一个新线程,在新的线程运行run方法中的代码start方法只是让线程进入就绪,里面代码不一定立刻运行(CPU的时间片还没分给它)。每个线程对象的start方法只能调用一次,如果调用了多次会出现IllegalThreadStateException......
  • 了解Python的基本数据类型
    引入 我们学习变量是为了让计算机能够记忆事物的某种状态,而变量的值就是用来存储事物状态的,很明显事物的状态是分成不同种类的(例如水的液态,气态和固态),所以变量值也应该有不同的类型 一、数字类型int(整型)float(浮点型)不可变数据类型int,整型,是没有小数点的数字......
  • < Python全景系列-2 > Python数据类型大盘点
    <Python全景系列-2>Python数据类型大盘点欢迎来到我们的系列博客《Python全景系列》!在这个系列中,我们将带领你从Python的基础知识开始,一步步深入到高级话题,帮助你掌握这门强大而灵活的编程语法。无论你是编程新手,还是有一定基础的开发者,这个系列都将提供你需要的知识和技能。Py......
  • python基础学习-读写CSV文件
    CSV文件介绍参考:Python-Core-50-Courses/第23课:用Python读写CSV文件.mdatmaster·jackfrued/Python-Core-50-Courses(github.com)CSV 全称逗号分隔值文件是一种简单、通用的文件格式,被广泛的应用于应用程序(数据库、电子表格等)数据的导入和导出以及异构系统之间的数据......
  • python基础学习-用Python操作Word和PowerPoint
    参考链接:Python-Core-50-Courses/第26课:用Python操作Word文件和PowerPoint.mdatmaster·jackfrued/Python-Core-50-Courses(github.com)......
  • python基础学习-用Python读写Excel文件
    参考链接:Python-Core-50-Courses/第24课:用Python读写Excel文件-1.mdatmaster·jackfrued/Python-Core-50-Courses(github.com)Python-Core-50-Courses/第25课:用Python读写Excel文件-2.mdatmaster·jackfrued/Python-Core-50-Courses(github.com)......