首页 > 数据库 >sql server导入mysql,使用python多线程

sql server导入mysql,使用python多线程

时间:2024-08-25 17:49:53浏览次数:16  
标签:NULL python DimAccount server RESTRICT DEFAULT 导入 清空 多线程

概述

在上一篇文章中,链接:https://www.cnblogs.com/xiao987334176/p/18377915

使用工具SQLyog进行导入,传输过程是单进程的,一个表一个表的传,一条条数据插入,所以传输速度会比较慢。

如果sql server mdf文件在200m左右,传输需要花费30分钟左右。

如果来了一个10GB左右的mdf的文件,需要25个小时,时间太漫长了。

 

mysql表结构重构

如果使用python多进程导入,那么导入顺序是错乱的。如果表结构包含外键关联,例如:

CREATE TABLE `DimAccount` (
  `AccountKey` int NOT NULL AUTO_INCREMENT,
  `ParentAccountKey` int DEFAULT NULL,
  `AccountCodeAlternateKey` int DEFAULT NULL,
  `ParentAccountCodeAlternateKey` int DEFAULT NULL,
  `AccountDescription` varchar(50) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
  `AccountType` varchar(50) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
  `Operator` varchar(50) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
  `CustomMembers` text COLLATE utf8mb4_unicode_ci,
  `ValueType` varchar(50) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
  `CustomMemberOptions` varchar(200) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
  PRIMARY KEY (`AccountKey`),
  KEY `FK_DimAccount_DimAccount` (`ParentAccountKey`),
  CONSTRAINT `DimAccount_ibfk_1` FOREIGN KEY (`ParentAccountKey`) REFERENCES `DimAccount` (`AccountKey`) ON DELETE RESTRICT ON UPDATE RESTRICT,
  CONSTRAINT `DimAccount_ibfk_10` FOREIGN KEY (`ParentAccountKey`) REFERENCES `DimAccount` (`AccountKey`) ON DELETE RESTRICT ON UPDATE RESTRICT,
  CONSTRAINT `DimAccount_ibfk_11` FOREIGN KEY (`ParentAccountKey`) REFERENCES `DimAccount` (`AccountKey`) ON DELETE RESTRICT ON UPDATE RESTRICT,
  CONSTRAINT `DimAccount_ibfk_2` FOREIGN KEY (`ParentAccountKey`) REFERENCES `DimAccount` (`AccountKey`) ON DELETE RESTRICT ON UPDATE RESTRICT,
  CONSTRAINT `DimAccount_ibfk_3` FOREIGN KEY (`ParentAccountKey`) REFERENCES `DimAccount` (`AccountKey`) ON DELETE RESTRICT ON UPDATE RESTRICT,
  CONSTRAINT `DimAccount_ibfk_4` FOREIGN KEY (`ParentAccountKey`) REFERENCES `DimAccount` (`AccountKey`) ON DELETE RESTRICT ON UPDATE RESTRICT,
  CONSTRAINT `DimAccount_ibfk_5` FOREIGN KEY (`ParentAccountKey`) REFERENCES `DimAccount` (`AccountKey`) ON DELETE RESTRICT ON UPDATE RESTRICT,
  CONSTRAINT `DimAccount_ibfk_6` FOREIGN KEY (`ParentAccountKey`) REFERENCES `DimAccount` (`AccountKey`) ON DELETE RESTRICT ON UPDATE RESTRICT,
  CONSTRAINT `DimAccount_ibfk_7` FOREIGN KEY (`ParentAccountKey`) REFERENCES `DimAccount` (`AccountKey`) ON DELETE RESTRICT ON UPDATE RESTRICT,
  CONSTRAINT `DimAccount_ibfk_8` FOREIGN KEY (`ParentAccountKey`) REFERENCES `DimAccount` (`AccountKey`) ON DELETE RESTRICT ON UPDATE RESTRICT,
  CONSTRAINT `DimAccount_ibfk_9` FOREIGN KEY (`ParentAccountKey`) REFERENCES `DimAccount` (`AccountKey`) ON DELETE RESTRICT ON UPDATE RESTRICT
) ENGINE=InnoDB AUTO_INCREMENT=102 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

 

那么就需要将外键关联全部删除掉,改造结果如下:

CREATE TABLE `DimAccount` (
  `AccountKey` int NOT NULL AUTO_INCREMENT,
  `ParentAccountKey` int DEFAULT NULL,
  `AccountCodeAlternateKey` int DEFAULT NULL,
  `ParentAccountCodeAlternateKey` int DEFAULT NULL,
  `AccountDescription` varchar(50) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
  `AccountType` varchar(50) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
  `Operator` varchar(50) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
  `CustomMembers` text COLLATE utf8mb4_unicode_ci,
  `ValueType` varchar(50) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
  `CustomMemberOptions` varchar(200) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
  PRIMARY KEY (`AccountKey`)
) ENGINE=InnoDB AUTO_INCREMENT=102 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

  

