概念
定义
- 针对数据仓库设计中表存储数据的方式而定义的,设计表的时候添加start_date和end_date两个字段,数据更新时,通过修改end_date来设置数据的有效时间
- 所谓拉链,就是记录历史,记录一个事物从开始一直到当前状态的所有变化的信息
- 可以使用这张表拿到最新的当天的最新数据以及之前的历史数据
- 既能满足反映数据的历史状态,又可以最大限度地节省存储空间
目的
解决SCD(Slowly Changing Dimensions)缓慢变化维
收益
最大程度的节省存储,快速,高效的获取历史上任意一天的快照数据
适用场景
- 数据量很大且业务系统不会长期保留历史数据,需要在大数据平台保存
- 表字段会被update更新操作
- 需要查看某一个时间点或者时间段的历史快照信息
- 表中的记录变化的比例和频率很小
拉链表和流水表
流水表存放的是一个用户的变更记录,比如在一张流水表中,一天的数据中,会存放一个用户的每条修改记录,但是在拉链表中只有一条记录
缓慢变化维
定义
- 缓慢变化维,简称SCD(Slowly Changing Dimensions)
- 一些维度表的数据不是静态的,而是会随着时间而缓慢地变化,这里的缓慢是相对事实表而言,事实表数据变化的速度比维度表快,这种随着时间发生变化的维度称之为缓慢变化维
- 把处理维度表数据历史变化的问题,称为缓慢变化维问题,简称SCD问题
示例
编号 | 用户ID | 用户名 | 出生日期 | 住址 |
---|---|---|---|---|
001 | 1111 | 张三 | 1998-01-01 | 深圳市 |
这个用户的数据不是一直不变的,而是有可能发生变化,例如用户修改了用户名或者修改了住址
怎么解决SCD
保留初始值(不允许修改)
- 用户体验不好
- 例如在上述示例中,出生日期的数据,始终按照用户第一次填写的数据为准
改写属性值
- 该方法的前提:用户不关心数据变化
- 对其相应需要重写维度行中的旧值,以当前值替换,因此其始终反映最近的情况
- 当一个维度值的数据源发生变化,并且不需要在维度表中保留变化历史时,通常用新数据来覆盖旧数据,这样的处理使属性所反映的中是最新的赋值
增加维度新行
- 典型代表:拉链表
修改前:
编号 | 用户ID | 用户名 | 出生日期 | 住址 |
---|---|---|---|---|
001 | 1111 | 张三 | 1998-01-01 | 深圳市 |
修改后:
编号 | 用户ID | 用户名 | 出生日期 | 住址 |
---|---|---|---|---|
001 | 1111 | 张三 | 1998-01-01 | 深圳市 |
001 | 1111 | 张三 | 1998-01-01 | 深圳市坪山区 |
- 如何区分哪条数据是最新数据?可以考虑拉链表的方式,在后面新增两列
增加维度新列
- 用不同的字段来保存不同的值,就是在表中增加一个字段,这个字段用来保存变化后的当前值,而原来的值则被称为变化前的值
修改前:
编号 | 用户ID | 用户名 | 出生日期 | 住址 |
---|---|---|---|---|
001 | 1111 | 张三 | 1998-01-01 | 深圳市 |
修改后:
编号 | 用户ID | 用户名 | 出生日期 | 住址 | 现住址 |
---|---|---|---|---|---|
001 | 1111 | 张三 | 1998-01-01 | 深圳市 | 深圳市坪山区 |
使用历史表
- 另外建一个表来保存历史记录,这种方式就是将历史数据与当前数据完全分开来,在维度中只保存当前最新的数据
- 优点:可以同时分析当前及前一次变化的属性值
- 缺点:只保留了最后一次变化信息
修改前:
编号 | 用户ID | 用户名 | 出生日期 | 住址 |
---|---|---|---|---|
001 | 1111 | 张三 | 1998-01-01 | 深圳市 |
修改后:
编号 | 用户ID | 用户名 | 出生日期 | 住址 |
---|---|---|---|---|
001 | 1111 | 张三 | 1998-01-01 | 深圳市坪山区 |
修改后历史表:
编号 | 用户ID | 用户名 | 出生日期 | 住址 |
---|---|---|---|---|
001 | 1111 | 张三 | 1998-01-01 | 深圳市 |
拉链表的使用场景
在数据仓库的数据模型设计过程中,经常会遇到下面这种表的设计:
- 有一些表的数据量很大,比如一张用户表,大约10亿条记录,50个字段,这种表,即使使用ORC压缩,单张表的存储也会超过100G,在HDFS使用双备份或者三备份的话就更大一些
- 表中的部分字段会被update更新操作,如用户联系方式,产品的描述信息,订单的状态等等
- 需要查看某一个时间点或者时间段的历史快照信息,比如,查看某一个订单在历史某一个时间点的状态
- 表中的记录变化的比例和频率不是很大,比如,总共有10亿的用户,每天新增和发生变化的有200万左右,变化的比例占的很小
设计方案:
方案一:每天只留最新的一份,比如我们每天抽取最新的一份全量数据到Hive中
- 优点:节省空间,一些普通的使用也很方便,不用在选择表的时候加一个时间分区什么的
- 缺点:没有历史数据,想翻翻旧账只能通过其它方式,比如从流水表里面抽
方案二:每天保留一份全量的切片数据
- 优点:每天一份全量的切片是一种比较稳妥的方案,而且历史数据也在
- 缺点:存储空间占用量太大,如果对这边表每天都保留一份全量,那么每次全量中会保存很多不变的信息,对存储是极大的浪费
方案三:使用拉链表
- 优点:在空间上做了一个取舍,虽说不像方案一那样占用量那么小,但是它每日的增量可能只有方案二的千分之一甚至是万分之一;既能获取最新的数据,也能添加筛选条件也获取历史的数据
拉链表的设计和实现
- 2024-10-11 首次抽取
- 2024-10-12 修改出生日期
- 2024-10-13 修改地址
编号 | 用户ID | 用户名 | 出生日期 | 住址 | 更新时间 |
---|---|---|---|---|---|
001 | 1111 | 张三 | 1998-01-01 | 深圳市 | 2024-10-01 00:00:00 |
001 | 1111 | 张三 | 1998-10-11 | 深圳市 | 2024-10-12 00:00:00 |
001 | 1111 | 张三 | 1998-10-11 | 深圳市坪山区 | 2024-10-13 00:00:00 |
普通拉链表
在后面增加两个字段,即start_date和end_date
编号 | 用户ID | 用户名 | 出生日期 | 住址 | 更新时间 | start_date | end_date |
---|---|---|---|---|---|---|---|
001 | 1111 | 张三 | 1998-01-01 | 深圳市 | 2024-10-01 00:00:00 | 2024-10-11 | 2024-10-12 |
001 | 1111 | 张三 | 1998-10-11 | 深圳市 | 2024-10-12 00:00:00 | 2024-10-12 | 2024-10-13 |
001 | 1111 | 张三 | 1998-10-11 | 深圳市坪山区 | 2024-10-13 00:00:00 | 2024-10-13 | 9999-12-31 |
优化
新增三个字段start_date,end_date,dp,同时拉链表会有个分区字段,分别为end_date和status
- start_date:备注该记录的起始时间,不代表该记录的创建时间,因此该字段为非业务时间
- end_date:记录数据有效期的截止日期
- status:表示数据的当前状态,线上(active),过期(expired),如果数据量太大,可以加一个history归档
增量拉链
CREATE TABLE tmp.temp_ods_user(
`id` int comment 'id',
`user_id` int comment '用户id',
`user_name` string comment '用户名称',
`date_of_birth` string comment '出生日期',
`address_of_birth` string comment '出生地址',
`update_time` string comment '更新时间'
)
comment '测试拉链表,用户信息表ods抽取层'
PARTITIONED BY ( `dt` string COMMENT '增量抽取日期')
stored as orc
;
CREATE TABLE tmp.temp_dw_user_chain(
`start_date` string comment '起始日期',
`change_code` string comment '字段MD5值',
`id` int comment 'id',
`user_id` int comment '用户id',
`user_name` string comment '用户名称',
`date_of_birth` string comment '出生日期',
`address_of_birth` string comment '出生地址',
`update_time` string comment '更新时间'
)
comment '测试拉链表,用户信息表dw明细层拉链处理'
PARTITIONED BY ( `status` string COMMENT '状态'
,`end_date` string COMMENT '截止日期'
)
stored as orc
;
第一次抽取
-- 模拟第一次抽取 ods
insert overwrite table tmp.temp_ods_user partition (dt='2024-10-11')
select 001 as id
,1111 as user_id
,'张三' as user_name
,'1998-01-01' as date_of_birth
,'深圳市' as address_of_birth
,'2024-10-01 00:00:00' as update_time
;
-- 模拟第一次抽取 拉链
insert overwrite table tmp.temp_dw_user_chain partition (status='expired',end_date='2023-05-01')
select
case when h.change_code<>c.change_code then h.start_date else e.start_date end as start_date
,case when h.change_code<>c.change_code then h.change_code else e.change_code end as change_code
,case when h.change_code<>c.change_code then h.id else e.id end as id
,case when h.change_code<>c.change_code then h.user_id else e.user_id end as user_id
,case when h.change_code<>c.change_code then h.user_name else e.user_name end as user_name
,case when h.change_code<>c.change_code then h.date_of_birth else e.date_of_birth end as date_of_birth
,case when h.change_code<>c.change_code then h.address_of_birth else e.address_of_birth end as address_of_birth
,case when h.change_code<>c.change_code then h.update_time else e.update_time end as update_time
from(select *
from tmp.temp_dw_user_chain
where status = 'active'
and id is not null
) h -- 上次的active数据
full join(select `(dt|rank)?+.+`
from (select id,user_id,user_name,date_of_birth,address_of_birth,update_time,change_code
,row_number(id) as rank
from (select *,md5(concat_ws('_',id,user_id,user_name,date_of_birth,address_of_birth,update_time)) as change_code
from tmp.temp_ods_user
where dt = '2024-10-11'
and id is not null
distribute by id sort by id desc
) x
) t
where t.rank = 1
) c -- 抽取的增量数据
on h.id = c.id
full join(select *
from tmp.temp_dw_user_chain
where status='expired'
and end_date='2023-05-01'
) e -- 过期数据
on e.id = c.id
where h.id is not null and c.id is not null and (( h.change_code <> c.change_code ) or ( h.change_code = c.change_code and e.id is not null))
;
insert overwrite table tmp.temp_dw_user_chain partition (status='active',end_date='9999-12-31')
select if(h.id is null or (c.id is not null and (h.change_code <> c.change_code)),'2024-10-11',h.start_date) as start_date
,case
when h.id is null then
c.change_code
when h.id is not null and c.id is not null and h.change_code <> c.change_code then c.change_code
else
h.change_code
end as change_code
,case when c.id is not null then c.id else h.id end as id
,case when c.id is not null then c.user_id else h.user_id end as user_id
,case when c.id is not null then c.user_name else h.user_name end as user_name
,case when c.id is not null then c.date_of_birth else h.date_of_birth end as date_of_birth
,case when c.id is not null then c.address_of_birth else h.address_of_birth end as address_of_birth
,case when c.id is not null then c.update_time else h.update_time end as update_time
from(select *
from tmp.temp_dw_user_chain
where status = 'active'
and id is not null
) h -- 上次的active数据
full join(select `(dt|rank)?+.+`
from (select id,user_id,user_name,date_of_birth,address_of_birth,update_time,change_code
,row_number(id) as rank
from (select *
,md5(concat_ws('_',id,user_id,user_name,date_of_birth,address_of_birth,update_time)) as change_code
from tmp.temp_ods_user
where dt = '2024-10-11'
and id is not null
distribute by id sort by id desc
) x
) t
where t.rank = 1
) c -- 抽取的增量数据
on h.id = c.id
;
-- 拉链任务可能在一天内手工跑多次,当天第一次跑拉链任务时,EXPIRED分区中是没有数据的,此时会将被更新的旧数据写入EXPIRED分区中。当天第二次手工重跑拉链任务时,EXPIRED分区中已有数据,会直接将EXPIRED分区数据写入EXPIRED分区。
-- 拉链SQL中EXPIRED分区是必须使用的。拉链任务当天第二次重跑时ACTIVE分区数据已经更新,不是昨天的状态,不使用EXPIRED分区中已有的数据会清空EXPIRED分区数据。
标签:code,拉链,01,user,date,id,change
From: https://www.cnblogs.com/shihongpin/p/18458344