首页 > 其他分享 >阿里dataworks通过pyodps 3获取表元数据及质量稽核

阿里dataworks通过pyodps 3获取表元数据及质量稽核

时间:2024-08-23 16:50:36浏览次数:13  
标签:comment name Column time dataworks table 表元 type pyodps

用途:

本脚本的主要作用就是获取所属工作空间中表字段信息

核心脚本:

本逻辑主要需要五个核心脚本:

00_task_meta_setup_time #用于创建表及设置odps的启动时间

01_task_meta_fields_move #搬迁数据

02_task_meta_tables #表元数据获取及数据量统计

03_task_meta_fields_parallel #字段元数据获取及字段的质量稽核统计

04_task_meta_specification_etl_time #搬迁数据

00_task_meta_setup_time

创建PyODPS 3

from odps import ODPS
from datetime import datetime
from odps.models import Schema,Column,Partition

to_table = 'meta_tables_bak'
colums = [Column(name='table_no',type='int',comment='标序号'),
          Column(name='table_name',type='string',comment='表名'),
		  Column(name='table_comment',type='string',comment='表注释'),
		  Column(name='table_owner',type='string',comment='作者'),
		  Column(name='table_pname',type='string',comment='分区名,多个使用英文逗号分隔'),
		  Column(name='table_ddl_time',type='datetime',comment='最近元数据变更时间'),
		  Column(name='table_mod_time',type='datetime',comment='最近表数据变更时间'),
		  Column(name='table_create_time',type='datetime',comment='表创建时间'),
		  Column(name='batch_time',type='datetime',comment='批次时间'),
		  Column(name='batch_date',type='string',comment='批次日期')]
partitions = [Partition(name='pt_type',type='string',comment='存储方式,HIS表示归档数据,一天内有多次跑批,历时数据都归档到分区当中,CUR表示每天只存一份数据')]
schema = Schema(columns=columns,partitions=partitions)

odps.create_table(to_table,schema,if_not_exists=True)

to_table = 'mete_fields_bak'
columns = [Column(name='table_no',type='int',commnet='表编号'),
           Column(name='field_no',type='int',commnet='字段编号'),
		   Column(name='table_name',type='string',commnet='表英文名'),
		   Column(name='table_comment',type='string',commnet='表中文名'),
		   Column(name='field_name',type='string',commnet='字段英文名'),
		   Column(name='field_type',type='string',commnet='字段类型'),
		   Column(name='field_comment',type='string',commnet='字段中文名'),
		   Column(name='field_cnt',type='string',commnet='字段非空数量'),
		   Column(name='table_cnt',type='string',commnet='表数据量'),
		   Column(name='batch_time',type='datetime',commnet='跑批时间'),
		   Column(name='batch_date',type='string',commnet='跑批日期')]
partitions = [Partition(name='pt_type',type='string',comment='存储方式,HIS表示归档数据,一天内有多次跑批,历时数据都归档到分区当中,CUR表示每天只存一份数据')]
schema = Schema(columns=columns,partitions=partitions)

odps.create_table(to_table,schema,if_not_exists=True)

01_task_meta_fields_move

创建ODPS SQL

insert into table meta_fields_bak partition(pt_type='HIS')
select table_no,field_no,table_name,table_comment,field_name,field_type,field_comment,field_cnt,table_cnt,batch_time,batch_date
from meta_fields_bak where batch_date = from_unixtime(unix_timestamp(),'yyyyMMdd') and pt_type='CUR';

insert overwrite table meta_fields_bak partition(pt_type='HIS')
select table_no,field_no,table_name,table_comment,field_name,field_type,field_comment,field_cnt,table_cnt,batch_time,batch_date
from meta_fields_bak where batch_date <> from_unixtime(unix_timestamp(),'yyyyMMdd') and pt_type='CUR';

alter table meta_fields_bak frop if exists partition (pt_type='TOD');

02_task_meta_tables

创建PyODPS 3

from odps import ODPS
from datetime import datetime
from odps.models import Schema,Column,Partition
import re