由于只是数据导入,我并不需要外键关联。外键关联特别麻烦,插入一条数据,还需要校验父表中的关联id,如果不匹配,就会导致插入数据失败。

上面只是举例了其中一个表,其他表需要一个个检查,由外键关键的,就全部删掉。

 

python多线程导入

这里使用python 3.x版本

安装模块

pip install pymssql
pip install pymysql

  

test.py

import pymssql
import pymysql
import json
from datetime import datetime
import multiprocessing
import time


def write_table_data(table_name):
    try:
        # 数据库名
        database_name='AdventureWorksDW2014'

        # 连接数据库
        mssql_conn = pymssql.connect(server='192.168.20.131', user='sa', password='Y.saabcd@1234', database=database_name)
        
        # 连接到MySQL数据库
        mysql_conn = pymysql.connect(
            host='192.168.20.131',  # 替换为你的数据库主机名
            user='root',  # 替换为你的数据库用户名
            password='root',  # 替换为你的数据库密码
            database=database_name  # 替换为你的数据库名
        )

        # 创建cursor对象
        mssql_cursor = mssql_conn.cursor()
        # 创建一个cursor对象
        mysql_cursor = mysql_conn.cursor()

        # 清空表
        print("清空表%s"%table_name)
        mysql_cursor.execute('TRUNCATE TABLE %s;'%table_name)
        mysql_conn.commit()

        # 执行SQL查询
        # mssql_cursor.execute("SELECT top 1 * FROM %s"%table_name)
        mssql_cursor.execute("SELECT * FROM %s"%table_name)

        mssql_rows = mssql_cursor.fetchall()
        values = ', '.join(['%s'] * len(mssql_rows[0]))
        insert_query = f'INSERT INTO {database_name}.{table_name} VALUES ({values})'
        # print(insert_query)
        #批量插入数据
        mysql_cursor.executemany(insert_query, mssql_rows)
            
        # 提交更改,在所有插入操作完成后只调用一次,减少与数据库的交互次数,提高整体性能
        mysql_conn.commit()
        print("%s 表数据导入完成"%table_name)
        
        # 关闭cursor和连接
        mssql_cursor.close()
        mssql_conn.close()

        mysql_cursor.close()
        mysql_conn.close()

    except Exception as e:
        print("程序异常",e)

if __name__ == "__main__":
    # 记录程序开始执行时间
    start_time = time.time()

    # 指定进程数
    num_processes = 30

    table_list=['FactFinance','DimAccount','DatabaseLog', 'AdventureWorksDWBuildVersion', 'DimCurrency', 'DimCustomer', 'DimDate', 'DimDepartmentGroup', 'DimEmployee', 'DimGeography', 'DimOrganization', 'DimProduct', 'DimProductCategory', 'DimProductSubcategory', 'DimPromotion', 'DimReseller', 'DimSalesReason', 'DimSalesTerritory', 'DimScenario', 'FactAdditionalInternationalProductDescription', 'FactCallCenter', 'FactCurrencyRate', 'FactInternetSales', 'FactInternetSalesReason', 'FactProductInventory', 'FactResellerSales', 'FactSalesQuota', 'FactSurveyResponse', 'NewFactCurrencyRate', 'ProspectiveBuyer']
    # table_list=['DatabaseLog']
    
    # 创建进程池,指定最大进程数
    with multiprocessing.Pool(processes=num_processes) as pool:
        # 使用pool.apply_async异步执行函数
        for table_name in table_list:
            pool.apply_async(write_table_data, args=(table_name,))

        # 等待所有异步操作完成
        pool.close()
        pool.join()


    # 记录程序执行结束的时间
    end_time = time.time()

    # 计算程序执行所需的时间
    execution_time = end_time - start_time

    print(f"程序执行时间为:{execution_time}秒")

  

执行python文件

python test.py

  

输出结果:

清空表DimScenario清空表DimSalesReason
清空表DimSalesTerritory

