flink版本1.14.6
在通过./bin/flink run-application -t yarn-application 模式提交到yarn时失败。
报错信息:
点击查看代码
Caused by: java.lang.ClassCastException: cannot assign instance of org.apache.kafka.clients.consumer.OffsetResetStrategy to field org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer.offsetResetStrategy of type org.apache.kafka.clients.consumer.OffsetResetStrategy in instance of org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2287)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1417)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2293)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
at org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:67)
at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:431)
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:211)
堆栈信息都是底层源码,能力有限,看不出来啥问题
但是根据cause by报错提示可以看出来org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer类中的字段offsetResetStrategy不能强转为org.apache.kafka.clients.consumer.OffsetResetStrategy
所以我直接在本地代码中找到org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer,发现字段offsetResetStrategy的类型确实是org.apache.kafka.clients.consumer.OffsetResetStrategy,而且我的代码在本地能运行,怀疑是代码上传到线上环境后,jar引用关系混乱,没有找到我需要的版本的jar包
在本地代码中,可以发现:org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer属于flink-connector-kafka_2.11-1.14.6.jar
org.apache.kafka.clients.consumer.OffsetResetStrategy属于kafka-clients-3.0.2.jar
于是我将这两个包拷贝到线上flink的lib文件夹下,并将项目的pom的依赖scope改为provided,打包上传,运行成功,没有再次报错。
其实flink的一些依赖的scope也是provided,因为我改为compiled后,会报错,我是借用了flink example里的jar的风格。example里的jar就是只有文件,没有打进去外部依赖jar
问题一:
为什么不将依赖包打入生成的fat jar里呢
答:试过了,不管用,还是会报错,按理说应该是本包的依赖包优先。不知道为什么不生效
梳理下整体思路:
1.首先我是将所有的flink相关依赖和其他日志,json等等依赖打成 fat jar,上传线上运行后,报错:Call From hadoop000001.com/10.210.88.99 to 0.0.0.0:8030 .hadoop000001.com/10.210.88.99是yarn集群 某一个节点,是jobmanager所有的节点ip,8030是yarn的端口,不知道为什么会一直连0.0.0.0,网上找原因说是没有指定正确的yarn resourceManager地址后才会连接0.0.0.0,但是日志中确实显示是我指定的正确的resourceManager地址,不知道问题出在哪,解决啦好几天也没成功,先过,以后有机会再解决
2.然后我想通过fllink/examples/streaming/SocketWindowWordCount.jar 测试下,是我本身代码的问题,还是线上flink环境的问题,结果测试通过,正常运行, 由此怀疑是我自己项目的问题
3.重新建立个干净的项目,pom中加入flink的依赖,scope设置为provided,打包上传运行后报错,缺少link-shaded-guava,然后pom中加入link-shaded-guava依赖,并单独将此依赖的scope设置为compile,再次打包上传运行,就开始报Caused by: java.lang.ClassCastException: cannot assign instance of org.apache.kafka.clients.consumer.OffsetResetStrategy to field org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer.offsetResetStrategy of type org.apache.kafka.clients.consumer.OffsetResetStrategy in instance of org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer
4.然后我将flink-connector-kafka_2.11的依赖scope改为compile,flink-connector-kafka_2.11本身包含kafka-clients-3.0.2.jar打包上传还是不行,还是报java.lang.ClassCastException。
5:本地代码依赖看了后,没发现问题,怀疑是线上环境影响了,将flink-connector-kafka_2.11的scope改为provided,并将flink-connector-kafka_2.11-1.14.6.jar和kafka-clients-3.0.2.jar上传到flink/lib/下,打包上传,运行成功