首页 > 其他分享 >Kafka未触发消费异常排查实录

Kafka未触发消费异常排查实录

时间:2023-05-20 16:11:50浏览次数:41  
标签:消费 ConsumerRecords 实录 Kafka 问题 排查 消息 poll

前言:

    最近生产环境系统发现一个疑难杂症,看了很久的问题但是始终无法定位到问题并处理,然后查阅了相关资料也是定位不到问题,不过资料查阅却给了个新的思路,以此为跳板最终解决了问题。

一、问题描述

    功能介绍: “主计划拆分子计划”是APS系统很常见的功能,功能大概意思是用户可选多个主计划一次性进行“展开子计划”生成子计划,因单个主计划生成子计划的逻辑相对复杂,所以单个计划耗时不能算低,故这里的批量操作使用了异步进行,这里使用了Kafka进行生产及消费消息。

    问题起因: 功能完成之后上生产系统,然而偶尔会收到客户提出少量单据卡在中间状态,导致“展开”不了的问题,前前后后查了好久也没能找到具体问题并解决。

二、问题分析

    分析数据: 通过查看用户提供的单据,发现这些数据都是卡在了某一个中间状态,这个状态是作为中转状态使用的,一开始计划的状态为“未展开”,点击执行“展开子计划”的功能之后,将计划标识为“展开中”之后再推送到Kafka消费处理,Kafka消费者接收到生产者的消息之后,将计划进行处理,处理完成之后再将状态标识为“展开完成”。

    所以从数据上分析,问题点应该出现在消息生产到消费过程这段期间,但是纵观代码,发现已经对业务逻辑做了异常处理,如果是消息消费过程发生异常,都会将错误过程记录下来,所以再次定位到问题出现在Kafka的生产消息及消费消息这个过程。

    查看日志: 根据以上分析结合日志监控的方式,确定问题数据实际上并未进行消费,所以猜想有两种情况:

  1. Kafka根本没有生产消息成功;
  2. 生产消息成功,但是Kafka未Poll到需要消费的消息。

三、进展

  1. 加addCallback回调方法
    • 生产端的kafkaTemplate对象中,封装发送消息的方法,将send()方法封装为通用方法,增加addCallback()回调方法,用于消息生产成功之后回调记录日志。以此确认生产消息是否成功.

image.png
2. 参考相关资料:

    查问题过程中,看到大佬写的文章,文章里描述了造成消费不成功的问题是因为“Kafka 内存饱和”造成的,但是实际上内存饱和造成的问题是Kafka消费服务Poll消息时候超时,相应的错误信息在我们系统日志中搜不到,最终也确认不是因为改原因造成的问题。(文章下文参考)

  1. 研究kafka消费原理

    当前确认问题点应该是出现在消费端消费不了消息导致的,那么重新研究一下Kafka消费端的实现原理。

    消费者是通过KafkaConsumer对象的poll方法从Kafka队列中将消息拉取出来进行消费,这个poll方法可传入poll超时时间,超过设置的时间则会报拉取超时的异常“due to consumer poll timeout has expired.”,上文中大佬出现的报错就是提示这种拉取超时的报错,超时时间可通过配置节点【max.poll.interval.ms】进行配置;

    KafkaConsumer对象poll到数据之后取到ConsumerRecords对象,然后就可以对数据进行消费,直到取到的ConsumerRecords对象是空的(isEmpty()为true)才停止消费。

    这里发现有一个隐患的地方,当ConsumerRecords对象取到空数据才停止消费,那么这个ConsumerRecords对象是否会取到多个数据进行消费,是如何进行消费的?!

    查阅相关资料,发现Kafka的消费原理是:KafkaConsumer 对象是实时拉取消息的,但不是实时消费消息的。KafkaConsumer 在 poll() 方法中从 Kafka 集群中批量拉取数据,将多个消息封装在 ConsumerRecords 对象中返回。这些消息可以在消费者应用程序的时间间隔内处理,但poll() 方法返回的消息不是立即消费的。只有在 ConsumerRecords 中的所有消息都被处理后,才会发送下一个拉取请求。如果在处理消息时发生错误,可以根据实际需要重新处理这些消息或跳过这些消息。

    反观项目代码,发现在KafkaListener监听器拉取到数据之后,项目中仅仅只是取第一条数据进行消费,这里是不是有问题呢?(参考下面代码块)

    /**
     * 计划发布SAP-KTWKZ
     *
     * @param records
     * @param consumer
     */
    @KafkaListener(topics = {KafkaTopicConst.SCHEDULE_DAY_PLAN_RELEASE_SAP_KTWKZ_TOPIC},
            id = KafkaTopicConst.SCHEDULE_DAY_PLAN_RELEASE_SAP_KTWKZ_TOPIC, containerFactory = "batchFactory")
    public void dayPlanReleaseToSapKTWKZTopic(ConsumerRecords<String, String> records, Consumer<String, String> consumer) {
        String data = ApsKafkaUtils.getFirstRecordValues();
        log.info("mq消费-KT万颗子计划发布SAP数据:\n{}", data);
        try {
            SystemPostDTO systemPostDTO = JsonUtils.fromJson(data, SystemPostDTO.class);
            String postContent = systemPostDTO.getData();
            if (StrUtil.isEmpty(postContent)) {
                log.warn("mq消费-KT万颗子计划发布SAP数据异常警告,systemPostDTO.data数据不能为空");
                return;
            }
            pushRecordService.consumePushProductionOrderKtWKZ(postContent);
        } catch (Exception e) {
            log.error("mq消费-KT万颗子计划发布SAP数据异常:", e);
        }
    }

 

