首页 > 其他分享 >RocketMQ——快速入门

RocketMQ——快速入门

时间:2024-01-14 11:22:41浏览次数:19  
标签:消费 入门 MessageQueue 队列 Topic 消息 快速 RocketMQ

RocketMQ 架构设计

消息队列实现了消息投放和消息消费间的解耦,实现了异步处理消息的功能。RocketMQ 作为消息中间件,在其存储消息的结构上实现了消息均衡投放、消息容灾、高可用(Dledger 主从切换)、自动故障转移特点。

先引入以下几个概念:

  • Broker:实际存储消息的节点,接收来自生产者的消息,提供给消费者消费;一般以一个集群的形式提供服务,防止单点故障;
  • Topic:一类或一大类消息的集合,比如订单相关的消息包括(订单已下单、订单已支付等等....);
  • MessageQueue:消息队列对 Topic 进一步划分,将同一个 Topic 的消息均匀分布在多个消息队列中,其中每个消息队列存在于一个 Broker 上,这样保证了出现故障时不会丢失整个 Topic 消息,并且均衡分布防止了消息数据倾斜的问题。
  • NameServer:存储了整个系统的元数据,就想 Kafka 会采用 Zookeeper 来存储管理集群元数据一样。NameServer 主要存储了 (1)集群中有哪些Topic;(2)这些Topic的 MessageQueue 存放在哪些 Broker上;(3)集群中有哪些活跃的 Broker;NameServer 为了应对单点故障,也是以集群的形式对外提供服务。

为了保证数据的安全完整性以及服务高可用,需要对数据做多备份,因此提供了 Master BrokerSlave Broker 主从节点,主从之间会定期进行数据同步。但是如果 Master 出现宕机,如何重新选主成为一个问题,RocketMQ 4.5之后采用 Dledger 实现高可用集群,类似于 Redis 集群的 Sentinel 能够进行自动的故障转移操作。Dledger 底层使用 Raft 协议算法重新选举新的 Master 对外提供服务。


RocketMQ 特点

1、顺序消费

1.1 分区有序

一个 Topic 对应多个 MessageQueue,那么在一个 MessageQueue 中的消息就是保持有序的,也叫做分区有序。

例如上图一个 Topic 包括了 3个 MessageQueue,一共四条消息投递到了不同的消息队列中,那么 MessageQueue-0 内部的 Message1、Message4 就可以认为是分区有序的。但是需要注意,消费者从消息队列中取数据消费是可以并行消费多个 MessageQueue 中消息的。例如这个例子,第一个消费到的消息不一定是 Message-1。

1.2 全局有序

那么如果现在有这样一组消息:创建订单、更新订单、完成订单。各个消息需要保证按照顺序消费,此时就需要全局有序的概念了。要做到全局有序,只能一个 Topic 分配一个 MessageQueue,这样就变成单分区下的有序消费,全局有序模式下不支持并行消费。

2、消息过滤

一个消息队列中存储了多个 Topic 相关的消息,现在有需求需要消费者只消费某一个 Topic 的消息,如何实现?

  • 业务代码实现,消费者获取 MessageQueue 中所有消息,然后消费时判断 Topic;
  • RocketMQ 给每个消息打上一个标签,消费者在消费时只需要订阅标签为特定 Topic 的消息,消费者就只会拉取到相关 Topic 的消息。

3、事务消息

这里的事务类似于我们熟知的 Mysql 事务,即一组操作要么全部成功,要么全部失败。

RocketMQ 所定义的事务是指把消息发送到那个服务的本地事务和将消息投递到 MQ 这个操作组成一个事务,它涉及到两个操作:(1)本地事务运行且成功;(2)消息投递到MQ成功。

例如上图我们将订单数据持久化到数据库 和 消息投递到MQ 两个操作作为一个事务。

4、延迟队列

RocketMQ 支持的延时队列只有 18个选项,分别是 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

实际上可以组合这些时间实现任意的等待时间,可以利用时间轮算法,参考 Dubbo 的时间轮算法实现。

5、死信队列

死信队列用于处理那些无法被正常消费的消息,这些消息超过了最大重试消费次数(16),会被存放到死信队列中,实际上也是存储在一个 Topic 中,所有死信队列都会带上 %DLQ% 的前缀,然后跟上消费者组的名称,并且这个 Topic 对消费者来说是不可见的。

死信队列中的消息最多只能存放 3 天,并且可以通过 Dashboard 实现对这些消息的重放。


