首页 > 其他分享 >使用iceberg-使用Iceberg数据湖需要注意的点

使用iceberg-使用Iceberg数据湖需要注意的点

时间:2024-04-22 12:22:21浏览次数:22  
标签:同步 iceberg ods lake 使用 Iceberg data

一、资料准备

1、mysql地址选择

因为阿里云只读节点binlog保留时间短,需要用读写集群地址。可以登录阿里云控制台查看地址是只读还是读写,不清楚的话可以找dba要读写地址。

二、Iceberg概念

1、Iceberg选择合适的表版本

简述:Iceberg目前有两个表版本(V1和V2),根据数据选择合适的表版本。

    V1表只支持增量数据插入,适合做纯增量写入场景,如埋点数据。

    V2表才支持行级更新,适合做状态变化的更新,如订单表同步。

使用方式:建表语句时指定版本'format-version'='2',不指定参数默认使用V1

2、建表类型转换问题

简述:mysql表create_time大多为timestamp类型,同步到iceberg后使用spark查询会报timezone错误。

原因:源表没有timezone, 下游表需要设置local timezone

使用方式:建iceberg表时指定字段类型为TIMESTAMP_LTZ类型

3、任务全量同步和增量同步配置

简述:全量同步阶段需要速度快,可以把flink checkpoint设置为1分钟,并发可以调高(比如1CU*10)这样可以快速同步数据。同时要考虑你的数据源能支持多快抽取,评估好抽取并发,否则dba会找你。

增量阶段一般数据量比较低了,全量同步完后可以调小资源和并发(比如0.5CU*1)然后重新发布可以节省资源。mysql增量阶段只能单线程跑,设置并发多也没用

三、任务开发

1、建表

可以联系平台在spark客户端建表

(1)Iceberg v2更新表sql(只有v2表才支持更新数据,v1表只能追加数据)
CREATE TABLE data_lake_ods.test3(
    id bigint COMMENT 'unique id',
    `empcode` STRING
)USING iceberg
TBLPROPERTIES(
  'format-version'='2'
  ,'write.upsert.enabled'='true'
  ,'engine.hive.enabled'='true'
  ,'table.drop.base-path.enabled'='true'
);
 
(2)Iceberg v1非更新表sql(不支持数据更新,适合只有insert场景,如埋点数据)
CREATE TABLE data_lake_ods.test3 (
    `id` int ,
    `empcode` STRING
) USING iceberg;
 
(3)修改表format版本或其他属性
alter table data_lake_ods.test3 SET TBLPROPERTIES('format-version'='2');

2、删表-腾讯定制(spark客户端方式删表)

简述:和hive删表不一样,iceberg默认只删除元数据不清理hdfs表目录,删表语句后面需要加purge关键字可删除。腾讯特殊改造过包

功能描述:默认drop table 不会清理hdfs数据,使用官方 DROP TABLE spark_catalog.db.sample PURGE时会清理数据,但是还留存【表/data】、【表/metadata】文件。

修改包:iceberg-spark-runtime-3.2_2.12-1.3.1.jar

社区提交代码:https://github.com/apache/iceberg/pull/1839/files

官方ddl文档:https://iceberg.apache.org/docs/1.3.1/spark-ddl/

使用方式:

        !!!使用优化后的icebrg包

        在建表时需要开启'table.drop.base-path.enabled'='true'

        删表时:DROP TABLE data_lake_ods.test3 PURGE;

3、新增字段

ALTER TABLE data_lake_ods.table_name ADD COLUMN `event_timestamp` timestamp COMMENT 'kafka更新时间戳';

4、插入和更新数据

insert into table data_lake_ods.test3 values (1,"code1");
update data_lake_ods.test3 set empcode='code2' where id=1;

5、数据集成建数据源

6、配置同步任务

1)数据源高级参数

--优化抽数速度,如非均匀分布表,设置此参数可以加快分片速度

scan.incremental.snapshot.chunk.size=50000

--最后一个分片切割,适用于同步时间长的大表,在同步后新增数据很多场景
scan.lastchunk.optimize.enable=true

2)任务高级参数

--由于分布式系统中的 shuffle 会造成 ChalgeLog 数据的乱序,所以 sink 接收到的数据可能在全局的 upsert 中乱序,所以要在 upsert sink 之前添加一个 upsert 物化算子。该参数控制是否添加

table.exec.sink.upsert-materialize=NONE

7、整库同步--同步前设置高级参数,整库同步支持新表自动同步

mysql新增加表后iceberg会自动新加表,mysql删表后iceberg会暂停这个表的同步,任务不会中止,iceberg表不会删除

scan.newly-added-table.enabled=true

8、查询数据

presto引擎: select * from data_lake_ods.test1 limit 200

kyuubi引擎: select * from data_lake_ods.test1 limit 200

9、iceberg表治理

原因:实时程序每个checkpoint写入都会生成多个文件,小文件太多会影响下游查询速度,逐渐变慢直到不可用。所以需要每天定时治理任务小文件

http://127.0.0.1:9090/luoshu/

全量数据同步完成后,修改checkpoint为5分钟以上,在洛书平台设置治理任务。

四、任务运维注意事项

