首页 > 其他分享 >【Flink 日常踩坑】Could not find ExecutorFactory in classpath

【Flink 日常踩坑】Could not find ExecutorFactory in classpath

时间:2024-05-16 22:45:36浏览次数:15  
标签:flink java ExecutorFactory Could Flink api table apache org

Description

一段简单的 FlinkSQL 程序,在 IDE 中运行没问题,但是 maven 打包后发布到终端启动却报错了。

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class FlinkSQL {

	public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger("rest.port", 9091);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        String createSourceTableSQL = "CREATE TABLE source_dest (" +
        			"`id` INT," +
        			"`player` STRING," +
        			"`team` STRING," +
        			"`score` INT," +
        			"PRIMARY KEY (`id`) NOT ENFORCED" +
                ") WITH (" +
                	"'connector' = 'mysql-cdc'," +
                	"'hostname' = '10.4.45.207'," +
                	"'username' = 'username'," +
                	"'password' = 'password'," +
                	"'database-name' = 'cdc_test_source'," +
                	"'table-name' = 'player_scores'," +
                	"'scan.startup.mode' = 'latest-offset'" +
                ");";
        tableEnv.executeSql(createSourceTableSQL);

        String createSinkTableSQL = "CREATE TABLE sink_dest (" +
        			"`id` INT," +
        			"`player` STRING," +
        			"`team` STRING," +
        			"`score` INT," +
        			"PRIMARY KEY (`id`) NOT ENFORCED" +
                ") WITH (" +
                	"'connector' = 'jdbc'," +
                	"'url' = 'jdbc:mysql://10.4.45.207:3306/cdc_test_target'," +
                	"'username' = 'username'," +
                	"'password' = 'password'," +
                	"'table-name' = 'player_scores'" +
                ");";
        tableEnv.executeSql(createSinkTableSQL);

        String insertSQL = "INSERT INTO sink_dest SELECT * FROM source_dest;";
        StatementSet statementSet = tableEnv.createStatementSet();
        statementSet.addInsertSql(insertSQL);
        statementSet.execute();
    }
}

