首页 > 其他分享 >RocketMQ

RocketMQ

时间:2023-08-11 16:47:31浏览次数:43  
标签:RocketMQ 消费 消费者 队列 重试 消息 rocketmq

使用的版本是 4.9.4,下载地址:https://rocketmq.apache.org/zh/release-notes/2022/03/04/4.9.4/

Apache RocketMQ 使用的是 异步通信 方式和 发布与订阅 的消息传输模型。

使用 MQ 的好处:

  • 流量削峰:防止流量突然增加导致应用程序挂了;先将请求写入 MQ,然后再从 MQ 中获取数据。
  • 应用解耦:在当前模块中不需要调用其它模块;用户注册的时候需要发短信,用户模块只需要将手机号放到 MQ 中,短信模块消费 MQ 中的数据就行了。
  • 异步处理:和应用解耦类似,只是其它模块处理完后会将结果返回。

NameServer

主要就是用来:

  1. 注册 Broker 并保存 Broker 的集群名称、IP地址、端口号。
  2. 做健康检查,剔除(2m)不可用的 Broker。
  3. 管理 Topic 的路由信息,就是保存了哪些 Topic 在哪些 Broker 上。

NameServer 集群之间的节点不通信。

Broker

主要就是用来:

  1. 持久化消息和主从同步。
  2. 推送消息,推送方式支持 广播模式 和 集群模式。
  3. 过滤消息,推送 满足条件 的消息到指定消费者。
  4. 保证消息顺序性。

发送和接收的基本流程

  1. 生产者先连接上 NameServer,在发送消息的时候会创建 Topic,然后再返回 Topic 关联的 Broker,最后再连接这个 Broker,把消息发送到这个 Broker 里面。
  2. 消费者先连接上 NameServer,通过主题从 NameServer 中获取 Broker,然后再连接这个 Broker,等待消息。

生产者

生产者就是用来生产消息的,send 方法本身支持内部重试,重试逻辑如下:

  • 至多重试 2 次(同步发送为 2 次,异步发送为 0 次)。
  • 如果发送失败,则轮转到下一个 Broker。这个方法的总耗时时间不超过sendMsgTimeout设置的值,默认10s。
  • 如果本身向 broker 发送消息超过 sendMsgTimeout,就不会再重试。

普通消息

普通消息就是普通的一条消息,这个消息会发送到不同的消息队列中,但是不会保证消息被顺序消费。

普通消息有三种发送方式:

  • 同步发送:发出一条消息后,必须等待服务端返回响应,才能发送下一条。

    同步发送-6a8c78dae434afe4fbd970a2836f740c.png
  • 异步发送:就是不需要等待服务端返回响应,就直接发送下一条。

    异步发送-c05e8e1111d99d8b8b4626e419e9f8e5.png
  • 单项发送:只是发送消息,不会处理客户端响应。

    Oneway发送-bc1379bd3b8f382c23ff7abac1e0ed95.png
import org.springframework.messaging.support.MessageBuilder;
Message<String> message = MessageBuilder.withPayload("一个普通消息").build();
// asyncSend: 异步
// syncSend:  同步
rocketMQTemplate.syncSend("greetings-topic:tags", message);

延迟消息

给消息设置一个延迟时间,延迟一段时间后,才将消息投递到消费者。

Message<String> message = MessageBuilder.withPayload("一个普通消息").build();
// 支持的延迟等级:1s 5s 10s 30s 1min 2min 3min 4min 5min 6min 7min 8min 9min 10min 20min 30min 1h 2h
// 这里的等级是 3,也就是延迟 10s。
producer.syncSend("greetings-topic:tags", message, 1000, 3);

批量消息

多条消息同时发送,可以减少 API 和网络调用次数。

batch-241308ac9ed97b3a1fbf0e5e6417f74d.png
List<Message<String>> messageList = new LinkedList<>();
for (int i = 0; i < 10; i++) {
    messageList.add(MessageBuilder.withPayload("一个普通消息:" + (i + 1)).build());
}
SendResult result = rocketMQTemplate.syncSend("greetings-topic", messageList);

这些消息必须是同一个主题,而且这些消息会放到同一个消息队列。

过滤消息

通过 tag 将消息交给指定的消费者组处理。

订阅关系一致#正确例子 中写了消费者的 tag 例子,下面是生产者的例子:

