首页 > 其他分享 >数仓基础知识_拉链表的详细讲解

数仓基础知识_拉链表的详细讲解

时间:2024-08-01 09:28:59浏览次数:18  
标签:数仓 xinniu zip 拉链 基础知识 date table endTime

拉链表

没错,就像衣服的拉链一样重要,实用性非常强,使用频率非常高。

拉链表核心思想,像个拉链,支持开链,支持闭链,支持退链,我们通常将最新的数据称为开链数据,历史数据称为闭链数据,拉链表支持历史数据查询,且空间占用较小,但是数据加工处理较为繁琐,属于时间换空间的设计方式,拉链表一个时间维度中同一个用户只保存一条用户状态。拉链表通常会增加三个技术字段“开始日期starttime、结束日期endtime、状态标识mark”。通过主键(PK)与历史数据进行对比,判断当前数据与历史数据是否发生变化,如果发生变化或者新增则进行相应的开链、闭链操作。

1.1 用户基础表拉链示例
以下使用用户基本信息表对拉链表操作进行深入刨析
2021-01-01用户基础表原始数据如下:

userID为主键,可变化字段为登录名(userName)、电话号码(phoneNum)、账号状态(status)、最近登录日期(lastLoginDate)。

setp1:首先设计拉链表的主键,根据原始数据表的表结构,选择userID作为拉链表PK键;
setp2:选择phoneNum、status作为notPK键,notPK作为对比字段,这里去除了lastLoginDate与dataTime这两个经常变化的字段,如果需要跟踪登陆日期lastLoginDate数据,请使用事件表,notPK键选择原则需要同时满足如下条件:

会发生变化的字段,且满足缓慢变化维SCD;

不能选择每天都发生变化的无对比意义字段(如dataTime字段,对于原始数据表这个字段是每天都变化的,所以没有对比意义);

setp3:确定数仓技术字段,本次加入数据加工日期etlTimestamp、开始时间startTime、结束时间endTime、标识位mark("i":新增,"u":修改,"d":删除)四个技术字段;

数据采集方式为T+1方式(今天计算昨天的数据)。

2021-01-02用户拉链表如下:

第一次加载因为拉链表历史数据为空,所以所有数据都为新增数据,标识位标为新增状态"i",开始时间为数据日期2021/1/1,当前所有数据都为最新数据,多以结束时间设置为一个较大的时间2999/12/31作为开链时间标识日期。

2021-01-02用户基础表原始数据如下:

使用原始数据表主键userID关联拉链表中开链数据(where endTime=2999/12/31)的userID,对比notPK字段是否相同,我们选择的notPK字段为phoneNum和status,发生变化的数据:
coolniu2021a0001用户电话号码发生变化;
coolniu2021a0005用户电话号码发生变化;

coolniu2021a0007用户状态发生变化;

新增coolniu2021a0008、coolniu2021a0009、coolniu2021a0010三位用户;

2021-01-03用户拉链表如下:

coolniu2021a0001用户拉链分析

coolniu2021a0001电话号码发生变化,在拉链表中将结束日期修改为变化时间的数据日期2021/1/2

代表coolniu2021a0001用户上一个状态结束。
又新增了一行coolniu2021a0001的数据

针对这条数据新的状态开始时间startTime为变化日期的数据日期2021/1/2,结束时间标识为2999/12/31(代表最新开链状态),标识位"u"(代表这条数据是修改状态)。
coolniu2021a0005用户拉链分析
coolniu2021a0005与coolniu2021a0001用户一样,都是电话号码发生了变化,拉链处理方式与coolniu2021a0001一致。

coolniu2021a0007用户拉链分析

coolniu2021a0007用户状态由1变成了销户状态"2",拉链表处理:

coolniu2021a0007账号状态为1的数据,结束时间修改为数据发生变化的时间2021/1/2

新增一条coolniu2021a0007记录,标识位mark标识为"d",开始时间为数据变化时间2021/1/2,结束时间修改为2021/1/2,表示该条数据已经闭链。

新增用户拉链分析
coolniu2021a0008、coolniu2021a0009、coolniu2021a0010三个用户为2021-01-02日新增用户,开始时间为2021/1/2,结束时间为2999/12/31(标识该数据为最新开链状态),mark位标为新增标识"i"。

