首页 > 其他分享 >3、构建实时数据仓库-ods和dim层构建

3、构建实时数据仓库-ods和dim层构建

时间:2022-08-14 20:57:41浏览次数:53  
标签:dim gma ods 数据仓库 kafka 构建 mysql id

3、构建实时数据仓库

项目平台搭建架构及其总体流程

1、flink整合hive的catalog

因为本项目中的对应kafka中的表都存在hive的元数据库中,所以需要创建一个hive的catalog,方便直接操作

-- 进入sql客户端
sql-client.sh 

-- 创建hive catalog
CREATE CATALOG hive_catalog WITH (
    'type' = 'hive',
    'default-database' = 'default',
    'hive-conf-dir' = '/usr/local/soft/hive-1.2.1/conf'
);
-- set the HiveCatalog as the current catalog of the session
USE CATALOG hive_catalog;


-- 创建一个flink_init.sql文件,将hive catalog放进去,后面再启动sql-client时指定sql文件

2、在hviecatalog中创建数据库

离线数据仓库:每一个库对应一个用户,每一个库对应hdfs中一个目录

实时数据仓库每一个层的数据保存在不同位置,ods,dwd,dws保存在kafkadim,ads的数据保存在数据库中

create database gma_ods;
create database gma_dwd;
create database gma_dws;
create database gma_dim;
create database gma_ads;

4、数据采集

必须现在mysql中创建数据库,然后打开canal,实时监控数据库,当我们对于这个库做任何操作,kafka中就会实时同步我们的操作(通过json格式)

# 1、在mysql中创建数据库gma,指定编码格式为utf-8
# 2、修改canal配置文件
cd /usr/local/soft/canal/conf/example
# 修改配置文件
vim instance.properties
# 增加动态topic配置,每个表在kafka中生成一个topic
canal.mq.dynamicTopic=gma\\..*


# 3、开启mysqlbinlog日志
vim /etc/my.cnf 
# 在配置文件中增加二配置
# 需要将配置放在[mysqld]后面
# 打开binlog
log-bin=mysql-bin
# 选择ROW(行)模式
binlog-format=ROW
# 重启mysql
service mysqld restart

# 4、启动canal
cd /usr/local/soft/canal/bin
./restart.sh

# 5、在数据库中创建表
# 执行下面这个sql文件
init_mysql_table.sql

# 6、查看topic是否生成
kafka-topics.sh --list  --zookeeper  master:2181
# 如果topic没有生成,检测前面哪里出了问题,创建表之后必须要有topic

# 7、导入数据到mysql中
# 执行sql文件
load_data.sql

# 8、使用kafka控制台消费者消费数据
kafka-console-consumer.sh --bootstrap-server  master:9092 --from-beginning --topic gma.base_category1

5、ODS层

  • 在flink sql中创建ods的表
# 进入sql-client
sql-client.sh -i flink_init.sql

# 创建ods层所有的表
ods_mysql_kafka_base_category1:商品一级分类表
ods_mysql_kafka_base_category2:商品二级分类表
ods_mysql_kafka_base_category3:商品三级分类表
ods_mysql_kafka_base_province:省份配置报表
ods_mysql_kafka_base_region:地区配置表
ods_mysql_kafka_base_trademark:品牌表
ods_mysql_kafka_date_info:时间配置表
ods_mysql_kafka_holiday_info:节假日表
ods_mysql_kafka_holiday_year:节假日对应时间表
ods_mysql_kafka_order_detail:订单详情表,一个订单中一个商品一条数据
ods_mysql_kafka_order_info:订单表。一个订单一条数据,订单表中有订单状态
ods_mysql_kafka_order_status_log:订单转台变化日志记录表
ods_mysql_kafka_payment_info:支付流水表
ods_mysql_kafka_sku_info:商品信息表
ods_mysql_kafka_user_info:用户信息表

6、DIM层

每个kafka中的数据对应我在ods层中的一张表,然后我们可以通过sql对ods层进行操作,为dim,dwd层提供数据

注意:我们在构建dim层的维表并将维表的数据写入到hbase的时候,发现一个错误。就是写入到hbase的数据会变少。这可能是一个bug。

  • 将维度表单独保存到一个hbae的命名空间(相当于库)中

    # 进入hbase shell
    habse shell
    # 创建命名空间
    create_namespace 'gma_dim'
    

1、地区维度表

将省份表和地区表合并成地区维度表(维度退化)
从kafka中读取数据合并两个表,将合并之后的地区维度表保存到hbae中

-- 1、在hbase中创建表
create 'gma_dim.dim_region','info'

