首页 > 其他分享 >Flink消费kafka数据同步问题排查

Flink消费kafka数据同步问题排查

时间:2023-10-31 17:45:16浏览次数:40  
标签:pre 分区 Flink server 排查 offset kafka pro

Flink消费kafka数据同步问题排查 https://mp.weixin.qq.com/s/EZbCKHBI_JrsF0yJndhe8Q

Flink消费kafka数据同步问题排查

原创 任天兵 哈啰技术 2023-10-31 17:31 发表于上海  

我们有一个flink任务,消费的kafka的数据,写入到es,非常简单的逻辑,但是出现了数据丢失的情况,之前没遇到过,初步猜想是转换逻辑或脏数据的影响,排查了一圈,未发现Exception等相关信息。猜想是写入频率太快,es写入的时候,出现了version conflict,也没找到相关证据。

 

从埋点日志来看,这个消息根本没进入到flink内部,因此也就排除了是转换逻辑出错或者是es写入失败导致。

 

按照猜想,业务能感知到数据同步出现了问题,大概率是丢失了大量的数据,而且根本没进入到flink内部,因此怀疑是单分区不消费的情况,之前因为线程hang死也发生过类似的情况。

 

去统计了一下消费分区,还真发现了问题,我们kafka有5个分区,统计出来的只有3个分区在消费。

 

图片

 

同时我们用同一个程序,起了另外一个任务,发现了消费分区是0-4正常的5个,问题具有随机性,时好时坏?

 

图片

 

发给更多人一起排查,被同事一眼看到bootstrap.server里不仅配置了pro集群,还混入了pre集群地址,bootstrap.servers=[business-s2-002-kafka1.xxxx.com.cn:9092,business-s2-002-kafka2.xxxx.com.cn:9092,business-s2-002-kafka3.xxxx.com.cn:9092,pre-kafka1.xxxx.com.cn:9092,pre-kafka2.xxxx.com.cn:9092,pre-kafka3.xxxx.com.cn:9092],不知道是不是这个问题导致的,因此要从源代码层面寻找一些证据和支持。

 

按照网上谷歌出来的理论讲解,kafka的消费者在连接kafka server的时候,先选bootstrap.server的第一个地址,如果第一个连不上,再连第二个,实际上看起来并非如此...

 

消费组是sp2_group_name_G_2023_10_08_16_58_03,topic是test_topic_A,在pro的kafka集群里有0-4共计5个分区,而在pre集群里有一个同样的topic,他有0-2共3个分区。

 

关键性的日志如下:

 

图片

 

对应到FlinkKafkaConsumerBase.java的源代码,查找线索,主要看open和run两个函数,大概搞清楚了整个流程。

 

程序的第一步,根据时间戳到kafka的server读取到每个分区的offset,作为初始的offset传给kafka,调用的是如下方法。

 

图片

 

在open方法里,因为我们的场景是指定了offset,并且第一次启动的时候,没有状态,因此打日志的地方是这段代码 : (因为这里的TIMESTAMP模式有bug,因此没有直接用这个)。

 

图片

 

LOG.info("Consumer subtask {} will start reading the following {} partitions from the specified startup offsets {}: {}", new Object[]{this.getRuntimeContext().getIndexOfThisSubtask(), this.subscribedPartitionsToStartOffsets.size(), this.specificStartupOffsets, this.subscribedPartitionsToStartOffsets.keySet()});

这里面有几个关键变量subscribedPartitionsToStartOffsets和specificStartupOffsets,因此追寻这2个变量,specificStartupOffsets这个是我们上面的方法传入并且设置的指定offset逻辑,而subscribedPartitionsToStartOffsets则非常关键,他来源于this.partitionDiscoverer.discoverPartitions(),分区发现。

List<KafkaTopicPartition> allPartitions = this.partitionDiscoverer.discoverPartitions();

 

现在我们把整个逻辑串一下。

 

第一步,根据时间戳读取offset,我们从日志里看到,他读到了0-4个分区,并且也读到了他们的offset。

 

第二步,每一个消费线程(subtask 0-3),通过discoverPartitions去读取自己分配到的分区(服务端的协调节点负责分配,有RR等多种算法,网上讲解很多)。

 

我们发现,最后分配到的情况如下:

subtask0 => partition3,

subtask1 => partition0,

subtask2 => partition1,

subtask3 => partition2

 

4分区不见了?是为什么呢?由于去kafka server拿分配分区的时候,每个subtask发送1次请求,连接的server则是从bootstrap.server里随机选取的,因此可能连上pro的kafka集群(因此分配到了分区3),也可能连到了pre集群,因此可能被分配分区0-2(当然也不排除是pro的),这里就非常混乱,你搞不清楚这次subtask请求的到底是pro集群的还是pre集群的,因此领的分区,也不知道是来自哪里,不过可以肯定的是3分区是来自pro,至于4分区没有的原因,可能剩下的subtask都连的是pre集群?(猜想)

 

还有个疑问,这里连上了0-3分内,为什么日志里统计出来,只有0,2,3,其中的1分区一条数据也没有?

 

图片

 

这就涉及第三步。

 