四、问题重现及

重现:

    从第三点的分析中,暂时确定可能是ConsumerRecords对象接收到多条消息,但是消费端仅仅消费了第一条消息导致的问题,那么通过写demo来测试批量生产消息是否会导致ConsumerRecords一次性拉取到多条消息。

生产:

    @ApiOperation("测试Kafka poll消息机制")
    @PostMapping("/v99/schedule/kafka/test")
    public ResponseEntity<String> testKafkaPollMessage(
            @RequestParam("testData") String testData) {
        String topic = "my-test-topic";
        for (int i = 0; i < 100; i++) {
            Thread.sleep(0);//参数有0,10,100
            SubassemblyOpenPlanListInfoRespDTO dto = kafkaDemo.sendMessage(testData);
        }
        return Results.success("Success");
    }

 

消费:

    @KafkaListener(topics = {"my-test-topic"},
            id = "my-test-topic", containerFactory = "batchFactory")
    public void myTestKafka(ConsumerRecords<String, String> records, Consumer<String, String> consumer) {
        List<String> recordValues = ApsKafkaUtils.getRecordValues();
        try {
            log.info("接收到的数据为【{}】{}", recordValues.size(), JsonUtils.toJson(recordValues));
        } catch (Exception e) {
            log.error("接收到的数据为异常:", e);
        }
    }

 

    以上接口,通过调用发现确实出现ConsumerRecords对象poll到多条消息的情况:

image.png

    其中,for循环中执行等待时间越长,出现一个ConsumerRecords对象拉取到多条数据的情况越少:

image.png

image.png

    那么分析为什么在实际使用过程中,【主计划拆分子计划】这个功能是偶然出现消费失败的问题,而不是稳定出现呢?

    再次通过代码ReView的方式去回顾一下这个功能,发现当前代码中,是使用了for循环将一批次计划单循环推送给MQ进行消费,单个循环里执行了一次读库一次写库的操作,一次循环耗时大概几十毫秒,与上述demo的Thread.sleep(10)场景类似,所以基本确定偶发这种问题的原因出现在这里。

解决:

    其实解决该问题很简单,只需要在消费端获取到ConsumerRecords对象之后,将拉取到的所有消息列表循环消费而不是只消费单条消息即可,之前的仅消费单条消息的场景经过沟通确认只存在某些特殊场景才需要使用,暂时不再保证该种场景。
image.png

五、总结

    本案例中,通过日志、业务场景、写Demo使用并发工具等方式来分析及重现问题,将一个生产上的疑难杂症处理掉,其中也通过参考大佬的文章,虽然问题描述和大佬描述的基本一致,也和网络上的Blog描述一致,但是产生的问题却并不一样。

    总的来说,其实解决问题不难,重要的是要了解问题,了解原理以及了解到解决问题的步骤,建议从多个方面一起查看问题。从其他参考文章描述中,可以从业务、日志、内存环境等查看问题,我这里补充一点,也可以多多结合业务来适当写demo去测试问题,可能也会有意外收获。

