首页 > 其他分享 >使用RocketMQ实现消息顺序消费

使用RocketMQ实现消息顺序消费

时间:2023-12-19 12:32:56浏览次数:40  
标签:顺序 队列 Broker 消费 消息 NameServer consumer RocketMQ

消息的顺序消费在很多交易型的业务场景中都会被要求实现,而且,消息队列的顺序消费解决方案在很多互联网公司的面试中经常会被问到。

索尔老师在使用了多个消息队列后发现,虽然每个消息队列都有各自的顺序消费解决方案,但是RocketMQ经过了多年电商的洗礼,其功能性的要求,已经设计的非常全面。这样的全面可以通过RocketMQ消息模型的架构设计得以体现。我们看看RocketMQ是怎么解决消息的顺序消费。

一、RocketMQ的消息模型

1.技术架构

使用RocketMQ实现消息顺序消费_长连接

RocketMQ架构上主要分为四部分,如上图所示:

NameServer:NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以动态感知Broker的路由的信息。

Producer:消息的发布者,支持分布式集群方式部署。消息发布者通过消息队列的负载均衡模块选择相应的Broker集群进行消息投递,投递的过程低延迟并且支持快速失败。

Consumer:消息的消费者,支持分布式集群方式部署。RocketMQ同时支持push推和pull拉的两种消息消费模式。消息消费者也支持集群和广播两种消费方式,同时提供了实时消息的订阅机制,满足大多数用户的需求。

BrokerServer:Broker主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker包含了以下几个重要子模块。

  • Remoting Module:整个Broker的实体,负责处理来自clients端的请求。
  • Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息。
  • Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
  • HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。
  • Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。

使用RocketMQ实现消息顺序消费_apache_02

2. 部署架构

使用RocketMQ实现消息顺序消费_长连接_03

RocketMQ 网络部署特点

  • NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
  • Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。注意:当前RocketMQ版本在部署架构上支持一Master多Slave,但只有BrokerId=1的从服务器才会参与消息的读负载。
  • Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送信息。Producer完全无状态,可集群部署。
  • Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送信息。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,消费者在向Master拉取消息时,Master服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读I/O),以及从服务器是否可读等因素建议下一次是从Master还是Slave拉取。

结合部署架构图,描述集群工作流程:

  • 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
  • Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
  • 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
  • Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
  • Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。

二、顺序消息的应用场景

顺序消费的应用场景有很多,无论是在面试,或是实际生产环境中要解决顺序消费问题,都需要对消息队列的应用场景有一定的理解。

1.AIOT中消息的顺序消费

比如有这么一个物联网的应用场景,IOT中的设备在初始化时需要按顺序接收这样的消息:

  • 操作1:设置设备名称
  • 操作2:设置设备的网络
  • 操作3:重启设备使配置生效

如果这个顺序颠倒了,可能就没有办法让设备的配置生效,因为只有重启设备才能让配置生效,但重启的消息却在设置设备消息之前被消费。

2.用户服务中的顺序消费

应用为了提高用户粘合度,往往通过积分制度提供用户活跃度。比如接下来有这一系列的场景:

  • 操作1:新注册用户,将用户积分设置为10分。
  • 操作2:奖励行为,比如用户完善了个人信息,则积分+5分。
  • 操作3:惩罚行为,比如用户发表违规言论,则积分-3分。

如果上述的操作是顺序进行的,则用户积分为12分。如果上述操作的步骤顺序发生了变化,比如消费者先消费了操作2的消息,再消费操作1和3的消息,则分数变为7分,此时就出现了因为乱序消费导致的错误结果。

三、如何实现顺序消息

顺序消息要求消费者消费消息的顺序按照发送者发送消息的顺序执行。RocketMQ中实现的消息顺序有两个维度,分别是局部有序和全局有序。

1.局部有序

使用RocketMQ实现消息顺序消费_长连接_04

局部消息指的是消费者消费某个topic的某个队列中的消息是顺序的。在图上的八个队列中,消费者可以随机的消费这8个队列,也就是说消费者不能保证消费队列的顺序。但是消费者在消费某一个队列的时候,一定可以进行顺序消费。也就是说,在消费者两次访问一个队列中的消息时,一定是按照从左到右的顺序依次消费消息。所以针对于某一个队列来讲,消费是顺序的。但如果把问题定位在整个队列中时,不同的消息在不同的队列中,又不能保证消息的有序性,不如消息A到了队列2,消息B到了队列1,消费者先消费了队列1再消费队列2,就不能保证有序性。但如果消息C随着消息B进入队列1,那么消息C一定是在消息B之后被消费。

消费者使用MessageListenerOrderly类做消息监听,实现局部顺序。

package com.qf.producer.order;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
/**
 * 顺序消息
 * @author Thor
 * @公众号 Java架构栈
 */
