背景
在数据中台中,有时为了核对数据,需要每天批量统计MySQL数据库中表的数据量,但是DMS中没有周期调度功能。
MySQL创建表
--统计的表清单 CREATE TABLE `dws_table_list` ( `table_name` varchar(255) DEFAULT NULL, `flag` varchar(255) DEFAULT NULL ); --每天的数据量 CREATE TABLE `dws_table_count` ( `table_name` varchar(255) DEFAULT NULL, `table_count` bigint DEFAULT NULL, `ds` varchar(255) DEFAULT NULL );
dws_table_list表中插入数据
说明:test_table1、test_table2、test_table3这些表,需要具有相同的日期字段(比如:ds)。
Python代码
import pymysql import sys from datetime import datetime, timedelta if __name__ == '__main__': if len(sys.argv) == 2: input_str = sys.argv[1] else: print("参数个数不对!") exit() # MySQL中表不存在的列表 table_not_exists_list = [] # 没有业务数据日期的列表 dataDate_no_data_list = [] # 业务数据日期 data_date = datetime.strptime(input_str, "%Y%m%d") data_date_str = data_date.strftime("%Y%m%d") print(f"业务数据日期: {data_date_str}") # 日期减去特定的时间间隔 data_date_1days_ago_str = (data_date - timedelta(days=1)).strftime('%Y%m%d') data_date_2days_ago_str = (data_date - timedelta(days=2)).strftime('%Y%m%d') data_date_3days_ago_str = (data_date - timedelta(days=3)).strftime('%Y%m%d') data_date_4days_ago_str = (data_date - timedelta(days=4)).strftime('%Y%m%d') data_date_5days_ago_str = (data_date - timedelta(days=5)).strftime('%Y%m%d') data_date_6days_ago_str = (data_date - timedelta(days=6)).strftime('%Y%m%d') data_date_7days_ago_str = (data_date - timedelta(days=7)).strftime('%Y%m%d') # 连接MySQL数据库 config = { 'user': 'root', 'password': 'Root@1234', 'host': 'localhost', 'database': 'test' } conn = pymysql.connect(**config) # 创建一个游标对象 cursor = conn.cursor() # 执行一个查询 query = "SELECT table_name from dws_table_list ORDER BY table_name ;" cursor.execute(query) # 打印表清单中的所有表的数量 print("dws_table_list表中的表个数: ",cursor.rowcount) print("**"*100) # 可取出指针结果集中的所有行,返回的结果集一个元组。 result = cursor.fetchall() # 删除业务日期当天的数据,防止重跑时数据重复 cursor.execute("DELETE FROM dws_table_count WHERE ds = %s ;", data_date_str) # 遍历指针结果集 for rows in result: table_name = rows[0] query_count = f"SELECT '{table_name}' table_name,COUNT(0),ds table_count FROM {table_name} WHERE ds = '{data_date_str}' GROUP BY ds ;" print(query_count) try: cursor.execute(query_count) result2 = cursor.fetchall() if cursor.rowcount > 0: # 插入数据 insert_sql = "INSERT INTO dws_table_count VALUES (%s,%s,%s)" for rows2 in result2: cursor.execute(insert_sql, rows2) else: print("\033[93m" + f"Table {table_name} does not have data for {data_date_str} " + "\033[0m") dataDate_no_data_list.append(table_name) except pymysql.MySQLError as e: if "exist" in str(e): print("\033[31m" + "MySQL error: " + str(e) + "\033[0m") table_not_exists_list.append(table_name) else: print("\033[31m" + "MySQL error: " + str(e) + "\033[0m") exit() # 删除7天之前的数据 cursor.execute("DELETE FROM dws_table_count WHERE ds = %s ;",data_date_7days_ago_str) # 根据表中的数据展示需要的结果 final_query = (f"""SELECT table_name, '{data_date_str}' ds, SUM(IF(ds = '{data_date_str}',table_count,0)) data_{data_date_str}, SUM(IF(ds = '{data_date_1days_ago_str}',table_count,0)) data_{data_date_1days_ago_str}, SUM(IF(ds = '{data_date_2days_ago_str}',table_count,0)) data_{data_date_2days_ago_str}, SUM(IF(ds = '{data_date_3days_ago_str}',table_count,0)) data_{data_date_3days_ago_str}, SUM(IF(ds = '{data_date_4days_ago_str}',table_count,0)) data_{data_date_4days_ago_str}, SUM(IF(ds = '{data_date_5days_ago_str}',table_count,0)) data_{data_date_5days_ago_str}, SUM(IF(ds = '{data_date_6days_ago_str}',table_count,0)) data_{data_date_6days_ago_str} FROM dws_table_count WHERE ds BETWEEN '{data_date_6days_ago_str}' AND '{data_date_str}' GROUP BY table_name ; """) cursor.execute(final_query) # 可取出指针结果集中的所有行,返回的结果集一个元组。 result3 = cursor.fetchall() print("**"*100) print("\033[92m" + "MySQL表数据量统计:" + "\033[0m") print("--"*60) # 获取列名 columns = [column[0] for column in cursor.description] # 打印列名 print('xh',end='|') for col in columns: print(col,end='|') print() # 打印结果 count = 1 for rows3 in result3: print(count,end='|') for j in range(len(rows3)): print(rows3[j],end='|') count = count + 1 print() print("**" * 100) print("\033[92m" + "MySQL中不存在的表:" + "\033[0m") print("--"*60) for index,table in enumerate(table_not_exists_list,start=1): print(index,table,sep='|') print("**" * 100) print(f"\033[92m" + f"业务日期{data_date_str}没有数据的表: " + "\033[0m") print("--"*60) for index,table in enumerate(dataDate_no_data_list,start=1): print(index,table,sep='|') # 关闭游标和连接 cursor.close() conn.commit() conn.close()
执行结果
标签:count,str,Python,中表,print,数据量,date,table,data From: https://www.cnblogs.com/yeyuzhuanjia/p/18375375