首页 > 其他分享 >PyODPS获取MaxComputer数据仓中业务表的最新更新时间并输出至表中

PyODPS获取MaxComputer数据仓中业务表的最新更新时间并输出至表中

时间:2024-11-20 09:29:18浏览次数:1  
标签:COMMENT name system 仓中 time 最新更新 table 至表中 STRING

PyODPS获取MaxComputer数据仓中业务表的最新更新时间并输出

 1.准备两张数据表:

1.1 需要统计的库表中间表:

CREATE TABLE IF NOT EXISTS dim_river_system_business_time (
    `id`  STRING  COMMENT '序号',
    `dept_name`  STRING  COMMENT '',
    `system_name`  STRING  COMMENT '所属系统',
    `table_name`  STRING  COMMENT '数据仓表名称',
    `table_comment`  STRING  COMMENT '表中文名称',
    `column_name`  STRING  COMMENT '根据那个字段取最大值',
    `load_time`  STRING  COMMENT '数据录入时间'
)
COMMENT '业务数据表最新更新时间(配置表)';

1.2 最终结果存储表:

CREATE TABLE IF NOT EXISTS dwd_river_system_business_time (
    `dept_name`  STRING  COMMENT '',
    `system_name`  STRING  COMMENT '系统名称',
    `table_name`  STRING  COMMENT '库表名称',
    `table_comment`  STRING  COMMENT '表中文名称',
    `table_rows`  BIGINT  COMMENT '表数据量',
    `business_maxtime`  STRING  COMMENT '表业务最新更新时间',
    `etl_time`  STRING  COMMENT '取值时间(etl_time)'
)
COMMENT '业务数据表最新更新时间'
PARTITIONED BY (
    `dt`  STRING  COMMENT '按天分区字段'
);

1.3 PyODPS脚本创建:

 

from odps import ODPS
import datetime
import sys

# 解决控制台输出乱码
reload(sys);
sys.setdefaultencoding('utf-8');

# 创建连接
o = ODPS('id', 'key', 'project',endpoint='');

# 获取时间函数
def getDate(format_string="%Y%m%d%"):
    return datetime.datetime.now().strftime(format_string)

# 判断时间是否为空,为空赋值初始值
def timeReplaceIsNUll(time):
    if len(time)==0 or not time :
       return "1998-01-01 23:59:59";
    else :
       return time;

# 字符串截取,处理SQL拼接最后一个UNION ALL
def strjq(strData):
    return strData[0:len(strData)-10]+";";

# 获取分区时间和吸入数据时间
# 1.分区时间
timeHH=getDate("%Y%m%d");
# 2.写入数据时间
times=getDate("%Y-%m-%d %H:%M:%S");


# 组装统计SQL
def SQLCreate(table_name,table_comment,timeRex,system_name,dept_name):
    sql="SELECT '"+ table_name +"' AS table_name,'"+table_comment+"' as table_comment,count(1) as table_rows,'"+system_name+"' as system_name,'"+dept_name+"' as  dept_name,DATE_FORMAT(max("+timeRex+"),'yyyy-MM-dd hh:mm:ss','false') as Business_maxtime,getDATE() as etl_time from "+table_name;
    return sql;

# 批量SQL模板暂存区
SQLTemplateBath=[]
# 读取数据表
with o.execute_sql('select table_name,table_comment,column_name,system_name,dept_name from dim_river_system_business_time').open_reader() as reader:
         for record in reader:   # 处理每一个record。
            SQLTemplateBath.append(SQLCreate(record.table_name,record.table_comment,record.column_name,record.system_name,record.dept_name));

