hudi with flink datastream by java
版本
- flink1.14
- hudi0.12.1
pom依赖
<dependencies> <dependency> <groupId>org.apache.hudi</groupId> <artifactId>hudi-flink1.14-bundle</artifactId> <version>${hudi.version}</version> </dependency> <!-- Flink Hudi run extra --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.0</version> <scope>provided</scope> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <!-- Flink web --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- Table ecosystem --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- Flink core --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- Flink connector --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- Flink statebackend --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.25</version> <scope>compile</scope> </dependency> </dependencies>View Code
write
消费kafka,通过flink datastream持续写入hudi
package com.liangji.hudi0121.flink114; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.formats.json.JsonRowDataDeserializationSchema; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.logical.RowType; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.util.HoodiePipeline; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.config.SaslConfigs; import java.util.HashMap; import java.util.Map; import java.util.Properties; public class JavaTestInsert { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String targetTable = "t1"; String basePath = "/project/ccr_ai_upgrade/ccr_ai_upgrade/aiui/flink/demo/t1"; String checkpointDir= "/project/ccr_ai_upgrade/ccr_ai_upgrade/aiui/flink/chk"; ParameterTool parameterTool = ParameterTool.fromArgs(args); //设置并行度 String sourceTopic = parameterTool.get("sourceTopic","odeon_test_liangji_114_test"); String cg = parameterTool.get("cg","CG_odeon_test_liangji_114_test"); String cgpwd = parameterTool.get("cgPwd","252287"); EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true); env.setStateBackend(backend); env.enableCheckpointing(60000); CheckpointConfig config = env.getCheckpointConfig(); config.setCheckpointStorage(checkpointDir); config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); config.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE); config.setCheckpointTimeout(60000); Properties prop = new Properties(); prop.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.31.162.72:9093,172.31.162.73:9093,172.31.162.74:9093"); prop.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SASL_PLAINTEXT"); prop.setProperty(SaslConfigs.SASL_MECHANISM,"SCRAM-SHA-256"); prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG,cg); prop.setProperty(SaslConfigs.SASL_JAAS_CONFIG,"org.apache.kafka.common.security.scram.ScramLoginModule required username=\""+cg+"\"\n" +"password=\""+cgpwd+"\";"); prop.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true"); prop.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000"); prop.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"30000"); RowType rowType = createJsonRowType(); KafkaSource<RowData> source = KafkaSource.<RowData>builder() .setBootstrapServers("bootstrap server") .setTopics(sourceTopic) .setGroupId(cg) .setStartingOffsets(OffsetsInitializer.latest()) .setValueOnlyDeserializer(new JsonRowDataDeserializationSchema(rowType, InternalTypeInfo.of(rowType), false, false, TimestampFormat.ISO_8601)) .setProperties(prop) .build(); DataStream<RowData> dataStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source"); Map<String, String> options = new HashMap<>(); options.put(FlinkOptions.PATH.key(), basePath); options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name()); options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts"); HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable) .column("uuid VARCHAR(200)") .column("name VARCHAR(100)") .column("age INT") .column("ts TIMESTAMP(0)") .column("`partition` VARCHAR(20)") .pk("uuid") .partition("partition") .options(options); builder.sink(dataStream, false); // The second parameter indicating whether the input data stream is bounded env.execute("Api_Sink"); } private static RowType createJsonRowType() { return (RowType) DataTypes.ROW( DataTypes.FIELD("uuid", DataTypes.STRING()), DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("age", DataTypes.INT()), DataTypes.FIELD("ts", DataTypes.TIMESTAMP(0)), DataTypes.FIELD("partition", DataTypes.STRING()) ).getLogicalType(); } }View Code
kafka消息发送示例如下:
package com.liangji.kafka; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.config.SaslConfigs; import java.util.Properties; import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; /** * @author liangji * @Description TODO * @Date 2021/5/7 19:17 */ public class KafkaProducerFrom1 { public static void main(String[] args) throws ExecutionException, InterruptedException { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "bootstrap server"); props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SASL_PLAINTEXT"); props.put(SaslConfigs.SASL_MECHANISM,"SCRAM-SHA-256"); props.put(SaslConfigs.SASL_JAAS_CONFIG,"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"PG_odeon_test_liangji_114_test\" password=\"889448\";"); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.RETRIES_CONFIG, 3); props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000); props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); Producer producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props); while (true) { String uuid = UUID.randomUUID().toString().replace("-", "").toLowerCase(); ProducerRecord producerRecord = new ProducerRecord<>("odeon_test_liangji_114_test","k", "{" + "\"uuid\":" + "\"" + uuid + "\"," + "\"name\":\"liangji-" + uuid + "\"," + "\"age\":\"18\"" + "," + "\"ts\":\"2022-11-17T14:00:00\"" + "," + "\"partition\":\"2022-11-17\"" + "}"); Future<RecordMetadata> future = producer.send(producerRecord); future.get(); System.out.println("success"); Thread.sleep(1000); } } }View Code
query
通过flink datastream持续读取hudi
package com.liangji.hudi0121.flink114; import org.apache.commons.lang3.StringUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.data.RowData; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.util.HoodiePipeline; import java.util.HashMap; import java.util.Map; public class JavaTestQuery { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String targetTable = "t1"; String basePath = "/project/ccr_ai_upgrade/ccr_ai_upgrade/aiui/flink/demo/t1"; String startCommit = "20221117164814122"; if (StringUtils.isEmpty(startCommit)) { HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() .setConf(HadoopConfigurations.getHadoopConf(new Configuration())).setBasePath(basePath).build(); startCommit = metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants().firstInstant() .map(HoodieInstant::getTimestamp).orElse(null); } Map<String, String> options = new HashMap<>(); options.put(FlinkOptions.PATH.key(), basePath); options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name()); options.put(FlinkOptions.READ_AS_STREAMING.key(), "true"); options.put(FlinkOptions.READ_START_COMMIT.key(), startCommit); HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable) .column("uuid VARCHAR(200)") .column("name VARCHAR(100)") .column("age INT") .column("ts TIMESTAMP(0)") .column("`partition` VARCHAR(20)") .pk("uuid") .partition("partition") .options(options); DataStream<RowData> rowDataDataStream = builder.source(env); rowDataDataStream.print(); env.execute("Api_Source"); } }View Code
hudi with flink sql & hudi on metastore by scala
pom依赖
<dependencies> <dependency> <groupId>org.apache.hudi</groupId> <artifactId>hudi-flink1.14-bundle</artifactId> <version>${hudi.version}</version> </dependency> <!-- Flink Hudi run extra --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.0</version> <scope>provided</scope> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>org.apache.commons</groupId> <artifactId>commons-math3</artifactId> </exclusion> </exclusions> </dependency> <!-- Flink web --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- Table ecosystem --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- Flink core --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- Flink connector --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- Flink statebackend --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.25</version> <scope>compile</scope> </dependency> </dependencies>View Code
ddl
通过flink sql创建表,并将hudi表元数据托管在hive metastore中
package com.liangji.hudi0121.flink114.sql.ddl import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import java.net.URL object CreateTable { def main(args: Array[String]): Unit = { System.setProperty("HADOOP_USER_NAME","hueuser") val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(env) val createCatalogSql = s""" |CREATE CATALOG hudi_catalog WITH ( | 'type' = 'hudi', | 'mode' = 'hms', | 'default-database' = 'default', | 'hive.conf.dir' = 'E:\\mine\\hudi0121-demo-maven\\flink114-scala\\src\\main\\resources' |) |""".stripMargin val createDbSql = """create database if not exists hudi_catalog.ccr_ai_upgrade""".stripMargin val changeDbSql = """use hudi_catalog.ccr_ai_upgrade""".stripMargin val createTbSql = """ |CREATE TABLE if not exists test_hudi_flink_mor ( | uuid VARCHAR(200) PRIMARY KEY NOT ENFORCED, | name VARCHAR(100), | age INT, | ts TIMESTAMP(0), | `partition` VARCHAR(20) |) |PARTITIONED BY (`partition` |) |WITH ( | 'connector' = 'hudi', | 'path' = '/project/ccr_ai_upgrade/ccr_ai_upgrade/aiui/flink/demo/test_hudi_flink_mor', | 'table.type' = 'MERGE_ON_READ', | 'hoodie.datasource.write.keygenerator.class' = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator', | 'hoodie.datasource.write.recordkey.field' = 'uuid', | 'hoodie.datasource.write.hive_style_partitioning' = 'true' |) |""".stripMargin tableEnv.executeSql(createCatalogSql) tableEnv.executeSql(createDbSql) tableEnv.executeSql(changeDbSql) tableEnv.executeSql(createTbSql).print() } }View Code
batch
write
通过flink sql写入hudi表,如下是通过batch方式写入
package com.liangji.hudi0121.flink114.sql.write import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment object BatchWrite { def main(args: Array[String]): Unit = { System.setProperty("HADOOP_USER_NAME","hueuser") val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(env) val createCatalogSql = s""" |CREATE CATALOG hudi_catalog WITH ( | 'type' = 'hudi', | 'mode' = 'hms', | 'default-database' = 'default', | 'hive.conf.dir' = 'E:\\mine\\hudi0121-demo-maven\\flink114-scala\\src\\main\\resources' |) |""".stripMargin val changeDbSql = """use hudi_catalog.ccr_ai_upgrade""".stripMargin tableEnv.executeSql(createCatalogSql) tableEnv.executeSql(changeDbSql) tableEnv.executeSql("insert into test_hudi_flink_mor values ('b','liangji-1',19,TIMESTAMP '2022-11-18 18:00:00','2022-11-18')").print() } }View Code
query
通过flink sql查询hudi表,如下是通过batch方式查询
package com.liangji.hudi0121.flink114.sql.query import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment object BatchQuery { def main(args: Array[String]): Unit = { System.setProperty("HADOOP_USER_NAME","hueuser") val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(env) val createCatalogSql = s""" |CREATE CATALOG hudi_catalog WITH ( | 'type' = 'hudi', | 'mode' = 'hms', | 'default-database' = 'default', | 'hive.conf.dir' = 'E:\\mine\\hudi0121-demo-maven\\flink114-scala\\src\\main\\resources' |) |""".stripMargin val changeDbSql = """use hudi_catalog.ccr_ai_upgrade""".stripMargin tableEnv.executeSql(createCatalogSql) tableEnv.executeSql(changeDbSql) tableEnv.executeSql("select * from test_hudi_flink_mor").print() } }View Code
streaming
write
通过flink sql消费hudi表,流式写入hudi表,如下是通过streaming方式写入
package com.liangji.hudi0121.flink114.sql.streaming.write import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.environment.CheckpointConfig import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment object StreamingWrite { def main(args: Array[String]): Unit = { System.setProperty("HADOOP_USER_NAME","hueuser") val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(env) val checkpointDir = "hdfs://ns-tmp/project/ccr_ai_upgrade/ccr_ai_upgrade/aiui/flink/chk" val backend = new EmbeddedRocksDBStateBackend(true) env.setStateBackend(backend) env.enableCheckpointing(60000) val config = env.getCheckpointConfig config.setCheckpointStorage(checkpointDir) config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) config.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE) config.setCheckpointTimeout(60000) val createCatalogSql = s""" |CREATE CATALOG hudi_catalog WITH ( | 'type' = 'hudi', | 'mode' = 'hms', | 'default-database' = 'default', | 'hive.conf.dir' = 'E:\\mine\\hudi0121-demo-maven\\flink114-scala\\src\\main\\resources' |) |""".stripMargin val changeDbSql = """use hudi_catalog.ccr_ai_upgrade""".stripMargin tableEnv.executeSql(createCatalogSql) tableEnv.executeSql(changeDbSql) val table = tableEnv.sqlQuery("select * from test_hudi_flink_mor/*+ OPTIONS('read.streaming.enabled'='true')*/") // table=>datastream val stream = tableEnv.toDataStream(table) // do anything u want // datastream=>table val inputTable = tableEnv.fromDataStream(stream) // create temp view tableEnv.createTemporaryView("InputTable", inputTable) tableEnv.executeSql("insert into test_hudi_flink_mor_streaming select * from InputTable") } }View Code
注:
- 需开启chk,否则数据不会提交
- 可自由转换table和datastream进行逻辑处理
query
通过flink sql流式读取hudi表,如下是通过streaming方式读取
package com.liangji.hudi0121.flink114.sql.streaming.query import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment object StreamingQuery { def main(args: Array[String]): Unit = { System.setProperty("HADOOP_USER_NAME","hueuser") val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(env) val createCatalogSql = s""" |CREATE CATALOG hudi_catalog WITH ( | 'type' = 'hudi', | 'mode' = 'hms', | 'default-database' = 'default', | 'hive.conf.dir' = 'E:\\mine\\hudi0121-demo-maven\\flink114-scala\\src\\main\\resources' |) |""".stripMargin val changeDbSql = """use hudi_catalog.ccr_ai_upgrade""".stripMargin tableEnv.executeSql(createCatalogSql) tableEnv.executeSql(changeDbSql) tableEnv.executeSql("select * from test_hudi_flink_mor/*+ OPTIONS('read.streaming.enabled'='true')*/").print() } }View Code
注:需在sql添加hint:'read.streaming.enabled'='true'
参考
- https://github.com/apache/hudi/blob/master/hudi-examples/hudi-examples-flink/src/main/java/org/apache/hudi/examples/quickstart/HoodieFlinkQuickstart.java
- Flink SQL通过Hudi HMS Catalog读写Hudi并同步Hive表(强烈推荐这种方式)
- Static methods in interface require -target:jvm-1.8_好色仙人的徒弟的博客-CSDN博客
- Flink 使用之操作 Hudi 表
- 使用 Flink Hudi 构建流式数据湖-阿里云开发者社区
- 中文(简体)
- 中文(繁体)
- 丹麦语
- 乌克兰语
- 乌尔都语
- 亚美尼亚语
- 俄语
- 保加利亚语
- 克罗地亚语
- 冰岛语
- 加泰罗尼亚语
- 匈牙利语
- 卡纳达语
- 印地语
- 印尼语
- 古吉拉特语
- 哈萨克语
- 土耳其语
- 威尔士语
- 孟加拉语
- 尼泊尔语
- 布尔语(南非荷兰语)
- 希伯来语
- 希腊语
- 库尔德语
- 德语
- 意大利语
- 拉脱维亚语
- 挪威语
- 捷克语
- 斯洛伐克语
- 斯洛文尼亚语
- 旁遮普语
- 日语
- 普什图语
- 毛利语
- 法语
- 波兰语
- 波斯语
- 泰卢固语
- 泰米尔语
- 泰语
- 海地克里奥尔语
- 爱沙尼亚语
- 瑞典语
- 立陶宛语
- 缅甸语
- 罗马尼亚语
- 老挝语
- 芬兰语
- 英语
- 荷兰语
- 萨摩亚语
- 葡萄牙语
- 西班牙语
- 越南语
- 阿塞拜疆语
- 阿姆哈拉语
- 阿尔巴尼亚语
- 阿拉伯语
- 韩语
- 马尔加什语
- 马拉地语
- 马拉雅拉姆语
- 马来语
- 马耳他语
- 高棉语