首页 > 其他分享 >hudi with flink1.14验证

hudi with flink1.14验证

时间:2022-11-22 17:57:30浏览次数:77  
标签:hudi val 验证 import flink org apache flink1.14

hudi with flink datastream by java

版本

  • flink1.14
  • hudi0.12.1

pom依赖

<dependencies>
        <dependency>
            <groupId>org.apache.hudi</groupId>
            <artifactId>hudi-flink1.14-bundle</artifactId>
            <version>${hudi.version}</version>
        </dependency>

        <!-- Flink Hudi run extra -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.0</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!-- Flink web -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- Table ecosystem -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- Flink core -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- Flink connector -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- Flink statebackend -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.25</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>
View Code

write

消费kafka,通过flink datastream持续写入hudi

package com.liangji.hudi0121.flink114;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.util.HoodiePipeline;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SaslConfigs;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class JavaTestInsert {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        String targetTable = "t1";
        String basePath = "/project/ccr_ai_upgrade/ccr_ai_upgrade/aiui/flink/demo/t1";
        String checkpointDir= "/project/ccr_ai_upgrade/ccr_ai_upgrade/aiui/flink/chk";

        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        //设置并行度
        String sourceTopic = parameterTool.get("sourceTopic","odeon_test_liangji_114_test");
        String cg = parameterTool.get("cg","CG_odeon_test_liangji_114_test");
        String cgpwd = parameterTool.get("cgPwd","252287");

        EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true);
        env.setStateBackend(backend);
        env.enableCheckpointing(60000);
        CheckpointConfig config = env.getCheckpointConfig();
        config.setCheckpointStorage(checkpointDir);
        config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        config.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
        config.setCheckpointTimeout(60000);

        Properties prop = new Properties();
        prop.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.31.162.72:9093,172.31.162.73:9093,172.31.162.74:9093");
        prop.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SASL_PLAINTEXT");
        prop.setProperty(SaslConfigs.SASL_MECHANISM,"SCRAM-SHA-256");
        prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG,cg);
        prop.setProperty(SaslConfigs.SASL_JAAS_CONFIG,"org.apache.kafka.common.security.scram.ScramLoginModule required username=\""+cg+"\"\n" +"password=\""+cgpwd+"\";");
        prop.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
        prop.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
        prop.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"30000");

        RowType rowType = createJsonRowType();
        KafkaSource<RowData> source = KafkaSource.<RowData>builder()
                .setBootstrapServers("bootstrap server")
                .setTopics(sourceTopic)
                .setGroupId(cg)
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new JsonRowDataDeserializationSchema(rowType, InternalTypeInfo.of(rowType), false, false, TimestampFormat.ISO_8601))
                .setProperties(prop)
                .build();

        DataStream<RowData> dataStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

        Map<String, String> options = new HashMap<>();
        options.put(FlinkOptions.PATH.key(), basePath);
        options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
        options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts");

        HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable)
                .column("uuid VARCHAR(200)")
                .column("name VARCHAR(100)")
                .column("age INT")
                .column("ts TIMESTAMP(0)")
                .column("`partition` VARCHAR(20)")
                .pk("uuid")
                .partition("partition")
                .options(options);

        builder.sink(dataStream, false); // The second parameter indicating whether the input data stream is bounded
        env.execute("Api_Sink");
    }

    private static RowType createJsonRowType() {
        return (RowType) DataTypes.ROW(
                DataTypes.FIELD("uuid", DataTypes.STRING()),
                DataTypes.FIELD("name", DataTypes.STRING()),
                DataTypes.FIELD("age", DataTypes.INT()),
                DataTypes.FIELD("ts", DataTypes.TIMESTAMP(0)),
                DataTypes.FIELD("partition", DataTypes.STRING())
                ).getLogicalType();
    }
}
View Code

kafka消息发送示例如下:

package com.liangji.kafka;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.SaslConfigs;

import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

/**
 * @author liangji
 * @Description TODO
 * @Date 2021/5/7 19:17
 */