# 存储批量脚本运行结果
Datas=[]
# 运行脚本存储过程数据
bath1="";
bath2="";
bath3="";
bath4="";
bath5="";
bath6="";
bath7="";
# 获取表输入输出流【目的表】
table = o.get_table('dwd_river_system_business_time', project='FGDN_odps')
for i in range(len(SQLTemplateBath)):
    if i <= 300 :
      bath1 += SQLTemplateBath[i]+" UNION ALL ";
      print(">>> 组装第一批次SQL当前下标:{}".format(i));
    if i > 300 and i <= 600 :
      bath2 += SQLTemplateBath[i]+" UNION ALL ";
      print(">>> 组装第二批次SQL当前下标:{}".format(i));
    if i > 600 and i <= 1000 :
      bath3 += SQLTemplateBath[i]+" UNION ALL ";
      print(">>> 组装第三批次SQL当前下标:{}".format(i));
    if i > 1000 and i <= 1300 :
      bath4 += SQLTemplateBath[i]+" UNION ALL ";
      print(">>> 组装第四批次SQL当前下标:{}".format(i));
    if i > 1300 and i <= 1600 :
      bath5 += SQLTemplateBath[i]+" UNION ALL ";
      print(">>> 组装第五批次SQL当前下标:{}".format(i));
    if i > 1600 and i<=1900 :
      bath6 += SQLTemplateBath[i]+" UNION ALL ";
      print(">>> 组装第六批次SQL当前下标:{}".format(i));
    if i > 1900 and i <= 2200 :
      bath7 += SQLTemplateBath[i]+" UNION ALL ";
      print(">>> 组装第七批次SQL当前下标:{}".format(i));
    if i > 2200
     break;
     print(">>> 配置批量数据超出限制:{}".format(i));


# 配置启动脚本
def startBaths():
    if len(bath1) > 1 :
     print("<<< 开始启动批量脚本 1.....");
      with o.execute_sql('%s'%strjq(bath1)).open_reader() as reader:
       for record in reader:
         re=[record.dept_name,record.system_name,record.table_name,record.table_comment,record.table_rows,timeReplaceIsNUll(record.business_maxtime),record.etl_time,timeHH];
         Datas.append(re);
    if len(bath2) > 1 :
     print("<<< 开始启动批量脚本 2.....");
     with o.execute_sql('%s'%strjq(bath2)).open_reader() as reader:
      for record in reader:
         re=[record.dept_name,record.system_name,record.table_name,record.table_comment,record.table_rows,timeReplaceIsNUll(record.business_maxtime),record.etl_time,timeHH];
         Datas.append(re);
    if len(bath3) > 1 :
     print("<<< 开始启动批量脚本 3.....");
      with o.execute_sql('%s'%strjq(bath3)).open_reader() as reader:
       for record in reader:
         re=[record.dept_name,record.system_name,record.table_name,record.table_comment,record.table_rows,timeReplaceIsNUll(record.business_maxtime),record.etl_time,timeHH];
         Datas.append(re);
    
    if len(bath4) > 1 :
     print("<<< 开始启动批量脚本 4.....");
      with o.execute_sql('%s'%strjq(bath4)).open_reader() as reader:
       for record in reader:
         re=[record.dept_name,record.system_name,record.table_name,record.table_comment,record.table_rows,timeReplaceIsNUll(record.business_maxtime),record.etl_time,timeHH];
         Datas.append(re);
    
    if len(bath5) > 1 :
     print("<<< 开始启动批量脚本 5.....");
      with o.execute_sql('%s'%strjq(bath5)).open_reader() as reader:
       for record in reader:
         re=[record.dept_name,record.system_name,record.table_name,record.table_comment,record.table_rows,timeReplaceIsNUll(record.business_maxtime),record.etl_time,timeHH];
         Datas.append(re);

    if len(bath6) > 1 :
     print("<<< 开始启动批量脚本 6.....");
      with o.execute_sql('%s'%strjq(bath6)).open_reader() as reader:
       for record in reader:
         re=[record.dept_name,record.system_name,record.table_name,record.table_comment,record.table_rows,timeReplaceIsNUll(record.business_maxtime),record.etl_time,timeHH];
         Datas.append(re);
    
    if len(bath7) > 1 :
     print("<<< 开始启动批量脚本 7.....");
      with o.execute_sql('%s'%strjq(bath7)).open_reader() as reader:
       for record in reader:
         re=[record.dept_name,record.system_name,record.table_name,record.table_comment,record.table_rows,timeReplaceIsNUll(record.business_maxtime),record.etl_time,timeHH];
         Datas.append(re);

# 启动脚本
startBaths();


# 写数据到库表
if range(len(Datas))>=1:
  with table.open_writer(partition="dt='%s'"%timeHH,create_partition=True) as writer:
       writer.write(Datas)

print("<<< 数据写入结束!!")

 

标签:COMMENT,name,system,仓中,time,最新更新,table,至表中,STRING
From: https://www.cnblogs.com/zhuzhu-you/p/17699965.html

