首页 > 其他分享 >RocketMQ教程-(5)-功能特性-消费者分类

RocketMQ教程-(5)-功能特性-消费者分类

时间:2023-09-13 10:37:04浏览次数:47  
标签:教程 消费 处理 特性 重试 PushConsumer 消息 SimpleConsumer RocketMQ


Apache RocketMQ 支持 PushConsumer 、 SimpleConsumer 以及 PullConsumer 这三种类型的消费者,本文分别从使用方式、实现原理、可靠性重试和适用场景等方面为您介绍这三种类型的消费者。

背景信息

Apache RocketMQ 面向不同的业务场景提供了不同消费者类型,每种消费者类型的集成方式和控制方式都不一样。了解如下问题,可以帮助您选择更匹配业务场景的消费者类型。

  • 如何实现并发消费:消费者如何使用并发的多线程机制处理消息,以此提高消息处理效率?
  • 如何实现同步、异步消息处理:对于不同的集成场景,消费者获取消息后可能会将消息异步分发到业务逻辑中处理,此时,消息异步化处理如何实现?
  • 如何实现消息可靠处理:消费者处理消息时如何返回响应结果?如何在消息异常情况进行重试,保证消息的可靠处理?

以上问题的具体答案,请参考下文。

功能概述

RocketMQ教程-(5)-功能特性-消费者分类_Apache

如上图所示, Apache RocketMQ 的消费者处理消息时主要经过以下阶段:消息获取--->消息处理--->消费状态提交。

针对以上几个阶段,Apache RocketMQ 提供了不同的消费者类型: PushConsumer 、SimpleConsumer 和 PullConsumer。这几种类型的消费者通过不同的实现方式和接口可满足您在不同业务场景下的消费需求。具体差异如下:

在实际使用场景中,PullConsumer 仅推荐在流处理框架中集成使用,大多数消息收发场景使用 PushConsumer 和 SimpleConsumer 就可以满足需求。

若您的业务场景发生变更,或您当前使用的消费者类型不适合当前业务,您可以选择在 PushConsumer 和SimpleConsumer 之间变更消费者类型。变更消费者类型不影响当前Apache RocketMQ 资源的使用和业务处理。

危险

生产环境中相同的 ConsumerGroup 下严禁混用 PullConsumer 和其他两种消费者,否则会导致消息消费异常。

RocketMQ教程-(5)-功能特性-消费者分类_监听器_02

PushConsumer

PushConsumers是一种高度封装的消费者类型,消费消息仅通过消费监听器处理业务并返回消费结果。消息的获取、消费状态提交以及消费重试都通过 Apache RocketMQ 的客户端SDK完成。

使用方式

PushConsumer的使用方式比较固定,在消费者初始化时注册一个消费监听器,并在消费监听器内部实现消息处理逻辑。由 Apache RocketMQ 的SDK在后台完成消息获取、触发监听器调用以及进行消息重试处理。

示例代码如下:

// 消费示例:使用PushConsumer消费普通消息。
ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "YourTopic";
FilterExpression filterExpression = new FilterExpression("YourFilterTag", FilterExpressionType.TAG);
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
    // 设置消费者分组。
    .setConsumerGroup("YourConsumerGroup")
    // 设置接入点。
    .setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("YourEndpoint").build())
    // 设置预绑定的订阅关系。
    .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
    // 设置消费监听器。
    .setMessageListener(new MessageListener() {
        @Override
        public ConsumeResult consume(MessageView messageView) {
            // 消费消息并返回处理结果。
            return ConsumeResult.SUCCESS;
        }
    })
    .build();

PushConsumer的消费监听器执行结果分为以下三种情况:

  • 返回消费成功:以Java SDK为例,返回ConsumeResult.SUCCESS,表示该消息处理成功,服务端按照消费结果更新消费进度。
  • 返回消费失败:以Java SDK为例,返回ConsumeResult.FAILURE,表示该消息处理失败,需要根据消费重试逻辑判断是否进行重试消费。
  • 出现非预期失败:例如抛异常等行为,该结果按照消费失败处理,需要根据消费重试逻辑判断是否进行重试消费。

PushConsumer 消费消息时,若消息处理逻辑出现预期之外的阻塞导致消息处理一直无法执行成功,SDK会按照消费超时处理强制提交消费失败结果,并按照消费重试逻辑进行处理。消息超时,请参见PushConsumer消费重试策略

出现消费超时情况时,SDK虽然提交消费失败结果,但是当前消费线程可能仍然无法响应中断,还会继续处理消息。

内部原理

在PushConsumer类型中,消息的实时处理能力是基于SDK内部的典型Reactor线程模型实现的。如下图所示,SDK内置了一个长轮询线程,先将消息异步拉取到SDK内置的缓存队列中,再分别提交到消费线程中,触发监听器执行本地消费逻辑。

