首页 > 编程语言 >python从旧库中导出csv并导入新库

python从旧库中导出csv并导入新库

时间:2024-11-15 10:29:16浏览次数:1  
标签:新库 旧库 name database python connection table path csv

在线的游戏, 迁移数据库, 数据比较大, 游戏不能停很久, 先使用sqldump导入不变的表, 再使用python导出可变的表到csv文件, 导入到新库. 找出各表中csv中最大的id, 然后停服,  然后根据各表的id, 从id位置开始再导出新增数据, 再导入到新库.

export.py

"""
导出msql表格
"""
import csv
import os
import shutil

import pymysql


#删除文件夹
def deleteFolder(path):
    if not os.path.isdir(path):
        return
    try:
        shutil.rmtree(path)
        print(f"成功删除文件夹 {path}")
    except OSError as e:
        print(f"删除文件夹 {path} 失败: {e}")
def clearDir(path):
    if not os.path.isdir(path):
        return
    try:
        files = os.listdir(path)
        for cur in files:
            fullPath = os.path.join(path, cur)
            if os.path.isfile(fullPath):
                os.remove(fullPath)
            elif os.path.isdir(fullPath):
                deleteFolder(fullPath)
        print(f"成功清空文件夹 {path}")
    except OSError as e:
        print(f"清空文件夹 {path} 失败: {e}")

def createEmptyFolder(folderName):
    """
    创建空文件夹
    """
    if os.path.exists(folderName):
        clearDir(folderName)
    else:
        os.makedirs(folderName, exist_ok=True)
def export_table(db_config, table_name, id_col, export_start_id, chunk_size = 100000):
    # 创建数据库连接
    connection = pymysql.connect(**db_config)
    cursor = connection.cursor()

    print(f"Exporting {table_name}...")

    folder_dir = f'export/{table_name}'
    createEmptyFolder(folder_dir)
    # 导出数据
    page_num = 1
    while True:
        cursor.execute(f"SELECT * FROM {table_name} where {id_col} > {export_start_id} order by {id_col} asc limit {chunk_size}")
        with open(f"{folder_dir}/{table_name + '_' + str(page_num)}.csv", 'w', newline='', encoding='utf-8') as f:
            # 获取查询结果
            results = cursor.fetchall()
            if results and len(results) > 0:
                export_start_id = results[len(results) - 1][id_col]
                print(f"{table_name} Last id: {export_start_id}")
                # 写入CSV文件
                writer = csv.DictWriter(f, fieldnames=results[0].keys())  # 取第一行数据的键作为列名
                writer.writeheader()
                for row in results:
                    writer.writerow(row)
                page_num = page_num + 1
                if len(results) < chunk_size:
                    break
            else:
                break

    print(f"Exported {table_name} to CSV.")

    # 关闭数据库连接
    cursor.close()
    connection.close()

if __name__ == "__main__":
    db_config = {
        'host': '127.0.0.1',
        'user': 'game',
        'password': '123',
        'database': 'game',
        'port': 3306,  # 默认MySQL端口是3306
        'charset': 'utf8mb4',
        'cursorclass': pymysql.cursors.DictCursor
    }
    export_table(db_config, 'd_user', 'uid', 265219, 10 * 10000)
    export_table(db_config, 'd_user_bind', 'uid', 265219, 10 * 10000)

    # 从2024-11-05 00:00:00 第一条id开始
    export_table(db_config, 'd_desk_user', 'id', 13379855, 10 * 10000)
    export_table(db_config, 'coin_log', 'id', 40797349, 10 * 10000)

 

import.py

"""
导入mysql表格
"""
import pandas as pd
import os
from datetime import datetime
import pymysql
import csv

def now_time():
    # 获取当前时间
    now = datetime.now()
    # 使用f-string格式化时间输出
    formatted_now = f"[{now:%Y-%m-%d %H:%M:%S}]"
    return formatted_now
def get_mysql_connection(database_username, database_password, database_ip, database_name):
    # 数据库连接参数
    db_config = {
        'host': database_ip,
        'user': database_username,
        'password': database_password,
        'database': database_name,
        'charset': 'utf8mb4',
        'cursorclass': pymysql.cursors.DictCursor
    }
    # 连接到MySQL数据库
    connection = pymysql.connect(**db_config)
    return connection

def import_one_csv(connection, csv_file, table_name):
    # 创建目标数据库连接
    target_cursor = connection.cursor()

    # 导入CSV文件
    print(f"Importing {csv_file}...")
    with open(csv_file, 'r', newline='', encoding='utf-8') as file:
        reader = csv.reader(file)
        headers = next(reader)  # 读取表头
        placeholders = ', '.join(['%s'] * len(headers))  # 创建占位符
        data = list(reader)  # 读取所有数据行

        # 插入数据
        for row in data:
            target_cursor.execute(f"INSERT INTO {table_name} ({', '.join(headers)}) VALUES ({placeholders})", row)
        connection.commit()  # 提交事务

    print(f"Imported {csv_file} ")

    # 关闭目标数据库连接
    target_cursor.close()

