使用的版本是 4.9.4,下载地址:https://rocketmq.apache.org/zh/release-notes/2022/03/04/4.9.4/。
Apache RocketMQ 使用的是 异步通信 方式和 发布与订阅 的消息传输模型。
使用 MQ 的好处:
- 流量削峰:防止流量突然增加导致应用程序挂了;先将请求写入 MQ,然后再从 MQ 中获取数据。
- 应用解耦:在当前模块中不需要调用其它模块;用户注册的时候需要发短信,用户模块只需要将手机号放到 MQ 中,短信模块消费 MQ 中的数据就行了。
- 异步处理:和应用解耦类似,只是其它模块处理完后会将结果返回。
NameServer
主要就是用来:
- 注册 Broker 并保存 Broker 的集群名称、IP地址、端口号。
- 做健康检查,剔除(2m)不可用的 Broker。
- 管理 Topic 的路由信息,就是保存了哪些 Topic 在哪些 Broker 上。
NameServer 集群之间的节点不通信。
Broker
主要就是用来:
- 持久化消息和主从同步。
- 推送消息,推送方式支持 广播模式 和 集群模式。
- 过滤消息,推送 满足条件 的消息到指定消费者。
- 保证消息顺序性。
发送和接收的基本流程
- 生产者先连接上 NameServer,在发送消息的时候会创建 Topic,然后再返回 Topic 关联的 Broker,最后再连接这个 Broker,把消息发送到这个 Broker 里面。
- 消费者先连接上 NameServer,通过主题从 NameServer 中获取 Broker,然后再连接这个 Broker,等待消息。
生产者
生产者就是用来生产消息的,send 方法本身支持内部重试,重试逻辑如下:
- 至多重试 2 次(同步发送为 2 次,异步发送为 0 次)。
- 如果发送失败,则轮转到下一个 Broker。这个方法的总耗时时间不超过sendMsgTimeout设置的值,默认10s。
- 如果本身向 broker 发送消息超过 sendMsgTimeout,就不会再重试。
普通消息
普通消息就是普通的一条消息,这个消息会发送到不同的消息队列中,但是不会保证消息被顺序消费。
普通消息有三种发送方式:
-
同步发送:发出一条消息后,必须等待服务端返回响应,才能发送下一条。
-
异步发送:就是不需要等待服务端返回响应,就直接发送下一条。
-
单项发送:只是发送消息,不会处理客户端响应。
import org.springframework.messaging.support.MessageBuilder;
Message<String> message = MessageBuilder.withPayload("一个普通消息").build();
// asyncSend: 异步
// syncSend: 同步
rocketMQTemplate.syncSend("greetings-topic:tags", message);
延迟消息
给消息设置一个延迟时间,延迟一段时间后,才将消息投递到消费者。
Message<String> message = MessageBuilder.withPayload("一个普通消息").build();
// 支持的延迟等级:1s 5s 10s 30s 1min 2min 3min 4min 5min 6min 7min 8min 9min 10min 20min 30min 1h 2h
// 这里的等级是 3,也就是延迟 10s。
producer.syncSend("greetings-topic:tags", message, 1000, 3);
批量消息
多条消息同时发送,可以减少 API 和网络调用次数。
List<Message<String>> messageList = new LinkedList<>();
for (int i = 0; i < 10; i++) {
messageList.add(MessageBuilder.withPayload("一个普通消息:" + (i + 1)).build());
}
SendResult result = rocketMQTemplate.syncSend("greetings-topic", messageList);
这些消息必须是同一个主题,而且这些消息会放到同一个消息队列。
过滤消息
通过 tag 将消息交给指定的消费者组处理。
在 订阅关系一致#正确例子 中写了消费者的 tag 例子,下面是生产者的例子:
rocketMQTemplate.syncSend("topic:tag", "任意消息");
顺序消息
就是把发送的消息都放到同一个消息队列里面,消费顺序和发送顺序是一样的。
Message<String> message = MessageBuilder.withPayload("一个有序的消息").build();
rocketMQTemplate.syncSendOrderly("greetings-topic", message, "用来选择消息队列的 Key 可以是订单 ID");
消费者不能使用并发模式,需要使用顺序模式:
// consumer.registerMessageListener(new MessageListenerConcurrently()
consumer.registerMessageListener(new MessageListenerOrderly()
假设现在的线程数是 10,一共有 4 个消息队列:
在顺序模式下,它的并发数就是 4,就是一个线程对应一个消息队列,处理完一个消息才会处理对应队列的下一个消息。
在并发模式下,它的并发数就是 10,只要任何一个消息队列中还有消息,线程就会处理这个消息。
消费者
同一个组内的消费者,必须保证 订阅关系一致。
消费者组有两种消费模式:
-
集群模式: 需要指定一个消息分配策略,然后根据这个策略将消息投递给对应的的消费者。
// 下面指定了平均分配策略(轮询策略)。 // 还有机房优先分配策略、一致性 hash 分配策略等。 consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely());
-
广播模式: 直接发送给组内的所有消费者。
组 A 和组 B 都可以订阅同一个主题 A,这两个组都会收到主题 A 的消息,但是具体怎么分发到消费者,就要看消息模式了。
订阅关系一致
-
订阅关系:消费者组名、Topic、Tag。
-
订阅关系一致:同一个消费者组下的,所有消费者的 Topic、Tag 必须完全一致,如果订阅关系不一致,会导致消费消息紊乱,甚至消息丢失。
正确例子
C1、C2、C3 的订阅关系一致,即 C1、C2、C3 订阅消息的代码必须完全一致,代码示例如下:
@RocketMQMessageListener(topic = "TopicA", selectorExpression = "*")
@RocketMQMessageListener(topic = "TopicB", selectorExpression = "Tag2||Tag3")
错误例子
重平衡(reBalance)
重平衡是针对集群模式的,集群模式也可以叫负载均衡模式。
下面是平均分配算法的重平衡:
一个主题默认有四个队列,当新加入了一个消费者后,两个客户端会平分这四个队列。
以此类推当添加了四个消费者后,这四个队列就平均分配给了四个消费者。
当添加第五个消费者后,第五个消费者永远不会收到消息。
同一个消费者组中的消费者数量,必须小于等于主题队列的数量。
不要随便调整消息队列数量和消费者组内的消费者数量。
消费位点
test_topic 主题有四个消息队列,消费者组 consumer-group 中有两个消费者,分别消费消息队列 0、1 和消息队列 2、3。
- 代理者位点:就是消息队列中的最大消息数量。
- 消费者位点:消费到了这个队列中的第几个数据。
在同一个消息队列中,如果消息 M2 比 M1 先消费成功,那么消费者位点暂时不会后移,只有 M1 消费成功后才会一起后移。 - 差值:还有多少数据没有被消费。
集群模式下,消费者点位是由客户端返回给服务端的。
广播模式下,消费位点是由客户端自己保存的。
重复消费
两种情况会导致,消费者重复消费:
- 生产者发送了多次,比如程序逻辑问题或者生产者没有收到服务端响应然后进行了重试。
- 消费者组发生了重平衡。
假设集群模式下,C1 正在消费 Q2 队列中的消费位点 2,然后添加了一个消费者 C2,消费者组就会进行重平衡将 Q2 分配给 C2,
如果这个 C1 没有返回消费成功,那么服务端是不会将消费位点后移的,所以 C2 还是会消费 Q2 队列中的消费位点 2,这样就造成了重复消费。
解决方法就是:把业务的唯一 ID 添加到数据库中的去重表中,添加失败就是重复消费。解决重复消费就是解决幂等性。
消息重试和死信队列
-
消息重试:消息消费失败后,也会将消费者位点后移,然后将这个消息投递到重试队列。
重试队列的主题格式是:%RETRY% + 消费者组名。
-
死信队列:消息到达重试次数后,会将消息投递到死信队列。
死信队列的主题格式是:%DLQ% + 消费者组名。
// 指定重试次数。
// 并发模式重试:默认 16 次,每次重试间隔和延迟等级一样,第一次重试间隔就对应延迟等级 3,第二次重试就对应延迟等级 4。
// 顺序模式重试:重试次数是 Integer.MAX_VALUE,重试间隔是立即进行下一次重试。
consumer.setMaxReconsumeTimes(2);
// 在消息的回调方法中可以获取重试次数,可以自己判断到达重试次数后的逻辑,这样可以避免写很多死信消费者。
// msgs.get(0).getReconsumeTimes();
到达重试次数后还没有消费成功,就会发送到死信队列。
消息堆积
消费者消费消息的速度太慢了,导致堆积了很多消息没有消费。
有几个解决方案:
- 生产者做限流。
- 增加消费者的并发数量。
- 添加新的消费者。
- 扩容消息队列数量。
如果要缩容,就要等到队列都被消费完成后才去缩容,不然的话队列中的数据都丢失了。
消息补偿
消息补偿就是为了保证消息不丢失,比如保证下面这两种情况不丢:
- 生产者发送消息的时候一直发送失败。
- RocketMQ 有没有刷盘的时候,服务器宕机了。
解决方案:
- 将 RocketMQ 的刷盘机制改为同步刷盘。
- 做主从,消息保存到不同的服务器上。
- 启动消息追踪机制。
- 开发一个消息补偿器。
下面是消息补偿器的流程:
消息补偿器一天执行两三次就行了,没有必要频繁执行;因为消息能不能成功被消费,和消息重发次数是没有任何关系的,消息重发的再快,消息该消费失败还是会消费失败,所以失败的消息太多了要排查业务。
RocketMQ Connect
用来做实时增量同步,比如捕获 MySQL 的 INSERT、UPDATE、DELETE 变化然后同步到 Redis。
每个 Connector 包含:
- 一个 Connector 连接器,就是用来定义数据从哪复制到哪。
- 多个 Task 线程,就是真正干活的,真正复制数据的。
高并发设计
-
DNS 轮询:用来做 nginx 集群。
同一个域名每次请求的 IP 地址不一样,每个 IP 地址有自己的 nginx。
-
发送到 MQ:把一些耗时操作单独提取出来,这样接口可以立即返回和增加并发,另外也可以对数据库削峰。
有些时候还需要使用分布式锁或分布式事务。
下面是一些对于程序的优化:
- 能异步就一步。
- 减少 IO。
- 控制锁的力度。
- 控制事务的力度。
docker 中使用
sudo wget https://archive.apache.org/dist/rocketmq/4.9.4/rocketmq-all-4.9.4-bin-release.zip
sudo unzip rocketmq-all-4.9.4-bin-release.zip
修改文件
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g"
修改配置文件
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
# 注册到 NameServer 时候的 IP 地址
brokerIP1=IP
sudo apt-get -y install unzip
启动
nohup sh bin/mqnamesrv &
nohup sh bin/mqbroker -n localhost:9876 -c $ROCKETMQ_HOME/conf/broker.conf &
rocketmq-dashboard.jar --server.port=8001 --rocketmq.config.namesrvAddrs=127.0.0.1:9876
jps -l
https://blog.csdn.net/zbj18314469395/article/details/86064849
sudo wget https://download.bell-sw.com/java/8u382+6/bellsoft-jdk8u382+6-linux-aarch64.tar.gz
Spring Boot 中使用
依赖:
<!-- rocketmq client version 4.9.3 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
在 application.properties 中添加配置:
# 配置服务地址
# 消费者 只需要配置这个就行了,不用配置别的。
rocketmq.name-server=172.16.70.130:9876;192.168.0.2:9876
############################# 生产者 #############################
# 注意:由数字、字母、下划线、横杠(-)、竖线(|)或百分号组成;不能为空;长度不能超过 255。
rocketmq.producer.group=greetings-producer-group
# 命名空间
rocketmq.producer.namespace=greetings-producer-namespace
# 同步模式下,消息重新发送的最大次数,超过这个次数后会返回失败。
# 默认值:2,即:默认情况下一条消息最多会被投递 3 次;有可能会导致消息重复。
rocketmq.producer.retryTimesWhenSendFailed=2
# 和同步模式的作用一样,只不过这个是在异步模式下有用
rocketmq.producer.retryTimesWhenSendAsyncFailed=2
在 Java 中使用:
// 生产者使用
@Autowired
private RocketMQTemplate rocketMQTemplate;
// 消费者使用
// 下面的 MessageExt 就是 greetings-topic 主题的消息类型。
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Service
@RocketMQMessageListener(topic = "greetings-topic", consumerGroup = "simple-group")
public class SimpleConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {}
}
下面是 RocketMQMessageListener 注解的配置:
@RocketMQMessageListener(
// 命名空间
namespace = "",
// 必须指定一个消费者分组
// 消费者组的消费者实例必须订阅完全相同的 Topic
consumerGroup = "simple-group",
// 主题名称
topic = "greetings-topic",
// 过滤表达式,只接收满足表达式的消息
// 也可以写成 TAGA || TAGB
selectorExpression = "*",
// 消费模式
// ConsumeMode.CONCURRENTLY: 并发处理(默认值)
// ConsumeMode.ORDERLY: 顺序处理
// 注意:ConsumeMode.ORDERLY 不能和 MessageModel.BROADCASTING 一起使用
consumeMode = ConsumeMode.CONCURRENTLY,
// 消息模式
// MessageModel.CLUSTERING: 集群模式, topic 中的一条消息,只会被一个消费组内的 某一个 消费者处理。
// MessageModel.BROADCASTING: 广播模式,topic 中的一条消息,会被一个消费组内的 所有 消费者处理。
messageModel = MessageModel.CLUSTERING,
// 消费线程数
// consumeMode = ConsumeMode.ORDERLY: 最大值是主题中消息队列的数量,
// 也就是说,消息的最大并发消费数量 = 主题中消息队列的数量
// consumeMode = ConsumeMode.CONCURRENTLY: 消息的最大并发消费数量
consumeThreadNumber = 20,
// 消费超时时间(以分钟为单位)
// 超过这个时间后,就会认为消息消费失败,消费者就会重新发送这条消息到服务端
consumeTimeout = 15L,
// 重新投递到服务端的最大次数
// 消费超时(consumeTimeout)后会重新发送到服务端,到达最大次数后会将消息发送到 DLQ 死信队列
// 并发模式:-1 表示 16
// 有序模式:-1 表示 Integer.MAX_VALUE
// 注意: 广播模式下: 消费失败消息会丢弃
// 集群模式下: 消费失败的消息需要回传给 Broker
maxReconsumeTimes = -1,
// 并行模式下的消息重试策略
// -1: 代表不重试,直接扔到死信队列
// 0: 代表由 Broker 控制重试频率
// >0: 代表由客户端控制重试频率
delayLevelWhenNextConsume = 0,
// 有序模式下重试
// 暂停拉取的时间间隔,单位是毫秒
// 最小值为 10,最大值为 30000
suspendCurrentQueueTimeMillis = 1000,
// 投递到服务端的超时时间
replyTimeout = 3000,
// 是否启用消息追踪
enableMsgTrace = false,
// 消息跟踪的主题名
customizedTraceTopic = "RMQ_SYS_TRACE_TOPIC",
)
参考资料
https://rocketmq.apache.org/zh/docs/4.x/consumer/01concept2
https://rocketmq.apache.org/zh/docs/4.x/bestPractice/07subscribe
标签:RocketMQ,消费,消费者,队列,重试,消息,rocketmq From: https://www.cnblogs.com/jnyyxz/p/17623260.html