import datetime import sys import pandas as pd import pymysql import sqlalchemy.engine.url as engineUrl from sqlalchemy import create_engine DB_INFO = { "host": "IP", "port": 3306, "username": "root", "password": "password", "database": "database name" } def get_db_engine(): db_dict = {'host': DB_INFO.get("host"), 'username': DB_INFO.get("username"), 'password': DB_INFO.get("password"), 'port': DB_INFO.get("port"), 'database': DB_INFO.get("database") } try: db_url = engineUrl.URL.create("mysql+pymysql", **db_dict) except Exception as e: print(e) db_dict['drivername'] = 'mysql+pymysql' db_url = engineUrl.URL(**db_dict) db_engine = create_engine(db_url, pool_pre_ping=True) return db_engine def action(date_str): date_str = datetime.datetime.strptime(date_str, '%Y%m%d') + datetime.timedelta(days=3) show_sql = "show table status;" show_res = pd.read_sql(show_sql, get_db_engine()) # 获取具有分区的表名 show_frame = show_res[show_res["Create_options"] == "partitioned"][["Name", "Create_options"]] name_list = [] for x in show_frame["Name"]: name_list.append(x) print(f"info: 包含分区的表: {name_list}") # 获取分区表的最后一个分区字段 partition_list = [] for name in name_list: last_sql = f"SELECT PARTITION_NAME FROM INFORMATION_SCHEMA.PARTITIONS WHERE table_name = '{name}' ORDER BY partition_ordinal_position DESC LIMIT 1;" last_res = pd.read_sql(last_sql, get_db_engine()) partition_list.append(last_res["PARTITION_NAME"][0]) partition_dict = dict(zip(name_list, partition_list)) print(f"info: 分区表的最新字段为: {partition_dict}") # 建立数据库连接,read_sql无法执行创建分区 conn = pymysql.connect(host=DB_INFO.get("host"), port=DB_INFO.get("port"), user=DB_INFO.get("username"), password=DB_INFO.get("password"), database=DB_INFO.get("database")) cursor = conn.cursor() for table_name in partition_dict: partition_date = datetime.datetime.strptime(partition_dict.get(table_name).replace("p", ""), '%Y%m%d') while partition_date < date_str: partition_date = partition_date + datetime.timedelta(days=1) partition_date_str = "p" + partition_date.strftime("%Y%m%d") # 8月23日定时任务会完成p20220825的分区创建 partition_end_str = (partition_date + datetime.timedelta(days=1)).strftime("%Y-%m-%d") create_sql = f"ALTER TABLE {table_name} ADD PARTITION (PARTITION {partition_date_str} VALUES LESS THAN (TO_DAYS ('{partition_end_str}')));" try: cursor.execute(create_sql) conn.commit() print(f"success: table: {table_name} 新增分区成功,当前最新分区为: {partition_date_str}") except Exception as e: print(e) print(f"failed: table: {table_name} 新增分区失败,失败分区为:{partition_date_str}") conn.close() if __name__ == "__main__": start_time = datetime.datetime.now() print("任务开始:", start_time) avgs = sys.argv[1:] if len(avgs) > 0: date_str = avgs[0] else: date_str = (datetime.datetime.now() + datetime.timedelta(days=-1)).strftime('%Y%m%d') action(date_str) end_time = datetime.datetime.now() print("任务结束:", end_time - start_time)
标签:name,get,Python,分区,partition,datetime,str,Mysql,date From: https://www.cnblogs.com/lytcreate/p/17783114.html