3、构建实时数据仓库
项目平台搭建架构及其总体流程
1、flink整合hive的catalog
因为本项目中的对应kafka中的表都存在hive的元数据库中,所以需要创建一个hive的catalog,方便直接操作
-- 进入sql客户端
sql-client.sh
-- 创建hive catalog
CREATE CATALOG hive_catalog WITH (
'type' = 'hive',
'default-database' = 'default',
'hive-conf-dir' = '/usr/local/soft/hive-1.2.1/conf'
);
-- set the HiveCatalog as the current catalog of the session
USE CATALOG hive_catalog;
-- 创建一个flink_init.sql文件,将hive catalog放进去,后面再启动sql-client时指定sql文件
2、在hviecatalog中创建数据库
离线数据仓库:每一个库对应一个用户,每一个库对应hdfs中一个目录
实时数据仓库每一个层的数据保存在不同位置,ods,dwd,dws保存在kafka,dim,ads的数据保存在数据库中
create database gma_ods;
create database gma_dwd;
create database gma_dws;
create database gma_dim;
create database gma_ads;
4、数据采集
必须现在mysql中创建数据库,然后打开canal,实时监控数据库,当我们对于这个库做任何操作,kafka中就会实时同步我们的操作(通过json格式)
# 1、在mysql中创建数据库gma,指定编码格式为utf-8
# 2、修改canal配置文件
cd /usr/local/soft/canal/conf/example
# 修改配置文件
vim instance.properties
# 增加动态topic配置,每个表在kafka中生成一个topic
canal.mq.dynamicTopic=gma\\..*
# 3、开启mysqlbinlog日志
vim /etc/my.cnf
# 在配置文件中增加二配置
# 需要将配置放在[mysqld]后面
# 打开binlog
log-bin=mysql-bin
# 选择ROW(行)模式
binlog-format=ROW
# 重启mysql
service mysqld restart
# 4、启动canal
cd /usr/local/soft/canal/bin
./restart.sh
# 5、在数据库中创建表
# 执行下面这个sql文件
init_mysql_table.sql
# 6、查看topic是否生成
kafka-topics.sh --list --zookeeper master:2181
# 如果topic没有生成,检测前面哪里出了问题,创建表之后必须要有topic
# 7、导入数据到mysql中
# 执行sql文件
load_data.sql
# 8、使用kafka控制台消费者消费数据
kafka-console-consumer.sh --bootstrap-server master:9092 --from-beginning --topic gma.base_category1
5、ODS层
- 在flink sql中创建ods的表
# 进入sql-client
sql-client.sh -i flink_init.sql
# 创建ods层所有的表
ods_mysql_kafka_base_category1:商品一级分类表
ods_mysql_kafka_base_category2:商品二级分类表
ods_mysql_kafka_base_category3:商品三级分类表
ods_mysql_kafka_base_province:省份配置报表
ods_mysql_kafka_base_region:地区配置表
ods_mysql_kafka_base_trademark:品牌表
ods_mysql_kafka_date_info:时间配置表
ods_mysql_kafka_holiday_info:节假日表
ods_mysql_kafka_holiday_year:节假日对应时间表
ods_mysql_kafka_order_detail:订单详情表,一个订单中一个商品一条数据
ods_mysql_kafka_order_info:订单表。一个订单一条数据,订单表中有订单状态
ods_mysql_kafka_order_status_log:订单转台变化日志记录表
ods_mysql_kafka_payment_info:支付流水表
ods_mysql_kafka_sku_info:商品信息表
ods_mysql_kafka_user_info:用户信息表
6、DIM层
每个kafka中的数据对应我在ods层中的一张表,然后我们可以通过sql对ods层进行操作,为dim,dwd层提供数据
注意:我们在构建dim层的维表并将维表的数据写入到hbase的时候,发现一个错误。就是写入到hbase的数据会变少。这可能是一个bug。
-
将维度表单独保存到一个hbae的命名空间(相当于库)中
# 进入hbase shell habse shell # 创建命名空间 create_namespace 'gma_dim'
1、地区维度表
将省份表和地区表合并成地区维度表(维度退化)
从kafka中读取数据合并两个表,将合并之后的地区维度表保存到hbae中
-- 1、在hbase中创建表
create 'gma_dim.dim_region','info'
-- 2、在flink实时数据仓库的dim层创建地区维度表
-- 创建hbase sink表
CREATE TABLE gma_dim.dim_hbase_region (
pro_id BIGINT,
info ROW<pro_name STRING,region_id BIGINT,region_name STRING,area_code STRING>,
PRIMARY KEY (pro_id) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'gma_dim.dim_region',
'zookeeper.quorum' = 'master:2181'
);
-- 3、编写flinksql 合并地区表和省份表,将数据保存到地区维度表中
insert into gma_dim.dim_hbase_region
select pro_id,ROW(pro_name,region_id,region_name,area_code) from (
select b.id as pro_id,b.name as pro_name,region_id,region_name,area_code from
gma_ods.ods_mysql_kafka_base_region /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as a
inner join
gma_ods.ods_mysql_kafka_base_province /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as b
on
a.id=b.region_id
) as c
--4、到hbase中查看是否有数据
scan 'gma_dim.dim_region'
2、商品维度表
商品表、品类表、spu表、商品三级分类、商品二级分类、商品一级分类表退化为商品维度表
维度退化:将多个维度表退化成一个维度表
-- 1、在hbsae中创建商品维度表
create 'gma_dim.dim_item_info','info'
-- 2、在flink实时数据仓库中创建商品维度表
CREATE TABLE gma_dim.dim_hbase_item_info (
sku_id bigint,
info ROW<spu_id bigint, price decimal(10,0) , sku_name STRING, sku_desc STRING, weight decimal(10,2), tm_id bigint, tm_name STRING, category3_id bigint, category3_name STRING, category2_id bigint, category2_name STRING, category1_id bigint, category1_name STRING, sku_default_img STRING, create_time TIMESTAMP(3) >,
PRIMARY KEY (sku_id) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'gma_dim.dim_item_info',
'zookeeper.quorum' = 'master:2181',
'sink.buffer-flush.max-rows'='0'
);
--3、关联多张表得到商品维度表
insert into gma_dim.dim_hbase_item_info
select
sku_id,ROW(spu_id , price, sku_name, sku_desc, weight, tm_id, tm_name, category3_id, category3_name, category2_id, category2_name, category1_id, category1_name, sku_default_img, create_time) as info
from (
select
a.id as sku_id,
a.spu_id,
a.price,
a.sku_name,
a.sku_desc,
a.weight,
a.tm_id,
b.tm_name,
a.category3_id,
c.name as category3_name,
d.id as category2_id,
d.name as category2_name,
e.id as category1_id,
e.name as category1_name,
a.sku_default_img,
a.create_time
from
gma_ods.ods_mysql_kafka_sku_info
/*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as a
inner join
gma_ods.ods_mysql_kafka_base_trademark
/*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as b
on a.tm_id=cast(b.tm_id as bigint)
inner join
gma_ods.ods_mysql_kafka_base_category3
/*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as c
on a.category3_id=c.id
inner join
gma_ods.ods_mysql_kafka_base_category2
/*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as d
on c.category2_id=d.id
inner join
gma_ods.ods_mysql_kafka_base_category1
/*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as e
on d.category1_id=e.id
) as f;
3、用户维度表
--1、在hbase中创建用户维度表
create 'gma_dim.dim_user_info','info'
--2、在flink实时数据仓库中创建用户维度表
CREATE TABLE gma_dim.dim_hbase_user_info (
user_id BIGINT,
info ROW<login_name STRING , nick_name STRING , passwd STRING , name STRING , phone_num STRING , email STRING , head_img STRING , user_level STRING , birthday DATE , gender STRING , create_time TIMESTAMP(3)>,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'gma_dim.dim_user_info',
'zookeeper.quorum' = 'master:2181'
);
--3、编写flink sql读取ods层用户信息表将数据保存到hbae中构建用户维度表
insert into gma_dim.dim_hbase_user_info
select id as user_id,ROW(login_name,nick_name,passwd,name,phone_num,email,head_img,user_level,birthday,gender,create_time) from
gma_ods.ods_mysql_kafka_user_info
/*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as a
;
标签:dim,gma,ods,数据仓库,kafka,构建,mysql,id
From: https://www.cnblogs.com/atao-BigData/p/16586279.html