kyuubi-1.6.1版本对于flink sql的支持不是很好,在提交流式任务时会阻塞进程,为了修复这个缺陷,需要修改源代码并重新编译
待编译的kyuubi版本:kyuubi-1.6.1-incubating
适配的flink版本:flink-1.14.4
1、下载kyuubi-1.6.1-incubating版本的源代码,并导入IDEA中
git clone -b v1.6.1-incubating https://github.com/apache/kyuubi.git
2、在根目录找到总项目的pom文件,修改部分配置属性
将flink.version和flink.module.scala.suffix属性改为下面所示
<flink.version>1.14.4</flink.version>
<flink.module.scala.suffix>_2.12</flink.module.scala.suffix>
3、修改类org.apache.kyuubi.engine.flink.operation.ExecuteStatement
找到runOperation方法,将其按照以下代码修改
private def runOperation(operation: Operation): Unit = {
// FLINK-24461 executeOperation method changes the return type
// from TableResult to TableResultInternal
val executeOperation = DynMethods.builder("executeOperation")
.impl(executor.getClass, classOf[String], classOf[Operation])
.build(executor)
val result = executeOperation.invoke[TableResult](sessionId, operation)
jobId = result.getJobClient.asScala.map(_.getJobID)
// return jobId when submit success and table.dml-sync is false
if (operation.getClass == classOf[CatalogSinkModifyOperation]) {
resultSet = ResultSet.builder
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.columns(Column.physical("jobId", DataTypes.STRING()))
.data(Array(Row.of(jobId.map(_.toHexString).getOrElse("null"))))
.build
} else {
resultSet = ResultSet.fromTableResult(result)
}
}
4、修改scalastyle-config.xml文件
scalastyle-config.xml文件在项目的根目录下,找到以下配置
<check level="error" class="org.scalastyle.scalariform.ImportOrderChecker" enabled="true">
<parameters>
<parameter name="groups">java,scala,3rdParty,kyuubi</parameter>
<parameter name="group.java">javax?\..*</parameter>
<parameter name="group.scala">scala\..*</parameter>
<parameter name="group.3rdParty">(?!org\.apache\.kyuubi\.).*</parameter>
<parameter name="group.kyuubi">org\.apache\.kyuubi\..*</parameter>
</parameters>
</check>
将enabled属性修改为false(如果为true,编译的时候会检查类导入顺序,容易导致编译失败),修改后如下所示
<check level="error" class="org.scalastyle.scalariform.ImportOrderChecker" enabled="false">
<parameters>
<parameter name="groups">java,scala,3rdParty,kyuubi</parameter>
<parameter name="group.java">javax?\..*</parameter>
<parameter name="group.scala">scala\..*</parameter>
<parameter name="group.3rdParty">(?!org\.apache\.kyuubi\.).*</parameter>
<parameter name="group.kyuubi">org\.apache\.kyuubi\..*</parameter>
</parameters>
</check>
5、重新编译
在linux环境下,执行以下命令进行编译
cd kyuubi && ./build/dist --tgz --spark-provided --flink-provided --hive-provided
编译完成后,在项目的根目录即可找到编译后的包
标签:1.6,kyuubi,..,scala,flink,编译,apache From: https://www.cnblogs.com/zoufh/p/17746575.html