相关文章

  • Next.js 零基础开发入门教程2 构建基础脚手架 2024最新更新中|曲速引擎 Warp Drive
    开发目标我们将构建一个简化版本的财务仪表板,其内容包括:公共主页、登录页面、受身份验证保护的仪表板页面、用户可以添加、编辑和删除发票这篇文章先创建一个简单的nextjs脚手架页面安装pnpm包管理器接上一篇,开发环境都准备好之后,我们来做创建项目的准备,首先先判断上一篇的环......
  • ChatGPT 国内使用知识【最新更新】
    ChatGPT很强大,聊聊天、写论文、搞翻译、写代码、写文案、审合同等,真是无所不能~近两年人工智能AI非常火热,可以这么说:已经从“能用”到“好用”再到“离不开”了。。。已经用上AI的,尤其用上ChatGPT的人,真的是享受到了AI发展的红利,并且还一直不断领先~~~相信未来几年,随着技术......
  • 如何使用 Midjourney?2024年最新更新
    一:基础篇1:注册首先,你需要注册一个 Discord 账号,然后加入Midjourney的 Discord服务器。或者去Midjourney的官网点击右下角的JointheBeta:​2:在Discord公共服务器里使用注册并进入到Midjourney的服务器后,有可能需要完成各种任务(这个取决于Midjourney的运营......
  • 2024年性价比高的服务器多少钱?腾讯云最新更新
    在当今这个数据驱动的时代,选择一款合适的服务器对于企业和个人开发者来说至关重要。腾讯云,作为国内领先的云服务提供商,为广大用户提供了多样化的服务器选择。那么,在腾讯云的众多服务器产品中,我们该如何做出明智的选择呢?首先,对于轻量级应用或初创项目,轻量应用服务器无疑是一个经......
  • 详解如何在数仓中搭建细粒度容灾应用
    本文分享自华为云社区《GaussDB(DWS)细粒度容灾使用介绍》,作者:天蓝蓝。1.前言适用版本:【8.2.1.210及以上】当前数仓承载的客户业务越来越多,从而导致客户对于数仓的可靠性要求不断增加。尤其在金融领域,容灾备份机制是信息系统必须提供的能力之一。本文介绍了在云上环境的双集......
  • 实例详解构建数仓中的行列转换
    本文分享自华为云社区《GaussDB数据库SQL系列-行列转换》,作者:Gauss松鼠会小助手2。一、前言在构建数据仓库或做数据分析时,需要对原始数据的结构进行一定的处理,有时涉及到“行转列”,有时涉及到“列转行”,那么这两个转换的方式具体是什么,有什么差异,怎么实现,今天我们将以GaussDB数......
  • SWD与JTAG区别及使用情况 最新更新时间
      上图是SEGGER说明书中给出的Jlink引脚图,可以对照着看SWD引脚与JTAG引脚的关系。 这是我手边开发板上的JTAG连接图,这个肯定是能用的。  这个是从网上找来的标准的JTAG连接图,供对照参考。  调试方式既可以用JTAG,也可以用SWD。 以下是一段转自:(http://showvi.c......
  • 2个数仓中不等值关联优化案例
    本文分享自华为云社区《GaussDB(DWS)性能调优:不等值关联优化》,作者:门前一棵葡萄树。场景1使用场景:本案例适合满足以下条件的场景关联条件使用OR连接关联条件中使用同一列做数据筛选原始语句SELECTt2.PARTNER_CHANNEL_CODEASCHANNEL_ID,t1.COUNTRY_CODE,t1.BRAND......
  • 9月22日深夜(UTC+8),Telegram CEO兼创始人Durov发文宣布Telegram的最新更新
    更新主要有如下内容:1.用户可以通过给予"Boost"来授予他们喜爱的频道发布故事的能力。*每个Telegram大会员的订户都拥有一个“Boost”,可以被用户分配给任意一个频道,为频道“升级”。*每升一个级别,频道每天就可以额外发布一个故事。*频道可以通过特殊链接向用户乞求“Boost”......
  • 谷歌2023年4月19日最新更新规则及算法
    多年来,谷歌的的核心排名系统一直致力于奖励能够提供良好网页体验的内容,正如谷歌在 2011年提供的2019年更新,并于去年加入创建实用、可靠、以用户为中心的内容帮助页面的指南中所述。该帮助页面是谷歌的搜索要素的关键资源。会定期推荐希望使用Google搜索取得理想成效的用户,仔......