import pandas as pd import pymysql # 数据库连接信息 source_db_config = { 'host': 'source_db_host', 'user': 'source_db_user', 'password': 'source_db_password', 'database': 'source_db_name', } target_db_config = { 'host': 'target_db_host', 'user': 'target_db_user', 'password': 'target_db_password', 'database': 'target_db_name', } # 表和字段映射关系 table_mapping = { 'source_table1': 'target_table1', 'source_table2': 'target_table2', } field_mapping = { 'source_table1': {'field1': 'target_field1', 'field2': 'target_field2'}, 'source_table2': {'field3': 'target_field3', 'field4': 'target_field4'}, } def fetch_data_from_db(config, table, fields): connection = pymysql.connect(**config) query = f"SELECT {', '.join(fields)} FROM {table}" df = pd.read_sql(query, connection) connection.close() return df def main(): for source_table, target_table in table_mapping.items(): source_fields = list(field_mapping[source_table].keys()) target_fields = list(field_mapping[source_table].values()) # 从源数据库获取数据 source_data = fetch_data_from_db(source_db_config, source_table, source_fields) # 从目标数据库获取数据 target_data = fetch_data_from_db(target_db_config, target_table, target_fields) # 进行数据比较 if source_data.equals(target_data): print(f"数据校验通过: {source_table} 和 {target_table}") else: print(f"数据校验失败: {source_table} 和 {target_table}") if __name__ == "__main__": main()
标签:target,data,db,校验,source,TEST,table,config,数据 From: https://www.cnblogs.com/gina11/p/18039317