MV data refresh solution: Redshift 卸载 -> s3 RTA table logic: redshift-> Hive -> s3 hql in Hive: create schema app_bm_graphics_lf_telemetry_${env}_spectrum_stage; create schema app_bm_graphics_lf_telemetry_${env}_spectrum; sql in redshift: create external schema if not exists app_bm_graphics_lf_telemetry_${env}_spectrum_stage from hive metastore database 'app_bm_graphics_lf_telemetry_${env}_spectrum_stage' uri 'EMR_IP' port 9083 iam_role 'arn:aws:iam::{420302173806/851474929481}:role/DEVELOPER'; create external schema if not exists app_bm_graphics_lf_telemetry_${env}_spectrum from hive metastore database 'app_bm_graphics_lf_telemetry_${env}_spectrum' uri 'EMR_IP' port 9083 iam_role 'arn:aws:iam::{420302173806/851474929481}:role/DEVELOPER'; grant all on schema app_bm_graphics_lf_telemetry_${env}_spectrum to {user}; grant all on schema app_bm_graphics_lf_telemetry_${env}_spectrum_stage to {user}; unload 表数据 -> s3: UNLOAD ('${deduplication_select_sql}') TO 's3://seals-lf-normalized-data-lake-${env}/temp/${table_name}/' iam_role 'arn:aws:iam::{420302173806/851474929481}:role/DEVELOPER' FORMAT AS PARQUET PARTITION BY (${partition_columns}); create external table app_bm_graphics_lf_telemetry_${env}_spectrum.${table_name} ( column1 VARCHAR(45), column2 INTEGER, column3 DECIMAL(38,20), column4 DOUBLE PRECISION, column5 4TIMESTAMP ) row format serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde' WITH SERDEPROPERTIES ( 'separatorChar' = '\t' ) stored as textfile location 's3://seals-lf-normalized-data-lake-${env}/temp/${table_name}/'; 需要注意的是: 在Hive里创建外部表 (super user 权限) MV Story -> logic (daily): app_bm_graphics_lf_telemetry_itg_spectrum_stage 存放子表和增量更新的临时表,对应的s3路径是: s3://seals-lf-normalized-data-lake-${env}/stage/${table_name}/ app_bm_graphics_lf_telemetry_itg_spectrum 存放最终的final表对应的s3路径是: s3://seals-lf-normalized-data-lake-${env}/${table_name}/ s3.finalPath=s3://seals-lf-normalized-data-lake-itg/${table_name}/ s3.stagePath=s3://seals-lf-normalized-data-lake-itg/stage/${table_name}/ unload 和create external table 模板: unload_template=UNLOAD ('${sqlUnload}') TO '${s3Path}' \ iam_role 'arn:aws:iam::420302173806:role/DEVELOPER' \ FORMAT AS PARQUET \ PARTITION BY (${partitionColumn}); create_template=${sqlCreate} \ row format delimited fields terminated by '\t' \ stored as parquet \ location '${s3Path}'; #sql模板: # -- sql模板是 xxx.properties ${table}.step= ${table}_step1,${table}_step2,${table}_final # -- step 配置的顺序,就是执行的顺序,(从左往右执行) # --step可以是多个 但是需要注意的是最后一个step的名称,必须是${table}_final 的格式 ${table}_step1.flag=0 # -- (flag的值只能设置0和1,flag=0代表全量刷新, flag=1增量刷新) ${table}_step1.partition_column= ${table}_step1.unload= ${table}_step1.create= ${table}_step1.unload_full= ${table}_step1.create_full= # -- 增量刷新: 有些表中的字段,可能不是update_ts 或者insert_ts # -- 所以这种情况下每天增量更新数据,可能会出现重复的数据,可以在配置sql的时候和final表中的数据进行对比,去重 ${table}_step2.flag=0 ${table}_step2.partition_column= ${table}_step2.unload= ${table}_step2.create= ${table}_step2.unload_full= ${table}_step2.create_full= ${table}_final.flag=1 ${table}_final.partition_column= ${table}_final.unload= ${table}_final.create= ${table}_final.unload_full= ${table}_final.create_full= 如果有新增的MV 需要配置,我们需要做下面两步操作: 1.按照我们提供的sql模板,整理需要跑的sql 放到项目中的 src/main/resources/sqlFile 目录下: 2.按照表名,在url_daily配置url # -- 格式: sync/rta/refresh/redshift/{table}?async=true 具体的代码逻辑设计: 创建一张report_etl_history表,用来存放job执行的历史信息 CREATE TABLE IF NOT EXISTS app_bm_graphics_lf_telemetry.report_etl_history ( view VARCHAR(20) ENCODE RAW --table 或者 view的名称 ,type VARCHAR(20) ENCODE RAW --执行的类型 refresh/validation ,status VARCHAR(10) ENCODE lzo --执行的状态 DONE/FALIED ,insert_ts TIMESTAMP WITHOUT TIME ZONE ENCODE az64 --执行的时间 ,step VARCHAR(10) ENCODE lzo -- ${table}.step 名称 ,message VARCHAR(200) ENCODE lzo --成功或者异常的日志信息 ) DISTSTYLE KEY DISTKEY (view) SORTKEY ( view ) ; 1.首先处理sql模板数据,获取每一个step 最终需要执行的 unload 和 create external table sql语句 # -- ${table_name} = ${table_name}_年_月_日 (当前系统时间) # -- 增量刷新 (从 report_etl_history 表中获取需要增量更新的起始时间 ,终止时间是当前系统时间) 2.获取step falg ,判断是全量刷新,还是增量刷新 # -- 全量刷新: step 执行成功后,删除s3.finalPath下的表数据, 把s3.stagePath 下的数据 move 到 s3.finalPath, 刷新分区 # -- 增量刷新 step 执行成功后,直接把s3.stagePath 下的数据 move 到 s3.finalPath, 刷新分区 # -- move成功后, 删除 app_bm_graphics_lf_telemetry_itg_spectrum_stage 中创建的临时表 # -- 执行中监控每一个step的运行状态 # -- 如果执行失败重试这个step操作 ,如果重试次数达到三次还是失败,结束这个job执行 另外,每一个step执行结束后,无论是成功还是失败都会添加一条执行记录到 report_etl_history 表中 4.数据库连接配置,定时API调度 refresh partition msck repair table app_bm_graphics_lf_telemetry_${env}_spectrum.${table_name};
声明:此博客为个人学习之用,如与其他作品雷同,纯属巧合,转载请指明出处!
标签:lf,同步,redshift,S3,bm,app,--,s3,table From: https://www.cnblogs.com/zhihuifan10/p/18295421