7、DWD层
流表和维表关联,可以使用lookup join ,当存在hbase或者mysql中的表发生改变时,可以动态的发生改变
1、支付事实表
数据仓库建模的方法:
注意:Upsert Kafka 连接器支持以 upsert 方式从 Kafka topic 中读取数据并将数据写入 Kafka topic。从kafka中source并且sink到kafka中去。
学习一个新的函数:listAGG():连接字符串表达式的值并在它们之间放置分隔符值。字符串末尾不添加分隔符时则分隔符的默认值为“,”。
--1、创建kafka sink表
CREATE TABLE gma_dwd.dwd_kafka_payment_info (
id bigint,
user_id BIGINT,
payment_time STRING,
payment_type STRING,
province_id BIGINT,
skus STRING,
payment_price decimal(10,2),
sku_num BIGINT,
proc_time as PROCTIME(),
PRIMARY KEY (id) NOT ENFORCED-- 设置唯一主键
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'dwd_payment_info',
'properties.bootstrap.servers' = 'master:9092',
'key.format' = 'json',
'value.format' = 'json'
);
--2、关联支付流水表,订单表,订单明细表,构建支付事实表
insert into gma_dwd.dwd_kafka_payment_info
select
id,
cast(user_id as BIGINT) as user_id,
payment_time,
payment_type,
cast(province_id as BIGINT) as province_id,
listAGG(cast(sku_id as STRING)) as skus,
sum(order_price*sku_num) as payment_price,
sum(sku_num) as sku_num
from (
select a.id,a.user_id,a.payment_time,a.payment_type,b.province_id,c.sku_id,c.order_price,cast(c.sku_num as bigint) as sku_num from
gma_ods.ods_mysql_kafka_payment_info
/*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as a
inner join
gma_ods.ods_mysql_kafka_order_info
/*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as b
on cast(a.order_id as bigint)=b.id
inner join
gma_ods.ods_mysql_kafka_order_detail
/*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as c
on b.id=c.order_id
)
as d
group by
id,user_id,payment_time,payment_type,province_id;
-- 消费kafka
kafka-console-consumer.sh --bootstrap-server master:9092,node2:9092,node2:9092 --from-beginning --topic dwd_payment_info
--在flink中查询数据
select * from gma_dwd.dwd_kafka_payment_info /*+ OPTIONS('scan.startup.mode'='earliest-offset') */
2、订单事实表
-- 1、创建kafka sink表
CREATE TABLE gma_dwd.dwd_kafka_order_info (
id BIGINT,
consignee STRING,
consignee_tel STRING,
delivery_address STRING,
order_status STRING,
user_id BIGINT,
payment_way STRING,
create_time TIMESTAMP(3),
operate_time TIMESTAMP(3),
expire_time TIMESTAMP(3),
province_id BIGINT,
skus STRING,
total_amount decimal(10,2),
proc_time as PROCTIME(),
PRIMARY KEY (id) NOT ENFORCED-- 设置唯一主键
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'dwd_order_info',
'properties.bootstrap.servers' = 'master:9092',
'key.format' = 'json',
'value.format' = 'json'
);
-- 执行sql
insert into gma_dwd.dwd_kafka_order_info
select
a.id,
a.consignee,
a.consignee_tel,
a.delivery_address,
a.order_status,
a.user_id,
a.payment_way,
a.create_time,
a.operate_time,
a.expire_time,
cast(a.province_id as bigint),
b.skus,
a.total_amount
from
gma_ods.ods_mysql_kafka_order_info
/*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as a
join
(
select order_id,listagg(cast(sku_id as STRING)) as skus from
gma_ods.ods_mysql_kafka_order_detail
/*+ OPTIONS('scan.startup.mode'='earliest-offset') */
group by order_id
)
as b
on a.id=b.order_id
8、ADS层
-
统计指标的方法论
原子指标:下单金额,支付金额
派生指标=原子指标+统计周期+业务限定+统计维度
先在mysql中创建数据库gma_ads
1、支付金额
- 实时计算每个用户每天实时的支付金额
- 实时计算每个地区每天的支付金额
- 实时计算每种支付方式每天支付金额
- 实时统计每个大区每天的支付金额
- 实时统计不同性别每天支付金额
1、实时计算每个用户每天实时的支付金额
-- 1、创建msyql sink表
-- flink sql jdbc sink表
CREATE TABLE gma_ads.ads_mysql_user_day_sum_payment_price (
user_id BIGINT,
day_id STRING,
sum_payment_price decimal(10,2),
PRIMARY KEY (user_id,day_id) NOT ENFORCED -- 按照主键更新数据
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://master:3306/gma_ads?useUnicode=true&characterEncoding=UTF-8',
'table-name' = 'ads_mysql_user_day_sum_payment_price', -- 需要手动到数据库中创建表
'username' = 'root',
'password' = '123456'
);
-- 在mysql中创建表
CREATE TABLE `ads_mysql_user_day_sum_payment_price` (
`user_id` BIGINT NOT NULL,
`day_id` varchar(255) NOT NULL,
`sum_payment_price` decimal(10,2),
PRIMARY KEY (`user_id`,`day_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
--实时统计
insert into gma_ads.ads_mysql_user_day_sum_payment_price
select
user_id,
substr(payment_time,1,10) as day_id,
sum(payment_price) as sum_payment_price
from gma_dwd.dwd_kafka_payment_info
group by user_id,substr(payment_time,1,10);
2、实时计算每个地区每天的支付金额
-- 1、创建msyql sink表
-- flink sql jdbc sink表
CREATE TABLE gma_ads.ads_mysql_proc_day_sum_payment_price (
pro_name STRING,
day_id STRING,
sum_payment_price decimal(10,2),
PRIMARY KEY (pro_name,day_id) NOT ENFORCED -- 按照主键更新数据
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://master:3306/gma_ads?useUnicode=true&characterEncoding=UTF-8',
'table-name' = 'ads_mysql_proc_day_sum_payment_price', -- 需要手动到数据库中创建表
'username' = 'root',
'password' = '123456'
);
-- 在mysql中创建表
CREATE TABLE `ads_mysql_proc_day_sum_payment_price` (
`pro_name` varchar(255) NOT NULL,
`day_id` varchar(255) NOT NULL,
`sum_payment_price` decimal(10,2),
PRIMARY KEY (`pro_name`,`day_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
--实时统计
-- 实时关联读取维度表获取省名
insert into gma_ads.ads_mysql_proc_day_sum_payment_price
select
b.info.pro_name as pro_name,
substr(payment_time,1,10) as day_id,
sum(payment_price) as sum_payment_price
from gma_dwd.dwd_kafka_payment_info as a
LEFT JOIN
gma_dim.dim_hbase_region FOR SYSTEM_TIME AS OF a.proc_time as b
on a.province_id=b.pro_id
group by b.info.pro_name,substr(payment_time,1,10);
2、下单笔数
- 每个省份每天实时下单的数量
- 每个大区每天实时下单的数量
- 实时统计每个品牌每天下单的数量
- 实时统计每个用户每天下单的数量
1、每个省份每天实时下单的数量
-- 创建mysql sin表
CREATE TABLE gma_ads.ads_mysql_proc_day_order_num (
pro_name STRING,
day_id STRING,
num bigint,
PRIMARY KEY (pro_name,day_id) NOT ENFORCED -- 按照主键更新数据
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://master:3306/gma_ads?useUnicode=true&characterEncoding=UTF-8',
'table-name' = 'ads_mysql_proc_day_order_num', -- 需要手动到数据库中创建表
'username' = 'root',
'password' = '123456'
);
-- 在mysql中创建表
CREATE TABLE `ads_mysql_proc_day_order_num` (
`pro_name` varchar(255) NOT NULL,
`day_id` varchar(255) NOT NULL,
`num` bigint,
PRIMARY KEY (`pro_name`,`day_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
2、每个大区每天实时下单的数量
-- 创建mysql sin表
CREATE TABLE gma_ads.ads_mysql_region_day_order_num (
region_name STRING,
day_id STRING,
num bigint,
PRIMARY KEY (region_name,day_id) NOT ENFORCED -- 按照主键更新数据
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://master:3306/gma_ads?useUnicode=true&characterEncoding=UTF-8',
'table-name' = 'ads_mysql_region_day_order_num', -- 需要手动到数据库中创建表
'username' = 'root',
'password' = '123456'
);
-- 在mysql中创建表
CREATE TABLE `ads_mysql_region_day_order_num` (
`region_name` varchar(255) NOT NULL,
`day_id` varchar(255) NOT NULL,
`num` bigint,
PRIMARY KEY (`region_name`,`day_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
3、实时统计每个品牌每天下单的数量
LOAD MODULE hive WITH ('hive-version' = '1.2.1');
select c.info.tm_name as tm_name,day_id,count(1) as num from (
select
sku_id,
DATE_FORMAT(create_time,'yyyy-MM-dd') as day_id,
PROCTIME() as proc_time
from
gma_dwd.dwd_kafka_order_info as a,
LATERAL TABLE(explode(split(skus,','))) t(sku_id)
) as b
left join
gma_dim.dim_hbase_item_info FOR SYSTEM_TIME AS OF b.proc_time as c
on cast(b.sku_id as bigint)=c.sku_id
group by c.info.tm_name,day_id
在一个任务中执行多条sql
EXECUTE STATEMENT SET
BEGIN
-- 每个省份每天实时下单的数量
insert into gma_ads.ads_mysql_proc_day_order_num
select
b.info.pro_name,
DATE_FORMAT(create_time,'yyyy-MM-dd') as day_id,
count(1) as num
from
gma_dwd.dwd_kafka_order_info as a
left join
gma_dim.dim_hbase_region FOR SYSTEM_TIME AS OF a.proc_time as b
on a.province_id=b.pro_id
group by b.info.pro_name,DATE_FORMAT(create_time,'yyyy-MM-dd');
-- 每个大区每天实时下单的数量
insert into gma_ads.ads_mysql_region_day_order_num
select
b.info.region_name,
DATE_FORMAT(create_time,'yyyy-MM-dd') as day_id,
count(1) as num
from
gma_dwd.dwd_kafka_order_info as a
left join
gma_dim.dim_hbase_region FOR SYSTEM_TIME AS OF a.proc_time as b
on a.province_id=b.pro_id
group by b.info.region_name,DATE_FORMAT(create_time,'yyyy-MM-dd');
END;
9、问题
1、flink将更新的数据批量写入hbase时数据丢失的问题 (BUG)
- 解决方法:将批量写入改成条写入
- 'sink.buffer-flush.max-rows'='0'