首页 > 其他分享 >Apache Doris 整合 FLINK CDC + Iceberg 构建实时湖仓一体的联邦查询

Apache Doris 整合 FLINK CDC + Iceberg 构建实时湖仓一体的联邦查询

时间:2024-02-02 10:44:47浏览次数:29  
标签:iceberg CDC FLINK hive user Iceberg NULL userinfo

1概况

本文展示如何使用 Flink CDC + Iceberg + Doris 构建实时湖仓一体的联邦查询分析,Doris 1.1版本提供了Iceberg的支持,本文主要展示Doris和Iceberg怎么使用,大家按照步骤可以一步步完成。完整体验整个搭建操作的过程。

2系统架构

我们整理架构图如下,

 

 

 

1.首先我们从Mysql数据中使用Flink 通过 Binlog完成数据的实时采集 2.然后再Flink 中创建 Iceberg 表,Iceberg的元数据保存在hive里 3.最后我们在Doris中创建Iceberg外表 4.在通过Doris 统一查询入口完成对Iceberg里的数据进行查询分析,供前端应用调用,这里iceberg外表的数据可以和Doris内部数据或者Doris其他外部数据源的数据进行关联查询分析

Doris湖仓一体的联邦查询架构如下:

 

 

 

 

1.Doris 通过 ODBC 方式支持:MySQL,Postgresql,Oracle ,SQLServer 2.同时支持 Elasticsearch 外表 3.1.0版本支持Hive外表 4.1.1版本支持Iceberg外表 5.1.2版本支持Hudi 外表

3 创建MySQL数据库表并初始化数据

CREATE DATABASE demo;
USE demo;
CREATE TABLE userinfo (
  id int NOT NULL AUTO_INCREMENT,
  name VARCHAR(255) NOT NULL DEFAULT 'flink',
  address VARCHAR(1024),
  phone_number VARCHAR(512),
  email VARCHAR(255),
  PRIMARY KEY (`id`)
)ENGINE=InnoDB ;
INSERT INTO userinfo VALUES (10001,'user_110','Shanghai','13347420870', NULL);
INSERT INTO userinfo VALUES (10002,'user_111','xian','13347420870', NULL);
INSERT INTO userinfo VALUES (10003,'user_112','beijing','13347420870', NULL);
INSERT INTO userinfo VALUES (10004,'user_113','shenzheng','13347420870', NULL);
INSERT INTO userinfo VALUES (10005,'user_114','hangzhou','13347420870', NULL);
INSERT INTO userinfo VALUES (10006,'user_115','guizhou','13347420870', NULL);
INSERT INTO userinfo VALUES (10007,'user_116','chengdu','13347420870', NULL);
INSERT INTO userinfo VALUES (10008,'user_117','guangzhou','13347420870', NULL);
INSERT INTO userinfo VALUES (10009,'user_118','xian','13347420870', NULL);

4 创建Iceberg Catalog

CREATE CATALOG hive_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://localhost:9083',
  'clients'='5',
  'property-version'='1',
  'warehouse'='hdfs://localhost:8020/user/hive/warehouse'
);

5 创建 Mysql CDC 表

CREATE TABLE user_source (
    database_name STRING METADATA VIRTUAL,
    table_name STRING METADATA VIRTUAL,
    `id` DECIMAL(20, 0) NOT NULL,
    name STRING,
    address STRING,
    phone_number STRING,
    email STRING,
    PRIMARY KEY (`id`) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'localhost',
    'port' = '3306',
    'username' = 'root',
    'password' = 'MyNewPass4!',
    'database-name' = 'demo',
    'table-name' = 'userinfo'
  );

6 创建Iceberg表

---查看catalog
show catalogs;
---使用catalog
use catalog hive_catalog;
--创建数据库
CREATE DATABASE iceberg_hive; 
--使用数据库
use iceberg_hive;
​

7 创建表

