首页 > 其他分享 >Apache Hudi 在袋鼠云数据湖平台的设计与实践

Apache Hudi 在袋鼠云数据湖平台的设计与实践

时间:2023-05-24 11:13:01浏览次数:52  
标签:袋鼠 Hudi String Apache hudi 数据 id

在大数据处理中,实时数据分析是一个重要的需求。随着数据量的不断增长,对于实时分析的挑战也在不断加大,传统的批处理方式已经不能满足实时数据处理的需求,需要一种更加高效的技术来解决这个问题。Apache Hudi(Hadoop Upserts Deletes and Incremental Processing)就是这样一种技术,提供了高效的实时数据仓库管理功能。

本文将介绍袋鼠云基于 Hudi 构建数据湖的整体方案架构及其在实时数据仓库处理方面的特点,并且为大家展示一个使用 Apache Hudi 的简单示例,便于新手上路。

Apache Hudi 介绍

Apache Hudi 是一个开源的数据湖存储系统,可以在 Hadoop 生态系统中提供实时数据仓库处理功能。Hudi 最早由 Uber 开发,后来成为 Apache 顶级项目。

Hudi 主要特性

· 支持快速插入和更新操作,以便在数据仓库中实时处理数据;

· 提供增量查询功能,可有效提高数据分析效率;

· 支持时间点查询,以便查看数据在某一时刻的状态;

· 与 Apache Spark、Hive 等大数据分析工具兼容。

Hudi 架构

Apache Hudi 的架构包括以下几个主要组件:

· Hudi 数据存储:Hudi 数据存储是 Hudi 的核心组件,负责存储数据,数据存储有两种类型:Copy-On-Write(COW)和 Merge-On-Read(MOR);

· Copy-On-Write:COW 存储类型会在对数据进行更新时,创建一个新的数据文件副本,将更新的数据写入副本中,之后,新的数据文件副本会替换原始数据文件;

· Merge-On-Read:MOR 存储类型会在查询时,将更新的数据与原始数据进行合并,这种方式可以减少数据存储的写入延迟,但会增加查询的计算量;

· Hudi 索引:Hudi 索引用于维护数据记录的位置信息,索引有两种类型:内置索引(如 Bloom 过滤器)和外部索引(如 HBase 索引);

· Hudi 查询引擎:Hudi 查询引擎负责处理查询请求,Hudi 支持多种查询引擎,如 Spark SQL、Hive、Presto 等。

file

Hudi 的使用场景

Apache Hudi 可以帮助企业和组织实现实时数据处理和分析。实时数据处理需要快速地处理和查询数据,同时还需要保证数据的一致性和可靠性。

Apache Hudi 的增量数据处理ACID 事务性保证、写时合并等技术特性可以帮助企业更好地实现实时数据处理和分析,基于 Hudi 的特性可以在一定程度上在实时数仓的构建过程中承担上下游数据链路的对接(类似 Kafka 的角色)。既能实现增量的数据处理,也能为批流一体的处理提供存储基础。

Hudi 的优势和劣势

● 优势

· 高效处理大规模数据集;

· 支持实时数据更新和查询;

· 实现了增量写入机制,提高了数据访问效率;

· Hudi 可以与流处理管道集成;

· Hudi 提供了时间旅行功能,允许回溯数据的历史版本。

● 劣势

· 在读写数据时需要付出额外的代价;

· 操作比较复杂,需要使用专业的编程语言和工具。

Hudi 在袋鼠云数据湖平台上的实践

Hudi 在袋鼠云数据湖的技术架构

Hudi 在袋鼠云的数据湖平台上主要对数据湖管理提供助力:

· 元数据的接入,让用户可以快速的对表进行管理;

· 数据快速接入,包括对符合条件的原有表数据进行转换,快速搭建数据湖能力;

· 湖表的管理,监控小文件定期进行合并,提升表的查询性能,内在丰富的表操作功能,包括 time travel ,孤儿文件清理,过期快照清理等;

· 索引构建,提供多种索引包括 bloom filter,zorder 等,提升计算引擎的查询性能。

file

Hudi 使用示例

在介绍了 Hudi 的基本信息和袋鼠云数据湖平台的结构之后,我们来看一个使用示例,替换 Flink 在内存中的 join 过程。

在 Flink 中对多流 join 往往是比较头疼的场景,需要考虑 state ttl 时间设置,设置太小数据经常关联不上,设置太大内存又需要很高才能保留,我们通过 Hudi 的方式来换个思路实现。

● 构建 catalog

public String createCatalog(){
    String createCatalog = "CREATE CATALOG hudi_catalog WITH (\n" +
            "    'type' = 'hudi',\n" +
            "    'mode' = 'hms',\n" +
            "    'default-database' = 'default',\n" +
            "    'hive.conf.dir' = '/hive_conf_dir',\n" +
            "    'table.external' = 'true'\n" +
            ")";

    return createCatalog;
}