public class KafkaProducerFrom1 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "bootstrap server");
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SASL_PLAINTEXT");
        props.put(SaslConfigs.SASL_MECHANISM,"SCRAM-SHA-256");
        props.put(SaslConfigs.SASL_JAAS_CONFIG,"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"PG_odeon_test_liangji_114_test\" password=\"889448\";");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 3);
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000);
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        Producer producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);

        while (true) {
            String uuid = UUID.randomUUID().toString().replace("-", "").toLowerCase();
            ProducerRecord producerRecord = new ProducerRecord<>("odeon_test_liangji_114_test","k", "{" +
                    "\"uuid\":" + "\"" + uuid + "\"," +
                    "\"name\":\"liangji-" + uuid + "\"," +
                    "\"age\":\"18\"" + "," +
                    "\"ts\":\"2022-11-17T14:00:00\"" + "," +
                    "\"partition\":\"2022-11-17\"" +
                    "}");
            Future<RecordMetadata> future = producer.send(producerRecord);
            future.get();
            System.out.println("success");
            Thread.sleep(1000);
        }
    }
}
View Code

query

通过flink datastream持续读取hudi

package com.liangji.hudi0121.flink114;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.util.HoodiePipeline;

import java.util.HashMap;
import java.util.Map;

public class JavaTestQuery {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        String targetTable = "t1";
        String basePath = "/project/ccr_ai_upgrade/ccr_ai_upgrade/aiui/flink/demo/t1";

        String startCommit = "20221117164814122";
        if (StringUtils.isEmpty(startCommit)) {
            HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
                    .setConf(HadoopConfigurations.getHadoopConf(new Configuration())).setBasePath(basePath).build();
            startCommit = metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants().firstInstant()
                    .map(HoodieInstant::getTimestamp).orElse(null);
        }

        Map<String, String> options = new HashMap<>();
        options.put(FlinkOptions.PATH.key(), basePath);
        options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
        options.put(FlinkOptions.READ_AS_STREAMING.key(), "true");
        options.put(FlinkOptions.READ_START_COMMIT.key(), startCommit);

        HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable)
                .column("uuid VARCHAR(200)")
                .column("name VARCHAR(100)")
                .column("age INT")
                .column("ts TIMESTAMP(0)")
                .column("`partition` VARCHAR(20)")
                .pk("uuid")
                .partition("partition")
                .options(options);

        DataStream<RowData> rowDataDataStream = builder.source(env);
        rowDataDataStream.print();
        env.execute("Api_Source");
    }
}
View Code

hudi with flink sql & hudi on metastore by scala

pom依赖

<dependencies>
        <dependency>
            <groupId>org.apache.hudi</groupId>
            <artifactId>hudi-flink1.14-bundle</artifactId>
            <version>${hudi.version}</version>
        </dependency>

        <!-- Flink Hudi run extra -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.0</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.commons</groupId>
                    <artifactId>commons-math3</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!-- Flink web -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- Table ecosystem -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- Flink core -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- Flink connector -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- Flink statebackend -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.25</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>
View Code

ddl

通过flink sql创建表,并将hudi表元数据托管在hive metastore中

package com.liangji.hudi0121.flink114.sql.ddl

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

import java.net.URL

object CreateTable {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME","hueuser")

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = StreamTableEnvironment.create(env)

    val createCatalogSql =
      s"""
        |CREATE CATALOG hudi_catalog WITH (
        |    'type' = 'hudi',
        |    'mode' = 'hms',
        |    'default-database' = 'default',
        |    'hive.conf.dir' = 'E:\\mine\\hudi0121-demo-maven\\flink114-scala\\src\\main\\resources'
        |)
        |""".stripMargin

    val createDbSql = """create database if not exists hudi_catalog.ccr_ai_upgrade""".stripMargin

    val changeDbSql = """use hudi_catalog.ccr_ai_upgrade""".stripMargin

    val createTbSql =
      """
        |CREATE TABLE if not exists test_hudi_flink_mor (
        |  uuid VARCHAR(200) PRIMARY KEY NOT ENFORCED,
        |  name VARCHAR(100),
        |  age INT,
        |  ts TIMESTAMP(0),
        |  `partition` VARCHAR(20)
        |)
        |PARTITIONED BY (`partition`
        |)
        |WITH (
        |  'connector' = 'hudi',
        |  'path' = '/project/ccr_ai_upgrade/ccr_ai_upgrade/aiui/flink/demo/test_hudi_flink_mor',
        |  'table.type' = 'MERGE_ON_READ',
        |  'hoodie.datasource.write.keygenerator.class' = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
        |  'hoodie.datasource.write.recordkey.field' = 'uuid',
        |  'hoodie.datasource.write.hive_style_partitioning' = 'true'
        |)
        |""".stripMargin

