Kafka的消费者
Kafka采用消费者组的方式来消费消息,一个消费者组中可以包含多个消费者。消费者对象订阅主题并接收Kafka的消息,然后验证消息并保存结果。尽管一个消费者组中可以包含多个消费者,但是它们订阅的都是同一个主题的消息。
1. 消费模式
当生产者将消息发送到Kafka集群后,会转发给消费者进行消费。消息的消费模型有两种,推模式(push)和拉模式(pull)。
1.1 消息的推模式(push)
消息的推模式需要记录消费者的消费状态。当把一条消息推送给消费者后,需要维护消息的状态,如标记这条消息已经被消费。
但这种方式无法很好地保证消息被处理,如果要保证消息被处理,发送完消息后,需要将其状态设置为“已发送”。收到消费者的确认后,才将其状态更新为“已消费”,这就需要我们记录所有消息的消费状态。显然这种方式不可取。这种方式还存在一个明显的缺点,就是消息被标记为“已消费”后,其他消费者就不能再进行消费了。
1.2 消息的拉模式(pull)
由于推模式存在一定的缺点,因此Kafka采用消费拉取的模式来消费消息。由每个消费者维护自己的消费状态,并且每个消费者互相独立地顺序拉取每个分区的消息。消费者通过偏移量的信息来控制从Kafka中消费的消息(这里可以类比数组,每一个消费者都持有一个自己独有的下标,通过下标来访问数组中的数据)。
由消费者通过偏移量进行消费控制的优点在于,消费者可以按照任意的顺序消费消息。例如,消费者可以通过重置偏移量信息,重新处理之前已经消费过的消息;或者直跳转到某一个偏移量位置,并开始消费。
这里需要特别说明的是,当生产者最新写入的消息还没有达到备份数量,即新写入的消息还没有达到冗余度要求时,对消费者是不可见的。消费者只能消费到水位线(Watermark)的位置。
另外,如果消费者已经将消息进行了消费,Kafka并不会立即将消息删除,而是会将所有消息进行保存,即持久化保存到Kafka的消息日志中。无论消息有没有被消费,用户可以通过设置保留时间来清理过期的消息数据。关于Kafka持久化机制与日志的清理策略,将在后续文章进行详细的介绍。
1.3 推模式与拉模式的区别
由于消息的发送速率是由Kafka的Broker(实例) 决定的,Broker 的目标是尽可能以最快的速度传递消息。
在推模式下,很难适应消费速率不同的消费者,从而造成消费者来不及处理消息。消费者来不及处理消息就可能造成消息的阻塞,从而降低系统的处理能力。
在拉模式下,用户可以根据消费者的处理能力调整消息消费的速率,但在这种模式下也存在一定的缺点。如果消息的生产者没有产生消息,就可能造成消费者陷入循环中,一直等待数据到达。为了避免这种情况出现,可以在拉取过程中指定允许消费者在等待数据到达前进行阻塞,并且还可以指定消费的字节数,从而保证传输时的数据量。
2. 消费者与消费者组
消费者是以消费者组的方式工作的,即一个消费者组由一个或多个消费者组成,它们共同消费一个 Topic 中的消息。在同一个时间点上,Topic 中的分区只能由一个组中的一个消费者进行消费,同一个分区可以被不同组中的消费者进行消费。
2.1 消费者与消费者组与分区的关系
在Kafka消息系统中,消费者以消费者组为单位订阅 Topic 中的消息,而 Topic 又包含多个分区,消费者组中有多个消费者实例。那么消费者组中的每一个消费者负责处理消费一些分区中的数据。
前面提到,同一时刻每个分区中的一条消息只能被消费者组中的一个消费者消费。这里的分配原则是,Topic下的每个分区只能被消费者组中的一个消费者消费,也就是说,不会发生同一个消费者组中的两个不同的消费者负责处理同一分区的情况。这里就可能存在几种不同的情况。
-
Topic的分区数大于消费者组中的消费者数。
假设有一个Topic1的主题,该主题有四个分区,并且有一个消费组Group1,这个消费者组只有一个消费者Consumer1,那么消费者Consumer1 将会收到这四个分区的消息。如果增加消费者组中的消费者,例如,增加到两个消费者时。每个消费者将分别接收到两个分区中的消息。
-
Topic的分区数等于消费者组中的消费者数。
如果继续增加到四个消费者,那么每个消费者将分别收到一个分区的消息。 -
Topic的分区数小于消费者组中的消费者数。
我们继续在这个消费组中增加消费者,这时消费者组中有五个消费者,那么将有一个消费者会空闲,它不会接收到任何消息。
通过在消费者组中添加消费者,可以提升系统的水平扩展能力,从而提升消费者的消费能力。Kafka建议创建 Topic 时使用比较多的分区数,一般分区数要大于消费者的个数,这样可以在消费负载高的情况下增加消费者来提升性能。
如果存在多个消费者组,那么又会是什么样的情况呢?
由于Kafka 支持写入一次消息,支持任意多的应用读取这个消息。应用需要包含不同的消费组,这样可以使得每个应用都能读到全量消息。
2.2 分区的重分配
分区的重分配(Rebalance)是Kafka一个很重要的性质,它可以保证系统的高可用和系统的水平扩展。以下几种情况会触发Kafka发生重平衡。
- 新的消费者加入消费者组。新加入的成员会消费一个或多个分区,而这些分区中的消息在新成员加入之前被其他成员消费处理过。
- 消费者离开消费者组,例如,发生了宕机或重启。这种情况会导致之前由该消费者负责的分区分配给其他分区。
重分配的优点是保证高可用性和扩展性,但是它也会带来一些问题,其中最主要的问题是在重分配期间,整个消费组是不可用的,会造成所有消费者都不能消费消息;并且,重分配会导致消费者需要重新更新状态,从而导致原来的消费者状态过期。这些都会导致系统的消费能力降低。
除此之外,在消费者组中维护一个协调者(Coordinator)用于感知消费者的心跳信息。无论是增加新的消费者,还是有消费者退出,都会由这个协调者来感知其心跳信息。如果消费者超过一定时间没有发送心跳信息,那么它的状态就会过期,协调者会认为该消费者已经宕机,然后触发重分配。从消费者宕机到其被协调者感知到,这中间是有一段时间间隔的,这段时间内该消费者是不能进行消息消费的。
3. 偏移量与提交
Kaka的消费者每次拉取服务器端的消息时,总是拉取由生产者写入Kafka但还没有被消费者处理过的数据。因此,需要一种机制来记录哪些消息是被消费者组里的哪个消费者消费过的。与其他消息系统不同的是,Kafka消费者每次拉取完消息后,会记录最新的偏移量地址。下次拉取消息的时候,将会从偏移量往后拉取最新的消息数据。我们把消费者更新当前拉取分区中的位置(即偏移量)的行为称为提交。
3.1 偏移量与重平衡
消费者需要定期提交拉取的偏移量,一方面用于记录最新消费的位置信息,以便下次的拉取操作;另一方面,当消费者退出或有新的消费者加入消费者组的时候,都会触发重分配的操作,完成重分配后,每个消费者可能会分配到新的分区,读取新分区中的数据。
为了能够继续之前的拉取工作,消费者需要读取每个partition最后一次提交的偏移量,然后从偏移量指定的地方继续处理。这里就可能存在几种不同的情况。
- 情况一:如果提交的偏移量小于客户端处理的最后一个消息的偏移量,会导致两个偏移量之间的消息被重复处理。(因为消息并不是拉一条,处理一条,提交一条。而是可能拉取了10条(假设是从0-9),处理了8条,但此时发生了重分配,那么当新的消费者拉取消息时,还是会从第0号消息开始处理)
- 情况二:如果提交的偏移量大于客户端处理的最后一个消息的偏移量,会导致两个偏移量之间的消息丢失。
通过对以上两种情况的介绍,我们会发现如何正确提交消费者的偏移量将会对客户端产生很大的影响。
3.2 偏移量的提交方式
(1)自动提交
这种提交方式是一种简单的提交方式,需要把参数enable.auto.commit设置为true,那么在默认情况下,每隔5秒消费者会自动把从pull( )方法接收到的最大偏移量提交上去。这个时间间隔可以通过参数 auto.commit.interval.ms 进行修改。当然这个自动提交是在每次进行轮询时,即调用pull( )方法时进行的。消费者会检查是否该提交偏移量了,如果已经提交,就会返回上次提交时的偏移量。
自动提交虽然方便,但是也可能存在一些问题,其中最主要的问题就是可能造成消息的重复消费。在前面的内容中介绍过,重分配的发生会有不同的情况。其中的一种情况就是当提交的偏移量小于客户端处理的最后一个消息的偏移量,会造成消息的重复消费。
(2)手动提交(同步)
将enable.auto.commit设置成false,让应用程序决定何时提交偏移量,即使用commitSync( )方法提交偏移量。这种方式非常简单,也很可靠。它可以减少在重分配时重复处理的消息数量,并同时消除丢失消息的可能性。需要注意的是,commitSync( )方法将提交由poll( )方法返回的最新偏移量,所以在处理完所有记录后要确保调用了commitSync( )方法,否则还会有丢失消息的风险。这种提交方式的本质是同步提交偏移量,因此在提交的过程中,其应用程序会被阻塞。
while (true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// do something
}
consumer.commitSync();
}
(3)异步提交。
上面提到的同步提交方式会造成应用程序一直阻塞,这样会限制应用程序的吞吐量,其中的一种解决办法就是降低提交频率;另一种方式可以使用异步提交API。消费者只需要发送提交偏移量的请求,而不需要等待服务器端的响应。
while (true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// do something
}
consumer.commitAsync();
}
与同步提交不同的是,在成功提交偏移量之前,异步提交不会进行重试,只是根据服务器端的响应做出相应的动作。如果在得到服务器端返回响应之前,有另一个较大的偏移量信息被成功提交,就可能造成消息的重复消费。假设我们发出一个异步请求用于提交偏移量100,但网络出现问题,服务器未收到该请求。与此同时,消费者又提交了200的偏移量且成功了,那此时如果偏移为100的提交也到达了服务器,就可能造成偏移量从200变100,出现重复消费。
(4)组合同步提交和异步提交。
既然消费者支持同步提交偏移量和异步提交偏移量的两种方式。我们就可以组合使用commitSync( )方法和commitAsync( )方法来提交偏移量信息。这样针对偶尔出现的提交失败,不必提交偏移量的重试也不会有太大问题。
try {
while (true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// do something
}
consumer.commitAsync();
}
}catch (Exception e){
e.printStackTrace();
}finally {
try {
consumer.commitSync();
}finally {
consumer.close();
}
}
(5) 提交指定的偏移量。
在使用同步提交偏移量和异步提交偏移量时,可以在调用commitSync()方法和commitAsync()方法时,传入希望提交的partition和offset的map,即提交特定的偏移量。
4. 高级特性
4.1 分区策略
Kafka的 Topic 是由分区组成的,并且还可以配置分区的冗余度。一个分区在多个 Broker 中选举出一个Leader,消费者只访问这个 Leader 的分区副本。这里重点介绍一个消费者如何选定一个 Topic 多个分区中的一个分区和Kafka 消费者支持的分区策略。
通过前面内容的介绍可知,一条消息只能被消费者组中的一个消费者消费。消费者组订阅Topic,意味着该Topic 下的所有分区都会被消费者组中的消费者消费,如果按照从属关系来说,Topic下的每个分区只属于消费者组中的一个消费者,不可能出现组中的两个消费者负责同一个分区。
Kafka通过配置消费者分区分配策略来决定分区中的消息被哪一个消费者消费。消费者分区的分配策略都应该实现rg.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor 接口。通过实现这个接口,用户可以自定义分区分配策略。Kafka提供了3种实现的方式,可以通过参数 partition.assignment.strategy 进行指定。
- RangeAssignor.
这是默认的分区分配策略。这种分配策略是根据Kafka Consumer 端的总数和Topic 中的分区总数来获取一个范围的,然后将分区按照范围进行平均分配,以保证分区尽可能均匀地分配给所有消费者。 - RoundRobinAssignor.
这种方式将Consumer Group中的所有消费者及其订阅Topic的分区按照字典序列排序,然后通过轮询的方式逐个将分区分配给每个消费者。 - StickyAssignor.
这种分区分配策略采用黏性分配策略。所谓黏性分配策略,既要保证分区的分配要尽可能均匀,又要保证每次分区的分配尽可能与上次分配的保持相同,就像进行粘贴一样。如果这两点发生冲突,优先考虑第一点,即分区的分配要尽可能均匀。
4.2 重分配监听器
前面介绍到当有新的消费者成员加入消费者组或有消费者退出,都可能触发重分配的操作。但是在重分配期间,消费者组内的消费者是无法读取消息的,即在重分配这一段时间内,消费者组不可用。如果在这段时间内,我们想要执行一些应用程序代码,在调用subscribe()方法时传入一个org.apache.kafka.clients.consumer.ConsumerRebalanceListener 接口的实例就可以了,而这个实例就是重平衡监听器。
4.3 拦截器
消费者的拦截器允许用户在消费者消费消息之前或消费者提交偏移量之后,执行特定的业务逻辑。消费者拦截器都是实现了org.apache.kafka.clients.consumer.ConsumerInterceptor接口的实例。可以通过消费者拦截器配置参数interceptor.classes 进行配置。
该接口有两个主要方法:onConsume( )和onCommit()。
- onConsume( ):该方法在消息返回给Consumer之前被调用。
- onCommit( ):该方法在 Consumer 提交偏移量信息后被调用
4.4 优雅退出
在一般情况下,在一个主线程中循环poll 消息并进行处理。当需要退出poll循环时,可以使用另一个线程调用consumer.wakeup( )方法,调用此方法会使pull( )方法抛出WakeupException。主线程在捕获 WakeUpException后,需要调用consumer.close()方法将消费者关闭。另外,在调用consumer.wakeup( )方法时,主线程正在处理消息,在下一次主线程调用poll 时会抛出异常。
5. 参数配置
Kafka 消费者端的配置参数,除了bootstrap.servers、key.deserializer、value.deserializer三个必需参数以外,还有很多可选的参数。下面列举了消费者的配置参数及它们的含义。
- bootstrap.servers .
该参数表示 Kafka Broker 集群的地址信息,其格式为ip1:port,ip2:port 等,不需要设定全部的集群地址,设置两个或两个以上即可。 - group.id.
该参数表示消费者组名称,如果group.id相同则表示属于一个消费者组中的成员。如果没有指定该参数,会报出异常。 - fetch.min.bytes .
该参数用来配置Kafka消费者在一次拉取请求中能从Kafka中拉取的最小数据量,即调用pull( )方法时,每次拉取的数据量,其默认值为1字节。
消费者在拉取数据时,如果Kafka服务器端返回给消费者的数据量小于这个参数值的设定,那么消费者就需要进行等待,直到数据量满足这个参数的配置大小。因此在实际运行环境中,可以适当调大这个参数的值以提高一定的吞吐量。另外,增大这个参数值也会造成额外的延迟,因此增大该参数不适合敏感的应用, - fetch.max.bytes .
该参数与fetch.min.bytes参数对应,它用来配置Kafka 消费者在一次拉取请求中从Kafka 服务器端中拉取的最大数据量,其默认值为52428800字节,也就是50MB。
该参数并不是绝对的最大值。试想一下,如果该参数设置的值比任何一条由生产者写入Kafka服务器端中的消息字节数小,那么会不会造成无法消费呢?如果在第一个非空分区中拉取的第一条消息字节数大于该值,那么该消息仍然返回,以确保消费者继续工作。Kafka 消息系统中,能够接收的最大消息的字节数是通过服务器端参数message.max.bytes进行设置的。 - fetch.max.wait.ms
该参数也和fetch.min.bytes 参数有关。前面提到,如果Kafka服务器端返回给消费者的数据量小于 fetch.min.bytes 参数值的设定,消费者就需要等待,直到数据量满足这个参数的配置大小。然而有可能会一直等待而无法将消息发送给消费者,显然这是不合理的。fetch.max.wait.ms 参数用于指定 Kafka的等待时间,默认值为 500ms。当 Kafka 满足不了fetch.min.bytes 参数值的设定时,Kafka 集群也会根据 fetch.max.wait.ms 参数值的设定,默认等待 5s,然后将消息数据返回给消费者。综合来看,fetch.min,bytes 和 fetch.max.wail.ms都有可能造成消息的延迟处理。如果业务应用对延迟敏感,那么可以适当调小这些参数 - max.poll.records。
该参数用来配置Kafka消费者在一次拉取请求中拉取的最大消息数,其默认值为500,如果消息数都比较小,则可以适当调大这个参数值来提升消费速度。 - max.partition.fetch.bytes.
该参数用来配置从每个分区里返回给消费者的最大数据量,其默认值为1048576字节,即 1MB。这个参数与fetch.max.bytes 参数相似,只不过 max.partition.fetch.bytes 用来限制一次拉取中每个分区消息的字节数,而fetch.max.bytes用来限制一次拉取中整体消息的字节数。同样,如果这个参数设定的值比消息字节数小,那么也不会造成无法消费。 - connections.max.idle.ms.
该参数用来指定在多长时间之后,关闭闲置的Kafka 消费者连接,默认值是 540000ms,即9min 。 - send.buffer.bytes
该参数用来设置发送消息缓冲区(SO_SNDBUF)的大小,其默认值为131072字节,即128KB。与receive.bufer.bytes 参数一样,如果设置为-1,则使用操作系统的默认值。 - request.timeout.ms.
该参数用来配置 Kafka 消费者等待请求响应的最长时间,其默认值为40s。 - receive.buffer.bytes.
该参数用来设置接收消息缓冲区(SO_RECBUF)的大小,其默认值为65536字节,即64KB。如果将该参数设置为-1,则使用操作系统的默认值。 - metadata.max.age.ms.
该参数用来配置元数据的过期时间,其默认值为300000ms,即5min。如果元数据在此参数限定的时间范围内没有进行更,,即使没有任何分区变化或有新的Kafka Broker 加入,也会被强制更新。 - reconnect.backoff.ms.
该参数用来配置Kafka消费者每次尝试重新连接指定主机之前应该等待的时间,避免频繁地连接主机,其默认值为50s。 - auto.offset.reset.
该参数值为字符串类型,其有效值为以下三个。
。earliest:当各分区下有已提交的偏移量时,从提交的偏移量开始消费;无提交的偏移量时,从头开始消费。
。latest:当各分区下有已提交的偏移量时,从提交的偏移量开始消费;无提交的偏移量时,消费新产生的该分区下的数据。
。none:Topic 各分区都存在已提交的偏移量时,从偏移量后开始消费;只要有一个分区不存在已提交的偏移量,则抛出异常。
注意,除了以上三个有效值以外,设置其他任何值都会抛出错误。 - enable.auto.commit.
该参数值为boolean 类型,配置是否开启自动提交消费位移的功能,默认开启。 - auto.commit.interval.ms.
该参数只有当enable.auto.commit 参数设置为 true 时才生效,表示开启自动提交偏移
量功能时自动提交消费位移的时间间隔,其默认值为5s。 - partition.assignment.strategy.
该参数表示消费者的分区分配策略,支持轮询策略设置和范围策略设置。 - interceptor.class .
该参数用来配置消费者客户端的拦截器,该拦截器必须实现org.apache.kafkaclients.consumer.Consumerlnterceptor接口。使用消费者拦截器可以允许用户截取消费者接收到的消息,从而可以进一步改变消息。在默认情况下,没有拦截器的设置 - exclude.internal.topics
该参数用来指定Kaka中的内部主题是否可以向消费者公开,其默认为true。在Kafka消息系统中有两个内部的主题:__consumer_offsets 和 __transaction_state