清空表DimDate
清空表DimCurrency
清空表DatabaseLog
清空表DimEmployee
清空表FactCurrencyRate
清空表DimProduct
清空表FactCallCenter
清空表AdventureWorksDWBuildVersion
清空表FactFinance
清空表FactProductInventory
清空表DimGeography
清空表DimOrganization
清空表DimDepartmentGroup
清空表DimProductSubcategory
清空表DimReseller
清空表FactAdditionalInternationalProductDescription
清空表DimAccount
清空表ProspectiveBuyer
清空表FactInternetSalesReason
清空表FactSalesQuota
清空表FactInternetSales
清空表NewFactCurrencyRate
清空表DimCustomer
清空表DimProductCategory
清空表FactSurveyResponse
清空表DimPromotion
DimScenario 表数据导入完成
清空表FactResellerSales
DimSalesReason 表数据导入完成
DimCurrency 表数据导入完成
DatabaseLog 表数据导入完成
DimSalesTerritory 表数据导入完成
AdventureWorksDWBuildVersion 表数据导入完成
DimDepartmentGroup 表数据导入完成
DimAccount 表数据导入完成
FactCallCenter 表数据导入完成
DimProductSubcategory 表数据导入完成
DimDate 表数据导入完成
DimOrganization 表数据导入完成
DimPromotion 表数据导入完成
DimProductCategory 表数据导入完成
FactSalesQuota 表数据导入完成
DimGeography 表数据导入完成
FactSurveyResponse 表数据导入完成
NewFactCurrencyRate 表数据导入完成
DimReseller 表数据导入完成
ProspectiveBuyer 表数据导入完成
FactCurrencyRate 表数据导入完成
FactAdditionalInternationalProductDescription 表数据导入完成
DimEmployee 表数据导入完成
DimProduct 表数据导入完成
FactFinance 表数据导入完成
FactInternetSalesReason 表数据导入完成
DimCustomer 表数据导入完成
FactResellerSales 表数据导入完成
FactInternetSales 表数据导入完成
FactProductInventory 表数据导入完成
程序执行时间为:37.30717396736145秒

  

从结果上来看,运行花费了37秒。

比用工具SQLyog,花了30分钟,快了48倍左右

 

标签:NULL,python,DimAccount,server,RESTRICT,DEFAULT,导入,清空,多线程
From: https://www.cnblogs.com/xiao987334176/p/18379220

相关文章

  • 计算机毕业设计推荐- 基于Python的高校岗位招聘数据分析平台
    ......
  • 面试官问什么?Python基础与进阶?介绍Django框架?MySQL数据库索引?
    Catalog自我介绍Python001.Python支持哪些数据类型?01.数字类型02.序列类型03.映射类型04.集合类型05.其他类型06.特点总结002.什么是模块(module),如何导入一个模块?01.导入模块02.`import`和`from...import...`的区别003.高频发问题004.数据去重005.Python中......
  • 基于python+flask框架的社区团购平台(开题+程序+论文) 计算机毕设
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容研究背景随着互联网技术的飞速发展和智能设备的普及,电子商务已渗透到人们生活的方方面面,社区团购作为一种新兴的电商模式,近年来迅速崛起并受到广泛......
  • 基于python+flask框架的基于推荐系统的电影网站系统小程序前端(开题+程序+论文) 计算机
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容研究背景在数字化时代,互联网已成为人们获取信息和娱乐的主要途径之一,电影作为大众喜爱的文化消费形式,其在线观看和推荐需求日益增长。随着电影产业......
  • 基于python+flask框架的基于WEB的咖啡销售系统(开题+程序+论文) 计算机毕设
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容研究背景在快节奏的现代生活中,咖啡已成为许多人日常生活中不可或缺的饮品,不仅因为其独特的口感和提神醒脑的功效,更因其承载了社交、休闲等多种文化......
  • 基于python+flask框架的网上电影购票系统(开题+程序+论文) 计算机毕设
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容研究背景随着互联网技术的迅猛发展和智能设备的普及,线上娱乐消费已成为人们日常生活的重要组成部分。电影作为深受大众喜爱的文化娱乐形式之一,其购......
  • 3-python之字符串
    字符串基本特点1:字符串的本质是:字符序列。2:Python不支持单字符类型,单字符也是作为一个字符串使用的。        引号创建字符串                我们可以通过单引号或双引号创建字符串a='love'print(a)#结果是:love           ......
  • 4:python之序列 (列表篇)
    序列        列表简介                1:列表:用于存储任意数目、任意类型的数据集合。        2:列表是内置可变序列,是包含多个元素的有序连续的内存空间列表的标准语法格式:a=[1,2,3,'qwe','你好']方法要点描述a.append(x)增加元......
  • Android fork 进程 process(init/Zygote/SystemServer)
    ##Android的init/Zygote/SystemServer Android手机先开机,init/Zygote/SystemServer,然后启动Framework,然后启动Launcher;【安装APP(PMS),】然后启动APP(AMS)。所有的Android应用进程都是有Zygote进程fork出来的。Android系统启动流程(一)解析init进程-http://blog.csdn.net/itach......
  • 基于python+flask框架的力悦月子会所管理系统(开题+程序+论文) 计算机毕设
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容研究背景近年来,随着人们生活水平的提高和健康意识的增强,母婴健康服务行业迎来了前所未有的发展机遇。力悦月子会所作为高端母婴护理服务的代表,其服......