首页 > 其他分享 >4 . DWD和ADS层

4 . DWD和ADS层

时间:2022-08-14 23:35:49浏览次数:55  
标签:ADS -- gma payment DWD mysql id day

7、DWD层

流表和维表关联,可以使用lookup join ,当存在hbase或者mysql中的表发生改变时,可以动态的发生改变

1、支付事实表

数据仓库建模的方法:

注意:Upsert Kafka 连接器支持以 upsert 方式从 Kafka topic 中读取数据并将数据写入 Kafka topic。从kafka中source并且sink到kafka中去。

学习一个新的函数:listAGG():连接字符串表达式的值并在它们之间放置分隔符值。字符串末尾不添加分隔符时则分隔符的默认值为“,”。

--1、创建kafka sink表

CREATE TABLE gma_dwd.dwd_kafka_payment_info (
id bigint,
user_id BIGINT,
payment_time STRING,
payment_type STRING,
province_id BIGINT,
skus STRING,
payment_price decimal(10,2),
sku_num BIGINT,
proc_time as PROCTIME(),
 PRIMARY KEY (id) NOT ENFORCED-- 设置唯一主键
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'dwd_payment_info',
  'properties.bootstrap.servers' = 'master:9092',
  'key.format' = 'json',
  'value.format' = 'json'
);

--2、关联支付流水表,订单表,订单明细表,构建支付事实表

insert into gma_dwd.dwd_kafka_payment_info
select 
id,
cast(user_id as BIGINT) as user_id,
payment_time,
payment_type,
cast(province_id as BIGINT) as province_id,
listAGG(cast(sku_id as STRING)) as skus,
sum(order_price*sku_num) as payment_price,
sum(sku_num) as sku_num
from (
    select a.id,a.user_id,a.payment_time,a.payment_type,b.province_id,c.sku_id,c.order_price,cast(c.sku_num as bigint) as sku_num  from 
    gma_ods.ods_mysql_kafka_payment_info
    /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as a
    inner join 
    gma_ods.ods_mysql_kafka_order_info
    /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as b
    on cast(a.order_id as bigint)=b.id
    inner join 
    gma_ods.ods_mysql_kafka_order_detail
    /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as c
    on b.id=c.order_id
)
as d
group by 
id,user_id,payment_time,payment_type,province_id;


-- 消费kafka
kafka-console-consumer.sh --bootstrap-server  master:9092,node2:9092,node2:9092 --from-beginning --topic dwd_payment_info

--在flink中查询数据
select * from gma_dwd.dwd_kafka_payment_info  /*+ OPTIONS('scan.startup.mode'='earliest-offset') */

2、订单事实表


-- 1、创建kafka sink表

CREATE TABLE gma_dwd.dwd_kafka_order_info (
id BIGINT,
consignee  STRING,
consignee_tel STRING,
delivery_address STRING,
order_status  STRING,
user_id BIGINT,
payment_way  STRING,
create_time  TIMESTAMP(3),
operate_time  TIMESTAMP(3),
expire_time TIMESTAMP(3),
province_id  BIGINT,
skus STRING,
total_amount decimal(10,2),
proc_time as PROCTIME(),
PRIMARY KEY (id) NOT ENFORCED-- 设置唯一主键
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'dwd_order_info',
  'properties.bootstrap.servers' = 'master:9092',
  'key.format' = 'json',
  'value.format' = 'json'
);