因每次插入数据都会生成一个快照和很多小文件,为预防快照和小文件过多影响查询速度,需要定时清理。请全量同步完成时,配置治理任务开始治理

治理平台:http://127.0.0.1:9090/luoshu/

1、小文件合并

简述:全量同步后因为每分钟一次checkpoint会生成大量的小文件,全量同步结束后需要做一下小文件合并,提高查询速度。

参考信息:Iceberg小文件合并测试

2、过期快照清理

全量同步后因为每分钟一次checkpoint会产生大量快照,全量同步结束后需要做一下快照清理,提高查询速度。

参考信息:Iceberg过期快照清理

五、附录

1、建表优化

建表时设置metadata.json保留版本个数

功能描述:每次插入数据都会生成一个metadata文件,插入次数太多会影响查询,所以设置保留版本个数

详细介绍和测试文档:Iceberg元数据合并-metadata.json文件

CREATE TABLE data_lake_ods.test3 (
`id` int ,
`empcode` STRING
) USING iceberg
TBLPROPERTIES(
'format-version'='2'
,'write.metadata.delete-after-commit.enabled'='true'
,'write.metadata.previous-versions-max'='3'
);
--插入和更新数据
insert into table iceberg_test.test3 values (1,"code1");
update iceberg_test.test3 set empcode='code2' where id=1;

 

标签:同步,iceberg,ods,lake,使用,Iceberg,data
From: https://www.cnblogs.com/robots2/p/18150399

相关文章

  • python使用scap构建镜像流量
    importpytestimportrequestsimporttimefromscapy.allimport*@pytest.mark.usefixtures("get_config_instance")deftest_76_vlan(get_config_instance):#DefineethernetandIP/TCPlayerseth=Ether()ip=IP(src='10.0.0.2&......
  • MySQL如何使用字符集配置选项
    MySQL中与character_set有关的配置选项有8个,分别是:mysql>showvariableslike'character_set%';+--------------------------+-------------------------------------+|Variable_name      |Value               |+------......
  • 腾讯EMR表治理工具安装使用
    一、安装1、root用户上传文件cdwangrz-beyluoshu-1.0-bin.tar.gz2、解压文件到服务目录重新安装洛书需执行:rm-rf /usr/local/service/luoshumkdir /usr/local/service/luoshutar-zxf luoshu-1.0-bin.tar.gz-C/usr/local/service/luoshu3、(仅初次安装洛书执行)连接......
  • 表治理-iceberg表手动治理常用命令
    一、登录spark客户端spark-sql--masteryarn\--deploy-modeclient\--queuedefault\--namewang\--driver-memory12G\--num-executors10\--executor-cores4\--executor-memory20G二、sql查询表信息1、查询表快照信息SELECT*FROMspark_catalog.data_lak......
  • MYSQL explain的使用
     #1.table:表名#查询的每一行记录都对应着一个单表EXPLAINSELECT*FROMs1;#s1:驱动表s2:被驱动表EXPLAINSELECT*FROMs1INNERJOINs2;#2.id:在一个大的查询语句中每个SELECT关键字都对应一个唯一的idSELECT*FROMs1WHEREkey1='a';SELECT*FROMs1INNER......
  • JPA使用问题总结记录
    1.jpa使用@OneToMany和@ManyToOne注解映射两个实体类的关系时报栈溢出的错误:>实体代码片段:①主表(一)@OneToMany(fetch=FetchType.EAGER,mappedBy="crewManagement",cascade=CascadeType.REMOVE)privateList<CrewMember>crewMemberList;②关联表(多)@ManyToOne@......
  • 【PLM踩坑记】新建SpringBoot项目,无法使用Java8
    概述今天开始学SpringBoot,需要使用IDEA新建SpringBoot项目。公司使用的Java版本为jdk1.8,这里我选择了这个版本的jdk之后,下面的Java选项不提供Java8。解决方法如下:首先将jdk版本选择为较新的jdk22,然后下面的Java版本随便选择一个。在正式进入项目之后,修改IDEA中的项目设置。点......
  • 见鬼了!我家的 WiFi 只有下雨天才能正常使用...
    这是作者大学时期在家里遇到的一个非常奇怪的网络问题,作者的父亲是一名经验丰富的网络工程师,他们家里使用了一个复杂的网络设置,通过Wi-Fi桥接的方式,将父亲公司的高速商业网络连接到家中。但是有一天,作者发现家里的Wi-Fi只有在下雨时才能正常工作。。。事情发生在十多年前,那......
  • 使用ThreadPool.SetMinThreads方法提升API服务响应性能
     使用该方法的背景?某个API服务在每日请求量40W的情况下,流量增多时会产生大量请求异常:Theoperationwascanceled,从实际情况来看,并不是外部依赖接口或者服务实例不足导致,于是设置线程池数量后,服务性能提升效果显著。方法定义:设置线程池在新请求预测中维护的空闲线程数。pu......
  • homebrew的安装和使用
    目录安装xcode安装homebrew有关报错解决卸载脚本安装xcode自己百度安装homebrew这可以说是网上所有brew教程中安装最快最省事的教程安装命令如下:/bin/zsh-c"$(curl-fsSLhttps://gitee.com/cunkai/HomebrewCN/raw/master/Homebrew.sh)"复制终端回车即可参考教程......