首页 > 其他分享 >flink提交yarn 集群模式失败

flink提交yarn 集群模式失败

时间:2024-07-04 15:41:44浏览次数:18  
标签:java ObjectInputStream flink yarn kafka 集群 apache org

flink版本1.14.6
在通过./bin/flink run-application -t yarn-application 模式提交到yarn时失败。
报错信息:

点击查看代码

Caused by: java.lang.ClassCastException: cannot assign instance of org.apache.kafka.clients.consumer.OffsetResetStrategy to field org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer.offsetResetStrategy of type org.apache.kafka.clients.consumer.OffsetResetStrategy in instance of org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer
	at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2287)
	at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1417)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2293)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
	at org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:67)
	at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:431)
	at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:211)

堆栈信息都是底层源码,能力有限,看不出来啥问题
但是根据cause by报错提示可以看出来org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer类中的字段offsetResetStrategy不能强转为org.apache.kafka.clients.consumer.OffsetResetStrategy
所以我直接在本地代码中找到org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer,发现字段offsetResetStrategy的类型确实是org.apache.kafka.clients.consumer.OffsetResetStrategy,而且我的代码在本地能运行,怀疑是代码上传到线上环境后,jar引用关系混乱,没有找到我需要的版本的jar包
在本地代码中,可以发现:org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer属于flink-connector-kafka_2.11-1.14.6.jar
org.apache.kafka.clients.consumer.OffsetResetStrategy属于kafka-clients-3.0.2.jar
于是我将这两个包拷贝到线上flink的lib文件夹下,并将项目的pom的依赖scope改为provided,打包上传,运行成功,没有再次报错。
其实flink的一些依赖的scope也是provided,因为我改为compiled后,会报错,我是借用了flink example里的jar的风格。example里的jar就是只有文件,没有打进去外部依赖jar

问题一:
为什么不将依赖包打入生成的fat jar里呢
答:试过了,不管用,还是会报错,按理说应该是本包的依赖包优先。不知道为什么不生效

梳理下整体思路:
1.首先我是将所有的flink相关依赖和其他日志,json等等依赖打成 fat jar,上传线上运行后,报错:Call From hadoop000001.com/10.210.88.99 to 0.0.0.0:8030 .hadoop000001.com/10.210.88.99是yarn集群 某一个节点,是jobmanager所有的节点ip,8030是yarn的端口,不知道为什么会一直连0.0.0.0,网上找原因说是没有指定正确的yarn resourceManager地址后才会连接0.0.0.0,但是日志中确实显示是我指定的正确的resourceManager地址,不知道问题出在哪,解决啦好几天也没成功,先过,以后有机会再解决
2.然后我想通过fllink/examples/streaming/SocketWindowWordCount.jar 测试下,是我本身代码的问题,还是线上flink环境的问题,结果测试通过,正常运行, 由此怀疑是我自己项目的问题
3.重新建立个干净的项目,pom中加入flink的依赖,scope设置为provided,打包上传运行后报错,缺少link-shaded-guava,然后pom中加入link-shaded-guava依赖,并单独将此依赖的scope设置为compile,再次打包上传运行,就开始报Caused by: java.lang.ClassCastException: cannot assign instance of org.apache.kafka.clients.consumer.OffsetResetStrategy to field org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer.offsetResetStrategy of type org.apache.kafka.clients.consumer.OffsetResetStrategy in instance of org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer
4.然后我将flink-connector-kafka_2.11的依赖scope改为compile,flink-connector-kafka_2.11本身包含kafka-clients-3.0.2.jar打包上传还是不行,还是报java.lang.ClassCastException。
5:本地代码依赖看了后,没发现问题,怀疑是线上环境影响了,将flink-connector-kafka_2.11的scope改为provided,并将flink-connector-kafka_2.11-1.14.6.jar和kafka-clients-3.0.2.jar上传到flink/lib/下,打包上传,运行成功

标签:java,ObjectInputStream,flink,yarn,kafka,集群,apache,org
From: https://www.cnblogs.com/datadevelop/p/18283975