RocketMQ教程-(5)-功能特性-消费者分类_消息处理_03

 可靠性重试

PushConsumer 消费者类型中,客户端SDK和消费逻辑的唯一边界是消费监听器接口。客户端SDK严格按照监听器的返回结果判断消息是否消费成功,并做可靠性重试。所有消息必须以同步方式进行消费处理,并在监听器接口结束时返回调用结果,不允许再做异步化分发。消息重试具体信息,请参见PushConsumer消费重试策略

使用PushConsumer消费者消费时,不允许使用以下方式处理消息,否则 Apache RocketMQ 无法保证消息的可靠性。

  • 错误方式一:消息还未处理完成,就提前返回消费成功结果。此时如果消息消费失败,Apache RocketMQ 服务端是无法感知的,因此不会进行消费重试。
  • 错误方式二:在消费监听器内将消息再次分发到自定义的其他线程,消费监听器提前返回消费结果。此时如果消息消费失败,Apache RocketMQ 服务端同样无法感知,因此也不会进行消费重试。

顺序性保障

基于 Apache RocketMQ 顺序消息的定义,如果消费者分组设置了顺序消费模式,则PushConsumer在触发消费监听器时,严格遵循消息的先后顺序。业务处理逻辑无感知即可保证消息的消费顺序。

适用场景

PushConsumer严格限制了消息同步处理及每条消息的处理超时时间,适用于以下场景:

  • 消息处理时间可预估:如果不确定消息处理耗时,经常有预期之外的长时间耗时的消息,PushConsumer的可靠性保证会频繁触发消息重试机制造成大量重复消息。
  • 无异步化、高级定制场景:PushConsumer限制了消费逻辑的线程模型,由客户端SDK内部按最大吞吐量触发消息处理。该模型开发逻辑简单,但是不允许使用异步化和自定义处理流程。

SimpleConsumer

SimpleConsumer 是一种接口原子型的消费者类型,消息的获取、消费状态提交以及消费重试都是通过消费者业务逻辑主动发起调用完成

使用方式

SimpleConsumer 的使用涉及多个接口调用,由业务逻辑按需调用接口获取消息,然后分发给业务线程处理消息,最后按照处理的结果调用提交接口,返回服务端当前消息的处理结果。示例如下:

// 消费示例:使用 SimpleConsumer 消费普通消息,主动获取消息处理并提交。 
ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "YourTopic";
FilterExpression filterExpression = new FilterExpression("YourFilterTag", FilterExpressionType.TAG);
SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder()
        // 设置消费者分组。
        .setConsumerGroup("YourConsumerGroup")
        // 设置接入点。
        .setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("YourEndpoint").build())
        // 设置预绑定的订阅关系。
        .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
        // 设置从服务端接受消息的最大等待时间
        .setAwaitDuration(Duration.ofSeconds(1))
        .build();
try {
    // SimpleConsumer 需要主动获取消息,并处理。
    List<MessageView> messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
    messageViewList.forEach(messageView -> {
        System.out.println(messageView);
        // 消费处理完成后,需要主动调用 ACK 提交消费结果。
        try {
            simpleConsumer.ack(messageView);
        } catch (ClientException e) {
            logger.error("Failed to ack message, messageId={}", messageView.getMessageId(), e);
        }
    });
} catch (ClientException e) {
    // 如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
    logger.error("Failed to receive message", e);
}

SimpleConsumer主要涉及以下几个接口行为:

RocketMQ教程-(5)-功能特性-消费者分类_rocketmq_04

 可靠性重试

SimpleConsumer消费者类型中,客户端SDK和服务端通过ReceiveMessageAckMessage接口通信。客户端SDK如果处理消息成功则调用AckMessage接口;如果处理失败只需要不回复ACK响应,即可在定义的消费不可见时间到达后触发消费重试流程。更多信息,请参见SimpleConsumer消费重试策略

顺序性保障

基于 Apache RocketMQ 顺序消息的定义,SimpleConsumer在处理顺序消息时,会按照消息存储的先后顺序获取消息。即需要保持顺序的一组消息中,如果前面的消息未处理完成,则无法获取到后面的消息。

适用场景

SimpleConsumer提供原子接口,用于消息获取和提交消费结果,相对于PushConsumer方式更加灵活。SimpleConsumer适用于以下场景:

  • 消息处理时长不可控:如果消息处理时长无法预估,经常有长时间耗时的消息处理情况。建议使用SimpleConsumer消费类型,可以在消费时自定义消息的预估处理时长,若实际业务中预估的消息处理时长不符合预期,也可以通过接口提前修改。
  • 需要异步化、批量消费等高级定制场景:SimpleConsumer在SDK内部没有复杂的线程封装,完全由业务逻辑自由定制,可以实现异步分发、批量消费等高级定制场景。
  • 需要自定义消费速率:SimpleConsumer是由业务逻辑主动调用接口获取消息,因此可以自由调整获取消息的频率,自定义控制消费速率。