    tableEnv.executeSql(createCatalogSql)
    tableEnv.executeSql(createDbSql)
    tableEnv.executeSql(changeDbSql)
    tableEnv.executeSql(createTbSql).print()
  }
}
View Code

batch

write

通过flink sql写入hudi表,如下是通过batch方式写入

package com.liangji.hudi0121.flink114.sql.write

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object BatchWrite {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME","hueuser")

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = StreamTableEnvironment.create(env)

    val createCatalogSql =
      s"""
        |CREATE CATALOG hudi_catalog WITH (
        |    'type' = 'hudi',
        |    'mode' = 'hms',
        |    'default-database' = 'default',
        |    'hive.conf.dir' = 'E:\\mine\\hudi0121-demo-maven\\flink114-scala\\src\\main\\resources'
        |)
        |""".stripMargin

    val changeDbSql = """use hudi_catalog.ccr_ai_upgrade""".stripMargin

    tableEnv.executeSql(createCatalogSql)
    tableEnv.executeSql(changeDbSql)
    tableEnv.executeSql("insert into test_hudi_flink_mor values ('b','liangji-1',19,TIMESTAMP '2022-11-18 18:00:00','2022-11-18')").print()
  }
}
View Code

query

通过flink sql查询hudi表,如下是通过batch方式查询

package com.liangji.hudi0121.flink114.sql.query

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object BatchQuery {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME","hueuser")

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = StreamTableEnvironment.create(env)

    val createCatalogSql =
      s"""
        |CREATE CATALOG hudi_catalog WITH (
        |    'type' = 'hudi',
        |    'mode' = 'hms',
        |    'default-database' = 'default',
        |    'hive.conf.dir' = 'E:\\mine\\hudi0121-demo-maven\\flink114-scala\\src\\main\\resources'
        |)
        |""".stripMargin

    val changeDbSql = """use hudi_catalog.ccr_ai_upgrade""".stripMargin

    tableEnv.executeSql(createCatalogSql)
    tableEnv.executeSql(changeDbSql)
    tableEnv.executeSql("select * from test_hudi_flink_mor").print()
  }
}
View Code

streaming

write

通过flink sql消费hudi表,流式写入hudi表,如下是通过streaming方式写入

package com.liangji.hudi0121.flink114.sql.streaming.write

import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object StreamingWrite {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME","hueuser")

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = StreamTableEnvironment.create(env)

    val checkpointDir = "hdfs://ns-tmp/project/ccr_ai_upgrade/ccr_ai_upgrade/aiui/flink/chk"
    val backend = new EmbeddedRocksDBStateBackend(true)
    env.setStateBackend(backend)
    env.enableCheckpointing(60000)
    val config = env.getCheckpointConfig
    config.setCheckpointStorage(checkpointDir)
    config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    config.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
    config.setCheckpointTimeout(60000)

    val createCatalogSql =
      s"""
        |CREATE CATALOG hudi_catalog WITH (
        |    'type' = 'hudi',
        |    'mode' = 'hms',
        |    'default-database' = 'default',
        |    'hive.conf.dir' = 'E:\\mine\\hudi0121-demo-maven\\flink114-scala\\src\\main\\resources'
        |)
        |""".stripMargin

    val changeDbSql = """use hudi_catalog.ccr_ai_upgrade""".stripMargin

    tableEnv.executeSql(createCatalogSql)
    tableEnv.executeSql(changeDbSql)

    val table = tableEnv.sqlQuery("select * from test_hudi_flink_mor/*+ OPTIONS('read.streaming.enabled'='true')*/")

    // table=>datastream
    val stream = tableEnv.toDataStream(table)

    // do anything u want

    // datastream=>table
    val inputTable = tableEnv.fromDataStream(stream)
    // create temp view
    tableEnv.createTemporaryView("InputTable", inputTable)

    tableEnv.executeSql("insert into test_hudi_flink_mor_streaming select * from InputTable")
  }
}
View Code

