首页 > 数据库 >FlinkSQL 运行官网的 filesystem SQL 连接器例子出错:Cannot discover a connector using option: 'connector'

FlinkSQL 运行官网的 filesystem SQL 连接器例子出错:Cannot discover a connector using option: 'connector'

时间:2024-06-11 21:58:18浏览次数:28  
标签:java scala flink connector 连接器 filesystem apache org table

我的例子程序是仿照官网例子写的:
image

我的程序:

package com.xxx.demo;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class TextSql {

    public static void main(String[] args) throws Exception {

        // 1. 创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);


        // 注册 CSV 表
        tableEnv.executeSql(
                "CREATE TABLE csv_source (" +
                        "  `uid` INT," +
                        "  `name` STRING," +
                        "  `age` INT" +
                        ") WITH (" +
                        "  'connector' = 'filesystem'," +
                        "  'path' = 'file:///G:/JetBrains/java_workspace/flink-learning/flink-demo1/input/data.csv'," +
                        "  'format' = 'csv'," +
                        "  'csv.field-delimiter' = ','," +
                        "  'csv.ignore-parse-errors' = 'true'" +
                        ")"
        );

        // 将注册的表转换为DataStream
        DataStream<Row> csvSourceDataStream = tableEnv.toAppendStream(tableEnv.from("csv_source"), Row.class);
        // 打印输出
        csvSourceDataStream.print();
        // 执行任务
        env.execute();

    }
}

完整报错信息:

Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.csv_source'.

Table options are:

'connector'='filesystem'
'csv.field-delimiter'=','
'csv.ignore-parse-errors'='true'
'format'='csv'
'path'='file:///G:/JetBrains/java_workspace/flink-learning/flink-demo1/input/data.csv'
	at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:167)
	at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:192)
	at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:175)
	at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:115)
	at org.apache.calcite.rel.core.RelFactories$TableScanFactoryImpl.createScan(RelFactories.java:481)
	at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1757)
	at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:366)
	at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:158)
	at org.apache.flink.table.operations.SourceQueryOperation.accept(SourceQueryOperation.java:86)
	at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:155)
	at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:135)
	at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:92)
	at org.apache.flink.table.operations.SourceQueryOperation.accept(SourceQueryOperation.java:86)
	at org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.java:261)
	at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:294)
	at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:194)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
	at scala.collection.Iterator.foreach(Iterator.scala:937)
	at scala.collection.Iterator.foreach$(Iterator.scala:937)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
	at scala.collection.IterableLike.foreach(IterableLike.scala:70)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableLike.map(TraversableLike.scala:233)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:194)
	at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224)
	at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:324)
	at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:314)
	at com.auguigu.demo.TextSql.main(TextSql.java:34)
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='filesystem'
	at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:736)
	at org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:710)
	at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:163)
	... 30 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'filesystem' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

Available factory identifiers are:

blackhole
datagen
kafka
print
upsert-kafka
	at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:546)
	at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:732)
	... 32 more

解决方法:

添加依赖:

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-connector-files</artifactId>
	<version>${flink.version}</version>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-csv</artifactId>
	<version>${flink.version}</version>
</dependency>

解决方案来源

Read a csv file in Flink 1.15 using Table API - Stack Overflow

标签:java,scala,flink,connector,连接器,filesystem,apache,org,table
From: https://www.cnblogs.com/FengZeng666/p/18242809

