data.csv内容:
1,Tom,15
2,Lily,13
3,Mike,21
4,John,20
5,Emma,18
6,Sophia,19
7,David,22
8,James,16
9,Olivia,17
10,Robert,23
11,Emily,14
12,Daniel,25
13,Amelia,24
代码:
package com.auguigu.demo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
public class TextSql {
public static void main(String[] args) throws Exception {
// 1. 创建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 注册 CSV 表
tableEnv.executeSql(
"CREATE TABLE csv_source (" +
" `uid` INT," +
" `name` STRING," +
" `age` INT" +
") WITH (" +
" 'connector' = 'filesystem'," +
" 'path' = 'file:///G:/JetBrains/java_workspace/flink-learning/flink-demo1/input/data.csv'," +
" 'format' = 'csv'," +
" 'csv.field-delimiter' = ','," +
" 'csv.ignore-parse-errors' = 'true'" +
")"
);
// 将注册的表转换为DataStream
DataStream<Row> csvSourceDataStream = tableEnv.toAppendStream(tableEnv.from("csv_source"), Row.class);
// 打印输出
csvSourceDataStream.print();
// 执行任务
env.execute();
}
}
输出结果
1> +I[1, Tom, 15]
1> +I[2, Lily, 13]
1> +I[3, Mike, 21]
1> +I[4, John, 20]
1> +I[5, Emma, 18]
1> +I[6, Sophia, 19]
1> +I[7, David, 22]
1> +I[8, James, 16]
1> +I[9, Olivia, 17]
1> +I[10, Robert, 23]
1> +I[11, Emily, 14]
1> +I[12, Daniel, 25]
1> +I[13, Amelia, 24]
Process finished with exit code 0
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flink-demo1</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<java.version>11</java.version>
<flink.version>1.17.2</flink.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
</project>
标签:Flink,Java,java,flink,version,SQL,apache,org,csv
From: https://www.cnblogs.com/FengZeng666/p/18242831