背景: 启动Flink的sql-client.sh,创建Kafka的source端表,然后查询Kafka的数据时报错。 报错信息: 2024-06-18 16:10:12 org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'Source: kafka_rmc_cust_analog_u[1]' (operator bc764cd8ddf7a0cff126f51c16239658). at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:600) at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:237) at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:374) at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.handleUncaughtExceptionFromAsyncCall(SourceCoordinatorContext.java:387) at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to list subscribed topic partitions due to at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.checkPartitionChanges(KafkaSourceEnumerator.java:234) at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:83) at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40) ... 7 more Caused by: java.lang.NoSuchMethodError: org.apache.kafka.clients.admin.DescribeTopicsResult.allTopicNames()Lorg/apache/kafka/common/KafkaFuture; at org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:44) at org.apache.flink.connector.kafka.source.subscriber.TopicListSubscriber.getSubscribedTopicPartitions(TopicListSubscriber.java:52) at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getSubscribedTopicPartitions(KafkaSourceEnumerator.java:219) at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80) ... 7 more 报错原因: 由于Flink在与Kafka集成时,调用了org.apache.kafka.clients.admin.DescribeTopicsResult.allTopicNames(),但是目前的Kafka的client的Jar包中没有该方法导致。也就是Flink版本与Kafka的客户端依赖包不兼容。 可以去Kafka官网下载源代码包,查看./org/apache/kafka/clients/admin这个文件下的类:DescribeTopicsResult中是否含有allTopicNames()这个方法。 解决办法: 下载含有org.apache.kafka.clients.admin.DescribeTopicsResult.allTopicNames()方法的Kafka的安装包,在lib目录中找到kafka-clients-X.X.X.jar这个包,然后上传到flink的lib目录中,重新启动集群。 下载3.5.0以上版本即可。
标签:lang,allTopicNames,java,flink,kafka,util,报错,apache,org From: https://www.cnblogs.com/yeyuzhuanjia/p/18254652