def import_one_csv_with_null_values(connection, csv_file, table_name):
    # 创建目标数据库连接
    cursor = connection.cursor()
    # 获取表结构
    cursor.execute(f"DESCRIBE {table_name}")
    columns = cursor.fetchall()

    # 创建一个字典来存储列名和对应的数据类型
    column_types = {column['Field']: column['Type'] for column in columns}

    # 读取 CSV 文件
    df = pd.read_csv(csv_file)
    # 遍历 DataFrame 中的每一行
    for index, row in df.iterrows():
        # 构建 SQL 插入语句的参数列表
        params = []
        # 构建 SQL 插入语句的列名和值部分
        columns_part = []
        values_part = []
        for column, value in row.items():
            # 检查列的数据类型
            if column_types[column].startswith('int') and value == '':
                # 整数列,空字符串转换为 None
                params.append(None)
            elif pd.isnull(value):
                # 处理 NaN 值
                params.append(None)
            else:
                # 其他列(如 varchar),保留空字符串
                params.append(value if value != '' else None)
            columns_part.append(f"`{column}`")
            values_part.append('%s')

        # 构建完整的 SQL 插入语句
        sql = f"INSERT INTO {table_name} ({', '.join(columns_part)}) VALUES ({', '.join(values_part)})"
        # 执行 SQL 语句
        cursor.execute(sql, params)
    # 提交事务
    connection.commit()
    print(f"Imported {csv_file} ")

def import_table(connection, table_name, folder_path):
    num = 0
    for filename in os.listdir(folder_path):
        if filename.endswith(".csv") and table_name in filename:
            num = num + 1
    for i in range(1, num + 1):
        filename = f"{table_name}_{i}.csv"
        file_path = os.path.join(folder_path, filename)
        print(now_time(), f"开始导入 csv文件:{filename}")
        if table_name == 'd_user' or table_name == 'd_user_bind' or table_name == 'd_desk_user':
            import_one_csv_with_null_values(connection, file_path, table_name)
        else:
            import_one_csv(connection, file_path, table_name)


def import_game_data(database_ip, database_username, database_password):
    database_name = 'game'
    connection = get_mysql_connection(database_username, database_password, database_ip, database_name)
    tables = ['d_user', 'd_user_bind', 'd_desk_user', 'coin_log']
    for table in tables:
        import_table(connection, table, table)
    connection.close()

if __name__ == "__main__":
    database_username = 'root'
    database_password = '123'
    database_ip = '127.0.0.1'

    now = datetime.now()
    print(now_time(), f"开始导入hub数据")
    import_game_data(database_ip, database_username, database_password)
    print(now_time(), f"导入完成,耗时:{end - now}")

 

标签:新库,旧库,name,database,python,connection,table,path,csv
From: https://www.cnblogs.com/barrysgy/p/18547523

相关文章

  • 从零开始:数学建模算法汇总之MATLAB与Python在建模中的应用对比
    目录从零开始:数学建模算法汇总之MATLAB与Python在建模中的应用对比前言最小二乘法数值分析方法数值分析方法图论算法线性规划整数规划动态规划贪心算法分支定界法蒙特卡洛方法随机游走算法遗传算法粒子群算法神经网络算法人工智能算法模糊数学时间序列分析......
  • python多线程和网络编程
    一、多线程1.进程、线程和并行执行学习目标:了解什么是进程、线程,了解什么是并行执行进程比作公司,线程比作员工,多线程并行执行就比作公司的不同员工在同一时间去做不同的事。总结2.多线程编程学习目标:掌握使用threading模块完成多线程编程当你想实现唱歌和跳舞一......
  • 软件测试笔记|Python自动化测试|python中的数值运算有何特点?
    一、类型方面特点1.类型丰富:支持整数(int)、浮点数(float)、复数(complex)等多种数值类型。2.动态类型:声明变量时无需指定类型,运行时确定类型。二、精度相关特点1.整数精度:整数类型不会溢出,可处理任意大小整数,受机器内存限制。2.浮点数精度:通常用双精度浮点数表示,符合IEEE7......
  • 软件测试笔记|Python自动化测试|isinstance与type有什么区别,分别有什么特点?
    一、区别isinstance和type都可用于判断对象的类型,但它们有明显区别:1.判断方式•type:直接返回对象的类型,是通过比较对象的类型是否完全相同来判断,更关注对象确切的类型本身。•isinstance:判断一个对象是否是指定类型(或其派生类型)的实例,考虑了继承关系,更灵活些。2.对继......
  • [oeasy]python0041_输出ASCII码表_英文字符编码_键盘字符_ISO_646
    输出ASCII码表_英文字符编码_键盘字符_ISO_646回忆上次内容上次输出了从0到122序号对应的所有字符 fornuminrange(123):print(num,chr(num),sep=":")字符类型包括数字大小写字母符号   添加图片注释,不超过14......
  • 基于 Python 的机器学习的新闻文本分类系统,附源码
    博主介绍:✌程序员徐师兄、7年大厂程序员经历。全网粉丝12w+、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌......
  • 基于 Python Django 的二手房间可视化系统分析
    博主介绍:✌程序员徐师兄、7年大厂程序员经历。全网粉丝12w+、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌......
  • 2024年09月CCF-GESP编程能力等级认证Python编程二级真题解析
    本文收录于专栏《Python等级认证CCF-GESP真题解析》,专栏总目录:点这里,订阅后可阅读专栏内所有文章。一、单选题(每题2分,共30分)第1题据有关资料,山东大学于1972年研制成功DJL-1计算机,并于1973年投入运行,其综合性能居当时全国第三位。DJL-1计算机运算控制部分所使用的......
  • Python-BMI指数计算
    BMI指数身体质量质数(BMI)。计算公式:BMI=体重/身高的平方(体重单位:千克;身高单位:米)。|人体胖瘦程度|消瘦|正常值|超重||BIM数值|<18.5kg/m²|18.5-24kg/m²|>24kg/m²|Python代码`"""BMI计算"""defbmi(weight,tall):bmi=weight/tall**2......
  • Python中的正则表达式教程
    一、正则表达式基础1。1。概念介绍正则表达式是用于处理字符串的强大工具,它并不是Python的一部分。其他编程语言中也有正则表达式的概念,区别只在于不同的编程语言实现支持的语法数量不同。它拥有自己独特的语法以及一个独立的处理引擎,在提供了正则表达式的语言里,正则表......