首页 > 数据库 >重新编译kyuubi-1.6.1版本使其提交flink sql流式任务时不阻塞

重新编译kyuubi-1.6.1版本使其提交flink sql流式任务时不阻塞

时间:2023-10-07 16:26:38浏览次数:43  
标签:1.6 kyuubi .. scala flink 编译 apache

kyuubi-1.6.1版本对于flink sql的支持不是很好,在提交流式任务时会阻塞进程,为了修复这个缺陷,需要修改源代码并重新编译

待编译的kyuubi版本:kyuubi-1.6.1-incubating

适配的flink版本:flink-1.14.4

1、下载kyuubi-1.6.1-incubating版本的源代码,并导入IDEA中

git clone -b v1.6.1-incubating https://github.com/apache/kyuubi.git

2、在根目录找到总项目的pom文件,修改部分配置属性

将flink.version和flink.module.scala.suffix属性改为下面所示

<flink.version>1.14.4</flink.version>
<flink.module.scala.suffix>_2.12</flink.module.scala.suffix>

3、修改类org.apache.kyuubi.engine.flink.operation.ExecuteStatement

找到runOperation方法,将其按照以下代码修改

private def runOperation(operation: Operation): Unit = {
    // FLINK-24461 executeOperation method changes the return type
    // from TableResult to TableResultInternal
    val executeOperation = DynMethods.builder("executeOperation")
      .impl(executor.getClass, classOf[String], classOf[Operation])
      .build(executor)
    val result = executeOperation.invoke[TableResult](sessionId, operation)
    jobId = result.getJobClient.asScala.map(_.getJobID)
    // return jobId when submit success and table.dml-sync is false
    if (operation.getClass == classOf[CatalogSinkModifyOperation]) {
      resultSet = ResultSet.builder
        .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
        .columns(Column.physical("jobId", DataTypes.STRING()))
        .data(Array(Row.of(jobId.map(_.toHexString).getOrElse("null"))))
        .build
    } else {
      resultSet = ResultSet.fromTableResult(result)
    }
  }

4、修改scalastyle-config.xml文件

scalastyle-config.xml文件在项目的根目录下,找到以下配置

<check level="error" class="org.scalastyle.scalariform.ImportOrderChecker" enabled="true">
    <parameters>
        <parameter name="groups">java,scala,3rdParty,kyuubi</parameter>
        <parameter name="group.java">javax?\..*</parameter>
        <parameter name="group.scala">scala\..*</parameter>
        <parameter name="group.3rdParty">(?!org\.apache\.kyuubi\.).*</parameter>
        <parameter name="group.kyuubi">org\.apache\.kyuubi\..*</parameter>
    </parameters>
</check>

将enabled属性修改为false(如果为true,编译的时候会检查类导入顺序,容易导致编译失败),修改后如下所示

<check level="error" class="org.scalastyle.scalariform.ImportOrderChecker" enabled="false">
    <parameters>
        <parameter name="groups">java,scala,3rdParty,kyuubi</parameter>
        <parameter name="group.java">javax?\..*</parameter>
        <parameter name="group.scala">scala\..*</parameter>
        <parameter name="group.3rdParty">(?!org\.apache\.kyuubi\.).*</parameter>
        <parameter name="group.kyuubi">org\.apache\.kyuubi\..*</parameter>
    </parameters>
</check>

5、重新编译

在linux环境下,执行以下命令进行编译

cd kyuubi && ./build/dist --tgz --spark-provided --flink-provided --hive-provided

编译完成后,在项目的根目录即可找到编译后的包

标签:1.6,kyuubi,..,scala,flink,编译,apache
From: https://www.cnblogs.com/zoufh/p/17746575.html

相关文章

  • flink 与 es 的一些问题
    写入esmaping字段类型冲突error1:org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler.onFailure(NoOpFailureHandler.java:40)-FailedElasticsearchitemrequest:Elasticsearchexception[type=mapper_parsing_exception,reason=failedtop......
  • flink序列化类型验证
    flink支持的序列化类型官方支持javatuplesandscalacaseclassesjavapojosprimitivetypesregularclassesvalueshadoopwritablesspeclalTypes验证代码StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();......
  • Flink 从0到1实战实时风控系统[云盘分享]
    点击下载:Flink从0到1实战实时风控系统提取码:1sqmFlink是一款基于流处置的散布式计算框架,能够完成高性能、低延迟的实时数据处置和剖析。下面是一个示例代码,用于展现如何运用Flink从零开端构建实时风控系统。首先,我们需求在pom.xml文件中添加Flink的依赖:<dependency><g......
  • Flink学习记录
    Flink学习记录1简介1.1梗概ApacheFlink是一个框架和分布式处理引擎,用于对无界和有界数据流进行状态计算。对比Spark来说,FLink是真正的流式计算框架,而不是像Spark的微批处理1.2工程搭建<properties><flink.version>1.13.0</flink.version><slf4j.vers......
  • 44、Flink之module模块介绍及使用示例和Flink SQL使用hive内置函数及自定义函数详细示
    文章目录Flink系列文章一、模块Modules1、模块介绍2、模块类别ModuleTypes1)、CoreModule2)、HiveModule3)、User-DefinedModule3、模块生命周期和解析顺序ModuleLifecycleandResolutionOrder4、模块Modules的使用1)、SQL方式2)、编码方式-java二、HiveFunctions内置函数和自定......
  • 26、Flink 的SQL之概览与入门示例
    文章目录Flink系列文章一、SQL1、数据类型2、保留关键字二、SQL入门1、FlinkSQL环境准备1)、安装Flink及提交任务方式2)、SQL客户端使用介绍3)、简单示例2、Source表介绍及示例3、连续查询介绍及示例4、Sink表介绍及示例本文简单的介绍了SQL和SQL的入门,并以三个简单的示例进行介......
  • Flink 1.17教程:时间和窗口
    在批处理统计中,我们可以等待一批数据都到齐后,统一处理。但是在实时处理统计中,我们是来一条就得处理一条,那么我们怎么统计最近一段时间内的数据呢?引入“窗口”。所谓的“窗口”,一般就是划定的一段时间范围,也就是“时间窗”;对在这范围内的数据进行处理,就是所谓的窗口计算。所以窗口......
  • Flink 1.17教程:时间和窗口
    在批处理统计中,我们可以等待一批数据都到齐后,统一处理。但是在实时处理统计中,我们是来一条就得处理一条,那么我们怎么统计最近一段时间内的数据呢?引入“窗口”。所谓的“窗口”,一般就是划定的一段时间范围,也就是“时间窗”;对在这范围内的数据进行处理,就是所谓的窗口计算。所以窗口......
  • 29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB
    Flink系列文章1、Flink部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接13、Flink的tableapi与sql的基本概念、通用api介绍及入门示例14、Flink的tableapi与sql之数据类型:内置数据类型以及它们的属性15、Flink的tableap......
  • 27、Flink 的SQL之SELECT (Pattern Recognition 模式检测)介绍及详细示例(7)
    Flink系列文章1、Flink部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接13、Flink的tableapi与sql的基本概念、通用api介绍及入门示例14、Flink的tableapi与sql之数据类型:内置数据类型以及它们的属性15、Flink的tableap......