1、MySQL创建数据库
-- 创建数据库
CREATE DATABASE demo DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;
-- 创建商品表
create table if not exists `demo`.`t_product_2`(
goods_id varchar(50), -- 商品编号
goods_status varchar(50), -- 商品状态
createtime varchar(50), -- 商品创建时间
modifytime varchar(50) -- 商品修改时间
)ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ;
-- hive 创建库表
create database if not exists `demo`;
================== 创建ods层表
create table if not exists `demo`.`ods_product_2`(
goods_id string, -- 商品编号
goods_status string, -- 商品状态
createtime string, -- 商品创建时间
modifytime string -- 商品修改时间
)
partitioned by (dt string) --按照天分区
--row format delimited fields terminated by ',' stored as TEXTFILE; 最好用默认方式,不然在用datax同步的时候需要修改 "fieldDelimiter": "," 不然数据会在一个字段里面 (默认分割符号:“\n0001”)
-- 创建拉链表
create table if not exists `demo`.`dw_product_2`(
goods_id string, -- 商品编号
goods_status string, -- 商品状态
createtime string, -- 商品创建时间
modifytime string, -- 商品修改时间
dw_start_date string, -- 生效日期
dw_end_date string -- 失效日期
)
row format delimited fields terminated by ',' stored as TEXTFILE;
---往mysql导入数据
insert into `demo`.`t_product_2`(goods_id, goods_status, createtime, modifytime) values
('001', '待审核', '2022-10-18', '2022-10-20'),
('002', '待售', '2022-10-19', '2022-10-20'),
('003', '在售', '2022-10-20', '2022-10-20'),
('004', '已删除', '2022-10-15', '2022-10-20');
-- hive 创建分区
alter table `demo`.`ods_product_2` add if not exists partition (dt='${dt}');
alter table `demo`.`ods_product_2` add if not exists partition (dt='2022-10-20');
--用datax往ods里面同步数据;
{
"job": {
"setting": {
"speed": {
"channel": 1,
"byte": -1
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "${reader_username}",
"password": "${reader_password}",
"column": [
"`goods_id`",
"`goods_status`",
"`createtime`",
"`modifytime`",
"`goods_id`"
],
"splitPk": "",
"connection": [
{
"table": [
"t_product_2"
],
"jdbcUrl": [
"${reader_jdbc_url}"
]
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://casickp",
"fileType": "text",
"compress": "",
"path": "/user/hive/warehouse/demo.db/ods_product_2/dt=2022-10-20",
"fileName": "ods_product_2",
"writeMode": "append",
"fieldDelimiter": ",",
"column": [
{
"name": "goods_id",
"type": "string"
},
{
"name": "goods_status",
"type": "string"
},
{
"name": "createtime",
"type": "string"
},
{
"name": "modifytime",
"type": "string"
},
{
"name": "dt",
"type": "string"
}
],
"hadoopConfig": {
"dfs.nameservices": "casickp",
"dfs.ha.namenodes.casickp": "nn1,nn2",
"dfs.namenode.rpc-address.casickp.nn1": "node1:9000",
"dfs.namenode.rpc-address.casickp.nn2": "node2:9000",
"dfs.client.failover.proxy.provider.casickp": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
}
}
}
}
]
}
}
-- 从ods层导入dw当天最新数据
insert overwrite table `demo`.`dw_product_2`
select
goods_id, -- 商品编号
goods_status, -- 商品状态
createtime, -- 商品创建时间
modifytime, -- 商品修改时间
modifytime as dw_start_date, -- 生效日期
'9999-12-31' as dw_end_date -- 失效日期
from
`demo`.`ods_product_2`
where
dt = '2022-10-20';
增量导入MySQL 2022年10月21日数据
UPDATE `demo`.`t_product_2` SET goods_status = '待售', modifytime = '2019-12-21' WHERE goods_id = '001';
INSERT INTO `demo`.`t_product_2`(goods_id, goods_status, createtime, modifytime) VALUES
('005', '待审核', '2022-10-21', '2022-10-21'),
('006', '待审核', '2022-10-21', '2022-10-21');
-- 创建分区
alter table `demo`.`ods_product_2` add if not exists partition (dt='${dt}');
alter table `demo`.`ods_product_2` add if not exists partition (dt='2022-10-21');
用datax增量导入2022年10月21日数据
编写SQL处理dw层历史数据,重新计算之前的dw_end_date
-- 重新计算dw层拉链表中的失效时间
select
t1.goods_id, -- 商品编号
t1.goods_status, -- 商品状态
t1.createtime, -- 商品创建时间
t1.modifytime, -- 商品修改时间
t1.dw_start_date, -- 生效日期(生效日期无需重新计算)
case when (t2.goods_id is not null and t1.dw_end_date > '2022-10-21')
then '2022-10-21'
else t1.dw_end_date
end as dw_end_date -- 更新生效日期(需要重新计算)
from
`demo`.`dw_product_2` t1
left join
(select * from `demo`.`ods_product_2` where dt='2019-12-21') t2
on t1.goods_id = t2.goods_id
合并当天最新的数据和历史数据到dw_product_2
insert overwrite table `demo`.`dw_product_2`
select
t1.goods_id, -- 商品编号
t1.goods_status, -- 商品状态
t1.createtime, -- 商品创建时间
t1.modifytime, -- 商品修改时间
t1.dw_start_date, -- 生效日期(生效日期无需重新计算)
case when (t2.goods_id is not null and t1.dw_end_date > '2022-10-21')
then '2022-10-21'
else t1.dw_end_date
end as dw_end_date -- 更新生效日期(需要重新计算)
from
`demo`.`dw_product_2` t1
left join
(select * from `demo`.`ods_product_2` where dt='2022-10-21') t2
on t1.goods_id = t2.goods_id
union all
select
goods_id, -- 商品编号
goods_status, -- 商品状态
createtime, -- 商品创建时间
modifytime, -- 商品修改时间
modifytime as dw_start_date, -- 生效日期
'9999-12-31' as dw_end_date -- 失效日期
from
`demo`.`ods_product_2` where dt='2022-10-21'
order by dw_start_date, goods_id;
标签:10,product,goods,拉链,--,hive,2022,dw
From: https://www.cnblogs.com/dhcc/p/16813055.html