RocketMQ——Message

Message 作为消息的载体,其主要包括以下属性:

  • Topic:消息主题,消息类型;
  • Flag
  • Properties:消息属性,内部包含消息的标签 Tag;
  • body:消息体内容;
  • TransactionId:事务ID;

标签:消费,入门,MessageQueue,队列,Topic,消息,快速,RocketMQ
From: https://www.cnblogs.com/istitches/p/17963479

相关文章

  • 极客时间之Serverless入门
    开篇词|说来说去,到底Serverless要解决什么问题?问题一:说来说去,到底Serverless要解决什么问题?拿自己部署一套博客来说吧,常见的Node.jsMVC架构,需要购买云服务商的Linux虚拟机、RDS关系型数据库,做得好的话还要购买Redis缓存、负载均衡、CDN等等。再专业一点,可能还会考虑......
  • VUE快速改造前端高级搜索
    更新说明:支持本地存储,默认1天,7天支持当字段少于2个,自动隐藏“展开”按钮增加时间组件、时间范围组件简化组件代码不影响自动代码生成的搜索条件,不影响其他代码逻辑截图步骤如果部分代码,无法复制,可以参考:src/views/purchase/cgreturn/index.vue每个列表Index界面,都有搜索,将<a-form>(......
  • P9007 [入门赛 #9] 最澄澈的空与海 (Hard Version) 题解
    Updon2023.10.1408:21:修改了推式子和题意的一些小错误。前言一道恐怖的绿题。显然我认为应该是蓝题。(不过在这篇题解写到一半的时候升蓝了,感谢@StudyingFather。)名字挺好的。题意给定\(n\),求出满足以下条件的三元组\((x,y,z)\)的组数:\(x\ge0,z\ge1\)。\(......
  • RUST web框架axum快速入门教程6之测试
    本文主要讨论axum的测试,axum对于测试的支持还是比较完善的,我们可以测试状态码,HTTP头信息,响应体等内容,因为框架实现的原因,其实axum很依赖tower。往期文章:https://youerning.top/post/axum/quickstart-1https://youerning.top/post/axum/quickstart-2https://youerning.top/pos......
  • ★教程4:FPGA/MATLAB/Simulink联合应用开发入门与进阶X例——前言★教程3:simulink学
        专业即算法,算法即数学,数学即万物。从事MATLAB算法仿真工作15年,从事FPGA系统开发工作12多年。擅长解决各种算法仿真、建模、通信、图像处理、AI、智能控制等。 1.无线基带,无线图传,编解码2.机器视觉,图像处理,三维重建3.人工智能,深度学习4.智能控制,智能优化目录1.FPG......
  • ★教程4:FPGA/MATLAB/Simulink联合应用开发入门与进阶X例——目录
    1.订阅本教程用户可以免费获得本博任意1个博文对应代码;2.本课程的所有案例(部分理论知识点除外)均由博主编写而成,供有兴趣的朋友们自己订阅学习使用。未经本人允许,禁止任何形式的商业用途;3.本课程我们更侧重于各种实例的完整设计介绍。更全面的介绍FPGA,MATLAB,Simulink的联合开发应......
  • Python逆向爬虫入门教程: 酷狗音乐加密参数signature逆向解析
    数据来源分析......
  • 如何在SAP GUI中快速执行新的事务代码
    当我们成功登录SAP的某个连接后,在SAPGUI起始页(SAP轻松访问),我们可以通过点击【收藏夹】或者在界面左上角的输入框输入对应的事务代码,直接进入对应事务的界面。但是下面列举的场景,你是否知道如何快速应对? 场景一:当前已处于某个事务代码中,如何进入新的事务代码?场景二:当前SA......
  • ACCESS 快速构建修改数据的窗体
    有个客户表: 现在需要创建一个可以修改客户数据的窗体,我们一般的做法是:1.选中数据表:客户列表2.在菜单中选择"创建"-->"窗体". 3.调整一下格式,和添加一下按钮,就变成了这样,初始状态下,它会自动绑定字段值.此时如果用户对某个值做了修改,会直接修改数据表中的值,......
  • 路由--基础入门
    VueRouter,是一个用于Vue.js的官方路由管理器。它和Vue.js核心深度集成,使得构建单页面应用(SPA)变得简单。在单页面应用中,页面不会重新加载整个页面,而是通过动态加载和替换页面的某些部分来实现与用户的交互 routerindex.js//1.定义路由组件.import{createRoute......