rocketMQTemplate.syncSend("topic:tag", "任意消息");

顺序消息

就是把发送的消息都放到同一个消息队列里面,消费顺序和发送顺序是一样的。

Message<String> message = MessageBuilder.withPayload("一个有序的消息").build();
rocketMQTemplate.syncSendOrderly("greetings-topic", message, "用来选择消息队列的 Key 可以是订单 ID");

消费者不能使用并发模式,需要使用顺序模式:

// consumer.registerMessageListener(new MessageListenerConcurrently()
consumer.registerMessageListener(new MessageListenerOrderly()

假设现在的线程数是 10,一共有 4 个消息队列:

在顺序模式下,它的并发数就是 4,就是一个线程对应一个消息队列,处理完一个消息才会处理对应队列的下一个消息。

在并发模式下,它的并发数就是 10,只要任何一个消息队列中还有消息,线程就会处理这个消息。

消费者

同一个组内的消费者,必须保证 订阅关系一致

RocketMQ.消费者订阅2.png

消费者组有两种消费模式:

  1. 集群模式: 需要指定一个消息分配策略,然后根据这个策略将消息投递给对应的的消费者。

    集群消费模式-7f4462d200247db35ca90bb67df7c9b1.png
    // 下面指定了平均分配策略(轮询策略)。
    // 还有机房优先分配策略、一致性 hash 分配策略等。
    consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely());
    
  2. 广播模式: 直接发送给组内的所有消费者。
    广播消费模式-59abf13c1dfde37423a4b9ac552dc1f3.png

组 A 和组 B 都可以订阅同一个主题 A,这两个组都会收到主题 A 的消息,但是具体怎么分发到消费者,就要看消息模式了。

RocketMQ_消费者订阅.png

订阅关系一致

  • 订阅关系:消费者组名、Topic、Tag。

  • 订阅关系一致:同一个消费者组下的,所有消费者的 Topic、Tag 必须完全一致,如果订阅关系不一致,会导致消费消息紊乱,甚至消息丢失。

正确例子

C1、C2、C3 的订阅关系一致,即 C1、C2、C3 订阅消息的代码必须完全一致,代码示例如下:

@RocketMQMessageListener(topic = "TopicA", selectorExpression = "*")
@RocketMQMessageListener(topic = "TopicB", selectorExpression = "Tag2||Tag3")

错误例子

重平衡(reBalance)

重平衡是针对集群模式的,集群模式也可以叫负载均衡模式。

下面是平均分配算法的重平衡:

一个主题默认有四个队列,当新加入了一个消费者后,两个客户端会平分这四个队列。

消费者扩容1-2409cbfb4077f47f2e473b18eb78656b.jpeg

以此类推当添加了四个消费者后,这四个队列就平均分配给了四个消费者。

消费者扩容2-7d9c1d1dd8caea665a4a74b91f017560.jpeg

当添加第五个消费者后,第五个消费者永远不会收到消息。

消费者扩容3-65293ca6c2a01bf0a186821ba3432417.jpeg

同一个消费者组中的消费者数量,必须小于等于主题队列的数量。

不要随便调整消息队列数量和消费者组内的消费者数量。

消费位点

test_topic 主题有四个消息队列,消费者组 consumer-group 中有两个消费者,分别消费消息队列 0、1 和消息队列 2、3。

批注2023-07-26103855.png
  • 代理者位点:就是消息队列中的最大消息数量。
  • 消费者位点:消费到了这个队列中的第几个数据。
    在同一个消息队列中,如果消息 M2 比 M1 先消费成功,那么消费者位点暂时不会后移,只有 M1 消费成功后才会一起后移。
  • 差值:还有多少数据没有被消费。

集群模式下,消费者点位是由客户端返回给服务端的。

广播模式下,消费位点是由客户端自己保存的。

重复消费

两种情况会导致,消费者重复消费:

  1. 生产者发送了多次,比如程序逻辑问题或者生产者没有收到服务端响应然后进行了重试。
  2. 消费者组发生了重平衡。
    假设集群模式下,C1 正在消费 Q2 队列中的消费位点 2,然后添加了一个消费者 C2,消费者组就会进行重平衡将 Q2 分配给 C2,
    如果这个 C1 没有返回消费成功,那么服务端是不会将消费位点后移的,所以 C2 还是会消费 Q2 队列中的消费位点 2,这样就造成了重复消费。