public class OrderConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("OrderTopicTest", "*");
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
                                                       ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                for(MessageExt msg:msgs){
                    System.out.println("消息内容:"+new String(msg.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

2.全局有序

消费者消费全部消息都是顺序的,只能通过一个某个topic只有一个队列才能实现,这种应用场景较少,且性能较差。

使用RocketMQ实现消息顺序消费_应用场景_05

3.乱序消费

消费者消费消息不需要关注消息的顺序。消费者使用MessageListenerConcurrently类做消息监听。

package com.qf.producer.order;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
/**
 * 顺序消息
 * @author Thor
 * @公众号 Java架构栈
 */
public class OrderConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("OrderTopicTest", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for(MessageExt msg:msgs){
                    System.out.println("消息内容:"+new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}


标签:顺序,队列,Broker,消费,消息,NameServer,consumer,RocketMQ
From: https://blog.51cto.com/u_15739596/8886592

相关文章

  • M1 Docker 部署rocketmq
    1、克隆镜像arm64镜像代码编译镜像,docker直接安装会报错所以选择编译gitclonehttps://github.com/apache/rocketmq-docker.git  cdrocketmq-docker #注意这里centos固定不要改镜像版本可以改shbuild-image.sh4.8.0centos 第二步:拉取mqconsole镜像: candice06......
  • 一文讲透消息队列RocketMQ实现消费幂等
    这篇文章,我们聊聊消息队列中非常重要的最佳实践之一:消费幂等。1基础概念消费幂等是指:当出现RocketMQ消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响。例如,在支付场景下,消费者消费扣款消息,对一笔订单......
  • 删除序列相同元素并保持顺序
    问题怎样在一个序列上面保持元素顺序的同时消除重复的值?解决方案如果序列上的值都是hashable类型,那么可以很简单的利用集合或者生成器来解决这个问题。比如:defdedupe(items):seen=set()foriteminitems:ifitemnotinseen:yielditemseen.add(item) 下面是使......
  • Android深入学习之Activity与Fragment之间回调函数的调用顺序
    本文使用的例子是用WelcomeActivity托管WelcomeFragment。先来看Log。1.WelcomeActivityWelcomeActivitycreated!2.WelcomeActivityonCreate2.1.WelcomeFragmentWelcomeFragmentcreated!2.2.FragmentManagerCommit:BackStackEntry{cc......
  • 解锁RocketMQ秘籍:如何保障消息顺序性?
    嗨,小伙伴们!小米在这里啦!今天我们要聊的话题是社招面试中一个经典而又百思不得其解的问题——“RocketMQ如何保证顺序性?”不用担心,小米来给你揭秘RocketMQ的秘密武器,让你轻松过关面试大关!引言:为什么要谈顺序性?首先,我们得明白为什么在消息队列中要讲究消息的顺序性。假设你正在开发一......
  • 关于java的多态方法调用顺序的问题
    使用父类类型的引用指向子类的对象,该引用调用的师父类中定义的方法和变量,变量不能被重写(覆盖);如果子类中重写了父类中的一个方法,那么在调用这个方法的时候,将会调用子类中的这个方法;注意特殊情况,如果该父类引用所调用的方法参数列表未定义,就调用该父类的父类中查找,如果还没找到就强......
  • CompletableFuture + LinkedBlockingDeque 实现生产者消费者案例
    设计要求:1.设计一个生产者生产,消费者消费场景;2.使用线程池 CompletableFuture+队列LinkedBlockingDeque实现;3.生产者生产的数据存储到长度为5的LinkedBlockingDeque队列,消费者消费从LinkedBlockingDeque队列中取数据;4.生产者和消费者均是多线程且不知道谁快谁慢,互......
  • 顺序表
    1.顺序表概念什么是顺序表?顺序表是一种新的数据类型,它使用一段物理地址连续的存储单元依次存储数据元素(数组实现),并具有操作(增删查改)这个数组的方法数组也是使用连续的地址空间存储数据,那么数组和顺序表有什么区别?数组是一个连续地址依次存储数据的简单结构,而顺序表......
  • Golang实现简易的顺序执行协程池
    countable_executor.go//一个可计数的单线程顺序任务执行器typeCountableExecutorstruct{namestring//名称taskQueuechaniCountableTask//任务队列bufferSizeint//缓冲区大小}//一个可计数的单线程任务......
  • 1.5万字 + 25张图盘点RocketMQ 11种消息类型,你知道几种?
    大家好,我是三友~~故事的开头是这样的最近有个兄弟私信了我一张截图我一看截图内容,好家伙,原来是我一年多前立的flag倒不是我忘了这件事,我后来也的确写了一篇的关于RocketMQ运行的原理的文章只不过这篇文章是从上帝的视角去看待RocektMQ一条消息整个生命周期的过程所以就没有......