未变化用户拉链分析
用户coolniu2021a0002、coolniu2021a0003、coolniu2021a0004、coolniu2021a0006用户notPK数据未变化,所以保持不变。

2021-01-03用户基础表原始数据如下:

1月3日新增了一条coolniu2021a0011数据。
2021-01-04用户拉链表如下:

因为其他天没有发生变化,所以在拉链表中保持不变,只新增一条coolniu2021a0011的记录。

1.2 拉链表算法示例
1.2.1 建表
source_table

CREATE TABLE source_table(
userid string,
loginname string,
regiondate string,
phonenum string,
birthday string,
status string,
lastlogindate string)
PARTITIONED BY (datatime string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘|’
STORED AS TEXTFILE;

1.2.2 load数据

load data local INPATH ‘/tmp/xinniu/20210101’ into table source_table partition (datatime=‘20210101’);
load data local INPATH ‘/tmp/xinniu/20210102’ into table source_table partition (datatime=‘20210102’);
load data local INPATH ‘/tmp/xinniu/20210103’ into table source_table partition (datatime=‘20210103’);

1.2.3 创建拉链表

CREATE TABLE zip_table(
userid string,
loginname string,
regiondate string,
phonenum string,
birthday string,
status string,
lastlogindate string,
datatime string,
etltimestamp string,
starttime string,
endtime string,
mark string)

1.2.4 创建算法文件sqlfile

vim /tmp/xinniu/sqlfile

填写如下内容

– 创建一张拉链表的备份表 备份拉链表历史开链数据
CREATE TABLE IF NOT EXISTS xinniu.zip_table_bk stored AS orc tblproperties (“orc.compress” = “SNAPPY”) AS
SELECT
userID,
loginName,
regionDate,
phoneNum,
birthday,
status,
lastLoginDate,
dataTime,
etlTimestamp ,
startTime ,
endTime ,
mark
FROM
xinniu.zip_table
WHERE
1 = 0 ;

– 将拉链表历史开链数据插入到bk备份表中
– 卡拉链条件:startTime < to_date(from_unixtime(unix_timestamp(‘ h i v e c o n f : b a t c h d a t e ′ , ′ y y y y M M d d ′ ) ) ) A N D e n d T i m e > = t o d a t e ( f r o m u n i x t i m e ( u n i x t i m e s t a m p ( ′ {hiveconf:batch_date}' ,'yyyyMMdd'))) AND endTime >= to_date(from_unixtime(unix_timestamp(' hiveconf:batchd​ate′,′yyyyMMdd′)))ANDendTime>=tod​ate(fromu​nixtime(unixt​imestamp(′{hiveconf:batch_date}’ ,‘yyyyMMdd’)))
– 备份表非空判断:(select count(1) from xinniu.zip_table_bk limit 1) = 0 判断备份表非空才插入 此处必须判空 不能使用drop或者truncate的方式清空备份表 会导致失败重跑时丢数
INSERT
INTO
xinniu.zip_table_bk
SELECT
userID,
loginName,
regionDate,
phoneNum,
birthday,
status,
lastLoginDate,
dataTime,
etlTimestamp ,
startTime ,
endTime ,
mark
FROM
xinniu.zip_table
join (select count(1) cnt from xinniu.zip_table_bk limit 1) b
WHERE
startTime < to_date(from_unixtime(unix_timestamp(‘ h i v e c o n f : b a t c h d a t e ′ , ′ y y y y M M d d ′ ) ) ) A N D e n d T i m e > = t o d a t e ( f r o m u n i x t i m e ( u n i x t i m e s t a m p ( ′ {hiveconf:batch_date}' ,'yyyyMMdd'))) AND endTime >= to_date(from_unixtime(unix_timestamp(' hiveconf:batchd​ate′,′yyyyMMdd′)))ANDendTime>=tod​ate(fromu​nixtime(unixt​imestamp(′{hiveconf:batch_date}’ ,‘yyyyMMdd’)))
AND b.cnt = 0
;

– 创建拉链表闭链数据备份表bf
CREATE TABLE IF NOT EXISTS xinniu.zip_table_bf stored AS orc tblproperties (“orc.compress” = “SNAPPY”) AS
SELECT
userID,
loginName,
regionDate,
phoneNum,
birthday,
status,
lastLoginDate,
dataTime,
etlTimestamp ,
startTime ,
endTime ,
mark
FROM
xinniu.zip_table
WHERE
1 = 0 ;
– 备份拉链表中历史闭链数据 卡拉链条件:endTime < to_date(to_timestamp(‘${hiveconf:batch_date}’ , ‘yyyyMMdd’))

– 备份表非空判断:(select count(1) from xinniu.zip_table_bf limit 1) = 0 同上,不能使用drop或者truncate的方式清空备份表
INSERT
INTO
xinniu.zip_table_bf
SELECT
userID,
loginName,
regionDate,
phoneNum,
birthday,
status,
lastLoginDate,
dataTime,
etlTimestamp ,
startTime ,
endTime ,
mark
FROM
xinniu.zip_table
join
(
SELECT
count(1) cnt
FROM
xinniu.zip_table_bf
LIMIT 1) b
WHERE
endTime < to_date(from_unixtime(unix_timestamp(‘${hiveconf:batch_date}’ ,‘yyyyMMdd’)))
AND b.cnt = 0 ;

– 中间加工表清空
DROP TABLE IF EXISTS xinniu.zip_table_nw;
DROP TABLE IF EXISTS xinniu.zip_table_od;

– 创建中间表 新增变化修改中间表
CREATE TABLE IF NOT EXISTS xinniu.zip_table_nw stored AS orc tblproperties (“orc.compress” = “SNAPPY”) AS
SELECT
userID,
loginName,
regionDate,
phoneNum,
birthday,
status,
lastLoginDate,
dataTime,
etlTimestamp ,
startTime ,
endTime ,
mark
FROM
xinniu.zip_table
WHERE
0 = 1;

– 创建中间表 未变化中间表
CREATE TABLE IF NOT EXISTS xinniu.zip_table_od stored AS orc tblproperties (“orc.compress” = “SNAPPY”) AS
SELECT
userID,
loginName,
regionDate,
phoneNum,
birthday,
status,
lastLoginDate,
dataTime,
etlTimestamp ,
startTime ,
endTime ,
mark
FROM
xinniu.zip_table
WHERE
0 = 1;

– 新增、修改、删除变化数据插入变化中间表nw
– 原始数据表与拉链表进行full join关联,通过主键is_pk关联 根据不同情况生成对应的startTime与endTime及mark三个技术字段
– 字段值选择原始表与目标表的非空字段值 nvl(n.@{source_column_names}, o.@{xinniu.zip_table_column_names})
– 本逻辑中mark分为I、D两种,新增与修改为I,删除为D
INSERT
INTO
TABLE xinniu.zip_table_nw
SELECT
nvl(n.userID,o.userID) ,
nvl(n.loginName,o.loginName) ,
nvl(n.regionDate,o.regionDate) ,
nvl(n.phoneNum,o.phoneNum) ,
nvl(n.birthday,o.birthday) ,
nvl(n.status,o.status) ,
nvl(n.lastLoginDate,o.lastLoginDate) ,
nvl(n.dataTime,o.dataTime) ,
current_date AS etlTimestamp ,
CASE
WHEN n.dataTime IS NULL THEN o.startTime
ELSE to_date(from_unixtime(unix_timestamp(‘ h i v e c o n f : b a t c h d a t e ′ , ′ y y y y M M d d ′ ) ) ) E N D A S s t a r t T i m e , C A S E W H E N n . d a t a T i m e I S N U L L T H E N t o d a t e ( f r o m u n i x t i m e ( u n i x t i m e s t a m p ( ′ {hiveconf:batch_date}' ,'yyyyMMdd'))) END AS startTime , CASE WHEN n.dataTime IS NULL THEN to_date(from_unixtime(unix_timestamp(' hiveconf:batchd​ate′,′yyyyMMdd′)))ENDASstartTime,CASEWHENn.dataTimeISNULLTHENtod​ate(fromu​nixtime(unixt​imestamp(′{hiveconf:batch_date}’ ,‘yyyyMMdd’)))
ELSE to_date(from_unixtime(unix_timestamp(‘29991231’ ,‘yyyyMMdd’)))
END AS endTime ,
CASE
WHEN ( n.userID is null ) THEN ‘D’
ELSE ‘I’
END AS mark
FROM
(
SELECT
userID,
loginName,
regionDate,
phoneNum,
birthday,
status,
lastLoginDate,
dataTime
FROM
xinniu.source_table
WHERE
dataTime = ‘${hiveconf:batch_date}’ ) n
FULL JOIN xinniu.zip_table_bk o ON
o.userID = n.userID
WHERE
(
o.userID IS NULL )
OR (
n.userID IS NULL )
OR (
nvl( CAST(o.phoneNum AS string) , ‘’ ) <> nvl( CAST(n.phoneNum AS string) , ‘’ )
OR nvl( CAST(o.status AS string) , ‘’ ) <> nvl( CAST(n.status AS string) , ‘’ )
)
;

– 闭链发生变化的数据 endTime改为hiveconf:batch_date
– 未变化数据保持原来状态 新增与修改状态统一"I"
– 发生变化的endTime逻辑:when n.startTime is not null then to_date(to_timestamp(‘ h i v e c o n f : b a t c h d a t e ′ , ′ y y y y M M d d ′ ) ) − − 没发生变化的 e n d T i m e 逻辑: w h e n o . e n d T i m e > = t o d a t e ( t o t i m e s t a m p ( ′ {hiveconf:batch_date}' , 'yyyyMMdd')) -- 没发生变化的endTime逻辑:when o.endTime >= to_date(to_timestamp(' hiveconf:batchd​ate′,′yyyyMMdd′))−−没发生变化的endTime逻辑:wheno.endTime>=tod​ate(tot​imestamp(′{hiveconf:batch_date}’ , ‘yyyyMMdd’)) then to_date(to_timestamp(‘29991231’,‘yyyyMMdd’))
INSERT
INTO
TABLE xinniu.zip_table_od
SELECT
o.userID,
o.loginName,
o.regionDate,
o.phoneNum,
o.birthday,
o.status,
o.lastLoginDate,
o.dataTime,
o.etlTimestamp ,
o.startTime ,
CASE
WHEN n.startTime IS NOT NULL THEN to_date(from_unixtime(unix_timestamp(‘ h i v e c o n f : b a t c h d a t e ′ , ′ y y y y M M d d ′ ) ) ) W H E N o . e n d T i m e > = t o d a t e ( f r o m u n i x t i m e ( u n i x t i m e s t a m p ( ′ {hiveconf:batch_date}' ,'yyyyMMdd'))) WHEN o.endTime >= to_date(from_unixtime(unix_timestamp(' hiveconf:batchd​ate′,′yyyyMMdd′)))WHENo.endTime>=tod​ate(fromu​nixtime(unixt​imestamp(′{hiveconf:batch_date}’ ,‘yyyyMMdd’)))
THEN to_date(from_unixtime(unix_timestamp(‘29991231’ ,‘yyyyMMdd’)))
ELSE o.endTime
END AS endTime ,
‘I’ AS mark
FROM
xinniu.zip_table_bk o
LEFT JOIN xinniu.zip_table_nw n ON
o.userID = n.userID
WHERE
nvl(n.endTime,to_date(from_unixtime(unix_timestamp(‘29991231’ ,‘yyyyMMdd’)))) <> to_date(from_unixtime(unix_timestamp(‘${hiveconf:batch_date}’ ,‘yyyyMMdd’)))
;

– 清空拉链表
TRUNCATE TABLE xinniu.zip_table;

– 插入数据到拉链表
INSERT
INTO
TABLE xinniu.zip_table
SELECT
*
FROM
xinniu.zip_table_nw
UNION ALL
SELECT
*
FROM
xinniu.zip_table_od
UNION ALL
SELECT
*
FROM
xinniu.zip_table_bf ;

– 清空临时表
DROP TABLE xinniu.zip_table_bk;

DROP TABLE xinniu.zip_table_bf;

DROP TABLE xinniu.zip_table_nw;

DROP TABLE xinniu.zip_table_od;

1.2.5 执行跑批任务

hive -hiveconf batch_date=20210101 -f /tmp/xinniu/sqlfile && hive -hiveconf batch_date=20210102 -f /tmp/xinniu/sqlfile && hive -hiveconf batch_date=20210103 -f /tmp/xinniu/sqlfile

1.3 效果验证
1.3.1 全表验证

select * from xinniu.zip_table;

1.3.2 卡拉链,查询历史某一节点数据

select * from xinniu.zip_table where startTime<=‘2021-01-01’ and endTime>‘2021-01-01’;

select * from xinniu.zip_table where startTime<=‘2021-01-02’ and endTime>‘2021-01-02’;

标签:数仓,xinniu,zip,拉链,基础知识,date,table,endTime
From: https://blog.csdn.net/qq_22201881/article/details/140838716

相关文章

  • 【会计基础知识】销售相关业务会计分录
    1.普通销售1.1销售发货单在存货核算模块的会计分录:借:主营业务成本贷:库存商品1.2销售发票在应收账款模块的会计分录:借:应收账款贷:主营业务收入、应交税金--应交增值税--销项税额2.分期收款2.1在存货核算模块的会计分录:分期收款发货单:借:发出商品贷:库存商品分期收款......
  • java基础知识汇总(二)
    PART1:变量与数据类型Java语言是强类型语言,对于每一种数据都定义了明确的具体数据类型,在内存总分配了不同大小的内存空间。整数默认:int小数默认:double使用变量注意事项:作用域:变量定义在哪一级大括号中,哪个大括号的范围就是这个变量的作用域。相同的作用域中不能......
  • vue基础知识总结(2)--- axios的使用
    一.下载Vue3:选择自己想要下载的项目文件夹,cmd回车打开命令栏,执行:cnpminitvue@latest然后等待一会就可以创建一个项目,并更改项目名:√请输入项目名称:...vue-project之后按照提示输入对应的语句:cdvue-projectcnpminstall我们等待几秒Vue3项目就成功创建出来了......
  • Java基础知识
    @目录一、第一个java程序二、Java中标识符的使用三、Java中的数据类型1.基本数据类型2.引用数据类型3.String类型变量的使用(字符串类型)四、Java中的运算符1.算数运算符2.赋值运算符3.比较运算符4.逻辑运算符5.位运算符6.三元运算符一、第一个java程序publicclasshello{......
  • Python基础知识笔记——常用函数
    一、range()函数range()函数用于生成一个整数序列。它通常用于循环结构中,例如for循环,以提供循环的迭代次数。range()函数可以有1到3个参数。#range(start,stop,step)range(2,6,2)#生成从2开始,到6结束(不包括6),步长为2的一串数字#参数指定不完全时,默认从0开始,步长......
  • Java基础知识
    Java基础知识一、第一个java程序二、Java中标识符的使用三、Java中的数据类型1.基本数据类型2.引用数据类型3.String类型变量的使用(字符串类型)四、Java中的运算符1.算数运算符2.赋值运算符**3.比较运算符****4.逻辑运算符****5.位运算符****6.三元运算符**一、第一......
  • Python基础知识笔记---保留字
    保留字,也称关键字,是指被编程语言内部定义并保留使用的标识符。一、保留字概览  二、保留字用途 1.`False`:表示布尔值假。2.`None`:表示空值或无值。3.`True`:表示布尔值真。4.`not`:布尔逻辑操作符,对条件表达式取反。5.`or`:布尔逻辑操作符,用于连接两个条件表达式......
  • ETL数据集成丨将DB2数据同步至Postgres数仓实践
    随着企业数字化转型的加速,数据已成为企业的重要资产。为了更好地挖掘数据价值,企业纷纷建立自己的数据仓库,以便于数据分析和决策。在众多数据库中,DB2和Postgres作为两款备受欢迎的数据库,如何实现它们之间的数据同步,成为了企业关注的焦点。本文将为您介绍如何使用ETLCloud将DB2数据......
  • [SDR] GNU Radio 系列教程 —— GNU Radio TX PDU (发送数据包操作)的基础知识(超全)
    目录1PDU概述2Demo详解2.1RandomPDUGenerator2.2AsyncCRC322.3ProtocolFormatter(Async)2.4将header和payload合并输出2.5对PDU实施突发填充和渐变2.6RRC滤波与多相任意重采样2.6.1FIR滤波器例子2.6.2滤波器基础及RRC知识1)什么时候用RRC?2)什么是RRC?3......