相关文章

  • 新能源电源连接器插针插孔全自动扭力试验机
      一、产品说明:全自动扭力试验机适用各种产品之相关扭力测试,NB转轴、手机、LCDMonitor脚座、旋转开关之扭力测试及扭力寿命试验,Windows中文视窗软体画面设定,操作简单方便,且所有资料皆可储存,可精确测量待测物的力矩与旋转角度及力矩与转速相对应变化曲线,并可准确控制螺......
  • Could not find artifact com.mysql:mysql-connector-j:pom:8.0.36 in central (https
    遇到修改依赖项的MySQL版本结果说找不到依赖项解决方法确保MySQL版本正确降低依赖项的MySQL版本,修改后更新即可以我的MySQL版本举例,可以降低MySQL版本到依赖项支持的版本<dependency><groupId>com.mysql</groupId><artifactId>m......
  • Caused by: org.apache.catalina.connector.ClientAbortException: java.io.IOExcepti
    错误描述Causedby:org.apache.catalina.connector.ClientAbortException:java.io.IOException:你的主机中的软件中止了一个已建立的连接。发生场景ApiFox发起请求,接口内容是下载Excel文件,数据比较大5w条,在请求完之后发生此错误。但是在线上环境并没有这种情况,后来想了想......
  • pinus老项目启动遇'Property connector does not exist on type UserRpc'报错
    跟示例项目对比过,配置代码并无出入,尝试在示例中新增远程调用connectorRemote可用,证明代码配置正确尝试在示例项目中使用工作项目的配置文件包括引用的模块文件目录列表如下 packagespluginspackage.jsonpackage-lock.jsontsconfig.jsonyarn.lock 示例安装模块后,运......
  • mysql.connector.errors.NotSupportedError: Authentication plugin 'caching_sha2_pa
    今天将程序部署到服务器,遇到mysql.connector.errors.NotSupportedError:Authenticationplugin'caching_sha2_password'isnotsupported问题产生的原因:从MySQL8.0开始,默认的用户认证插件从mysql_native_password变成了caching_sha2_password查看现有的用户mysql>se......
  • [UE 虚幻插件 DTPostgreSQL] PostgreSQL Connector 使用蓝图连接操作 PostgreSQL 数据
    本插件主要是支持在UE蓝图中连接和操作PostgreSQL数据库。下载连接在文章最后。数据库连接【CreatePostgreSQL】输入:Host:数据库IP地址。Port:数据库开放端口。User:数据库用户名。Password:数据库密码。DBName:指定连接的数据库库名。输出:Success:返回数据库是否......
  • 揭露 FileSystem 引起的线上 JVM 内存溢出问题
    作者:来自vivo互联网大数据团队-YeJidong本文主要介绍了由FileSystem类引起的一次线上内存泄漏导致内存溢出的问题分析解决全过程。内存泄漏定义(memoryleak):一个不再被程序使用的对象或变量还在内存中占有存储空间,JVM不能正常回收改对象或者变量。一次内存泄漏似乎不会有大......
  • Ubuntu20文件系统磁盘空间不足low disk space on filesystem root——转载
      Ubuntu20文件系统磁盘空间不足lowdiskspaceonfilesystemroot天然玩家于2022-07-2307:45:00发布阅读量1w 收藏 132点赞数41分类专栏: #Ubuntu 文章标签: filesystem gparted ubuntu版权Ubuntu专栏收录该内容19篇文章1......
  • SeaTunnel JDBC DB2 Sink Connector支持的工作原理,快来学习吧!
    DB2是IBM的一款关系型数据库管理系统,JDBCDB2SourceConnector是一个用于通过JDBC读取外部数据源数据的连接器。ApacheSeaTunnel如何支持JDBCDB2SinkConnector?请参考本文档。支持引擎SparkFlinkSeaTunnelZeta主要功能精确一次性CDC(变更数据捕获)使用Xa事务......
  • 当 mysql-connector-java-5 遇上 MySQL8,终究还是错付了 → 门当户对真的很重要!
    开心一刻今天,老婆给我发消息老婆:老公,儿子从隔壁邻居家回来了老婆:是先打还是先洗?我:先洗吧,万一打错人了呢老婆:先洗脸吧,没错就边打边洗起因在我们的固有认知中, mysql-connector-java-5.x.x 连接的是 MySQL5 ,而 mysql-connector-java-8.x.x 连......