● 创建 hudi 表

public String createHudiTable(){

    String createTable = "CREATE TABLE if not exists hudi_catalog.flink_db.test_hudi_flink_join_2 (\n" +
            "  id int ,\n" +
            "  name VARCHAR(10),\n" +
            "  age int ,\n" +
            "  address VARCHAR(10),\n" +
            "  dt VARCHAR(10),\n" +
            "  primary key(id) not enforced\n" +
            ")\n" +
            "PARTITIONED BY (dt)\n" +
            "WITH (\n" +
            "  'connector' = 'hudi',\n" +
            "  'table.type' = 'MERGE_ON_READ',\n" +
            "  'changelog.enabled' = 'true',\n" +
            "  'index.type' = 'BUCKET',\n" +
            "  'hoodie.bucket.index.num.buckets' = '2',\n" +
            String.format("  '%s' = '%s',\n", FlinkOptions.PRECOMBINE_FIELD.key(), FlinkOptions.NO_PRE_COMBINE) +
            "  'write.payload.class' = '" + PartialUpdateAvroPayload.class.getName() + "'\n" +

    ");";

    return createTable;
}

● 更新 hudi 表的 flink_db.test_hudi_flink_join_2 的 id, name, age, dt 列

01 从 kafka 中读取 topic1

public String createKafkaTable1(){
    String kafkaSource1 = "CREATE TABLE source1\n" +
            "(\n" +
            "    id        INT,\n" +
            "    name      STRING,\n" +
            "    age        INT,\n" +
            "    dt        String,\n" +
            "    PROCTIME AS PROCTIME()\n" +
            ") WITH (\n" +
            "      'connector' = 'kafka'\n" +
            "      ,'topic' = 'join_topic1'\n" +
            "      ,'properties.bootstrap.servers' = 'localhost:9092'\n" +
            "      ,'scan.startup.mode' = 'earliest-offset'\n" +
            "      ,'format' = 'json'\n" +
            "      ,'json.timestamp-format.standard' = 'SQL'\n" +
            "      )";

    return kafkaSource1;
}

02 从 kafka 中读取 topic2

public String createKafkaTable2(){
    String kafkaSource2 = "CREATE TABLE source2\n" +
            "(\n" +
            "    id        INT,\n" +
            "    name      STRING,\n" +
            "    address        string,\n" +
            "    dt        String,\n" +
            "    PROCTIME AS PROCTIME()\n" +
            ") WITH (\n" +
            "      'connector' = 'kafka'\n" +
            "      ,'topic' = 'join_topic2'\n" +
            "      ,'properties.bootstrap.servers' = 'localhost:9092'\n" +
            "      ,'scan.startup.mode' = 'earliest-offset'\n" +
            "      ,'format' = 'json'\n" +
            "      ,'json.timestamp-format.standard' = 'SQL'\n" +
            "      )";

    return kafkaSource2;
}

● 执行插入逻辑1

String insertSQL = "insert into hudi_catalog.flink_db.test_hudi_flink_join_2(id,name,age,dt) " +
        "select id, name,age,dt from source1";

● 通过 spark 查询数据

20230323090605515 20230323090605515_1_186 45 1 c990a618-896c-4627-8243-baace65c7ad6-0_0-21-26_20230331101342388.parquet 45 xc 45 NULL 1

20230323090605515 20230323090605515_1_179 30 1 c990a618-896c-4627-8243-baace65c7ad6-0_0-21-26_20230331101342388.parquet 30 xc 30 NULL 1

● 执行插入逻辑2

String insertSQL = "insert into hudi_catalog.flink_db.test_hudi_flink_join_2(id,name,address,dt) " +
        "select id, name, address,dt from source2";

● 运行成功

运行成功后在 spark 中查询对应的表数据:

20230323090605515 20230323090605515_1_186 45 1 c990a618-896c-4627-8243-baace65c7ad6-0_0-21-26_20230331101342388.parquet 45 xc 45 xc:address45 1

20230323090605515 20230323090605515_1_179 30 1 c990a618-896c-4627-8243-baace65c7ad6-0_0-21-26_20230331101342388.parquet 30 xc 30 xc:address30 1

可以发现在第二次数据运行之后,表数据的对应字段 address 已经更新,达到了类似在 Flink 中直接执行 join 的效果。

