首页 > 数据库 >数据湖Iceberg-FlinkSQL-kafka类型表数据无法成功写入(6)

数据湖Iceberg-FlinkSQL-kafka类型表数据无法成功写入(6)

时间:2023-05-06 15:31:27浏览次数:63  
标签:Iceberg default FlinkSQL kafka catalog 数据 id


数据湖Iceberg-简介(1)数据湖Iceberg-存储结构(2)数据湖Iceberg-Hive集成Iceberg(3)数据湖Iceberg-SparkSQL集成(4)数据湖Iceberg-FlinkSQL集成(5)数据湖Iceberg-FlinkSQL-kafka类型表数据无法成功写入(6)数据湖Iceberg-Flink DataFrame集成(7)

数据湖Iceberg-FlinkSQL-kafka类型表数据无法成功写入

数据湖Iceberg-FlinkSQL-kafka类型表数据无法成功写入版本问题问题原因解决方法

版本

Iceberg:1.1.0

Flink:1.14.3

问题

Kafka类型的Iceberg表创建完成后,通过语句写入其他表中执行成功,但是没数据

问题原因

当前版本的BUG(存疑)

解决方法

Kafka表必须要在default_catalog.default_database下,即catalog名为default_catalog,数据库(命名空间)为default_database下,否则kafka类型的表读取不到数据。

如果都在我们自己创建的catalog下创建,则执行INSERT INTO hadoop_catalog.iceberg_db.sample6 SELECT * FROM default_catalog.default_database.kafka1;后,在Flink任务中看不到一个持续执行的Flink Job,而正常执行该命令Flink会执行一个持续执行的任务,去消费kafka数据写入Iceberg,正常情况如下图:

数据湖Iceberg-FlinkSQL-kafka类型表数据无法成功写入(6)_大数据

所以这里我们kafka表在default_catalog.default_database下,写入数据的表在我们自己创建的hadoop_catalog.iceberg_db

create table default_catalog.default_database.kafka1(
  id int,
  data string
) with (
  'connector' = 'kafka'
  ,'topic' = 'ttt'
  ,'properties.zookeeper.connect' = '172.16.24.194:2181'
  ,'properties.bootstrap.servers' = '172.16.24.194:9092'
  ,'format' = 'json'
  ,'properties.group.id'='iceberg1'
  ,'scan.startup.mode'='earliest-offset'
);

CREATE TABLE `hadoop_catalog`.`iceberg_db`.`sample6` (
  `id`  INT UNIQUE COMMENT 'unique id',
  `data` STRING NOT NULL,
 PRIMARY KEY(`id`) NOT ENFORCED
) with (
'format-version'='2', 
'write.upsert.enabled'='true'
);


INSERT INTO hadoop_catalog.iceberg_db.sample6 SELECT * FROM default_catalog.default_database.kafka1;

此时我们往Kafka发送数据:

{"id":123,"data":"llalalala"}
{"id":1123,"data":"asdfasfds"}

查看表中数据可以看到写入成功

select * from hadoop_catalog.iceberg_db.sample6;

数据湖Iceberg-FlinkSQL-kafka类型表数据无法成功写入(6)_iceberg_02

再次发送数据

{"id":123,"data":"JastData"}

查看表中数据,发现修改成功

数据湖Iceberg-FlinkSQL-kafka类型表数据无法成功写入(6)_大数据_03


标签:Iceberg,default,FlinkSQL,kafka,catalog,数据,id
From: https://blog.51cto.com/u_13721902/6250227

相关文章

  • filebeat+kafka_logstash+es进行日志分析
    filebeat+kafka_logstash+es进行日志分析目录一.将安装包上传至目标服务器(即日志所在的服务器)二.解压安装三.配置filebeat1.配置采集日志到logstash,这种配置适用于日志量较小的场景,Filebeat--->logstash,logstash直接解析filebeat2.配置采集日志至kafka,file......
  • Kafka基础阶段与集群搭建详细教程
    Kafka第一天课堂笔记一.Kafka简介1.1消息队列消息队列——用于存放消息的组件程序员可以将消息放入到队列中,也可以从消息队列中获取消息很多时候消息队列不是一个永久性的存储,是作为临时存储存在的(设定一个期限:设置消息在MQ中保存10天)消息队列中间件:消息队列的组件,例如:Kafk......
  • C#操作kafka
    1.手动设置TopicPartition=>offset//手动设置TopicPartition=>offsetforeach(TopicPartitionpartitioninconsumer.Assignment){if(partition.Partition.Value==13){TopicPartitionOffsetoffset=newTopicPartitionOffset(partition,27060);......
  • Kafka2.4安装与配置
    一、安装zookeeper集群1、安装jdk1.82、下载apache-zookeeper-3.5.7-bin.tar.gz并解压第1台机器:mkdir/usr/local/zookeeper/datamv/usr/local/zookeeper/conf/zoo_sample.cfg/usr/local/zookeeper/conf/zoo.cfgvim/usr/local/zookeeper/conf/zoo.cfgdataDir=/usr/l......
  • JDK导致ActiveMQ、Kafka连接zookeeper失败:Session 0x0 for server 10.1.21.244/<unres
      最近在部署一套ActiveMQ集群时,使用zookeeper来实现,zookeeper启动了,在启动ActiveMQ时,抛出异常:    WARN|Session0x0forserver10.1.21.244/<unresolved>:2181,unexpectederror,closingsocketconnectionandattemptingreconnectjava.lang.IllegalArgu......
  • C# 入门 Kafka
    从C#入门Kafka  目录1,搭建Kafka环境安装docker-compose单节点Kafka的部署Kafka集群的部署2,Kafka概念基本概念关于Kafka脚本工具主题管理使用C#创建分区分区与复制生产者消费者修改配置3,Kafka.NET基础生产者批量生......
  • kafka 不支持读写分离的原因
    前段时间在看kafka相关内容,发现kafka“所有的”读写流量都在主partition上,从partition只负责备份数据。那么为什么kafka从partition不跟其他中间件一样承接读流量?读写分离的初衷读写分离的初衷我觉得是利用读流量&写流量不同的特性做针对性的优化,而这两种流量......
  • 记录一下linux-kafka命令
    使用工具:puTTY下载地址:DownloadPuTTY-afreeSSHandtelnetclientforWindowsloginas:rootroot@*******'spassword:Lastlogin:FriApr2814:54:262023from10.10.16.80[root@kafka272c41~]#cd..[root@kafka272c41/]#ls-a....autorelabelbinboot......
  • Kafka命令行常用命令说明(一)
    基于0.8.0版本。 ##查看topic分布情况kafka-list-topic.shbin/kafka-list-topic.sh--zookeeper192.168.197.170:2181,192.168.197.171:2181(列出所有topic的分区情况)bin/kafka-list-topic.sh--zookeeper192.168.197.170:2181,192.168.197.171:2181--topictest(查......
  • @KafkaListener属性简介
    @KafkaListener从2.2.4版开始,您可以直接在注释上指定Kafka使用者属性,这些属性将覆盖在使用者工厂中配置的具有相同名称的所有属性。可以使用#{…​}或属性占位符(${…​})在SpEL上配置注释上的大多数属性。例如:@KafkaListener(id="consumer-id",......