to_table = 'meta_tables_bak'
colums = [Column(name='table_no',type='int',comment='标序号'),
          Column(name='table_name',type='string',comment='表名'),
		  Column(name='table_comment',type='string',comment='表注释'),
		  Column(name='table_owner',type='string',comment='作者'),
		  Column(name='table_pname',type='string',comment='分区名,多个使用英文逗号分隔'),
		  Column(name='table_ddl_time',type='datetime',comment='最近元数据变更时间'),
		  Column(name='table_mod_time',type='datetime',comment='最近表数据变更时间'),
		  Column(name='table_create_time',type='datetime',comment='表创建时间'),
		  Column(name='batch_time',type='datetime',comment='批次时间'),
		  Column(name='batch_date',type='string',comment='批次日期')]
partitions = [Partition(name='pt_type',type='string',comment='存储方式,HIS表示归档数据,一天内有多次跑批,历时数据都归档到分区当中,CUR表示每天只存一份数据')]
schema = Schema(columns=columns,partitions=partitions)

records = []
tm = datetime.now()
# 结合实际需求做正则判定需要哪些表
pattern = re.compile('^(ods)_.+_(his|t1)$')
n = 1
for tbl in odps.list_tables():
	if tbl.view_text is None and pattern.match(tbl.name) is None:
		partition_str = ''
		if tbl.schema.partitons:
			for i in range(len(tbl.schema.partitions)):
				partition_str = partition_str + tbl.schema.partitions[i].name + ','
		records.append([n,
						tbl.name,
						tbl.comment,
						tbl.owner.split(':')[-1],
						partition_str[0:-1] if tbl.schema.partitions else None,
						tbl.last_meta_modified_time.strftime('%Y-%m-%d %H:%M:%S'),
						tbl.last_modified_time.strftime('%Y-%m-%d %H:%M:%S'),
						tbl.creation_time.strftime('%Y-%m-%d %H:%M:%S'),
						tm.strftime('%Y-%m-%d %H:%M:%S'),
						tm.strftime('%Y-%m-%d %H:%M:%S')]),
		n += 1
to_tbl = odps.create_table(to_table,schema,if_not_exists=True)
partition_cur = '%s=\'%s\'' % ('pt_type','CUR')
partition_tod = '%s=\'%s\'' % ('pt_type','TOD')
partition_his = '%s=\'%s\'' % ('pt_type','HIS')
#to_tbl.truncate()
movehis_str = 'insert into table ' + to_table + ' partition (' + partition_his + ') select table_no,table_name,table_comment,table_owner,table_pname,table_ddl_time,table_mod_time,table_create_time,batch_time,batch_date from ' + to_table + ' where batch_date = ' + tm.strftime(%Y%m%d) + ' and ' +partition_cur
delcur_str = 'insert overwrite table ' + to_table + ' partition (' + partition_cur + ') select table_no,table_name,table_comment,table_owner,table_pname,table_ddl_time,table_mod_time,table_create_time,batch_time,batch_date from ' + to_table + ' where batch_date <> ' + tm.strftime(%Y%m%d) + ' and ' +partition_cur
print(movehis_str)
print(delcur_str)
o.execute_sql(movehis_str)
o.execute_sql(delcur_str)
#odps.write_table(to_table,records)
odps.write_table(to_table,records,partition=partition_cur,create_partition=True)
to_tbl.delete_partition(partition_tod,if_exists=True)
odps.write_table(to_table,records,partition=partition_tod,create_partition=True)
tm1 = datetime.now()

print(tm.strftime('%Y-%m-%d %H:%M:%S'))
print(tm1.strftime('%Y-%m-%d %H:%M:%S'))

03_task_meta_fields_parallel

创建PyODPS 3

由于该脚本会涉及大量的运算,因此此脚本在有大量表的情况最好进行分批跑,故需要在Pyodps 3中设置参数parallel= batchnum=

from odps import ODPS
from datetime import datetime
from odps.models import Schema,Column,Partition
import re

i = 0
k = 0
records = []
# 结合实际需求做正则判定需要哪些表
pattern = re.compile('^(ods)_.+_(his|t1)$')

start_tm = datetime.now()