insert into hudi_catalog.flink_db.test_hudi_flink_join_2
        select a.id, a.name, a.age,b.address a.dt from source1 a left join source2 b on a.id = b.id 
				```

《数栈产品白皮书》:https://www.dtstack.com/resources/1004?src=szsm

《数据治理行业实践白皮书》下载地址:https://www.dtstack.com/resources/1001?src=szsm

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=szbky

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术qun」,交流最新开源技术信息,qun号码:30537511,项目地址:https://github.com/DTStack

标签:袋鼠,Hudi,String,Apache,hudi,数据,id
From: https://www.cnblogs.com/DTinsight/p/17427402.html

相关文章

  • hudi学习
    1.背景想要对自己的各种数据(非结构化)进行统一管理,突然想到数据湖,看看是否符合我的需求。2.Hudi简介2.1hudi的特性mutabilitysupportforalldatalakeworkoadsQuicklyupdate&deletedatawithHudi'sfast,pluggableindexing.Thisincludesstreamingworkloads,w......
  • 袋鼠云数栈UI5.0焕新升级,全新设计语言DT Design,更懂视觉更懂你!
    数栈作为袋鼠云打造的一站式数据开发与治理平台,从2016年发布第⼀个版本开始,就始终坚持着以技术为核⼼、安全为底线、提效为⽬标、中台为战略的思想,坚定不移地⾛国产化信创路线,不断推进产品功能迭代、技术创新、服务细化和性能升级。伴随业务的⻜速发展,数栈设计团队也启动了针对数栈......
  • Apache maven 安装与配置 (Mac)
    一、准备文件Maven3.5.4version下载 https://maven.apache.org/download.cgi  二、安装与配置(1)安装将下载好的文件解压到/usr/local目录下,如果找不到/usr/local目录,可以打开苹果访达,按住shift+command+G,则会弹出一个搜索框,在里面输入/usr/local即可出现。也可以使......
  • Apache-maven的安装与配置(IDEA)
    https://mbd.baidu.com/ug_share/mbox/4a83aa9e65/share?product=smartapp&tk=8b0406b2ed7c5e71944c9c31062c5007&share_url=https%3A%2F%2Fyebd1h.smartapps.cn%2Fpages%2Fblog%2Findex%3FblogId%3D122922148%26_swebfr%3D1%26_swebFromHost%3Dbaiduboxapp&domai......
  • org.apache.jasper.JasperException: /pages/role-list.jsp (行.: [145], 列: [8]) 根
    org.apache.jasper.JasperException:/pages/role-list.jsp(行.:[145],列:[8])根据标记文件中的TLD或attribute指令,attribute[items]不接受任何表达式 web.xml中版本号不兼容产生的问题;解决方法:<%@taglibprefix=“c”uri=“http://java.sun.com/jstl/core”%>改为<%@t......
  • 开源之夏 2023|欢迎报名 Apache RocketMQ 社区项目!
    开源之夏是由中科院软件所“开源软件供应链点亮计划”发起并长期支持的一项暑期开源活动,旨在鼓励在校学生积极参与开源软件的开发维护,培养和发掘更多优秀的开发者,促进优秀开源软件社区的蓬勃发展,助力开源软件供应链建设。参与学生通过远程线上协作方式,配有资深导师指导,参与到开源......
  • Hudi写语义保证
    Hudi为HadoopUpsertandIncremental的缩写,Incremental即Incrementalpull,也就是增加拉取,是一种类似于消息队列的流式消费。单写保证upsert保证不重复。insert如果开启了去重(hoodie.datasource.write.insert.drop.duplicates为true,默认为false),保证不重复。bul......
  • Apache Log4j2远程代码执行漏洞(CVE-2021-44228)修复
    1、演示说明:最近在项目中遇到用户扫描es时发现ApacheLog4j2远程代码执行漏洞(CVE-2021-44228),该漏洞具体原理不再赘述,此处分享解决过程。2、演示环境:(1)系统版本:#cat/etc/redhat-release(2)JDK版本:#java-version(3)es版本:#./elasticsearch-V3、下载命令行扫描工具:下载地址:https://gi......
  • Apache、Tomcat、IIS(PHP、JSP、ASP)共存及安装Tomcat
    1.安装Tomcatapt-getinstalltomcat7apt-getinstalltomcat7-adminapt-getinstalltomcat7-docsapt-getinstalltomcat7-examples 2.开启,中止和重启/etc/init.d/tomcat7start/etc/init.d/tomcat7stop/etc/init.d/tomcat7restart 3.根目录链接cd/var/l......
  • Apache MINA 初
    3。编写一个ApacheMINA时间服务器该程序的功能非常简单,就是当客户端连接到服务器的9123端口后,程序将服务器当前的时间信息以字符串的形式发送给客户端。我们可以用Eclipse来创建编写这个程序。(1)在Eclipse中创建一个Java项目,例如TimeServerProject,然后将mina-core-2.0.0-M1.jar、......