在 Apache Kafka 中,消费者群组(Consumer Group)是一组订阅相同主题的消费者实例。消费者群组的主要目的是实现消息的共享消费,即一个主题的消息会被分发给群组内的不同消费者,而不是所有消费者都接收所有消息。
以下是如何配置和使用消费者群组的基本步骤:
配置消费者群组
-
创建消费者实例:首先,你需要创建一个消费者实例,并且为这个实例指定一个群组 ID。群组 ID 是用来区分不同消费者群组的标识符。
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "my-consumer-group"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
-
订阅主题:消费者可以订阅一个或多个主题,一旦订阅后,消费者就可以开始从这些主题拉取消息。
// 订阅一个主题 consumer.subscribe(Arrays.asList("my-topic")); // 或者直接分配分区 // consumer.assign(Collections.singletonList(new TopicPartition("my-topic", 0)));
使用消费者群组
-
启动多个消费者实例:为了形成一个群组,你需要启动多个消费者实例,并且确保它们都使用相同的群组 ID。
-
消费消息:消费者会自动加入到群组中,并且 Kafka 会根据配置和当前消费者的数量来分配分区给不同的消费者。每个分区只会被群组内的一个消费者消费。
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }
-
管理偏移量:消费者可以自动提交偏移量(即已读取的消息位置),也可以手动提交。自动提交简化了使用过程,但手动提交提供了更细粒度的控制。
-
自动提交:
props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); // 每隔一秒自动提交一次
-
手动提交:
consumer.commitSync(); // 同步提交 consumer.commitAsync(); // 异步提交
-
注意事项
- 如果消费者群组中有消费者长时间未读取消息,那么 Kafka 可能会重新平衡分区,将该消费者的分区重新分配给其他活跃的消费者。
- 当消费者群组中的消费者数量发生变化时,Kafka 会自动重新平衡分区,以确保每个消费者都能公平地获得消息。
- 消费者群组的偏移量信息通常存储在 Kafka 的
_consumer_offsets
主题中,但这可以通过配置进行更改。
通过以上配置,你可以设置和管理 Kafka 消费者群组,以满足不同的应用场景需求。
标签:消费者,群组,Kafka,props,put,consumer From: https://blog.csdn.net/qq_33240556/article/details/142333150