-- 2、在flink实时数据仓库的dim层创建地区维度表
-- 创建hbase sink表
CREATE TABLE gma_dim.dim_hbase_region (
 pro_id BIGINT,
 info ROW<pro_name STRING,region_id BIGINT,region_name STRING,area_code STRING>,
 PRIMARY KEY (pro_id) NOT ENFORCED
) WITH (
 'connector' = 'hbase-1.4',
 'table-name' = 'gma_dim.dim_region',
 'zookeeper.quorum' = 'master:2181'
);


-- 3、编写flinksql 合并地区表和省份表,将数据保存到地区维度表中
insert into gma_dim.dim_hbase_region
select pro_id,ROW(pro_name,region_id,region_name,area_code) from (
select b.id as pro_id,b.name as pro_name,region_id,region_name,area_code from
gma_ods.ods_mysql_kafka_base_region /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as a
inner join 
gma_ods.ods_mysql_kafka_base_province /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as b
on
a.id=b.region_id
) as c

--4、到hbase中查看是否有数据
scan 'gma_dim.dim_region'

2、商品维度表

商品表、品类表、spu表、商品三级分类、商品二级分类、商品一级分类表退化为商品维度表

维度退化:将多个维度表退化成一个维度表

-- 1、在hbsae中创建商品维度表
create 'gma_dim.dim_item_info','info'

-- 2、在flink实时数据仓库中创建商品维度表
CREATE TABLE gma_dim.dim_hbase_item_info (
 sku_id bigint,
 info ROW<spu_id bigint, price decimal(10,0) , sku_name STRING, sku_desc STRING, weight  decimal(10,2), tm_id bigint, tm_name STRING, category3_id bigint, category3_name STRING, category2_id bigint, category2_name STRING, category1_id bigint, category1_name STRING, sku_default_img STRING, create_time TIMESTAMP(3) >,
 PRIMARY KEY (sku_id) NOT ENFORCED
) WITH (
 'connector' = 'hbase-1.4',
 'table-name' = 'gma_dim.dim_item_info',
 'zookeeper.quorum' = 'master:2181',
 'sink.buffer-flush.max-rows'='0'
);


--3、关联多张表得到商品维度表
insert into  gma_dim.dim_hbase_item_info 
select 
sku_id,ROW(spu_id ,  price,  sku_name,  sku_desc,  weight,  tm_id,  tm_name,  category3_id,  category3_name,  category2_id,  category2_name,  category1_id,  category1_name,  sku_default_img,  create_time) as info
from (
select 
a.id as sku_id,
a.spu_id,
a.price,
a.sku_name,
a.sku_desc,
a.weight,
a.tm_id,
b.tm_name,
a.category3_id,
c.name as category3_name,
d.id as category2_id,
d.name as category2_name,
e.id as category1_id,
e.name as category1_name,
a.sku_default_img,
a.create_time
from 
gma_ods.ods_mysql_kafka_sku_info 
/*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as a
inner join 
gma_ods.ods_mysql_kafka_base_trademark 
/*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as b
on a.tm_id=cast(b.tm_id as bigint)
inner join 
gma_ods.ods_mysql_kafka_base_category3
/*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as c
on a.category3_id=c.id
inner join 
gma_ods.ods_mysql_kafka_base_category2
/*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as d
on c.category2_id=d.id
inner join 
gma_ods.ods_mysql_kafka_base_category1
/*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as e
on d.category1_id=e.id
) as f;

3、用户维度表


--1、在hbase中创建用户维度表
create 'gma_dim.dim_user_info','info'

--2、在flink实时数据仓库中创建用户维度表
CREATE TABLE gma_dim.dim_hbase_user_info (
 user_id BIGINT,
 info ROW<login_name STRING , nick_name STRING , passwd STRING , name STRING , phone_num STRING , email STRING , head_img STRING , user_level STRING , birthday DATE , gender STRING , create_time TIMESTAMP(3)>,
 PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
 'connector' = 'hbase-1.4',
 'table-name' = 'gma_dim.dim_user_info',
 'zookeeper.quorum' = 'master:2181'
);

--3、编写flink sql读取ods层用户信息表将数据保存到hbae中构建用户维度表
insert into gma_dim.dim_hbase_user_info
select id as user_id,ROW(login_name,nick_name,passwd,name,phone_num,email,head_img,user_level,birthday,gender,create_time) from 
gma_ods.ods_mysql_kafka_user_info
/*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as a
;

标签:dim,gma,ods,数据仓库,kafka,构建,mysql,id
From: https://www.cnblogs.com/atao-BigData/p/16586279.html

相关文章