CREATE TABLE all_users_info (
    database_name STRING,
    table_name    STRING,
    `id`          DECIMAL(20, 0) NOT NULL,
    name          STRING,
    address       STRING,
    phone_number  STRING,
    email         STRING,
    PRIMARY KEY (database_name, table_name, `id`) NOT ENFORCED
  ) WITH (
    'catalog-type'='hive'
  );

从CDC表里插入数据到Iceberg表里

use catalog default_catalog;
​
insert into hive_catalog.iceberg_hive.all_users_info select * from user_source;

我们去查询iceberg表

select * from hive_catalog.iceberg_hive.all_users_info

8 Doris 查询 Iceberg

8.1 创建Iceberg外表

CREATE TABLE `all_users_info` 
ENGINE = ICEBERG
PROPERTIES (
"iceberg.database" = "iceberg_hive",
"iceberg.table" = "all_users_info",
"iceberg.hive.metastore.uris"  =  "thrift://localhost:9083",
"iceberg.catalog.type"  =  "HIVE_CATALOG"
);

参数说明

•ENGINE 需要指定为 ICEBERG •PROPERTIES 属性: ◦iceberg.hive.metastore.uris:Hive Metastore 服务地址 ◦iceberg.database:挂载 Iceberg 对应的数据库名 ◦iceberg.table:挂载 Iceberg 对应的表名,挂载 Iceberg database 时无需指定。 ◦iceberg.catalog.type:Iceberg 中使用的 catalog 方式,默认为 HIVE_CATALOG,当前仅支持该方式,后续会支持更多的 Iceberg catalog 接入方式。
mysql> CREATE TABLE `all_users_info`
    -> ENGINE = ICEBERG
    -> PROPERTIES (
    -> "iceberg.database" = "iceberg_hive",
    -> "iceberg.table" = "all_users_info",
    -> "iceberg.hive.metastore.uris"  =  "thrift://localhost:9083",
    -> "iceberg.catalog.type"  =  "HIVE_CATALOG"
    -> );
Query OK, 0 rows affected (0.23 sec)
​
mysql> select * from all_users_info;
+---------------+------------+-------+----------+-----------+--------------+-------+
| database_name | table_name | id    | name     | address   | phone_number | email |
+---------------+------------+-------+----------+-----------+--------------+-------+
| demo          | userinfo   | 10004 | user_113 | shenzheng | 13347420870  | NULL  |
| demo          | userinfo   | 10005 | user_114 | hangzhou  | 13347420870  | NULL  |
| demo          | userinfo   | 10002 | user_111 | xian      | 13347420870  | NULL  |
| demo          | userinfo   | 10003 | user_112 | beijing   | 13347420870  | NULL  |
| demo          | userinfo   | 10001 | user_110 | Shanghai  | 13347420870  | NULL  |
| demo          | userinfo   | 10008 | user_117 | guangzhou | 13347420870  | NULL  |
| demo          | userinfo   | 10009 | user_118 | xian      | 13347420870  | NULL  |
| demo          | userinfo   | 10006 | user_115 | guizhou   | 13347420870  | NULL  |
| demo          | userinfo   | 10007 | user_116 | chengdu   | 13347420870  | NULL  |
+---------------+------------+-------+----------+-----------+--------------+-------+
9 rows in set (0.18 sec)

 

上述Doris On Iceberg我们只演示了Iceberg单表的查询,你还可以联合Doris的表,或者其他的ODBC外表,Hive外表,ES外表等进行联合查询分析,通过Doris对外提供统一的查询分析入口。

自此我们完整从搭建Hadoop,hive、flink 、Mysql、Doris 及Doris On Iceberg的使用全部介绍完了,Doris朝着数据仓库和数据融合的架构演进,支持湖仓一体的联邦查询,给我们的开发带来更多的便利,更高效的开发,省去了很多数据同步的繁琐工作。

作者:京东零售 吴化斌

来源:京东云开发者社区 转载请注明来源

标签:iceberg,CDC,FLINK,hive,user,Iceberg,NULL,userinfo
From: https://www.cnblogs.com/jingdongkeji/p/18002724