相关文章

  • VMware vSphere Tanzu部署_13_创建TKC集群
    1.登录tanzu集群登录语法为:kubectlvspherelogin--server=--vsphere-username--insecure-skip-tls-verify$kubectlvspherelogin--server=192.168.203.194--vsphere-usernameadministrator@vsphere.local--insecure-skip-tls-verify登录示例jianhua@napp:~$k......
  • K8S学习教程(二):在 PetaExpress KubeSphere容器平台部署高可用 Redis 集群
    前言Redis是在开发过程中经常用到的缓存中间件,为了考虑在生产环境中稳定性和高可用,Redis通常采用集群模式的部署方式。在制定Redis集群的部署策略时,常规部署在虚拟机上的方式配置繁琐并且需要手动重启节点,相较之下,使用PetaExpress提供的Kubernetes(k8s)服务进行Redis集......
  • Apache Hadoop完全分布式集群搭建指南
    Hadoop发行版本较多,Cloudera版本(Cloudera’sDistributionIncludingApacheHadoop,简称CDH)收费版本通常用于生产环境,这里用开源免费的ApacheHadoop原始版本。下载:ApacheHadoop版本下载:Indexof/hadoop/commonHadoop基础知识可查看本专栏其它篇章:ApacheHadoop的核心组成......
  • 2.Kubernetes集群架构与组件
    一、Kubernetes组件       1.1控制面板组件(都是在master上面的)               kube-apiserver:对节点以及任务处理的一个相关接口(所有的调用都要经过这个组件调用)               kube-controller-manager:控制器管理器,管理各个类型的......
  • npm/yarn/cnpm 淘宝镜像配置,包版本管理
    一、包管理命令安装为啥要安装这么多,有些情况会安装失败,npm安装最好设置外网代理优先级推荐(个人喜好)cnpm>yarn>pnpm>npm#yarn安装npmiyarn-gyarnconfigsetregistryhttp://registry.npm.taobao.org/#cnpm安装npmicnpm-gcnpmconfigsetregistryht......
  • NoSQL 之 Redis 集群部署
    前言:(1)主从复制:主从复制是高可用Redis的基础,哨兵和集群都是在主从复制基础上实现高可用的。主从复制主要实现了数据的多机备份,以及对于读操作的负载均衡和简单的故障恢复。缺陷:故障恢复无法自动化;写操作无法负载均衡;存储能力受到单机的限制。(2)哨兵:在主从复制的基础上,哨兵实......
  • (必看图文)Hadoop集群安装及MapReduce应用(手把手详解版)
    前言    随着大数据时代的到来,处理和分析海量数据已成为企业和科研机构不可或缺的能力。Hadoop,作为开源的分布式计算平台,因其强大的数据处理能力和良好的可扩展性,成为大数据处理领域的佼佼者。本图文教程旨在帮助读者理解Hadoop集群的安装过程,并通过MapReduce应用实例,......
  • 编译安装Kubernetes 1.29 高可用集群(6)--Cilium网络组件和CoreDNS配置
    1.部署Cilium网络组件1.1在k8s-master节点上,下载安装helmwgethttps://mirrors.huaweicloud.com/helm/v3.15.2/helm-v3.15.2-linux-amd64.tar.gztar-zxvfhelm-v3.15.2-linux-amd64.tar.gzcplinux-amd64/helm/usr/bin/#helmversionversion.BuildInfo{Version:"v3.1......
  • Flink 窗口触发器(Trigger)(一)
    Flink的窗口触发器(Trigger)是流处理中一个非常关键的概念,它定义了窗口何时被触发并决定触发后的行为(如进行窗口数据的计算或清理)。一、基本概念定义:触发器决定了窗口何时被触发以及触发后的行为。在Flink中,窗口的触发是通过设置定时器来实现的。作用:控制窗口数据的聚合时机......
  • 穿梭在Yarn的代理配置迷宫:全面指南
    ......