to_table = 'meta_fields_bak'
to_table = 'mete_fields_bak'
columns = [Column(name='table_no',type='int',commnet='表编号'),
           Column(name='field_no',type='int',commnet='字段编号'),
		   Column(name='table_name',type='string',commnet='表英文名'),
		   Column(name='table_comment',type='string',commnet='表中文名'),
		   Column(name='field_name',type='string',commnet='字段英文名'),
		   Column(name='field_type',type='string',commnet='字段类型'),
		   Column(name='field_comment',type='string',commnet='字段中文名'),
		   Column(name='field_cnt',type='string',commnet='字段非空数量'),
		   Column(name='table_cnt',type='string',commnet='表数据量'),
		   Column(name='batch_time',type='datetime',commnet='跑批时间'),
		   Column(name='batch_date',type='string',commnet='跑批日期')]
partitions = [Partition(name='pt_type',type='string',comment='存储方式,HIS表示归档数据,一天内有多次跑批,历时数据都归档到分区当中,CUR表示每天只存一份数据')]
schema = Schema(columns=columns,partitions=partitions)

for tbl in odps.list_tables():
	if tbl.view_text is None and pattern.match(tbl.name) is None:
		if i%int(args['parallel']) == int(args['batchnum']):
			print(tbl.name)
			sql_str = 'select '
			table_name = tbl.name
			table_desc = tbl.comment
			j = 1
			for fed in tbl.schema:
				field_name = fed.name
				field_type = str(fed.type).lower()
				field_comment = fed.comment
				records.append([i+1,j,table_name,table_desc,field_name,field_type,field_coment])
				sql_str = sql_str + "count(" + fed.name + "),"
				j += 1
				k += 1
			sql_str = sql_str + "count(0) from " + tbl.name
			try:
				with o.execute_sql(sql_str).open_reader() as reader:
					for record in reader:
						for n in range(len(record)-1):
							records[k-j+1+n].append(record[n])
							records[k-j+1+n].append(record[-1])
							records[k-j+1+n].append(start_tm.strftime('%Y-%m-%d %H:%M:%S'))
							records[k-j+1+n].append(start_tm.strftime('%Y%m%d))
			except:
				continue
		i += 1
#print(records)

to_tbl = odps.create_table(to_table,schema,if_not_exists=True)
partition_cur = '%s=\'%s\'' % ('pt_type','CUR')
partition_tod = '%s=\'%s\'' % ('pt_type','TOD')
#to_tbl.truncate()
#odps.write_table(to_table,records)
odps.write_table(to_table, records, partition=partition_cur,create_partition=True)
#to_tbl.delete_partition(partition_tod, if_exists=True)
#odps.write_table(to_table,records,partition=partition_tod,create_partition=True)
end_tm = datetime.now()

print(start_tm.strftime('%Y-%m-%d %H:%M:%S'))
print(end_tm.strftime('%Y-%m-%d %H:%M:%S'))

04_task_meta_specification_etl_time

创建ODPS SQL

-- 该脚本的主要目的将统一批次的batch_time更新为一致,以便区分。背景原因并行跑批导致的时间不一致
-- 以下脚本从CUR分区表拿数据的目的有两个:
-- 理论上从TOD也可以拿到批次的数据,但考虑到断点续跑的完整性,故从CUR分区来获取数据来更新batch_time
-- 担心不断地insert into会导致小文件过多影响查询效率,故通过overwrite来实现
with tmp as
(select table_no,table_name,table_comment,table_owner,table_pname,table_ddl_time,table_mod_time,table_create_time,batch_time,batch_date,'N' as flag
from meta_fields_bak where batch_date <> from_unixtime(unix_timestamp(),'yyyyMMdd') and pt_type='CUR'
union all
select table_no,table_name,table_comment,table_owner,table_pname,table_ddl_time,table_mod_time,table_create_time,
from_unixtime(unix_timestamp(),'yyyy-MM-dd HH:mm:ss') as batch_time,
batch_date,'Y' as flag
from meta_fields_bak where batch_date = from_unixtime(unix_timestamp(),'yyyyMMdd') and pt_type='CUR'
from tmp
insert overwrite table meta_fields_bak partition (pt_type='CUR')
select table_no,table_name,table_comment,table_owner,table_pname,table_ddl_time,table_mod_time,table_create_time,batch_time,batch_date
insert overwrite table meta_fields_bak partition (pt_type='TOD')
select table_no,table_name,table_comment,table_owner,table_pname,table_ddl_time,table_mod_time,table_create_time,batch_time,batch_date
where flag = 'Y'
)

标签:comment,name,Column,time,dataworks,table,表元,type,pyodps
From: https://www.cnblogs.com/tobeeasyman/p/18376386

相关文章

  • 力扣: 移除链表元素
    文章目录需求虚拟头结点法原头结点法结尾需求给你一个链表的头节点head和一个整数val,请你删除链表中所有满足Node.val==val的节点,并返回新的头节点。示例1:输入:head=[1,2,6,3,4,5,6],val=6输出:[1,2,3,4,5]示例2:输入:head=[],val=1输......
  • 代码随想录day3 | LeetCode203. 移除链表元素、LeetCode707. 设计链表、LeetCode206.
    代码随想录day3|LeetCode203.移除链表元素、LeetCode707.设计链表、LeetCode206.反转链表为了防止早上写博客上传图片失败,今天试试下午写,发现图片上传正常链表基础文章链接:链表基础C/C++的定义链表节点方式,如下所示://单链表structListNode{intval;/......
  • 随想录day3:203.移除链表元素|707.设计链表 |206.反转链表
    203.移除链表元素方法一:直接遍历,永远记得处理head,删除链表必须有前驱。/***Definitionforsingly-linkedlist.*publicclassListNode{*intval;*ListNodenext;*ListNode(){}*ListNode(intval){this.val=val;}*ListNode......
  • 【代码随想录】二、链表:1、移除链表元素
    部分图文参考于:代码随想录-203.移除链表元素。C++编程中记得要手动释放结点内存。链表操作中,可以使用原链表来直接进行删除操作,也可以设置一个虚拟头结点再进行删除操作。1.题目链接203.移除链表元素2.思路以链表1424来举例,移除元素4。如果使用C,C++编程语言的话,......
  • 203. 移除链表元素 给你一个链表的头节点 head 和一个整数 val ,请你删除链表中所有满
    在链表中,每个节点都有一个指向下一个节点的指针。删除一个节点的本质是将前一个节点的指针指向要删除节点的下一个节点,从而跳过要删除的节点。以下是详细解释为什么以及如何这样做:1.**链表的结构**:  一个链表节点包含两个部分:存储的数据和指向下一个节点的指针。  ``......
  • 代码随想录算法训练营day03|203.移除链表元素,707.设计链表,206.反转链表
    203.移除链表元素题目链接:https://leetcode.cn/problems/remove-linked-list-elements/description/我的代码(分头节点和中间节点两种情况操作):/***Definitionforsingly-linkedlist.*structListNode{*intval;*ListNode*next;*ListNode():val......
  • 移除链表元素
    这里注意我们操作链表的时候都要使用临时指针来进行遍历链表的操作,不然会改变链表的原始数据,这里我使用两种方式来进行删除的操作原链表删除元素classSolution{publicListNoderemoveElements(ListNodehead,intval){//if(head==null){//ret......
  • 代码随想录算法训练营第三天 | Leetcode 203 移除链表元素 Leetcode 206 翻转链表
    前言今天的两道题目都不难,但细节需要注意。如移除链表元素用到的虚拟头节点,翻转链表的思路。翻转链表真是写了忘,忘了写,希望这次能记住。除此之外我决定每天的记录里面增加一个总结八股的部分,将来二刷再翻看文章的时候顺便也能复习八股知识点。Leetcode203移除链表元素题目......
  • 删除字典值中的列表元素会影响该字典中的其他值吗?
    我在python中有一个字典,看起来像这样my_dict={'100':[['a',[10],[5]]],'101':[['a',[10],[7]]],'102':[['a',[10],[11]]],'103':[['a',[10],[4]]],}要创建my_dic......
  • 【代码随想录训练营第42期 Day3打卡 LeetCode 203.移除链表元素,707.设计链表,206.反转
    一、做题感受今天是打卡的第三天,前两天题目主要考察数组相关知识,现在已经来到了链表的学习。题目共有三道,都是以考察单链表为主,整体来说难度不大,但是思路很灵活,尤其是反转链表的考察,双指针的新用法。今天做题总体感觉不错,能有自己的思考和理解。二、链表相关知识1.常见链表......