解决重复消费就是解决幂等性。

解决方法就是:把业务的唯一 ID 添加到数据库中的去重表中,添加失败就是重复消费。

消息重试和死信队列

  • 消息重试:消息消费失败后,也会将消费者位点后移,然后将这个消息投递到重试队列。

    重试队列的主题格式是:%RETRY% + 消费者组名。

  • 死信队列:消息到达重试次数后,会将消息投递到死信队列。

    死信队列的主题格式是:%DLQ% + 消费者组名。

// 指定重试次数。
// 并发模式重试:默认 16 次,每次重试间隔和延迟等级一样,第一次重试间隔就对应延迟等级 3,第二次重试就对应延迟等级 4。
// 顺序模式重试:重试次数是 Integer.MAX_VALUE,重试间隔是立即进行下一次重试。
consumer.setMaxReconsumeTimes(2);

// 在消息的回调方法中可以获取重试次数,可以自己判断到达重试次数后的逻辑,这样可以避免写很多死信消费者。
// msgs.get(0).getReconsumeTimes();

到达重试次数后还没有消费成功,就会发送到死信队列。

消息堆积

消费者消费消息的速度太慢了,导致堆积了很多消息没有消费。

有几个解决方案:

  1. 生产者做限流。
  2. 增加消费者的并发数量。
  3. 添加新的消费者。
  4. 扩容消息队列数量。
    如果要缩容,就要等到队列都被消费完成后才去缩容,不然的话队列中的数据都丢失了。

消息补偿

消息补偿就是为了保证消息不丢失,比如保证下面这两种情况不丢:

  1. 生产者发送消息的时候一直发送失败。
  2. RocketMQ 有没有刷盘的时候,服务器宕机了。

解决方案:

  1. 将 RocketMQ 的刷盘机制改为同步刷盘。
  2. 做主从,消息保存到不同的服务器上。
  3. 启动消息追踪机制。
  4. 开发一个消息补偿器。

下面是消息补偿器的流程:

RocketMQ-消息补偿.png

消息补偿器一天执行两三次就行了,没有必要频繁执行;因为消息能不能成功被消费,和消息重发次数是没有任何关系的,消息重发的再快,消息该消费失败还是会消费失败,所以失败的消息太多了要排查业务。

RocketMQ Connect

用来做实时增量同步,比如捕获 MySQL 的 INSERT、UPDATE、DELETE 变化然后同步到 Redis。

overview-195cf6b6249dc8488e721970527cc533.png

每个 Connector 包含:

  • 一个 Connector 连接器,就是用来定义数据从哪复制到哪。
  • 多个 Task 线程,就是真正干活的,真正复制数据的。
Connector-Task-process-deec60b757a7689d932d86e7cfcadfaa.png

高并发设计

RocketMQ-高并发设计.png
  • DNS 轮询:用来做 nginx 集群。

    同一个域名每次请求的 IP 地址不一样,每个 IP 地址有自己的 nginx。

  • 发送到 MQ:把一些耗时操作单独提取出来,这样接口可以立即返回和增加并发,另外也可以对数据库削峰。

有些时候还需要使用分布式锁或分布式事务。

下面是一些对于程序的优化:

  1. 能异步就一步。
  2. 减少 IO。
  3. 控制锁的力度。
  4. 控制事务的力度。

docker 中使用

sudo wget https://archive.apache.org/dist/rocketmq/4.9.4/rocketmq-all-4.9.4-bin-release.zip

sudo unzip rocketmq-all-4.9.4-bin-release.zip

修改文件
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g"


修改配置文件
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
# 注册到 NameServer 时候的 IP 地址
brokerIP1=IP

sudo apt-get -y install unzip





启动
nohup sh bin/mqnamesrv &

nohup sh bin/mqbroker -n localhost:9876 -c $ROCKETMQ_HOME/conf/broker.conf &

rocketmq-dashboard.jar --server.port=8001 --rocketmq.config.namesrvAddrs=127.0.0.1:9876

jps -l


https://blog.csdn.net/zbj18314469395/article/details/86064849


sudo wget https://download.bell-sw.com/java/8u382+6/bellsoft-jdk8u382+6-linux-aarch64.tar.gz

Spring Boot 中使用

依赖:

<!-- rocketmq client version 4.9.3 -->
<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-spring-boot-starter</artifactId>
  <version>2.2.2</version>
