我的例子程序是仿照官网例子写的:
我的程序:
package com.xxx.demo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
public class TextSql {
public static void main(String[] args) throws Exception {
// 1. 创建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 注册 CSV 表
tableEnv.executeSql(
"CREATE TABLE csv_source (" +
" `uid` INT," +
" `name` STRING," +
" `age` INT" +
") WITH (" +
" 'connector' = 'filesystem'," +
" 'path' = 'file:///G:/JetBrains/java_workspace/flink-learning/flink-demo1/input/data.csv'," +
" 'format' = 'csv'," +
" 'csv.field-delimiter' = ','," +
" 'csv.ignore-parse-errors' = 'true'" +
")"
);
// 将注册的表转换为DataStream
DataStream<Row> csvSourceDataStream = tableEnv.toAppendStream(tableEnv.from("csv_source"), Row.class);
// 打印输出
csvSourceDataStream.print();
// 执行任务
env.execute();
}
}
完整报错信息:
Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.csv_source'.
Table options are:
'connector'='filesystem'
'csv.field-delimiter'=','
'csv.ignore-parse-errors'='true'
'format'='csv'
'path'='file:///G:/JetBrains/java_workspace/flink-learning/flink-demo1/input/data.csv'
at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:167)
at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:192)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:175)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:115)
at org.apache.calcite.rel.core.RelFactories$TableScanFactoryImpl.createScan(RelFactories.java:481)
at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1757)
at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:366)
at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:158)
at org.apache.flink.table.operations.SourceQueryOperation.accept(SourceQueryOperation.java:86)
at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:155)
at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:135)
at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:92)
at org.apache.flink.table.operations.SourceQueryOperation.accept(SourceQueryOperation.java:86)
at org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.java:261)
at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:294)
at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:194)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:194)
at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224)
at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:324)
at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:314)
at com.auguigu.demo.TextSql.main(TextSql.java:34)
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='filesystem'
at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:736)
at org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:710)
at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:163)
... 30 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'filesystem' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
Available factory identifiers are:
blackhole
datagen
kafka
print
upsert-kafka
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:546)
at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:732)
... 32 more
解决方法:
添加依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
解决方案来源
Read a csv file in Flink 1.15 using Table API - Stack Overflow
标签:java,scala,flink,connector,连接器,filesystem,apache,org,table From: https://www.cnblogs.com/FengZeng666/p/18242809