-- 执行sql
insert into gma_dwd.dwd_kafka_order_info
select 
a.id,
a.consignee,
a.consignee_tel,
a.delivery_address,
a.order_status,
a.user_id,
a.payment_way,
a.create_time,
a.operate_time,
a.expire_time,
cast(a.province_id as bigint),
b.skus,
a.total_amount
from 
gma_ods.ods_mysql_kafka_order_info
/*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as a
join 
(
    select order_id,listagg(cast(sku_id as STRING)) as skus from 
    gma_ods.ods_mysql_kafka_order_detail
    /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ 
    group by order_id
)
as b
on a.id=b.order_id


8、ADS层

  • 统计指标的方法论

    原子指标:下单金额,支付金额

    派生指标=原子指标+统计周期+业务限定+统计维度

先在mysql中创建数据库gma_ads

1、支付金额

  • 实时计算每个用户每天实时的支付金额
  • 实时计算每个地区每天的支付金额
  • 实时计算每种支付方式每天支付金额
  • 实时统计每个大区每天的支付金额
  • 实时统计不同性别每天支付金额
1、实时计算每个用户每天实时的支付金额
-- 1、创建msyql sink表
-- flink sql  jdbc sink表
CREATE TABLE gma_ads.ads_mysql_user_day_sum_payment_price (
  user_id BIGINT,
  day_id STRING,
  sum_payment_price decimal(10,2),
  PRIMARY KEY (user_id,day_id) NOT ENFORCED -- 按照主键更新数据
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://master:3306/gma_ads?useUnicode=true&characterEncoding=UTF-8',
   'table-name' = 'ads_mysql_user_day_sum_payment_price', -- 需要手动到数据库中创建表
   'username' = 'root',
   'password' = '123456'
);


-- 在mysql中创建表
CREATE TABLE `ads_mysql_user_day_sum_payment_price` (
  `user_id` BIGINT NOT NULL,
  `day_id` varchar(255)  NOT NULL,
  `sum_payment_price` decimal(10,2),
  PRIMARY KEY (`user_id`,`day_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

--实时统计
insert into gma_ads.ads_mysql_user_day_sum_payment_price
select 
user_id,
substr(payment_time,1,10) as day_id,
sum(payment_price) as sum_payment_price
from gma_dwd.dwd_kafka_payment_info
group by user_id,substr(payment_time,1,10);
2、实时计算每个地区每天的支付金额
-- 1、创建msyql sink表
-- flink sql  jdbc sink表
CREATE TABLE gma_ads.ads_mysql_proc_day_sum_payment_price (
  pro_name STRING,
  day_id STRING,
  sum_payment_price decimal(10,2),
  PRIMARY KEY (pro_name,day_id) NOT ENFORCED -- 按照主键更新数据
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://master:3306/gma_ads?useUnicode=true&characterEncoding=UTF-8',
   'table-name' = 'ads_mysql_proc_day_sum_payment_price', -- 需要手动到数据库中创建表
   'username' = 'root',
   'password' = '123456'
);


-- 在mysql中创建表
CREATE TABLE `ads_mysql_proc_day_sum_payment_price` (
  `pro_name`  varchar(255) NOT NULL,
  `day_id` varchar(255)  NOT NULL,
  `sum_payment_price` decimal(10,2),
  PRIMARY KEY (`pro_name`,`day_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

--实时统计
-- 实时关联读取维度表获取省名
insert into gma_ads.ads_mysql_proc_day_sum_payment_price
select 
b.info.pro_name as pro_name,
substr(payment_time,1,10) as day_id,
sum(payment_price) as sum_payment_price
from gma_dwd.dwd_kafka_payment_info as a
LEFT JOIN
gma_dim.dim_hbase_region  FOR SYSTEM_TIME AS OF a.proc_time as b
on a.province_id=b.pro_id
group by b.info.pro_name,substr(payment_time,1,10);

2、下单笔数

  • 每个省份每天实时下单的数量
  • 每个大区每天实时下单的数量
  • 实时统计每个品牌每天下单的数量
  • 实时统计每个用户每天下单的数量
1、每个省份每天实时下单的数量
-- 创建mysql sin表
CREATE TABLE gma_ads.ads_mysql_proc_day_order_num (
  pro_name STRING,
  day_id STRING,
  num bigint,
  PRIMARY KEY (pro_name,day_id) NOT ENFORCED -- 按照主键更新数据
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://master:3306/gma_ads?useUnicode=true&characterEncoding=UTF-8',
   'table-name' = 'ads_mysql_proc_day_order_num', -- 需要手动到数据库中创建表
   'username' = 'root',
   'password' = '123456'
);


-- 在mysql中创建表
CREATE TABLE `ads_mysql_proc_day_order_num` (
  `pro_name`  varchar(255) NOT NULL,
  `day_id` varchar(255)  NOT NULL,
  `num` bigint,
  PRIMARY KEY (`pro_name`,`day_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

2、每个大区每天实时下单的数量
-- 创建mysql sin表
CREATE TABLE gma_ads.ads_mysql_region_day_order_num (
  region_name STRING,
  day_id STRING,
  num bigint,
  PRIMARY KEY (region_name,day_id) NOT ENFORCED -- 按照主键更新数据
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://master:3306/gma_ads?useUnicode=true&characterEncoding=UTF-8',
   'table-name' = 'ads_mysql_region_day_order_num', -- 需要手动到数据库中创建表
   'username' = 'root',
   'password' = '123456'
);


-- 在mysql中创建表
CREATE TABLE `ads_mysql_region_day_order_num` (
  `region_name`  varchar(255) NOT NULL,
  `day_id` varchar(255)  NOT NULL,
  `num` bigint,
  PRIMARY KEY (`region_name`,`day_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;


3、实时统计每个品牌每天下单的数量
LOAD MODULE hive WITH ('hive-version' = '1.2.1');


select c.info.tm_name as tm_name,day_id,count(1) as num from  (
    select 
    sku_id,
    DATE_FORMAT(create_time,'yyyy-MM-dd') as day_id,
    PROCTIME() as proc_time
    from 
    gma_dwd.dwd_kafka_order_info as a,
    LATERAL TABLE(explode(split(skus,','))) t(sku_id)
) as b
left join 
gma_dim.dim_hbase_item_info  FOR SYSTEM_TIME AS OF b.proc_time as c
on cast(b.sku_id as bigint)=c.sku_id
group by c.info.tm_name,day_id
在一个任务中执行多条sql
EXECUTE STATEMENT SET 
BEGIN
-- 每个省份每天实时下单的数量
insert into gma_ads.ads_mysql_proc_day_order_num
select 
b.info.pro_name,
DATE_FORMAT(create_time,'yyyy-MM-dd') as day_id,
count(1) as num
from 
gma_dwd.dwd_kafka_order_info as a
left join 
gma_dim.dim_hbase_region  FOR SYSTEM_TIME AS OF a.proc_time as b
on a.province_id=b.pro_id
group by b.info.pro_name,DATE_FORMAT(create_time,'yyyy-MM-dd');

-- 每个大区每天实时下单的数量
insert into gma_ads.ads_mysql_region_day_order_num
select 
b.info.region_name,
DATE_FORMAT(create_time,'yyyy-MM-dd') as day_id,
count(1) as num
from 
gma_dwd.dwd_kafka_order_info as a
left join 
gma_dim.dim_hbase_region  FOR SYSTEM_TIME AS OF a.proc_time as b
on a.province_id=b.pro_id
group by b.info.region_name,DATE_FORMAT(create_time,'yyyy-MM-dd');

END;

9、问题

1、flink将更新的数据批量写入hbase时数据丢失的问题 (BUG)

  • 解决方法:将批量写入改成条写入
    • 'sink.buffer-flush.max-rows'='0'

标签:ADS,--,gma,payment,DWD,mysql,id,day
From: https://www.cnblogs.com/atao-BigData/p/16586698.html

相关文章