</dependency>

在 application.properties 中添加配置:

# 配置服务地址
# 消费者 只需要配置这个就行了,不用配置别的。
rocketmq.name-server=172.16.70.130:9876;192.168.0.2:9876

############################# 生产者 #############################

# 注意:由数字、字母、下划线、横杠(-)、竖线(|)或百分号组成;不能为空;长度不能超过 255。
rocketmq.producer.group=greetings-producer-group
# 命名空间
rocketmq.producer.namespace=greetings-producer-namespace

# 同步模式下,消息重新发送的最大次数,超过这个次数后会返回失败。
# 默认值:2,即:默认情况下一条消息最多会被投递 3 次;有可能会导致消息重复。
rocketmq.producer.retryTimesWhenSendFailed=2

# 和同步模式的作用一样,只不过这个是在异步模式下有用
rocketmq.producer.retryTimesWhenSendAsyncFailed=2

在 Java 中使用:

// 生产者使用
@Autowired
private RocketMQTemplate rocketMQTemplate;


// 消费者使用
// 下面的 MessageExt 就是 greetings-topic 主题的消息类型。
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(topic = "greetings-topic", consumerGroup = "simple-group")
public class SimpleConsumer implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt message) {}
}

下面是 RocketMQMessageListener 注解的配置:

@RocketMQMessageListener(
  // 命名空间
  namespace = "",
  
  // 必须指定一个消费者分组
  // 消费者组的消费者实例必须订阅完全相同的 Topic
  consumerGroup = "simple-group",
  
  // 主题名称
  topic = "greetings-topic",
  
  // 过滤表达式,只接收满足表达式的消息
  // 也可以写成 TAGA || TAGB
  selectorExpression = "*",

  // 消费模式
  //   ConsumeMode.CONCURRENTLY: 并发处理(默认值)
  //   ConsumeMode.ORDERLY: 顺序处理
  // 注意:ConsumeMode.ORDERLY 不能和 MessageModel.BROADCASTING 一起使用
  consumeMode = ConsumeMode.CONCURRENTLY,
  
  // 消息模式
  // MessageModel.CLUSTERING: 集群模式,  topic 中的一条消息,只会被一个消费组内的 某一个 消费者处理。
  // MessageModel.BROADCASTING: 广播模式,topic 中的一条消息,会被一个消费组内的 所有 消费者处理。
  messageModel = MessageModel.CLUSTERING,

  // 消费线程数
  // consumeMode = ConsumeMode.ORDERLY: 最大值是主题中消息队列的数量,
  //                                    也就是说,消息的最大并发消费数量 = 主题中消息队列的数量
  // consumeMode = ConsumeMode.CONCURRENTLY: 消息的最大并发消费数量
  consumeThreadNumber = 20,
    
  // 消费超时时间(以分钟为单位)
  // 超过这个时间后,就会认为消息消费失败,消费者就会重新发送这条消息到服务端
  consumeTimeout = 15L,

  // 重新投递到服务端的最大次数
  // 消费超时(consumeTimeout)后会重新发送到服务端,到达最大次数后会将消息发送到 DLQ 死信队列
  // 并发模式:-1 表示 16
  // 有序模式:-1 表示 Integer.MAX_VALUE
  // 注意: 广播模式下: 消费失败消息会丢弃
  //      集群模式下: 消费失败的消息需要回传给 Broker
  maxReconsumeTimes = -1,
    
  // 并行模式下的消息重试策略
  // -1: 代表不重试,直接扔到死信队列
  // 0:  代表由 Broker 控制重试频率
  // >0: 代表由客户端控制重试频率
  delayLevelWhenNextConsume = 0,
  // 有序模式下重试
  // 暂停拉取的时间间隔,单位是毫秒
  // 最小值为 10,最大值为 30000
  suspendCurrentQueueTimeMillis = 1000,

  // 投递到服务端的超时时间
  replyTimeout = 3000,

  // 是否启用消息追踪
  enableMsgTrace = false,
  // 消息跟踪的主题名
  customizedTraceTopic = "RMQ_SYS_TRACE_TOPIC",
)

参考资料

rocketmq-dashboard

https://rocketmq.apache.org/zh/docs/4.x/consumer/01concept2

https://rocketmq.apache.org/zh/docs/4.x/bestPractice/07subscribe

