用途:
本脚本的主要作用就是获取所属工作空间中表字段信息
核心脚本:
本逻辑主要需要五个核心脚本:
00_task_meta_setup_time #用于创建表及设置odps的启动时间
01_task_meta_fields_move #搬迁数据
02_task_meta_tables #表元数据获取及数据量统计
03_task_meta_fields_parallel #字段元数据获取及字段的质量稽核统计
04_task_meta_specification_etl_time #搬迁数据
00_task_meta_setup_time
创建PyODPS 3
from odps import ODPS
from datetime import datetime
from odps.models import Schema,Column,Partition
to_table = 'meta_tables_bak'
colums = [Column(name='table_no',type='int',comment='标序号'),
Column(name='table_name',type='string',comment='表名'),
Column(name='table_comment',type='string',comment='表注释'),
Column(name='table_owner',type='string',comment='作者'),
Column(name='table_pname',type='string',comment='分区名,多个使用英文逗号分隔'),
Column(name='table_ddl_time',type='datetime',comment='最近元数据变更时间'),
Column(name='table_mod_time',type='datetime',comment='最近表数据变更时间'),
Column(name='table_create_time',type='datetime',comment='表创建时间'),
Column(name='batch_time',type='datetime',comment='批次时间'),
Column(name='batch_date',type='string',comment='批次日期')]
partitions = [Partition(name='pt_type',type='string',comment='存储方式,HIS表示归档数据,一天内有多次跑批,历时数据都归档到分区当中,CUR表示每天只存一份数据')]
schema = Schema(columns=columns,partitions=partitions)
odps.create_table(to_table,schema,if_not_exists=True)
to_table = 'mete_fields_bak'
columns = [Column(name='table_no',type='int',commnet='表编号'),
Column(name='field_no',type='int',commnet='字段编号'),
Column(name='table_name',type='string',commnet='表英文名'),
Column(name='table_comment',type='string',commnet='表中文名'),
Column(name='field_name',type='string',commnet='字段英文名'),
Column(name='field_type',type='string',commnet='字段类型'),
Column(name='field_comment',type='string',commnet='字段中文名'),
Column(name='field_cnt',type='string',commnet='字段非空数量'),
Column(name='table_cnt',type='string',commnet='表数据量'),
Column(name='batch_time',type='datetime',commnet='跑批时间'),
Column(name='batch_date',type='string',commnet='跑批日期')]
partitions = [Partition(name='pt_type',type='string',comment='存储方式,HIS表示归档数据,一天内有多次跑批,历时数据都归档到分区当中,CUR表示每天只存一份数据')]
schema = Schema(columns=columns,partitions=partitions)
odps.create_table(to_table,schema,if_not_exists=True)
01_task_meta_fields_move
创建ODPS SQL
insert into table meta_fields_bak partition(pt_type='HIS')
select table_no,field_no,table_name,table_comment,field_name,field_type,field_comment,field_cnt,table_cnt,batch_time,batch_date
from meta_fields_bak where batch_date = from_unixtime(unix_timestamp(),'yyyyMMdd') and pt_type='CUR';
insert overwrite table meta_fields_bak partition(pt_type='HIS')
select table_no,field_no,table_name,table_comment,field_name,field_type,field_comment,field_cnt,table_cnt,batch_time,batch_date
from meta_fields_bak where batch_date <> from_unixtime(unix_timestamp(),'yyyyMMdd') and pt_type='CUR';
alter table meta_fields_bak frop if exists partition (pt_type='TOD');
02_task_meta_tables
创建PyODPS 3
from odps import ODPS
from datetime import datetime
from odps.models import Schema,Column,Partition
import re
to_table = 'meta_tables_bak'
colums = [Column(name='table_no',type='int',comment='标序号'),
Column(name='table_name',type='string',comment='表名'),
Column(name='table_comment',type='string',comment='表注释'),
Column(name='table_owner',type='string',comment='作者'),
Column(name='table_pname',type='string',comment='分区名,多个使用英文逗号分隔'),
Column(name='table_ddl_time',type='datetime',comment='最近元数据变更时间'),
Column(name='table_mod_time',type='datetime',comment='最近表数据变更时间'),
Column(name='table_create_time',type='datetime',comment='表创建时间'),
Column(name='batch_time',type='datetime',comment='批次时间'),
Column(name='batch_date',type='string',comment='批次日期')]
partitions = [Partition(name='pt_type',type='string',comment='存储方式,HIS表示归档数据,一天内有多次跑批,历时数据都归档到分区当中,CUR表示每天只存一份数据')]
schema = Schema(columns=columns,partitions=partitions)
records = []
tm = datetime.now()
# 结合实际需求做正则判定需要哪些表
pattern = re.compile('^(ods)_.+_(his|t1)$')
n = 1
for tbl in odps.list_tables():
if tbl.view_text is None and pattern.match(tbl.name) is None:
partition_str = ''
if tbl.schema.partitons:
for i in range(len(tbl.schema.partitions)):
partition_str = partition_str + tbl.schema.partitions[i].name + ','
records.append([n,
tbl.name,
tbl.comment,
tbl.owner.split(':')[-1],
partition_str[0:-1] if tbl.schema.partitions else None,
tbl.last_meta_modified_time.strftime('%Y-%m-%d %H:%M:%S'),
tbl.last_modified_time.strftime('%Y-%m-%d %H:%M:%S'),
tbl.creation_time.strftime('%Y-%m-%d %H:%M:%S'),
tm.strftime('%Y-%m-%d %H:%M:%S'),
tm.strftime('%Y-%m-%d %H:%M:%S')]),
n += 1
to_tbl = odps.create_table(to_table,schema,if_not_exists=True)
partition_cur = '%s=\'%s\'' % ('pt_type','CUR')
partition_tod = '%s=\'%s\'' % ('pt_type','TOD')
partition_his = '%s=\'%s\'' % ('pt_type','HIS')
#to_tbl.truncate()
movehis_str = 'insert into table ' + to_table + ' partition (' + partition_his + ') select table_no,table_name,table_comment,table_owner,table_pname,table_ddl_time,table_mod_time,table_create_time,batch_time,batch_date from ' + to_table + ' where batch_date = ' + tm.strftime(%Y%m%d) + ' and ' +partition_cur
delcur_str = 'insert overwrite table ' + to_table + ' partition (' + partition_cur + ') select table_no,table_name,table_comment,table_owner,table_pname,table_ddl_time,table_mod_time,table_create_time,batch_time,batch_date from ' + to_table + ' where batch_date <> ' + tm.strftime(%Y%m%d) + ' and ' +partition_cur
print(movehis_str)
print(delcur_str)
o.execute_sql(movehis_str)
o.execute_sql(delcur_str)
#odps.write_table(to_table,records)
odps.write_table(to_table,records,partition=partition_cur,create_partition=True)
to_tbl.delete_partition(partition_tod,if_exists=True)
odps.write_table(to_table,records,partition=partition_tod,create_partition=True)
tm1 = datetime.now()
print(tm.strftime('%Y-%m-%d %H:%M:%S'))
print(tm1.strftime('%Y-%m-%d %H:%M:%S'))
03_task_meta_fields_parallel
创建PyODPS 3
由于该脚本会涉及大量的运算,因此此脚本在有大量表的情况最好进行分批跑,故需要在Pyodps 3中设置参数parallel= batchnum=
from odps import ODPS
from datetime import datetime
from odps.models import Schema,Column,Partition
import re
i = 0
k = 0
records = []
# 结合实际需求做正则判定需要哪些表
pattern = re.compile('^(ods)_.+_(his|t1)$')
start_tm = datetime.now()
to_table = 'meta_fields_bak'
to_table = 'mete_fields_bak'
columns = [Column(name='table_no',type='int',commnet='表编号'),
Column(name='field_no',type='int',commnet='字段编号'),
Column(name='table_name',type='string',commnet='表英文名'),
Column(name='table_comment',type='string',commnet='表中文名'),
Column(name='field_name',type='string',commnet='字段英文名'),
Column(name='field_type',type='string',commnet='字段类型'),
Column(name='field_comment',type='string',commnet='字段中文名'),
Column(name='field_cnt',type='string',commnet='字段非空数量'),
Column(name='table_cnt',type='string',commnet='表数据量'),
Column(name='batch_time',type='datetime',commnet='跑批时间'),
Column(name='batch_date',type='string',commnet='跑批日期')]
partitions = [Partition(name='pt_type',type='string',comment='存储方式,HIS表示归档数据,一天内有多次跑批,历时数据都归档到分区当中,CUR表示每天只存一份数据')]
schema = Schema(columns=columns,partitions=partitions)
for tbl in odps.list_tables():
if tbl.view_text is None and pattern.match(tbl.name) is None:
if i%int(args['parallel']) == int(args['batchnum']):
print(tbl.name)
sql_str = 'select '
table_name = tbl.name
table_desc = tbl.comment
j = 1
for fed in tbl.schema:
field_name = fed.name
field_type = str(fed.type).lower()
field_comment = fed.comment
records.append([i+1,j,table_name,table_desc,field_name,field_type,field_coment])
sql_str = sql_str + "count(" + fed.name + "),"
j += 1
k += 1
sql_str = sql_str + "count(0) from " + tbl.name
try:
with o.execute_sql(sql_str).open_reader() as reader:
for record in reader:
for n in range(len(record)-1):
records[k-j+1+n].append(record[n])
records[k-j+1+n].append(record[-1])
records[k-j+1+n].append(start_tm.strftime('%Y-%m-%d %H:%M:%S'))
records[k-j+1+n].append(start_tm.strftime('%Y%m%d))
except:
continue
i += 1
#print(records)
to_tbl = odps.create_table(to_table,schema,if_not_exists=True)
partition_cur = '%s=\'%s\'' % ('pt_type','CUR')
partition_tod = '%s=\'%s\'' % ('pt_type','TOD')
#to_tbl.truncate()
#odps.write_table(to_table,records)
odps.write_table(to_table, records, partition=partition_cur,create_partition=True)
#to_tbl.delete_partition(partition_tod, if_exists=True)
#odps.write_table(to_table,records,partition=partition_tod,create_partition=True)
end_tm = datetime.now()
print(start_tm.strftime('%Y-%m-%d %H:%M:%S'))
print(end_tm.strftime('%Y-%m-%d %H:%M:%S'))
04_task_meta_specification_etl_time
创建ODPS SQL
-- 该脚本的主要目的将统一批次的batch_time更新为一致,以便区分。背景原因并行跑批导致的时间不一致
-- 以下脚本从CUR分区表拿数据的目的有两个:
-- 理论上从TOD也可以拿到批次的数据,但考虑到断点续跑的完整性,故从CUR分区来获取数据来更新batch_time
-- 担心不断地insert into会导致小文件过多影响查询效率,故通过overwrite来实现
with tmp as
(select table_no,table_name,table_comment,table_owner,table_pname,table_ddl_time,table_mod_time,table_create_time,batch_time,batch_date,'N' as flag
from meta_fields_bak where batch_date <> from_unixtime(unix_timestamp(),'yyyyMMdd') and pt_type='CUR'
union all
select table_no,table_name,table_comment,table_owner,table_pname,table_ddl_time,table_mod_time,table_create_time,
from_unixtime(unix_timestamp(),'yyyy-MM-dd HH:mm:ss') as batch_time,
batch_date,'Y' as flag
from meta_fields_bak where batch_date = from_unixtime(unix_timestamp(),'yyyyMMdd') and pt_type='CUR'
from tmp
insert overwrite table meta_fields_bak partition (pt_type='CUR')
select table_no,table_name,table_comment,table_owner,table_pname,table_ddl_time,table_mod_time,table_create_time,batch_time,batch_date
insert overwrite table meta_fields_bak partition (pt_type='TOD')
select table_no,table_name,table_comment,table_owner,table_pname,table_ddl_time,table_mod_time,table_create_time,batch_time,batch_date
where flag = 'Y'
)
标签:comment,name,Column,time,dataworks,table,表元,type,pyodps
From: https://www.cnblogs.com/tobeeasyman/p/18376386