注:

  1. 需开启chk,否则数据不会提交
  2. 可自由转换table和datastream进行逻辑处理

query

通过flink sql流式读取hudi表,如下是通过streaming方式读取

package com.liangji.hudi0121.flink114.sql.streaming.query

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object StreamingQuery {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME","hueuser")

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = StreamTableEnvironment.create(env)

    val createCatalogSql =
      s"""
        |CREATE CATALOG hudi_catalog WITH (
        |    'type' = 'hudi',
        |    'mode' = 'hms',
        |    'default-database' = 'default',
        |    'hive.conf.dir' = 'E:\\mine\\hudi0121-demo-maven\\flink114-scala\\src\\main\\resources'
        |)
        |""".stripMargin

    val changeDbSql = """use hudi_catalog.ccr_ai_upgrade""".stripMargin

    tableEnv.executeSql(createCatalogSql)
    tableEnv.executeSql(changeDbSql)

    tableEnv.executeSql("select * from test_hudi_flink_mor/*+ OPTIONS('read.streaming.enabled'='true')*/").print()
  }
}
View Code

 

注:需在sql添加hint:'read.streaming.enabled'='true'

参考

  1. https://github.com/apache/hudi/blob/master/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/HoodieFlinkQuickstart.java
  2. Flink SQL通过Hudi HMS Catalog读写Hudi并同步Hive表(强烈推荐这种方式)
  3. Static methods in interface require -target:jvm-1.8_好色仙人的徒弟的博客-CSDN博客
  4. Flink 使用之操作 Hudi 表
  5. 使用 Flink Hudi 构建流式数据湖-阿里云开发者社区
TRANSLATE with x English
Arabic Hebrew Polish
Bulgarian Hindi Portuguese
Catalan Hmong Daw Romanian
Chinese Simplified Hungarian Russian
Chinese Traditional Indonesian Slovak
Czech Italian Slovenian
Danish Japanese Spanish
Dutch Klingon Swedish
English Korean Thai
Estonian Latvian Turkish
Finnish Lithuanian Ukrainian
French Malay Urdu
German Maltese Vietnamese
Greek Norwegian Welsh
Haitian Creole Persian  
  TRANSLATE with COPY THE URL BELOW Back EMBED THE SNIPPET BELOW IN YOUR SITE Enable collaborative features and customize widget: Bing Webmaster Portal Back     此页面的语言为中文(简体)   翻译为        
  • 中文(简体)
  • 中文(繁体)
  • 丹麦语
  • 乌克兰语
  • 乌尔都语
  • 亚美尼亚语
  • 俄语
  • 保加利亚语
  • 克罗地亚语
  • 冰岛语
  • 加泰罗尼亚语
  • 匈牙利语
  • 卡纳达语
  • 印地语
  • 印尼语
  • 古吉拉特语
  • 哈萨克语
  • 土耳其语
  • 威尔士语
  • 孟加拉语
  • 尼泊尔语
  • 布尔语(南非荷兰语)
  • 希伯来语
  • 希腊语
  • 库尔德语
  • 德语
  • 意大利语
  • 拉脱维亚语
  • 挪威语
  • 捷克语
  • 斯洛伐克语
  • 斯洛文尼亚语
  • 旁遮普语
  • 日语
  • 普什图语
  • 毛利语
  • 法语
  • 波兰语
  • 波斯语
  • 泰卢固语
  • 泰米尔语
  • 泰语
  • 海地克里奥尔语
  • 爱沙尼亚语
  • 瑞典语
  • 立陶宛语
  • 缅甸语
  • 罗马尼亚语
  • 老挝语
  • 芬兰语
  • 英语
  • 荷兰语
  • 萨摩亚语
  • 葡萄牙语
  • 西班牙语
  • 越南语
  • 阿塞拜疆语
  • 阿姆哈拉语
  • 阿尔巴尼亚语
  • 阿拉伯语
  • 韩语
  • 马尔加什语
  • 马拉地语
  • 马拉雅拉姆语
  • 马来语
  • 马耳他语
  • 高棉语
 

标签:hudi,val,验证,import,flink,org,apache,flink1.14
From: https://www.cnblogs.com/just-JL/p/16915935.html

相关文章