首页 > 其他分享 >Kafka 消费者位置提交方式及适用场景

Kafka 消费者位置提交方式及适用场景

时间:2024-10-11 22:49:17浏览次数:8  
标签:场景 kafka 提交 props apache import Kafka consumer

《Kafka 消费者位置提交方式及适用场景》

在使用 Kafka 进行消息处理时,消费者的位置提交是一个非常重要的环节。它决定了消费者在下次启动时从哪里开始读取消息。今天,我们就来深入探讨一下 Kafka 消费者位置提交方式有哪些,以及在什么场景下使用。

一、Kafka 消费者位置提交的重要性

在 Kafka 中,消费者会不断地从主题(Topic)的分区(Partition)中读取消息。为了保证在消费者崩溃或重新启动后能够继续从上次停止的位置读取消息,消费者需要定期提交自己的位置信息。如果不进行位置提交,消费者在重新启动后可能会从头开始读取消息,导致重复处理已经处理过的消息,或者错过一些新的消息。

二、Kafka 消费者位置提交方式

  1. 自动提交

    • Kafka 消费者可以配置为自动提交位置信息。当消费者拉取一批消息后,经过一定的时间间隔或者消息数量达到一定阈值时,消费者会自动提交当前的位置信息。
    • 自动提交的优点是简单方便,不需要开发者手动干预。但是,它也存在一些缺点。例如,如果在自动提交之前消费者崩溃了,那么可能会导致一些消息被重复处理。
  2. 手动提交

    • 手动提交位置信息需要开发者在代码中显式地调用提交方法。手动提交可以分为同步提交和异步提交两种方式。
    • 同步提交:消费者会等待提交操作完成后才继续处理下一批消息。这种方式可以确保位置信息被正确提交,但是可能会影响消费者的性能,特别是在提交操作比较耗时的情况下。
    • 异步提交:消费者会在后台异步地提交位置信息,不会阻塞当前的消息处理。这种方式可以提高消费者的性能,但是如果在提交操作完成之前消费者崩溃了,那么可能会导致位置信息丢失。

三、不同提交方式的适用场景

  1. 自动提交

    • 适用于对消息处理的准确性要求不高,但是对性能要求较高的场景。例如,一些实时数据分析系统,可能更关注处理的速度,而对消息的重复处理不太敏感。
    • 自动提交也适用于一些简单的应用场景,开发者不想花费太多时间在位置提交的管理上。
  2. 手动提交(同步)

    • 适用于对消息处理的准确性要求非常高的场景。例如,在金融交易系统中,每一笔交易都必须被准确处理,不能出现重复处理或漏处理的情况。
    • 当消费者需要在提交位置信息之前进行一些额外的处理,如数据验证、事务处理等,同步提交可以确保这些处理完成后再提交位置信息。
  3. 手动提交(异步)

    • 适用于对性能要求较高,同时又希望在一定程度上保证消息处理的准确性的场景。例如,一些高并发的 Web 应用,需要快速处理大量的用户请求,同时又要确保消息不会被重复处理。
    • 异步提交可以在不影响消息处理性能的情况下,尽可能地保证位置信息的正确提交。

四、Java 代码示例

自动提交示例:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;

public class AutoCommitConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("test-topic"));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }
            }
        } finally {
            consumer.close();
        }
    }
}

手动同步提交示例:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;

public class ManualSyncCommitConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("test-topic"));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }
                // 同步提交
                Map<TopicPartition, OffsetAndMetadata> offsets = consumer.committed(consumer.assignment());
                for (TopicPartition partition : consumer.assignment()) {
                    long lastProcessedOffset = records.endOffsets(Collections.singleton(partition)).get(partition) - 1;
                    consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastProcessedOffset)));
                }
            }
        } finally {
            consumer.close();
        }
    }
}

手动异步提交示例:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;

public class ManualAsyncCommitConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("test-topic"));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }
                // 异步提交
                Map<TopicPartition, OffsetAndMetadata> offsets = consumer.committed(consumer.assignment());
                for (TopicPartition partition : consumer.assignment()) {
                    long lastProcessedOffset = records.endOffsets(Collections.singleton(partition)).get(partition) - 1;
                    consumer.commitAsync(Collections.singletonMap(partition, new OffsetAndMetadata(lastProcessedOffset)), (offsets1, exception) -> {
                        if (exception!= null) {
                            System.err.println("Error committing offsets: " + exception.getMessage());
                        }
                    });
                }
            }
        } finally {
            consumer.close();
        }
    }
}

五、总结