第三步,去拿数据。flink是在内部建了一个createFetcher,直接连接server去做数据拉取。这里也涉及到建立连接,如果这个时候你用的是pro的offset,连接的是pre集群去拉取数据,由于这个offset很大(在pre里根本不存在),是不是就拉不到数据了呢?毕竟pre集群的数据少offset很小,而pro的offset很大。

 

图片

 

由于没有去看拿时间戳、discoverPartitions和createFetcher建立连接的代码,因此对于bootstrap.server的随机选择只是猜想,因此用另外一个任务去证明他。

 

我们新建了第二个任务,相同的bootstrap.server的配置,日志如下:

 

图片

 

从日志中,我们看到拿offset的地方,只拿到了0-2三个分区,也就是拿offset连接的是pre(具有随机性,并且确认bootstrap.server里的机器都是活的),因此第一步请求的server肯定是随机选择的。

 

而在discoverPartitions的时候第二步的结果是:

subtask0 => [partition1,partition4]

subtask1 => [partition2]

subtask2 => [partition3,partition0]

subtask3 => 日志里没出现???

 

所以这里的discoverPartitions又连接的是pro集群的kakfa。

 

整个代码、日志和现象都对上了,因此也就确认了就是server的配置问题,由于有些源码没有细看,因此可能会有一些错误,供参考。

 

把bootstrap.server配置正确之后,分配的分区再也没有混乱了。

图片

 

 

 

翻译

搜索

复制

标签:pre,分区,Flink,server,排查,offset,kafka,pro
From: https://www.cnblogs.com/papering/p/17800829.html

相关文章

  • Kafka-生产者性能调优
    (一)参数调优参数调优相关代码在实际的kafka开发中,我们会发现,无论是生产者还是消费者,都需要构建一个Properties对象,里面设置了很多参数。在这段代码中有很多常用的参数配置,在线上使用时,我们要根据实际的数据量和数据大小来决定这些配置的具体值。Propertiesprops=newProperti......
  • kafka复习:(11)auto.offset.reset的默认值
    在ConsumerConfig这个类中定义了这个属性的默认值,如下图也就是默认值为latest,它的含义是:如果没有客户端提交过offset的话,当新的客户端消费时,把最新的offset设置为当前消费的offset.默认是自动提交位移的,每5秒进行一次提交。可以通过参数配置手动提交。手动提交offset的示例import......
  • kafka复习:(10)按分区获取ConsumerRecord
    packagecom.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka......
  • kafka复习:(8)消费某个主题指定分区的消息
    packagecom.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka......
  • App支付报错"商家订单参数异常,请重新发起付款"排查流程
     今天在对接支付宝APP支付的时候遇到了一个报错,记录下问题的排查过程~  报错过程APP中弹窗提示的报错“商家订单参数异常,请重新发起付款”,检查了下参数感觉没啥问题,不知道是啥问题导致的。 去官网搜了下,折腾排查了一遍,发现是环境问题,没有切到沙箱环境导致的(*/......
  • Kafka-生产者、broker、消费者的调优参数总结
     生产环境下,为了尽可能提升Kafka的整体吞吐量,可以对Kafka的相关配置参数进行调整,以达到提升整体性能的目的。本文主要从Kafka的不同组件出发,讲解各组件涉及的配置参数和参数含义。一、生产者(producer.properties或者代码中)1、acks:Producer需要Leader确认的Producer请求的应答......
  • 【技术分享】Amazon RDS MySQL常见故障分析和排查
    在亚马逊云的RDS中支持几乎主流的数据库,对于亚马逊云中的数据库的问题排查对于我们的日常业务的稳定运行会很有帮助。本篇将对于Amazon RDSMySQL常见故障问题的分析和排查办法。RDS提供了强大的适配工作负载功能对于RDS的责任共担RDS常见连接问题Check:√客户端IP地址是否在D......
  • 28、Flink 的SQL之DROP 、ALTER 、INSERT 、ANALYZE 语句
    文章目录Flink系列文章一、DROP1、DROPCATALOG2、DROPDATABASE3、DROPTABLE4、DROPVIEW5、DROPFUNCTION6、droptable示例二、alter1、ALTERDATABASE2、ALTERTABLE1)、建表2)、ADD1、增加单列示例2、增加watermark列3)、MODIFY1、修改列2、修改水印4)、DROP5)、RENAME6)、SET7)、......
  • Oracle系列---【数据库连接数超了,导致数据库连接不上,如何排查当前连接数,以及如何修改
    数据库连接数超了,导致数据库连接不上,如何排查当前连接数,以及如何修改最大连接数限制?1.对比当前连接数和最大连接数如果差的比较少,比如相差十几,二十几,连不上的话,很有可能是用连接池连接,一次申请连接数大于剩余的连接数导致的。#查看当前总连接数SELECTcount(*)FROMV$session......
  • k8s1.26.5 安装 flink1.17.1
    标签(空格分隔):kubernetes系列一:系统环境介绍系统:centos7.9x64k8s集群版本:k8s1.26.5采用kubesphere做页面caclico版本:calicov3.26.1containerd版本:containerd://1.6.24hadoop版本:hadoop3.3.6helm版本:helm3.9.0二:编译得到fl......