-
处理的文件:
label.xlsxAP ID Group Borrower【Holding Company ID】 Group Borrower【Type of Company】 Watermark 3.0 (Migration client only)【Effective Date】 1268 1974 Affiliate 1741 1268 Holding Company 1890 2073 Affiliate 2023/12/30 1955 2136 Affiliate 2008 2023/11/12 2073 1890 Holding Company - 新建文件夹
inputFiles
,把上面的文件放入到对应的文件夹里面: -
实现代码导入数据到label相关数据表:
import math import os import pandas as pd import pymysql.cursors from datetime import datetime # 数据库连接配置 db_config = { 'host': 'your_database_host', 'user': 'your_database_user', 'password': 'your_database_password', 'database': 'your_database_name' } # 连接到SQLite数据库,替换为你的实际数据库连接信息 connection = pymysql.connect (host=db_config['host'], user=db_config['user'], password=db_config['password'], database=db_config['database'], cursorclass=pymysql.cursors.DictCursor) # 定义文件夹名称 input_folder = 'inputFiles' # 定义 Excel 文件的名称 excel_file_name = 'label.xlsx' # 拼接完整路径 excel_file_path = os.path.join (input_folder, excel_file_name) # 读取 Excel 文件到数据框架 df = pd.read_excel (excel_file_path) # 在onboard_companies表中查找onboard_company_id def find_onboard_company_id(ap_id): # 查询数据库 with connection.cursor () as cursor: cursor.execute ("SELECT id FROM onboard_companies WHERE ap_cid=%s AND status=%s", (ap_id, 1)) result = cursor.fetchone () return result['id'] if result else None # 在labels表中查找label_id def find_label_id(label_name): # 查询数据库 with connection.cursor () as cursor: cursor.execute ("SELECT id FROM labels WHERE name=%s AND status=%s", (label_name, 2)) result = cursor.fetchone () return result['id'] if result else None # 在label_fields表中查找label_field_id def find_label_field_id(label_field_name, label_id): # 查询数据库 with connection.cursor () as cursor: cursor.execute ("SELECT id FROM label_fields WHERE name=%s AND label_id=%s AND status=1", (label_field_name, label_id)) result = cursor.fetchone () return result['id'] if result else None # 在company_labels表中查找数据 def find_company_label(onboard_company_id, label_id): # 查询数据库 with connection.cursor () as cursor: cursor.execute ("SELECT id FROM company_labels WHERE onboard_company_id=%s AND label_id=%s AND status=1", (onboard_company_id, label_id)) result = cursor.fetchone () # 返回包含查询结果的字典,或者返回None表示未找到 return result if result else None # 在company_labels表中创建新数据 def create_company_label(onboard_company_id, label_id): # 向数据库插入新数据 with connection.cursor () as cursor: current_time = datetime.now ().strftime ("%Y-%m-%d %H:%M:%S") cursor.execute ("INSERT INTO company_labels (onboard_company_id, label_id, created_by, updated_by, " "company_label_status, status, created_at, updated_at) " "VALUES (%s, %s, 0, 0, 1, 1, %s, %s)", (onboard_company_id, label_id, current_time, current_time)) connection.commit () # 返回包含新数据ID的字典 return cursor.lastrowid # 在company_label_vals表中查找或创建数据 def find_label_vals(company_label_id, label_field_id): # 查询或创建数据 with connection.cursor () as cursor: cursor.execute ( "SELECT id FROM company_label_vals WHERE company_label_id=%s AND label_field_id=%s AND status=1", (company_label_id, label_field_id)) result = cursor.fetchone () return result if result else None pass # 更新已存在的company_label_vals def update_label_val(val_id, val): with connection.cursor () as cursor: current_time = datetime.now ().strftime ("%Y-%m-%d %H:%M:%S") cursor.execute ("UPDATE company_label_vals SET val=%s, updated_at=%s WHERE id=%s", (val, current_time, val_id)) connection.commit () # 新加数据到company_label_vals def insert_label_val(company_label_id, label_field_id, val): with connection.cursor () as cursor: current_time = datetime.now ().strftime ("%Y-%m-%d %H:%M:%S") cursor.execute ( "INSERT INTO company_label_vals (company_label_id, label_field_id, val, status, created_at, updated_at) " "VALUES (%s, %s, %s, 1, %s, %s)", (company_label_id, label_field_id, val, current_time, current_time)) connection.commit () # 数据库操作的函数 def find_or_create_company_label(onboard_company_id, label_id, label_field_id, val): # 在company_labels表中查找对应数据 company_label_data = find_company_label (onboard_company_id, label_id) if company_label_data: company_label_id = company_label_data['id'] else: # 在company_labels表中创建新数据 company_label_id = create_company_label (onboard_company_id, label_id) # 在company_label_vals表中查找或创建数据 company_label_vals_data = find_label_vals (company_label_id, label_field_id) if company_label_vals_data: val_id = company_label_vals_data['id'] # 在company_label_vals表中更新数据 update_label_val (val_id, val) else: # 在company_label_vals表中创建新数据 insert_label_val (company_label_id, label_field_id, val) # 遍历Excel表格的每一行 for index, row in df.iterrows (): ap_id = row['AP ID'] val1 = row['Group Borrower【Holding Company ID】'] val2 = row['Group Borrower【Type of Company】'] val3 = row['Watermark 3.0 (Migration client only)【Effective Date】'] # 使用 pd.isnull() 检查是否是 NaN 或 NaT is_nan_val1 = pd.isnull (val1) is_nan_val2 = pd.isnull (val2) is_nan_val3 = pd.isnull (val3) onboard_company_id = find_onboard_company_id (ap_id) if onboard_company_id: if is_nan_val1: print (f"{row['AP ID']} 'Group Borrower【Holding Company ID】' Value is NaN or Nat , skipping...") else: label_id_holding = find_label_id ("Group Borrower") label_field_id_holding = find_label_field_id ("Holding Company ID", label_id_holding) val1_as_int = int (val1) val_with_quotes = f'"{val1_as_int}"' # 将val添加双引号 find_or_create_company_label (onboard_company_id, label_id_holding, label_field_id_holding, val_with_quotes) if is_nan_val2: print (f"{row['AP ID']} 'Group Borrower【Type of Company】' Value is NaN or Nat , skipping...") else: label_id_type = find_label_id ("Group Borrower") label_field_id_type = find_label_field_id ("Type of Company", label_id_type) val_with_quotes = f'"{val2}"' # 将val添加双引号 find_or_create_company_label (onboard_company_id, label_id_type, label_field_id_type, val_with_quotes) # 检查是否为 NaT if is_nan_val3: print ( f"{row['AP ID']} 'Watermark 3.0 (Migration client only)【Effective Date】' Value is NaN or Nat , skipping...") else: # 将 datetime 对象转换为字符串,使用指定的日期格式 formatted_date_str = val3.strftime ("%Y-%m-%d") label_id_effective_date = find_label_id ("Watermark 3.0 (Migration client only)") label_field_id_effective_date = find_label_field_id ("Effective Date", label_id_effective_date) val_with_quotes = f'"{formatted_date_str}"' # 将val添加双引号 find_or_create_company_label (onboard_company_id, label_id_effective_date, label_field_id_effective_date, val_with_quotes) else: print (f"AP ID {ap_id} 对应的 onboard_company_id 未找到.")