Kafka 消费者的位置提交方式有自动提交和手动提交两种,手动提交又分为同步提交和异步提交。不同的提交方式适用于不同的场景,开发者需要根据实际需求选择合适的提交方式。在选择提交方式时,需要考虑消息处理的准确性、性能要求以及应用场景的特点等因素。

文章(专栏)将持续更新,欢迎关注公众号:服务端技术精选。欢迎点赞、关注、转发

个人小工具程序上线啦,通过公众号(服务端技术精选)菜单【个人工具】即可体验,欢迎大家体验后提出优化意见!500 个访问欢迎大家踊跃体验哦~

标签:场景,kafka,提交,props,apache,import,Kafka,consumer
From: https://blog.51cto.com/jiangyi/12221870

相关文章

  • Kafka 的 Producer 如何实现幂等性
    在分布式系统中,消息队列Kafka扮演着重要的角色。而确保Kafka的Producer(生产者)的消息发送具有幂等性,可以极大地提高系统的可靠性和稳定性。那么,Kafka的Producer是如何实现幂等性的呢?让我们一起来深入探讨。一、什么是幂等性?在数学中,幂等性是指一个操作执行多次与执......
  • Kafka 消费者位置提交方式及适用场景
    在使用Kafka进行消息处理时,消费者的位置提交是一个非常重要的环节。它决定了消费者在下次启动时从哪里开始读取消息。今天,我们就来深入探讨一下Kafka消费者位置提交方式有哪些,以及在什么场景下使用。一、Kafka消费者位置提交的重要性在Kafka中,消费者会不断地从主题......
  • 用四个场景案例,分析使用大模型对程序员工作的帮助提升
    引言随着人工智能技术的不断发展,大模型在软件开发中的应用越来越广泛。这些大模型,如GPT、文心一言、讯飞星火、盘古大模型等,可以帮助程序员提高工作效率,加快开发速度,并提供更好的用户体验。本文将介绍我在实际工作中经常使用大模型的三个场景,展示如何在程序员的工作中使......
  • 米尔新唐MA35D1核心板搭载原生17路UART和4路CAN FD,适用多种应用场景
    米尔电子发布了基于新唐MA35D1处理器设计的MYC-LMA35核心板,MA35D1处理器集成了双核Cortex-A35和Cortex-M4,原生17路UART和4路CANFD接口,可实现多种设备的高效互联并满足通信需求,此外,MYC-LMA35核心板还提供了丰富的外设资源:RGMII/USB/SDIO/I2S/I2C/EADC/EPWM/SPI等,丰富的外设资源使......
  • 计算机视觉之YOLO算法基本原理和应用场景
     YOLO算法基本原理整体流程YOLO将目标检测问题转化为一个回归问题。它将输入图像划分成多个网格单元,每个网格单元负责预测中心点落在该网格内的目标。对于每个网格单元,YOLO预测多个边界框以及这些边界框中包含目标的类别概率。边界框通常由中心点坐标(x,y)、宽度(w)和高度(h)来表示。......
  • 消息队列详细介绍、工作原理,kafka与RocketMQ的比对
    消息队列:当一个服务处理量为100,而另一个服务发送量为200,这时候多余的消息会被丢弃,如果想要全部处理,我们必须加入队列,这个队列用来存储消息的信息,通过offset表示当前处理的位置。注意此时队列还位于进程中,也就是服务进程,我们的进程一旦挂掉,未被处理的消息会直接丢失,我们不希望......
  • 解锁京东店铺潜力:15大场景揭秘商品列表API接口
    随着电子商务的蓬勃发展,API接口成为连接商家与平台的重要桥梁。京东作为中国领先的电商平台,提供了丰富的API接口,帮助商家更高效地管理店铺和商品。主要用作于一下场景商品展示:在商家自己的网站或移动应用上展示京东店铺的商品列表,方便用户浏览和购买。库存管理:实时获取商......
  • Stylized Far East 古代国风建筑城镇宫殿场景模型
    下载:​​Unity资源商店链接资源下载链接效果图:......
  • Chromium 前端form表单提交过程分析c++
    一、本文以一个简单的HTML表单,包含两个文本输入框和一个提交按钮:<formaction="demo_form.php">Firstname:<inputtype="text"name="fname"><br>Lastname:<inputtype="text"name="lname"><br><i......
  • kafka集群升级新策略,Cloudera运维专家来揭秘:助你轻松应对大数据挑战
    项目背景我们团队负责维护的Kafka集群承载了公司大部分实时数据的收集与传输任务。然而,目前存在一些问题,严重影响了集群的稳定性、用户体验以及管理员的运维效率:当前集群版本较低,且低版本的bug频繁出现,导致集群稳定性受到威胁。例如,violet集群最近因触发bug而出现不可......