1. 工作流程
1.1 消费者组概述
Consumer Group(CG):由多个 consumer 组成。形成一个消费者组的条件,是所有消费者的 groupId 相同。
消费者与消费组这种模型可以让整体的消费能力具备横向伸缩性,我们可以增加(或减少)消费者的个数来提高(或降低〕整体的消费能力。
1.2 消费者组初始化
消费组是一个逻辑上的概念,它将旗下的消费者归为一类,每一个消费者只隶属于一个消费组。每一个消费组都会有一个固定的名称,消费者在进行消费前需要指定其所属消费组的名称,这个可以通过消费者客户端参数 group.id
来配置,默认值为空宇符串。
消费者并非逻辑上的概念,它是实际的应用实例,它可以是一个钱程,也可以是一个进程。同一个消费组内的消费者既可以部署在同一台机器上,也可以部署在不同的机器上。
coordinator:辅助实现消费者组的初始化和分区的分配。
coordinator节点选择 = groupId#hashcode % 50(__consumer_offsets的分区数量)
例如: groupid 的 hashcode 值为 1,1% 50 = 1,那么 __consumer_offsets
主题的 1 号分区,在哪个 broker 上,就选择这个节点的 coordinator 作为这个消费者组的管理者。消费者组下的所有的消费者提交 offset 的时候就往这个分区去提交 offset。
1.3 消费者组消费流程
2. 分区分配策略以及再平衡
2.1 分区分配策略
一个 Consumer Group 中有多个 consumer 组成,一个 Topic 有多个 Partition 组成,现在的问题是:到底由哪个 consumer 来消费哪个 Partition 的数据?
Kafka 有 4 种主流的分区分配策略: Range、RoundRobin、Sticky、CooperativeSticky。
可以通过配置参数 partition.assignment.strategy
来修改分区的分配策略。默认策略是 Range + CooperativeSticky。Kafka 可以同时使用多个分区分配策略。
时间参数:
a. RangeAssignor
【分区策略原理】
说明:Kafka 默认的分区分配策略就是 Range + CooperativeSticky,所以不需要修改策略。
【分区分配再平衡案例】
- 停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)
- 1 号消费者:消费到 3、4 号分区数据;
- 2 号消费者:消费到 5、6 号分区数据;
说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。
- 再次重新发送消息观看结果(45s 以后)
- 1 号消费者:消费到 0、1、2、3 号分区数据;
- 2 号消费者:消费到 4、5、6 号分区数据。
说明:消费者 0 已经被踢出消费者组,所以重新按照 range 方式分配。
b. RoundRobinAssignor
c. StickyAssignor
2.2 消费者/组-协调器
2.3 __consumer_offsets
3. 位移 Offset 提交
笔者对 offset 做了一些区分:对于消息在分区中的位置,我们将 offset 称为“偏移量”;对于消费者消费到的位置,将 offset 称为“位移”,有时候也会更明确地称之为“消费位移”。为了是区分开是在说分区存储层面的内容,还是在说消费层面的内容。
3.1 消费位移
消费位移,x 表示某一次拉取操作中此分区消息的最大偏移量,假设当前消费者已经消费了 x 位置的消息,那么我们就可以说消费者的消费位移为 x,图中也用了 lastConsumedOffset 这个单词来标识它。
不过需要非常明确的是,当前消费者需要提交的消费位移并不是 x,而是 x+1,对应于上图中的 position,它表示下一条需要拉取的消息的位置。读者可能看过一些相关资料,里面所讲述的内容可能是提交的消费位移就是当前所消费到的消费位移,即提交的是 x,这明显是错误的。类似的错误还体现在对 LEO(Log End Offset)的解读上。在消费者中还有一个 committed offset 的概念,它表示已经提交过的消费位移。
Kafka Consumer 类提供了 position(TopicPartition)
和 committed(TopicPartition)
两个方法来分别获取上面所说的 position 和 committed offset 的值。这两个方法的定义如下所示。
为了论证 lastConsumedOffset、committedoffset 和 position 之间的关系,我们使用上面的这两个方法来做相关演示。我们向某个主题中分区编号为 0 的分区发送若干消息,之后再创建一个消费者去消费其中的消息,等待消费完这些消息之后就同步提交消费位移(调用 commitSync()
方法,这个方法的细节在下面详细介绍),最后我们观察一下 lastConsumedOffset、committed offset 和 position 的值。
TopicPartition tp = new TopicPartition(topic, 0);
consumer.assign(Arrays.asList(tp));
loηg lastConsumedOffset = -1; // 当前消费到的位移
while (true) {
ConsumerRecords<String, String> records= consumer.poll(l000);
// 判断是否己经消费完分区中的消息,以此来退出while循环
if (records.isEmpty()) {
break;
}
List<ConsumerRecord<String, String>> partitionRecords = records.records(tp);
lastConsumedOffset = partitionRecords.get(partitionRecords.size()-1).offset();
consumer.commitSync(); // 同步提交消费位移
}
System.out.println("comsumedoff set is " + lastConsumedOffset); // 377
OffsetAndMetadata offsetAndMetadata = consumer.committed(tp);
System.out.println("commited offset is " + offsetAndMetadata.offset()); // 378
long posititon = consumer.position(tp);
System.out.println("the offset of the next record is " + posititon); // 378
可以看出,消费者消费到此分区消息的最大偏移量为 377,对应的消费位移 lastConsumedOffset 也就是 377。在消费完之后就执行同步提交,但是最终结果显示所提交的位移 committed offset 为 378,并且下一次所要拉取的消息的起始偏移量 position 也为 378。在本示例中,position= committed offset = lastConsumedOffset + 1,当然 position 和 committedoffset 并不会一直相同,这一点会在下面的示例中有所体现。
__consumer_offsets
主题里面采用 key 和 value 的方式存储数据。key 是 group.id+topic+分区号,value 就是当前 offset 的值。每隔一段时间,Kafka 内部会对这个 topic 进行 compact,也就是每个 group.id+topic+分区号就保留最新数据。
3.2 消息重复/丢失
3.3 提交方式
a. 自动提交
在Kafka中默认的消费位移的提交方式是自动提交,这个由消费者客户端参数 enable.auto.commit
配置,默认值为 true。当然这个默认的自动提交不是每消费一条消息就提交一次,而是定期提交,这个定期的周期时间由客户端参数 auto.commit.interval.ms
配置,默认值为 5s,此参数生效的前提是 enable.auto.commit
参数为 true。
在默认的方式下,消费者每隔 5s 会将拉取到的每个分区中最大的消息位移进行提交。自动位移提交的动作是在 poll()
方法的逻辑里完成的,在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移。
在 Kafka 消费的编程逻辑中位移提交是一大难点,自动提交消费位移的方式非常简便,它免去了复杂的位移提交逻辑,让编码更简洁。但随之而来的是重复消费和消息丢失的问题。
- 假设刚刚提交完一次消费位移,然后拉取一批消息进行消费,在下一次自动提交消费位移之前,消费者崩溃了,那么又得从上一次位移提交的地方重新开始消费,这样便发生了重复消费的现象(对于再均衡的情况同样适用)。我们可以通过减小位移提交的时间间隔来减小重复消息的窗口大小,但这样并不能避免重复消费的发送,而且也会使位移提交更加频繁。
- 按照一般思维逻辑而言,自动提交是延时提交,重复消费可以理解,那么消息丢失又是在什么情形下会发生的呢?我们来看下图中的情形。拉取线程 A 不断地拉取消息并存入本地缓存,比如在 BlockingQueue 中,另一个处理线程 B 从缓存中读取消息并进行相应的逻辑处理。假设目前进行到了第 y+1 次拉取,以及第 m 次位移提交的时候,也就是 x+6 之前的位移已经确认提交了,处理线程 B 却还正在消费 x+3 的消息。此时如果处理线程 B 发生了异常,待其恢复之后会从第 m 此位移提交处,也就是 x+6 的位置开始拉取消息,那么 x+3 至 x+6 之间的消息就没有得到相应的处理,这样便发生消息丢失的现象。
b. 手动提交
开启手动提交功能的前提是消费者客户端参数 enable.auto.commit
配置为 false。
手动提交可以细分为同步提交和异步提交,对应于 KafkaConsumer 中的 commitSync()
和 commitAsync()
两种类型的方法。
4. 消费者 API
4.1 订阅主题
【注】在消费者 API 代码中必须配置消费者组 id,否则会抛出 InvalidGroupIdException。命令行启动消费者不填写消费者组 id 会被自动填写随机的消费者组 id。
public class CustomConsumer {
public static void main(String[] args) {
// 0. 配置
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "custom-consumer-group");
// 1. 创建一个消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 2. 订阅主题
List<String> topics = new ArrayList<>();
topics.add("topic-demo");
kafkaConsumer.subscribe(topics);
// 3. 消费主题
int idx = 0;
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : consumerRecords) {
System.out.println(idx++ + " ===> " + record.value());
}
}
}
}
4.2 订阅分区
直接订阅某些主题的特定分区:
public class CustomConsumerPartition {
public static void main(String[] args) {
// 0. 配置
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "custom-consumer-group");
// 1. 创建一个消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 2. 订阅主题
List<TopicPartition> topics = new ArrayList<>();
topics.add(new TopicPartition("topic-demo", 0));
kafkaConsumer.assign(topics);
// 3. 消费主题
int idx = 0;
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : consumerRecords) {
System.out.println(idx++ + " ===> " + record.value());
}
}
}
}
集合订阅的方式 subscribe(Collection)
、正则表达式订阅的方式 subscribe(Pattem)
和指定分区的订阅方式 assign(Collection)
分别代表了 3 种不同的订阅状态:AUTO_TOPICS、AUTO PATTERN 和 USERASSIGNED (如果没有订阅,那么订阅状态为 NONE)。然而这 3 种状态是互斥的,在一个消费者中只能使用其中的一种,否则会报出 IllegalStateException 异常。
通过 subscribe()
方法订阅主题具有消费者自动再均衡的功能,在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系。当消费组内的消费者增加或减少时,分区分配关系会自动调整,以实现消费负载均衡及故障自动转移。而通过 assign()
方法订阅分区时,是不具备消费者自动均衡的功能的,其实这一点从 assign()
方法的参数中就可以看出端倪,两种类型的 subscribe()
都有 ConsumerRebalanceListener 类型参数的方法,而 assign()
方法却没有。
4.3 自动/手动提交offset
4.4 指定 offset
4.5 指定时间消费
4.6 多线程消费
分区是消费线程的最小划分单位。
(1)一个线程对应一个 KafkaConsumer 实例,它的优点是每个线程可以按顺序消费各个分区中的消息。缺点也很明显,每个消费线程都要维护一个独立的 TCP 连接,如果分区数和 consumerThreadNum 的值都很大,那么会造成不小的系统开销。
(2)一般而言,poll()
拉取消息的速度是相当快的,而整体消费的瓶颈也正是在处理消息这一块,如果我们通过一定的方式来改进这一部分,那么我们就能带动整体消费性能的提升。
(3)引入一个共享变量 offsets 来参与提交。每一个处理消息的 RecordHandler 类在处理完消息之后都将对应的消费位移保存到共享变量 offsets 中,KafkaConsumerThread 在每一次 poll()
方法之后都读取 offsets 中的内容并对其进行位移提交。注意在实现的过程中对 offsets 读写需要加锁处理,防止出现并发问题。