首页 > 其他分享 >Spring Kafka AckMode介绍

Spring Kafka AckMode介绍

时间:2024-02-22 11:12:15浏览次数:34  
标签:调用 Spring MANUAL AckMode 偏移量 Kafka record IMMEDIATE 提交

 


原文链接:https://blog.csdn.net/qq1309664161/article/details/116994341

一:AckMode介绍

kafka消费端在读取数据后,会向Kafka服务端提交偏移量,来记录消费端读取数据的位置。

提交偏移量分为手动提交和自动提交,为了保证数据读取的安全性,我们一般设置成手动提交偏移量。在Springboot集成Kafka后,Springboot为我们提供了AckMode枚举,供我们选择。AckMode有以下几种模式(AckMode在ContainerProperties类的内部定义):

 自动提交设置:

RECORD: 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交;

BATCH(默认):当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交,频率取决于每次poll的调用频率;

TIME:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交,它并不是一到就马上提交,如果此时正在消费某一条消息,需要等这条消息被消费完成,才能提交消费进度;

COUNT:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时自动提交,它并不是一到就马上提交,如果此时正在消费某一条消息,需要等这条消息被消费完成,才能提交消费进度;

COUNT_TIME:TIME和COUNT的结合体,满足任一都会自动提交;

手动提交设置:

MANUAL:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交;

MANUAL_IMMEDIATE:手动调用Acknowledgment.acknowledge()后立即提交;

由此可知,在设置为手动提交时,我们需要设置MANUAL或MANUAL_IMMEDIATE。

二:MANUAL和MANUAL_IMMEDIATE的区别

MANUAL是在批量处理完多条数据后,提交一次偏移量,而MANUAL_IMMEDIATE是随时调用ack.acknowledge()方法随时提交偏移量,它可以实现每条数据提交一次偏移量。

从字面意思上来看,我们可以看出,MANUAL的效率要比MANUAL_IMMEDIATE高。下面,我们通过数据,来体验一下效果。
首先,我们先通过生产者往Topic为demos的主题下存放10W条数据。此处代码忽略,可用多线程创建多个生产者发送数据。
然后,demos主题里已经有10W条数据了,我们创建一个名为demo的消费者群组,来消费demos主题下的这10W条数据,看多长时间可以消费完。
(demos主题分成了64个分区,消费者也用64个消费者去消费。kafka集群有3个broker,但是3个broker都在同一台服务器上。)

下面,我们先测试MANUAL_IMMEDIATE模式下的消费时间,消费者Listener的代码如下:

 

启动项目,查看输出日志,大概需要15秒,能把10W调数据消费完。而采用MANUAL模式,4~5秒就能处理完成。
由此可以知道,使用MANUAL方式进行批量提交,效率要比MANUAL_IMMEDIATE高出很多。

三、MANUAL与MANUAL_IMMEDIATE源码解析
下面我们从源码的角度,分析一下这两种模式是怎么实现的,同样的调用ack.acknowledge()方法,为什么一个是立马提交,一个是批量提交。
ConsumerAcknowledgment类是KafkaMessageListenerContainer类的内部类,查看其acknowledge方法,里面调用了processAck方法,我们看processAck方法的源码:

 可以看到,如果采用的MANUAL_IMMEDIATE模式,调用的ackImmediate方法,立马提交这条数据的偏移量。如果采用MANUAL模式,调用addOffset方法。我们看addOffset方法的源码:

private void addOffset(ConsumerRecord<K, V> record) {
this.offsets.computeIfAbsent(record.topic(), v -> new ConcurrentHashMap<>())
.compute(record.partition(), (k, v) -> v == null ? record.offset() : Math.max(v, record.offset()));
}

这里,将record的偏移量存入了Map集合中,offsets是一个Map集合:

 所以,采用MANUAL模式,调用acknowledge方法时,是将record的偏移量存入了map集合,然后存到一定数量后,再进行批量提交。