maven 的 pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" 
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>org.example</groupId>
  <artifactId>flink-learning</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <description>flink</description>
  
  <properties>
  	<scala.binary.version>2.12</scala.binary.version>
    <flink.version>1.15.4</flink.version>
  </properties>
  
  <dependencies>
	<dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
	<dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-runtime-web</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-java-bridge</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-runtime</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
    	<groupId>org.apache.flink</groupId>
	  	<artifactId>flink-table-planner-loader</artifactId>
	  	<version>${flink.version}</version>
	</dependency>
    <dependency>
		<groupId>org.apache.flink</groupId>
	    <artifactId>flink-connector-base</artifactId>
	    <version>${flink.version}</version>
	</dependency>
    <dependency>
    	<groupId>org.apache.flink</groupId>
	    <artifactId>flink-connector-jdbc</artifactId>
	    <version>${flink.version}</version>
	</dependency>
	<dependency>
      	<groupId>com.ververica</groupId>
      	<artifactId>flink-connector-mysql-cdc</artifactId>
      	<version>2.4.0</version>
    </dependency>
  </dependencies>
  
  <build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-shade-plugin</artifactId>
				<version>3.1.1</version>
				<configuration>
			    	<createDependencyReducedPom>false</createDependencyReducedPom>
				</configuration>
				<executions>
					<execution>
						<phase>package</phase>
						<goals>
							<goal>shade</goal>
						</goals>
						<configuration>
							<filters>
								<filter>
									<!-- Do not copy the signatures in the META-INF folder. Otherwise, 
										this might cause SecurityExceptions when using the JAR. -->
									<artifact>*:*</artifact>
									<excludes>
										<exclude>META-INF/*.SF</exclude>
										<exclude>META-INF/*.DSA</exclude>
										<exclude>META-INF/*.RSA</exclude>
									</excludes>
								</filter>
							</filters>
							<transformers>
								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
									<mainClass>org.example.test.FlinkSQL</mainClass>
								</transformer>
							</transformers>
						</configuration>
					</execution>
				</executions>
			</plugin>
		</plugins>
	</build>
</project>

使用 maven-shade-plugin 打包成 jar 后,直接使用 java -jar 命令启动,遇到报错:

Exception in thread "main" org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath
        at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.lookupExecutor(AbstractStreamTableEnvironmentImpl.java:108)
        at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:100)
        at org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:122)
        at org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:94)
        at org.example.test.FlinkSQL.main(FlinkSQL.java:19)
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factories that implement 'org.apache.flink.table.delegation.ExecutorFactory' in the classpath.
        at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:526)
        at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.lookupExecutor(AbstractStreamTableEnvironmentImpl.java:105)
        ... 4 more

Locate

找不到 org.apache.flink.table.delegation.ExecutorFactory 的实现类?

跟踪源码查看一下:

step into 继续跟进

step into 继续跟进

到这里就很明确了,原来是找不到 SPI 的实现类。SPI 的加载路径是 META-INF/services, 打开 jar 包查看一下:

org.apache.flink.table.factories.Factory 中只有 3 个注册的 class, 其中确实没有 org.apache.flink.table.delegation.ExecutorFactory 的实现类。

IDE 中可以运行,打包后出现异常,说明是打包过程出现了问题。

排查后发现,项目中的依赖中存在不止一个 org.apache.flink.table.factories.Factory 的 SPI:

而使用 maven-shade-plugin 打包时,默认将第一个依赖 (flink-table-api-java-bridge.jar)中的 META-INF/services 打包进来,并忽略了其他依赖中存在的同名 SPI。

Fixed

我们希望将所有依赖中存在的同名 SPI 中的内容进行 merge,Flink 官方文档 给出了解决办法:

Flink uses Java's Service Provider Interfaces (SPI) to load the table connector/format factories by their identifiers. Since the SPI resource file named org.apache.flink.table.factories.Factory for every table connector/format is under the same directory META-INF/services, these resource files will override each other when build the uber-jar of the project which uses more than one table connector/format, which will cause Flink to fail to load table connector/format factories.

实际上只需要给 maven-shade-plugin 添加一项配置:

<!-- The service transformer is needed to merge META-INF/services files -->
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
</transformer>

重新打包,验证一下:

重新启动程序:

java -jar flink-learning-0.0.1-SNAPSHOT.jar

运行成功:

标签:flink,java,ExecutorFactory,Could,Flink,api,table,apache,org
From: https://www.cnblogs.com/myownswordsman/p/18196872/flinkerror-could-not-find-executorfactor

相关文章

  • 【Flink 日常踩坑】Could not find ExecutorFactory in classpath
    Description一段简单的FlinkSQL程序,在IDE中运行没问题,但是maven打包后发布到终端启动却报错了。importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.......
  • Flink同步kafka到iceberg(cos存储)
    一、flink到logger1、sourcecreatetablesource_table(idbigintcomment'唯一编号',order_numberbigintcomment'订单编号',update_timestamptimestamp_ltzmetadatafr......
  • Flink的State
      有状态的计算是流式计算框架的一个重要功能,很多复杂的计算场景都需要记录一下相关的状态。FlinkState一种为了满足算子计算时需要历史数据需求的,使用checkpoint机制进行容错,存储在statebackend的数据结构。1.State分类    FlinkState被分为keyedstate、operato......
  • Flink执行图
    Flink的代码编写流程为env->source->transform->sink,基本所有的代码都是大致按照图1的流程进行代码编写,当然中间也会有一些封装之类的。  Flink代码写好后,它的任务调度执行图按照生成顺序分为:逻辑流图(StreamGraph)->作业图(JobGraph)->执行图(ExecutionGraph)->物理图(Physica......
  • flink监控数据库表
    背景在日常服务运行中可能会遇到很多数据上的问题,一些我们可以通过日志查询,但是一些修改等操作日志无法查询到,binlog日志不方便查询而且不是所有表都需要日志,增加了查询的难度,我们考虑使用canal或者flink对binlog进行记录,这里flink,flink程序和客户端版本1.17.1pom.xml<?xm......
  • Flink同步mysql到iceberg
    一、如何做一致性保障1、全量数据分片读取,增量数据单并发读取,保证增量阶段不会乱序2、全量阶段写入失败会清空表后重新写入,避免重复数据。3、全量阶段多task并行读取,把每个task开始结束时间提交给FlinkCoordinator,由Coordinator做时间合并后,仅读取一次全量同步区间内变化的binlo......
  • flink sql
    【案例1】Flink01_Table_BaseUsepublicclassFlink01_Table_BaseUse{publicstaticvoidmain(String[]args)throwsException{StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);......
  • pyFlink 入门总结
    一整体流程1.初始化pyFlink执行环境2.加载数据集3.执行数据分析4.导出分析结果 二初始化执行环境2.1初始化参考代码如下frompyflink.tableimportEnvironmentSettings,StreamTableEnvironmentes=EnvironmentSettings.new_instance().in_batch_mode().bui......
  • Flink Batch Hash Aggregate
    数据类型要求BatchPhysicalHashAggRulematch条件会判断isAggBufferFixedLength(agg)为什么要求aggCall的类型是FixedLength的才可以使用HashAggregate?因为在HashAggregate中,依赖于BytesHashMap数据结构来存储keyValue数据.而ByteHashMap不支持变长的val......
  • 10分钟了解Flink SQL使用
    Flink是一个流处理和批处理统一的大数据框架,专门为高吞吐量和低延迟而设计。开发者可以使用SQL进行流批统一处理,大大简化了数据处理的复杂性。本文将介绍FlinkSQL的基本原理、使用方法、流批统一,并通过几个例子进行实践。1、FlinkSQL基本原理FlinkSQL建立在ApacheFlink之上......