首页 > 其他分享 >Iceberg从入门到精通系列之十一:Flink DataStream读取Iceberg表

Iceberg从入门到精通系列之十一:Flink DataStream读取Iceberg表

时间:2023-09-11 10:07:23浏览次数:36  
标签:DataStream flink Iceberg Flink streaming api org apache import



Iceberg从入门到精通系列之十一:Flink DataStream读取Iceberg表

  • 一、完整代码
  • 二、效果如下所示


一、完整代码

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.source.FlinkSource;

public class ReadDemo1 {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //1.获取tableLoader
        TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a");
        //2.通过FlinkSource构建数据流
        DataStream<RowData> build = FlinkSource.forRowData()
                .env(env)
                .tableLoader(tableLoader)
                .streaming(false) // false batch方式 true streaming方式
//              .streaming(true)
//                .asOfTimestamp()
//                .startSnapshotId()

                .build();

		//转换为二元组
        build.map(r -> Tuple2.of(r.getInt(0),r.getLong(1)))
                .returns(Types.TUPLE(Types.INT,Types.LONG))
                .print();

        env.execute();

    }
}
  • streaming(false) :false batch方式
  • streaming(true):true streaming方式

二、效果如下所示

11> (2,3)
11> (4,5)


标签:DataStream,flink,Iceberg,Flink,streaming,api,org,apache,import
From: https://blog.51cto.com/u_12080573/7432125

相关文章

  • Iceberg从入门到精通系列之九:flink sql修改Iceberg表和删除Iceberg表
    Iceberg从入门到精通系列之九:flinksql修改Iceberg表一、修改表属性二、修改表名三、删除表一、修改表属性ALTERTABLE`hive_catalog`.`default`.`sample`SET('write.format.default'='avro');二、修改表名ALTERTABLE`hive_catalog`.`default`.`sample`RENAMETO`hive_cat......
  • Iceberg从入门到精通系列之十五:Spark集成Iceberg
    Iceberg从入门到精通系列之十五:Spark集成Iceberg一、下载Spark安装包二、解压Spark安装包三、配置环境变量四、激活环境变量五、下载Sparkiceberg的jar包六、Spark集成Iceberg七、Spark配置Catalog八、配置HiveCatalog九、配置HadoopCatalog十、spark集成hive十、启动Sparkshe......
  • Iceberg从入门到精通系列之八:flink sql 创建Iceberg表
    Iceberg从入门到精通系列之八:flinksql创建Iceberg表一、创建数据库二、创建表三、创建分区表四、使用LIKE语法建表五、创建主键表一、创建数据库createdatabaseiceberg_db;useiceberg_db;二、创建表createtable`hive_catalog`.`default`.`sample`(idbigintcomment'un......
  • Iceberg从入门到精通系列之七:Flink SQL创建Catalog
    Iceberg从入门到精通系列之七:FlinkSQL创建Catalog一、语法说明二、flink集成hivejar包三、放到指定目录四、启动hivemetastore服务五、创建hivecatalog六、查看catalog七、HadoopCatalog八、创建sql-client初始化文件九、启动flinksql指定初始化文件一、语法说明createcat......
  • Iceberg从入门到精通系列之五:Zeppelin集成iceberg,创建iceberg普通表和分区表,并插入数
    Iceberg从入门到精通系列之五:Zeppelin集成iceberg,创建iceberg普通表和分区表,并插入数据一、Zeppelin集成iceberg二、查看catalog三、使用数据库四、查看表五、创建表六、插入数据七、查询数据八、创建分区表九、分区表插入数据十、查询分区表数据一、Zeppelin集成icebergZeppelin......
  • Iceberg从入门到精通系列之六:Flink集成Iceberg
    Iceberg从入门到精通系列之六:Flink集成Iceberg一、下载Flink二、解压Flink安装包三、配置环境变量四、激活环境变量五、下载Icebergflinkjar包六、部署Icebergflinkjar包七、修改flink配置八、启动flink九、启动flinksqlclient一、下载Flink下载Flink:https://www.apache.o......
  • Iceberg从入门到精通系列之四:详细整理出Iceberg支持的字段类型,创建包含所有类型的表,并
    Iceberg从入门到精通系列之四:详细整理出Iceberg支持的字段类型,创建包含所有类型的表,并插入数据一、Iceberg表支持的字段类型二、创建包含所有类型的表三、插入数据一、Iceberg表支持的字段类型BOOLEANTINYINTSMALLINTINTEGERBIGINTFLOATDOUBLEDECIMALDATETIMESTAMPSTRINGUUIDFIXE......
  • 在flink-1.17中测试执行流处理版本的单词计数程序时,出现"Exception in thread "Thread
    场景描述采用单作业模式提交作业后发现报错了 报错内容Exceptioninthread“Thread-5”java.lang.IllegalStateException:Tryingtoaccessclosedclassloader.Pleasecheckifyoustoreclassloadersdirectlyorindirectlyinstaticfields.Ifthestacktrace......
  • Flink 1.17教程:聚合算子(Aggregation)之按键分区(keyBy)
    聚合算子(Aggregation)计算的结果不仅依赖当前数据,还跟之前的数据有关,相当于要把所有数据聚在一起进行汇总合并——这就是所谓的“聚合”(Aggregation),类似于MapReduce中的reduce操作。按键分区(keyBy)对于Flink而言,DataStream是没有直接进行聚合的API的。因为我们对海量数据做聚合肯定要......
  • Flink 1.17教程:输出算子之输出到MySQL(JDBC)
    输出到MySQL(JDBC)写入数据的MySQL的测试步骤如下。(1)添加依赖添加MySQL驱动:mysqlmysql-connector-java8.0.27官方还未提供flink-connector-jdbc的1.17.0的正式依赖,暂时从apachesnapshot仓库下载,pom文件中指定仓库路径:apache-snapshotsapachesnapshotshttps://repository.a......