标签:调用,Spring,MANUAL,AckMode,偏移量,Kafka,record,IMMEDIATE,提交
From: https://www.cnblogs.com/andy1234/p/18026891

相关文章

  • 华为二面:SpringBoot读取配置文件的原理是什么?加载顺序是什么?
    引言SpringBoot以其简化的配置和强大的开箱即用功能而备受欢迎,而配置文件的加载是SpringBoot应用启动过程中的关键步骤之一。深入理解SpringBoot启动时如何加载配置文件的源码,有助于开发者更好地理解其内部工作原理,提高配置管理的灵活性和可维护性。本文将从源码入手,解读Sprin......
  • Kafka监控系统Kafka Eagle
    kafka集群部署完成后需要有一个可视化web页面,便于实时查看和观测kafka集群状态,kafka本身并没有提供可视化页面,但市面上有很多开源的可视化工具,我们以其中的KafkaEagle为例,在安装KafkaEagle之前,至少需要安装JDK、kafka、zookeeper的环境后,再进行后续操作。本文的前置条件:Kafka......
  • 接私活利器!推荐一个基于SpringBoot3的后台管理框架
    大家好,我是Java陈序员。今天,给大家推荐一个后台管理框架,适合二次定制开发、接私活、源码学习等场景。关注微信公众号:【Java陈序员】,获取开源项目分享、AI副业分享、超200本经典计算机电子书籍等。项目介绍Admin3——一个轻巧的后台管理框架,项目后端基于Java17、SpringBo......
  • 1 Spring5 自定义标签开发
    spring5 自定义脚本开发步骤1 定义bean,publicclassUser{privateStringid;privateStringuserName;privateStringemail;privateStringpassword;publicStringgetId(){returnid;}publicvoidsetId(St......
  • idea创建spring项目的时候只有java 21和17
    1.问题我们在用IDEA创建一个spring项目时,发现java版本只能选用java21,java17,导致我们的jdk版本无法选择jdk1.8(我最常用的版本)2.解决参考:idea创建项目的时候只有java21和17原因是spring2在23年11月24日停止维护了,所以通过spring来创建,没有spring2,只有spring3+,最低jdk版本也是1......
  • springboot jar 快速启停脚本
    Window启动@echooffstartjavaw-jar-Dfile.encoding=UTF-8-Dlogging.config=D:\deploytest\logback-spring.xmlD:\deploytest\gateway-server-1.0.0.jarecho"gatewaysuccess----"startjavaw-jar-Dfile.encoding=UTF-8-Dlogging.config=D:\de......
  • Java人力资源管理系统源码(含数据库)-springboot+vue+mysql
    EHR人力资源管理系统是一种利用现代技术,如云计算、大数据等,来实现企业人力资源信息电子化、流程自动化的系统。它覆盖了人力资源管理的各个方面,从招聘、考勤、绩效到薪酬、社保公积金等,为企业提供一站式的解决方案。​1.招聘管理:-职位发布:系统支持在线发布职位信息,吸引候选人......
  • Kafka-批量启动和停止Kafka节点脚本
    1、编辑脚本#!/bin/bashzookeeper_home=/usr/local/zookeeperkafka_home=/usr/local/kafkazookeeper_array=(hadoop01hadoop02hadoop03)kafka_array=(hadoop01hadoop02hadoop03)##启动和停止Zookeeperfunctionzookeeper_operate(){zookeeper_operate=$@......
  • 02-21 记SpringBoot3 打包成exe的过程以及一些问题
    先说说基本流程:(Windows环境,springbootv3.2.1)1.首先mavenpom.xml中加入如下代码<build><plugins><plugin><groupId>org.graalvm.buildtools</groupId><artifactId>native-maven-plugi......
  • SpringBoot整合Quartz实现动态定时任务
    1、增加依赖<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-quartz</artifactId></dependency><!--json工具--><dependency><groupId>com.alibaba</gro......