首页 > 其他分享 >Flink同步kafka到iceberg(cos存储)

Flink同步kafka到iceberg(cos存储)

时间:2024-05-16 17:53:59浏览次数:10  
标签:comment cos iceberg timestamp Flink number order table id

一、flink到logger

1、source

create table source_table (
 id                             bigint          comment '唯一编号'
,order_number                   bigint          comment '订单编号'
,update_timestamp               timestamp_ltz metadata from 'timestamp'
,primary key (id, order_number) not enforced
) with (
 'connector' = 'kafka'
,'topic' = 'topic'
,'properties.bootstrap.servers' = '127.0.0.1:9092'
,'scan.startup.mode' = 'latest-offset'
,'key.format' = 'json'
,'key.json.fail-on-missing-field' = 'false'
,'key.fields' = 'id;order_number'
,'key.json.ignore-parse-errors' = 'true'
,'value.format' = 'debezium-json'
,'value.debezium-json.ignore-parse-errors' = 'true'
,'value.debezium-json.encode.decimal-as-plain-number' = 'true'
);

2、sink logger

CREATE TABLE sink_test_wang2(
    id                             bigint          comment '唯一编号'
    ,order_number                   bigint          comment '订单编号'
    ,update_timestamp               timestamp_ltz   comment '更新时间戳'
    ,primary key (id, order_number) not enforced
) WITH (
 'connector' = 'logger',
  'all-changelog-mode' = 'true'
);

3、写入

insert into sink_test_wang2
select  id
       ,order_number
       ,update_timestamp
from source_table /*+ OPTIONS('properties.group.id'='testwang') */;

 二、kafka到iceberg

1、目标源

create table sink_cos_table (
 id                             bigint          comment '唯一编号'
,order_number                   bigint          comment '订单编号'
,update_timestamp               timestamp_ltz   comment '更新时间戳'
,primary key (id, order_number) not enforced
)
with (
 'connector' = 'iceberg'
,'warehouse'='cosn://cos桶名称/test_wang'
,'catalog-type' = 'hadoop'
,'catalog-name'='hadoop'
,'catalog-database' = 'data_lake_ods_test'
,'catalog-table' = 'test_kafka_table'
,'format-version' = '2'
,'write.upsert.enabled' = 'true'
,'table.drop.base-path.enabled' = 'true'
,'engine.hive.enabled' = 'true'
);

2、写入

insert into sink_cos_table
select  id
       ,order_number
       ,pay_number
       ,update_timestamp
from source_table /*+ OPTIONS('properties.group.id'='read-oceanus-wangshida') */;

 

标签:comment,cos,iceberg,timestamp,Flink,number,order,table,id
From: https://www.cnblogs.com/robots2/p/18196401

相关文章

  • Flink的State
      有状态的计算是流式计算框架的一个重要功能,很多复杂的计算场景都需要记录一下相关的状态。FlinkState一种为了满足算子计算时需要历史数据需求的,使用checkpoint机制进行容错,存储在statebackend的数据结构。1.State分类    FlinkState被分为keyedstate、operato......
  • nacos2.3.2部署(鲲鹏arm版)
    1.说明  本次编译是因为公司适配鲲鹏arm系列,业务涉及到了nacos-server,所以就选择最新版本进行了编译,期间也想直接使用官方镜像nacos/nacos-server:v2.1.2-slim、nacos/nacos-server:v2.2.0-slim,无一例外失败了,启动不了,所以最后只能选择源码编译,在制作镜像的方式进行。2.编......
  • Nacos热更新静态变量配置
    Nacos热更新静态变量配置Springboot项目接入nacos,配置文件统一管理,但静态常量无法通过@Value注解实时热更新(如下所示)。GlobalVariables.java@ComponentpublicclassGlobalVariables{//测试热加载配置字段publicstaticStringtestInfo;@Value("${test......
  • MacOS环境变量source生效但重启后又失效
      .bash_profile和.zshrc都是macos系统重环境变量配置的文件,但是两者有不同之处。.bash_profile:在执行source~/.bash_profile,只在当前窗口生效,但关闭当前终端窗口或者mac关机重启后不会再生效。.zshrc:在执行source~/.zshrc,这是永久生效的,mac每次启动会自动执行source......
  • VMware Workstation 17.5.2 Pro Unlocker & OEM BIOS for Windows & Linux - 在 Windo
    VMwareWorkstation17.5.2ProUnlocker&OEMBIOSforWindows&Linux-在Windows和Linux上运行macOSSonoma请访问原文链接:https://sysin.org/blog/vmware-workstation-17-unlocker/,查看最新版。原创作品,转载请保留出处。作者主页:sysin.orgVMwareWorkstationPro......
  • VMware Workstation 17.5.2 Pro macOS Unlocker & OEM BIOS for Windows - 在 Windows
    VMwareWorkstation17.5.2PromacOSUnlocker&OEMBIOSforWindows-在Windows上运行macOSSonoma请访问原文链接:https://sysin.org/blog/vmware-workstation-17-unlocker-windows/,查看最新版。原创作品,转载请保留出处。作者主页:sysin.orgVMwareWorkstationPro是......
  • VMware Workstation 17.5.2 Pro macOS Unlocker & OEM BIOS for Linux - 在 Linux 上
    VMwareWorkstation17.5.2PromacOSUnlocker&OEMBIOSforLinux-在Linux上运行macOSSonoma请访问原文链接:https://sysin.org/blog/vmware-workstation-17-unlocker-linux/,查看最新版。原创作品,转载请保留出处。作者主页:sysin.orgVMwareWorkstationPro是行业标......
  • VMware Fusion 13.5.2 OEM BIOS Version - 在 macOS 中运行 Windows 虚拟机的最佳方式
    VMwareFusion13.5.2OEMBIOSVersion-在macOS中运行Windows虚拟机的最佳方式VMwareFusion13原版App中集成OEMBIOS请访问原文链接:https://sysin.org/blog/vmware-fusion-13-oem/,查看最新版。原创作品,转载请保留出处。作者主页:sysin.org使用VMwareFusion在......
  • 运维-微服务组件nacos(未写完)
    一、基础概念1.Nacos的概念Nacos/nɑ:kəʊs/是DynamicNamingandConfigurationService的首字母简称,一个更易于构建云原生应用的动态服务发现、配置管理和服务管理平台。Nacos致力于帮助您发现、配置和管理微服务。Nacos提供了一组简单易用的特性集,帮助您快速实现动态......
  • MacOS添加,查看,删除用户
    1.添加用户在macOS中,可以通过命令行使用dscl(DirectoryServicecommandlineutility)工具来添加用户。以下是使用dscl添加用户的步骤:打开终端:可以通过在Spotlight搜索中输入"Terminal"或在/Applications/Utilities中找到终端来打开它。使用dscl添加用户:sudodscl.-crea......