其他

    大家有没有遇到其他的生产上的疑难杂症呢,大家都是怎么遇到问题,最后怎么解决问题呢,这里大家不妨进行讨论,也可以列出多多的跟进方案或者工具,大家一起学习进步。

   

标签:消费,ConsumerRecords,实录,Kafka,问题,排查,消息,poll
From: https://www.cnblogs.com/evlinjun/p/17417352.html

相关文章

  • Kafka简单使用
    启动zookeeper>bin/zookeeper-server-start.sh-daemonconfig/zookeeper.properties启动KafkaServer>bin/kafka-server-start.shconfig/server.propertiesTopic相关创建Topic新版命令:>bin/kafka-topics.sh--create--bootstrap-serverlocalhost:9092--replicati......
  • 用好kafka,你不得不知的那些工具
    前言工欲善其事,必先利其器。本文主要分享一下消息中间件kafka安装部署的过程,以及我平时在工作中针对kafka用的一些客户端工具和监控工具。kafka部署架构一个kafka集群由多个kafkabroker组成,每个broker将自己的元数据信息注册到zookeeper中,通过zookeeper关联形成一个集群。pr......
  • MS SQL Server 排查阻塞和查找被锁语句
    --方法1SELECT'资源类型'=t1.resource_type,'来源数据库'=CONVERT(CHAR(25),DB_NAME(resource_database_id)),'数据库中与资源相关联的实体的ID'=t1.resource_associated_entity_id,'锁模式'=t1.request_mode, --锁的模式:S-共享锁,U-更新锁,X-排他锁,IS/IU/IX-意向......
  • 记一次排查:接口返回值写入excel后,从单元格copy出来的数据会带有多重引号的问题
    在项目里刚好有3个服务,同一个网关内层的3个服务,两个php的,一个golang的,为了提高负载以及进行分流,部分客户的接口调用会被网关自动分配到go服务。恰好为了测试,我写了一个全量用户的生产、测试环境调用接口返回结果进行对比的脚本,于是发现了题中的问题:两个php服务里的接口返回值写入......
  • 七、部署Kafka
    服务和组件Kafka1.0.0、Zookeeper3.4.8解压:tar-zxvfkafka1.0.0.tgz重命名:mvkafka_2.11-1.0.0kafka切换目录:cd/usr/local/src/kafka/config配置文档:vimserver.properties(21、123)broker.id=0zookeeper.connect=master,slave1,slave2分发:scp-r/usr/local/src/......
  • kafka设置多代理集群
    到目前为止,我们一直在使用单个代理,这并不好玩。对Kafka来说,单个代理只是一个大小为一的集群,除了启动更多的代理实例外,没有什么变化。为了深入了解它,让我们把集群扩展到三个节点(仍然在本地机器上)。首先,为每个代理创建一个配置文件(在Windows上使用copy 命令来代替):12>cpconfig......
  • Kafka-0.10源码解读
    前言其实干程序员这么些年来,一直有一个愿望想写本书,但是一直没实现,一开始是想着是因为自己没时间,后来想想也不是说没时间,可能还是感觉水平有限。为了给写书做准备,最近打算写一个专栏,专栏的要求就稍微低一些,能坚持写完的概率也就会大一些。所以最近挑了从Kafka的源码角度去写一个专......
  • Linux安装Kafka
    1.Kafka简介Kafka也是开源与Apache开源基金会的项目,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统。在百度百科是这样介绍的:Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处......
  • 8张图带你全面了解kafka的核心机制
    前言kafka是目前企业中很常用的消息队列产品,可以用于削峰、解耦、异步通信。特别是在大数据领域中应用尤为广泛,主要得益于它的高吞吐量、低延迟,在我们公司的解决方案中也有用到。既然kafka在企业中如此重要,那么本文就通过几张图带大家全面认识一下kafka,现在我们不妨带入kafka设计......
  • Linux 排查
    一、排查CPU使用率过高1.找出耗CPU的进程使用top命令查看内存、cpu及各进程信息2.找出耗cpu的线程情况top-H-p[进程id]psH-eouser,pid,ppid,tid,time,%cpu,cmd--sort=%cpu|grep[进程id]3.定位线程堆栈信息,找到异常代码printf"%x\n"[线程id]将线程id转换为......