PullConsumer

使用建议

PushConsumer合理控制消费耗时,避免无限阻塞

对于PushConsumer消费类型,需要严格控制消息的消费耗时,尽量避免出现消息处理超时导致消息重复。如果业务经常会出现一些预期外的长时间耗时的消息,建议使用SimpleConsumer,并设置好消费不可见时间。

标签:教程,消费,处理,特性,重试,PushConsumer,消息,SimpleConsumer,RocketMQ
From: https://blog.51cto.com/ratelcloud/7452260

相关文章

  • RocketMQ-(8-1)-EventBridge-EventBridge 核心概念
    RocketMQEventBridge核心概念理解EventBridge中的核心概念,能帮助我们更好的分析和使用EventBridge。本文重点介绍下EventBridge中包含的术语:EventSource:事件源。用于管理发送到EventBridge的事件,所有发送到EventBridge中的事件都必须标注事件源名称信息,对应CloudEvent事件体中的s......
  • RocketMQ-(9-1)-MQTT-EventBridge概述
    RocketMQMQTT概览传统的消息队列MQ主要应用于服务(端)之间的消息通信,比如电商领域的交易消息、支付消息、物流消息等等。然而在消息这个大类下,还有一个非常重要且常见的消息领域,即IoT类终端设备消息。近些年,我们看到随着智能家居、工业互联而兴起的面向IoT设备类的消息正在呈爆炸式......
  • RocketMQ教程-(6-5)-运维部署-Promethus Exporter
    介绍Rocketmq-exporter 是用于监控RocketMQbroker端和客户端所有相关指标的系统,通过 mqAdmin 从broker端获取指标值后封装成87个cache。警告过去版本曾是87个concurrentHashMap,由于Map不会删除过期指标,所以一旦有label变动就会生成一个新的指标,旧的无用指标无法......
  • VUE2教程-基础-Class 与 Style 绑定
    Class与Style绑定操作元素的class列表和内联样式是数据绑定的一个常见需求。因为它们都是attribute,所以我们可以用 v-bind 处理它们:只需要通过表达式计算出字符串结果即可。不过,字符串拼接麻烦且易错。因此,在将 v-bind 用于 class 和 style 时,Vue.js做了专门的增强......
  • RocketMQ教程-(5)-功能特性-事务消息
    事务消息为ApacheRocketMQ中的高级特性消息,本文为您介绍事务消息的应用场景、功能原理、使用限制、使用方法和使用建议。事务消息为ApacheRocketMQ中的高级特性消息,本文为您介绍事务消息的应用场景、功能原理、使用限制、使用方法和使用建议。以电商交易场景为例,用户支付订单......
  • RocketMQ教程-(5)-功能特性-顺序消息
    顺序消息为ApacheRocketMQ中的高级特性消息,本文为您介绍顺序消息的应用场景、功能原理、使用限制、使用方法和使用建议。应用场景在有序事件处理、撮合交易、数据实时增量同步等场景下,异构系统间需要维持强一致的状态同步,上游的事件变更需要按照顺序传递到下游进行处理。在这类......
  • RocketMQ教程-(4)-领域模型-消费者(Consumer)
    本文介绍ApacheRocketMQ中消费者(Consumer)的定义、模型关系、内部属性、行为约束、版本兼容性及使用建议。定义消费者是ApacheRocketMQ中用来接收并处理消息的运行实体。消费者通常被集成在业务系统中,从ApacheRocketMQ服务端获取消息,并将消息转化成业务可理解的信息,供业务......
  • RocketMQ教程-(4)-领域模型概述
    ApacheRocketMQ是一款典型的分布式架构下的中间件产品,使用异步通信方式和发布订阅的消息传输模型。通信方式和传输模型的具体说明,请参见下文通信方式介绍和消息传输模型介绍。ApacheRocketMQ产品具备异步通信的优势,系统拓扑简单、上下游耦合较弱,主要应用于异步解耦,流量削峰填......
  • RocketMQ教程-安装和配置
    Linux系统安装配置64位操作系统,推荐Linux/Unix/macOS64位JDK1.8+Maven3.0yum安装jdk8yum安装maven1.下载安装ApacheRocketMQRocketMQ的安装包分为两种,二进制包和源码包。点击这里 下载ApacheRocketMQ5.1.3的源码包。你也可以从这里 下载到二进制包。二进制包是已......
  • RocketMQ教程-(5)-功能特性-消息发送重试和流控机制
    本文为您介绍ApacheRocketMQ的消息发送重试机制和消息流控机制。背景信息消息发送重试ApacheRocketMQ的消息发送重试机制主要为您解答如下问题:部分节点异常是否影响消息发送?请求重试是否会阻塞业务调用?请求重试会带来什么不足?消息流控ApacheRocketMQ的流控机制主要为您解答......