标签:RocketMQ,消费,消费者,队列,重试,消息,rocketmq
From: https://www.cnblogs.com/jnyyxz/p/17623260.html

相关文章

  • M1版本Mac通过Docker安装RocketMQ
    前言上一篇我通过docker安装到mac上的rocketmq,三个容器都启动成功,却无法通过ip:port进行访问,考虑到我的Mac版本是M1的原因,这次我们通过官网的项目重新编译来解决这个问题步骤一gitclonehttps://github.com/apache/rocketmq-docker.git 步骤二cdimage-buildshbuild-ima......
  • Centos安装rocketmq
    一、创建文件夹        进入我们创建好的文件夹中: //创建文件加mkdirrockmq//进入文件夹cdrockmq二、下载安装包         下载自己所需的版本,我这里是rocketmq4.4.0 //下载安装包wgethttps://archive.apache.org/dist/rocketmq/4.4.0/rocke......
  • 阿里云中“间“力量!RocketMQ
    视频演示讲解视频讲解直达!已更新!!!视频讲解直达!已更新!!!视频讲解直达!已更新!!!切入正题介绍完活动后,咱们言归正传,首先说说什么是"消息队列"我想很多朋友应该是没有接触过"消息队列",笔者有一个5年开发经验的朋友,期间多次问到过他,他都回答笔者,在其日常开发工作中并没有使用到"消息队列"。......
  • RocketMQ Linux单机测试:简易快速部署指南及Dashboard控制台部署
    目录简介开始下载增加环境变量修改启动文件jvm大小修改rocketmq配置文件启动快速测试关闭Dashboard下载Dashboard已编译jar包网盘下载启动命令可能遇到的问题写在最后简介请注意,本博客仅供初期测试时快速部署之用,以节省时间避免不必要的问题。如需在生产环境部署,请参考其他可靠......
  • RocketMQ - 顺序消息/事务消息/延迟消息
    顺序消息生产端顺序生产消费端顺序消费一般都是局部顺序消息。生产端根据shardingkey对队列数量取模,把同一个shardingkey的消息发送到同一个队列而消费端也要确保消费这个队列时是一个线程消费的首先是consumer中注册的Listener来指定是顺序消息消费还是并发消费pu......
  • 聊聊 RocketMQ 名字服务
    NameServer是专为RocketMQ设计的轻量级名字服务,它的源码非常精简,八个类,少于1000行代码。这篇文章,笔者会从基础概念、Broker发送心跳包、NameServer维护路由、ZookeepervsNameServer四个模块揭秘名字服务的设计精髓。1基础概念NameServer是一个非常简单的Topic路......
  • Apache RocketMQ 远程代码执行漏洞(CVE-2023-33246)
    漏洞简介RocketMQ5.1.0及以下版本,在一定条件下,存在远程命令执行风险。RocketMQ的NameServer、Broker、Controller等多个组件外网泄露,缺乏权限验证,攻击者可以利用该漏洞利用更新配置功能以RocketMQ运行的系统用户身份执行命令。此外,攻击者可以通过伪造RocketMQ协议内容来达到......
  • RocketMQ 在业务消息场景的优势详解
    作者:隆基01消息场景RocketMQ5.0是消息事件流一体的实时数据处理平台,是业务消息领域的事实标准,很多互联网公司在业务消息场景会使用RocketMQ。我们反复提到的“消息、业务消息”,指的是分布式应用解耦,是RocketMQ的业务基本盘。通过本文,我们将深入了解RocketMQ5.0在业务......
  • RocketMq消费原理及源码解析
    消费原理概览 先简单说下常见的rocketMq的部署方式,上图中broker为真正计算和存储消息的地方,而nameServer负责维护broker地 图中右侧consumemessage部分即是本文重点描述的部分,主要分为ConsumerGroup和Consumer,consumerGroup可以参考https://rocketmq.apache.org/docs/do......
  • Apache RocketMQ 远程代码执行漏洞(CVE-2023-37582)
    ​漏洞简介ApacheRocketMQ是一款低延迟、高并发、高可用、高可靠的分布式消息中间件。CVE-2023-37582中,由于对CVE-2023-33246修复不完善,导致在ApacheRocketMQNameServer存在未授权访问的情况下,攻击者可构造恶意请求以RocketMQ运行的系统用户身份执行命令。影响版本Apac......