首页 > 其他分享 >Kafka中,如何配置和使用消费者群组?

Kafka中,如何配置和使用消费者群组?

时间:2024-09-18 14:55:26浏览次数:8  
标签:消费者 群组 Kafka props put consumer

在 Apache Kafka 中,消费者群组(Consumer Group)是一组订阅相同主题的消费者实例。消费者群组的主要目的是实现消息的共享消费,即一个主题的消息会被分发给群组内的不同消费者,而不是所有消费者都接收所有消息。

以下是如何配置和使用消费者群组的基本步骤:

配置消费者群组

  1. 创建消费者实例:首先,你需要创建一个消费者实例,并且为这个实例指定一个群组 ID。群组 ID 是用来区分不同消费者群组的标识符。

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "my-consumer-group");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    
  2. 订阅主题:消费者可以订阅一个或多个主题,一旦订阅后,消费者就可以开始从这些主题拉取消息。

    // 订阅一个主题
    consumer.subscribe(Arrays.asList("my-topic"));
    
    // 或者直接分配分区
    // consumer.assign(Collections.singletonList(new TopicPartition("my-topic", 0)));
    

使用消费者群组

  1. 启动多个消费者实例:为了形成一个群组,你需要启动多个消费者实例,并且确保它们都使用相同的群组 ID。

  2. 消费消息:消费者会自动加入到群组中,并且 Kafka 会根据配置和当前消费者的数量来分配分区给不同的消费者。每个分区只会被群组内的一个消费者消费。

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records)
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
    
  3. 管理偏移量:消费者可以自动提交偏移量(即已读取的消息位置),也可以手动提交。自动提交简化了使用过程,但手动提交提供了更细粒度的控制。

    • 自动提交:

      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000"); // 每隔一秒自动提交一次
      
    • 手动提交:

      consumer.commitSync(); // 同步提交
      consumer.commitAsync(); // 异步提交
      

注意事项

  • 如果消费者群组中有消费者长时间未读取消息,那么 Kafka 可能会重新平衡分区,将该消费者的分区重新分配给其他活跃的消费者。
  • 当消费者群组中的消费者数量发生变化时,Kafka 会自动重新平衡分区,以确保每个消费者都能公平地获得消息。
  • 消费者群组的偏移量信息通常存储在 Kafka 的 _consumer_offsets 主题中,但这可以通过配置进行更改。

通过以上配置,你可以设置和管理 Kafka 消费者群组,以满足不同的应用场景需求。

标签:消费者,群组,Kafka,props,put,consumer
From: https://blog.csdn.net/qq_33240556/article/details/142333150

相关文章

  • Kafka中的事务性消息是如何工作的?
    在ApacheKafka中,事务性消息是指那些在事务上下文中发送的消息。事务性消息保证了消息的ExactlyOnce语义,即消息只能被发送一次,并且只能被处理一次。事务性消息可以确保在生产者和消费者之间传递的数据的完整性和一致性,尤其是在需要处理关键任务数据的应用场景中尤为重......
  • Kafka中的Offset和Consumer Group之间的关系是什么?
    在ApacheKafka中,Offset是用来标记消息的位置标识符,它表示了一个主题分区中的消息序列号。每个消息在分区中都有唯一的Offset。当消费者消费消息时,它会跟踪Offset来记住自己已经消费到哪里了。ConsumerGroup(消费者群组)则是多个消费者实例的逻辑分组,它们共同消费一个......
  • ClickHouse-Kafka Engine 正确的使用方式
    Kafka是大数据领域非常流行的一款分布式消息中间件,是实时计算中必不可少的一环,同时一款OLAP系统能否对接Kafka也算是考量是否具备流批一体的衡量指标之一。ClickHouse的Kafka表引擎能够直接与Kafka系统对接,进而订阅Kafka中的Topic并实时接受消息数据。众所周......
  • Java多种方式实现 有界缓冲区下的多个生产者、消费者模型 (Semaphore、while+wait+noti
    /**@Author:SongyangJi@ProjectName:[email protected]@Description:*/classProducerThreadextendsThread{intrate;MultiProducerConsumermultiProducerConsumer;publicProducerThread(intrate,MultiProducerConsumermultiProducer......
  • 【背时咯】简单记录一下大数据技术的核心组件,包括Hadoop、Spark、Kafka等,并说明它们在
    大数据技术的核心组件包括Hadoop、Spark、Kafka等,它们在大数据生态系统中扮演着不可或缺的角色。以下是对这些核心组件的详细解释及它们在大数据生态系统中的作用:Hadoop核心组件:Hadoop分布式文件系统(HDFS):提供高可靠性的数据存储能力,能够将大规模的数据集分布式存储在多......
  • MQ学习笔记(一)Kafka简介
    什么是MQ?MessageQueue消息队列,在消息的传递过程中保存消息的容器。父亲==》书架《==儿子好处:应用解耦,异步提速,限流削峰使用成本:引入复杂度,最终一致性,高可用性何时使用:生产者不需要从消费者处获得反馈能够容忍短暂的不一致性效果要大于副作用应用场景应用解耦场......
  • 中间件知识点-消息中间件(Kafka)二
    Kafka一、Kafka介绍及基本原理kafka是一个分布式的、支持分区的、多副本、基于zookeeper的分布式消息系统/中间件。kafka一般不会删除消息,不管这些消息有没有被消费。只会根据配置的日志保留时间(log.retention.hours)确认消息多久被删除,默认保留最近一周的日志消息(日志......
  • docker安装运行kafka单机版
    这里我们安装一下kafka的单机版,由于kafka是基于zk进行管理的,如果我们没有安装过zk的话,需要进行安装好zk再安装kafka,当然如果已经安装过了,那就没必要安装了。我们可以执行dockerimages命令查看我们的zk镜像是否已经存在了。执行的主要的流程如下所示:1.docker拉取zookeeper镜像......
  • kafka集群架构设计原理详解
    目录从Zookeeper数据理解Kafka集群工作机制Kafka的Zookeeper元数据梳理1、zookeeper整体数据2、ControllerBroker选举机制3、LeaderPartition选举机制4、LeaderPartition自动平衡机制5、Partition故障恢复机制6、HW一致性保障-Epoch更新机制7、总结从Zookeeper......
  • Kafka 中消息保留策略详解
    个人名片......