首页 > 其他分享 >十二、消费者类型

十二、消费者类型

时间:2023-07-22 20:11:38浏览次数:29  
标签:消费 消费者 defaultLitePullConsumer 十二 拉取 消息 监听器 DefaultLitePullConsumer 类型

消费者概览

Apache RocketMQ 4.x 支持 PushConsumer 、 PullConsumer 这两种类型的消费者。DefaultMQPushConsumer只需要设置MessageListener,获取消息,消息并发等都有SDK处理。DefaultMQPullConsumer需要用户自己拉取消息,并维护消费进度,同时并发消费消息都由用户控制,比较灵活。

 

RocketMQ 4.x还提供了另一种PullConsumer,是Lite Pull Consumer,它提供了Subscribe和Assign两种方式,使用起来更加方便。

Subscribe模式示例如下:

@Configuration
@Slf4j
public class ConsumerConfig {

    @Value("${rocketmq.namesrv}")
    private String namesrv;

    String topic = "MyTopic";

    @Bean
    public DefaultLitePullConsumer litePullConsumer() throws MQClientException, InterruptedException {
        DefaultLitePullConsumer defaultLitePullConsumer = new DefaultLitePullConsumer("my-lite-consumer");

        defaultLitePullConsumer.setNamesrvAddr(namesrv);

        defaultLitePullConsumer.subscribe(topic,"*");

        defaultLitePullConsumer.setPullBatchSize(20);

        defaultLitePullConsumer.start();

        while (true) {
            List<MessageExt> messageExts = defaultLitePullConsumer.poll(300);
            if (!CollectionUtils.isEmpty(messageExts)) {
                log. info("消费消息:{}", messageExts.stream().map(msg -> new String(msg.getBody(), StandardCharsets.UTF_8)).collect(Collectors.toList()));
            } else {
                TimeUnit.MILLISECONDS.sleep(300);
            }
        }

    }
}

首先还是初始化DefaultLitePullConsumer并设置ConsumerGroupName,调用subscribe方法订阅topic并启动。与Push Consumer不同的是,LitePullConsumer拉取消息调用的是轮询poll接口,如果能拉取到消息则返回对应的消息列表,否则返回null。通过setPullBatchSize可以设置每一次拉取的最大消息数量,此外如果不额外设置,LitePullConsumer默认是自动提交位点在subscribe模式下,同一个消费组下的多个LitePullConsumer会负载均衡消费,与PushConsumer一致

 

再来看Assign模式:

@Bean
public DefaultLitePullConsumer litePullConsumer() throws MQClientException, InterruptedException {
        DefaultLitePullConsumer defaultLitePullConsumer = new DefaultLitePullConsumer("my-lite-consumer");

        defaultLitePullConsumer.setNamesrvAddr(namesrv);

        defaultLitePullConsumer.setAutoCommit(false);
        defaultLitePullConsumer.start();
        Collection<MessageQueue> messageQueues = defaultLitePullConsumer.fetchMessageQueues(topic);
        List<MessageQueue> list = new ArrayList<>(messageQueues);
        List<MessageQueue> messageQueueList = new ArrayList<>(messageQueues.size() / 2);

        for (int i = 0; i < list.size() / 2; i++) {
            messageQueueList.add(list.get(i));
        }

        defaultLitePullConsumer.assign(messageQueueList);

        defaultLitePullConsumer.seek(messageQueueList.get(0), 10);
        

        while (true) {
            List<MessageExt> messageExts = defaultLitePullConsumer.poll(300);
            if (!CollectionUtils.isEmpty(messageExts)) {
                log. info("消费消息:{}", messageExts.stream().map(msg -> new String(msg.getBody(), StandardCharsets.UTF_8)).collect(Collectors.toList()));
                defaultLitePullConsumer.commitSync();
            } else {
                TimeUnit.MILLISECONDS.sleep(300);
            }
        }

    }
}

Assign模式一开始仍然是初始化DefaultLitePullConsumer,这里我们采用手动提交位点的方式,因此设置AutoCommit为false,然后启动consumer。与Subscribe模式不同的是,Assign模式下没有自动的负载均衡机制,需要用户自行指定需要拉取的队列,因此在例子中,先用fetchMessageQueues获取了Topic下的队列,再取前面的一半队列进行拉取,示例中还调用了seek方法,将第一个队列拉取的位点设置从10开始。紧接着进入循环不停地调用poll方法拉取消息,拉取到消息后调用commitSync方法手动提交位点。

 

接下来看Apache RocketMQ 5.0版本的消费者,并做比较。

Apache RocketMQ 支持 PushConsumer 、 SimpleConsumer 以及 PullConsumer 这三种类型的消费者。

对比项 PushConsumer SimpleConsumer PullConsumer
接口方式 使用监听器回调接口返回消费结果,消费者仅允许在监听器范围内处理消费逻辑。 业务方自行实现消息处理,并主动调用接口返回消费结果。 业务方自行按队列拉取消息,并可选择性地提交消费结果
消费并发度管理 由SDK管理消费并发度。 由业务方消费逻辑自行管理消费线程。 由业务方消费逻辑自行管理消费线程。
负载均衡粒度 5.0 SDK是消息粒度,更均衡,早期版本是队列维度 消息粒度,更均衡 队列粒度,吞吐攒批性能更好,但容易不均衡
接口灵活度 高度封装,不够灵活。 原子接口,可灵活自定义。 原子接口,可灵活自定义。
适用场景 适用于无自定义流程的业务消息开发场景。 适用于需要高度自定义业务流程的业务开发场景。 仅推荐在流处理框架场景下集成使用

