原文链接:https://blog.csdn.net/qq1309664161/article/details/116994341
一:AckMode介绍
kafka消费端在读取数据后,会向Kafka服务端提交偏移量,来记录消费端读取数据的位置。
提交偏移量分为手动提交和自动提交,为了保证数据读取的安全性,我们一般设置成手动提交偏移量。在Springboot集成Kafka后,Springboot为我们提供了AckMode枚举,供我们选择。AckMode有以下几种模式(AckMode在ContainerProperties类的内部定义):
自动提交设置:
RECORD: 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交;
BATCH(默认):当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交,频率取决于每次poll的调用频率;
TIME:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交,它并不是一到就马上提交,如果此时正在消费某一条消息,需要等这条消息被消费完成,才能提交消费进度;
COUNT:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时自动提交,它并不是一到就马上提交,如果此时正在消费某一条消息,需要等这条消息被消费完成,才能提交消费进度;
COUNT_TIME:TIME和COUNT的结合体,满足任一都会自动提交;
手动提交设置:
MANUAL:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交;
MANUAL_IMMEDIATE:手动调用Acknowledgment.acknowledge()后立即提交;
由此可知,在设置为手动提交时,我们需要设置MANUAL或MANUAL_IMMEDIATE。
二:MANUAL和MANUAL_IMMEDIATE的区别
MANUAL是在批量处理完多条数据后,提交一次偏移量,而MANUAL_IMMEDIATE是随时调用ack.acknowledge()方法随时提交偏移量,它可以实现每条数据提交一次偏移量。
从字面意思上来看,我们可以看出,MANUAL的效率要比MANUAL_IMMEDIATE高。下面,我们通过数据,来体验一下效果。
首先,我们先通过生产者往Topic为demos的主题下存放10W条数据。此处代码忽略,可用多线程创建多个生产者发送数据。
然后,demos主题里已经有10W条数据了,我们创建一个名为demo的消费者群组,来消费demos主题下的这10W条数据,看多长时间可以消费完。
(demos主题分成了64个分区,消费者也用64个消费者去消费。kafka集群有3个broker,但是3个broker都在同一台服务器上。)
下面,我们先测试MANUAL_IMMEDIATE模式下的消费时间,消费者Listener的代码如下:
启动项目,查看输出日志,大概需要15秒,能把10W调数据消费完。而采用MANUAL模式,4~5秒就能处理完成。
由此可以知道,使用MANUAL方式进行批量提交,效率要比MANUAL_IMMEDIATE高出很多。
三、MANUAL与MANUAL_IMMEDIATE源码解析
下面我们从源码的角度,分析一下这两种模式是怎么实现的,同样的调用ack.acknowledge()方法,为什么一个是立马提交,一个是批量提交。
ConsumerAcknowledgment类是KafkaMessageListenerContainer类的内部类,查看其acknowledge方法,里面调用了processAck方法,我们看processAck方法的源码:
可以看到,如果采用的MANUAL_IMMEDIATE模式,调用的ackImmediate方法,立马提交这条数据的偏移量。如果采用MANUAL模式,调用addOffset方法。我们看addOffset方法的源码:
private void addOffset(ConsumerRecord<K, V> record) {
this.offsets.computeIfAbsent(record.topic(), v -> new ConcurrentHashMap<>())
.compute(record.partition(), (k, v) -> v == null ? record.offset() : Math.max(v, record.offset()));
}
这里,将record的偏移量存入了Map集合中,offsets是一个Map集合:
所以,采用MANUAL模式,调用acknowledge方法时,是将record的偏移量存入了map集合,然后存到一定数量后,再进行批量提交。
标签:调用,Spring,MANUAL,AckMode,偏移量,Kafka,record,IMMEDIATE,提交 From: https://www.cnblogs.com/andy1234/p/18026891