相关文章

  • Flink CDC引起的Mysql元数据锁
    记一次FlinkCDC引起的Mysql元数据锁事故,总结经验教训。后续在编写FlinkCDC任务时,要处理好异常,避免产生长时间的元数据锁。同时出现生产问题时要及时排查,不能抱有侥幸心理。1、事件经过某天上午,收到系统的告警信息,告警提示:同步Mysql的某张表数据到Elasticsearch异常,提示连不......
  • Flink之状态编程 值状态(ValueState)列表状态(ListState)映射状态(MapState)归约状态(Reducin
    Flink之状态编程值状态(ValueState)列表状态(ListState)映射状态(MapState)归约状态(ReducingState)聚合状态(AggregatingState)广播状态(BroadcastState)Flink之状态编程一、按键分区状态(KeyedState)1.1、值状态(ValueState)1.1.1、定义1.1.2、使用案例1.2、列表状态(ListState)1.2.1......
  • java flink(二十六) 实战之电商黑名单过滤 Flink CEP编程实现、什么是CEP、CEP组合模式d
    javaflink(二十六)实战之电商黑名单过滤FlinkCEP编程实现、什么是CEP、CEP组合模式demo、CEP循环模式demo什么是CEP:1、复杂事件处理2、Flink中实现复杂事件处理库3、CEP允许在无休止的事件中检测事件模式,让我们有机会掌握数据中的重要部分4、一个或多个由简单事件构成的事......
  • flink定时器使用问题
    flink定时器使用问题        flink定时器的使用,需要涉及flinktime、watermark、keyStream、keyState等概述,尽管关于flinktime和watermark的文章烂大街,但还是有必要先简单介绍一下,有助于解释下面flink定时器使用遇到的问题。时间模型        flink在stream......
  • flink状态编程
    flink状态编程简单记录一下最近工作中常用的flink状态flink中可以创建不同类型的状态,如键控状态(KeyedState)和操作符状态(OperatorState)等。状态管理是在流处理的整个过程中保持状态的一种能力,它让我们能够在复杂的事件处理和流转换中保留重要的状态信息,例如:聚合结果、过滤条件......
  • AP8851L DCDC降压恒压输出12V 5V2.5A应用资料及BOM清单
    1.方案特性双层PCB板(L42mm×W25mm×H15mm) 输入电压范围:11V~85V(输出5V)18V~85V(输出12V) 输出电流:2.5A 效率:93.8%(输出12V)2.应用领域 扭扭车控制器 平衡车控制器电动车控制器 快充电源 逆变器系统工业控制系统3方案原理图及工作原理描述 4,AP8851-5......
  • 2024/1/30 scala、Flink
    scala的模式匹配:很强大,也很难记住。和java相比,代码简洁了,但是熟练难度却上去了。 各种花里胡哨的模式匹配。还有很多至简原则。无疑是给新手带来了很多的麻烦。 这个scala给人的印象是:java?python?js?的集大成者???......
  • Flink
    FlinkFlink主要特点事件驱动基于流的世界观:在Flink的世界观中,一切都是由流组成的,离线数据是有界的流;实时数据是一个没有界限的流分层API:越顶层越抽象,表达含义越简明,使用越方便;越底层越具体,表达能力越丰富,使用越灵活支持事件时间(event-time)和处理时间(processing-time)语......
  • 深入理解 Flink(六)Flink Job 提交和 Flink Graph 详解
    FlinkProgram编程套路回顾1、获取执行环境对象StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();2、通过执行环境对象,注册数据源Source,得到数据抽象DataStreamds=env.socketTextStream(...)3、调用数据抽象的各种Transformation......
  • Flink 中的容错机制
    在Flink中,有一套完整的容错机制来保证故障后的恢复,其中最重要的就是检查点。1.检查点(Checkpoint)在流处理中,我们可以用存档读档的思路,就是将之前某个时间点所有的状态保存下来,这份“存档”就是所谓的“检查点”(checkpoint)。遇到故障重启的时候,我们可以从检查点中“读档”,恢复......