PushConsumer内部原理

在PushConsumer类型中,消息的实时处理能力是基于SDK内部的典型Reactor线程模型实现的。如下图所示,SDK内置了一个长轮询线程,先将消息异步拉取到SDK内置的缓存队列中,再分别提交到消费线程中,触发监听器执行本地消费逻辑。

PushConsumer 消费者类型中,客户端SDK和消费逻辑的唯一边界是消费监听器接口。客户端SDK严格按照监听器的返回结果判断消息是否消费成功,并做可靠性重试。所有消息必须以同步方式进行消费处理,并在监听器接口结束时返回调用结果,不允许再做异步化分发。如果消费者分组设置了顺序消费模式,则PushConsumer在触发消费监听器时,严格遵循消息的先后顺序。业务处理逻辑无感知即可保证消息的消费顺序。如果业务逻辑自定义实现了异步分发,则Apache RocketMQ 无法保证消息的顺序性。

标签:消费,消费者,defaultLitePullConsumer,十二,拉取,消息,监听器,DefaultLitePullConsumer,类型
From: https://www.cnblogs.com/shigongp/p/17574021.html

相关文章

  • mysql设置时间字段类型
    MySQL设置时间字段类型在MySQL数据库中,时间字段是一种用于存储日期和时间信息的特殊类型。MySQL提供了多种时间字段类型,可以根据不同的需求选择合适的类型。本文将介绍MySQL中常用的时间字段类型,并提供代码示例来演示如何设置时间字段类型。1.DATEDATE类型用于存储日期信息,格式......
  • 元素类型
    1.块级元素 独占一行 可以做容器----盒子 2.内联元素 左右排列  不是盒子----子哦为辅助标签来使用 有很多属性显示不完整  3.行内块级元素  imginput  可以支持宽高 左右排列 也叫行内块级元素都有相应的bug 行内块和行内元素换行都会与有空......
  • JS数据类型
    JavaScript中的数据类型可以分为两类:基本数据类型和引用数据类型。七种基本数据类型类型typeof返回值对象包装器Null"object"N/AUndefined"undefined"N/ABoolean"boolean"BooleanNumber"number"NumberBigInt"bigint"BigIntStrin......
  • redis数据类型及操作命令
    数据类型Redis存储的是key-value结构的数据,其中key是字符串类型,value有5种常用的数据类型:字符串string哈希hash列表list集合set有序集合sortedset/zset解释说明:字符串(string):普通字符串,常用哈希(hash):适合存储对象列表(list):按照插入顺序排序,可以有重复元素......
  • java 声明新的类型
    如何声明新的类型(Java)作为一名经验丰富的开发者,我将向你介绍如何在Java中声明新的类型。这是一个非常基础但又非常重要的概念,对于刚入行的开发者来说尤为重要。下面是一个简单的步骤表格,展示了声明新的类型的过程:步骤描述第一步创建一个新的类第二步添加成员变量(属......
  • kernel源码(二十二)块设备
    操作系统所有设备可分为两类:块设备和字符设备。块设备是一种可以以固定大小的数据块为单位进行寻址和访问的设备,例如硬盘、软盘。字符设备是一种以字符流作为操作对象的设备,不能进行寻址操作,例如打印机、网卡、终端设备。为便于管理,操作系统将这些设备统一的以设备号进行分类。......
  • 什么是虚拟化?六种虚拟化的类型
    虚拟化是一种将计算资源(如服务器、存储、网络等)从物理硬件中抽象出来,以创建虚拟资源的技术。虚拟化可以将一个物理资源分割成多个虚拟资源,每个虚拟资源可以独立运行,并且彼此之间相互隔离,就像是在独立的物理环境中运行一样。虚拟化的主要目标是提高硬件资源的利用率、灵活性和可管理......
  • vue3组合式 API_为 computed() 标注类型
    computed() 会自动从其计算函数的返回值上推导出类型<template><h3>{{doubleCount}}</h3></template><scriptsetuplang="ts">import{ref,computed}from"vue"constcount=ref<number>(100)//推导得到的类型:ComputedRef&l......
  • JAVA中数值类型的类型和类以及使用的选择
    数值类型的分类在JAVA中,数值类型可以分为两大类:基本数据类型和包装类。基本数据类型共有八种,分别是:整型:byte、short、int、long浮点型:float、double字符型:char布尔型:boolean包装类是为了让基本数据类型可以作为对象使用而提供的一种类,它们分别是:Integer:对应int类型Lon......
  • mysql 索引类型 fulltext
    如何实现MySQL索引类型fulltext简介在MySQL中,fulltext是一种特殊的索引类型,它可以提供更高效的全文搜索功能。本文将向你介绍如何使用fulltext索引类型来优化全文搜索的性能。流程图以下是使用fulltext索引类型实现全文搜索的流程图:步骤操作1创建包含ful......