首页 > 其他分享 >如何保证RocketMQ消息不丢失

如何保证RocketMQ消息不丢失

时间:2023-12-01 11:55:24浏览次数:37  
标签:重试 队列 发送 Master 丢失 保证 刷盘 RocketMQ 消息

一、概述

一个消息从开始到结束会经历这么三个阶段:生产阶段、消息队列Broker存储阶段和消费阶段。一个消息在三个阶段中的任何一个阶段都有可能丢失,知道这个之后,我们只要保证这三个阶段不出现问题,消息自然就不会出现丢失了。接下来我们来细说一下如何保证这三个阶段不出现问题。

生产阶段、消息队列Broker存储阶段和消费阶段

二、生产阶段

生产阶段的使命就是将消息发送到队列之中。生产者(Producer)通过网络请求将消息发送给消息队列,消息队列接受到之后返回响应给生产者。RocketMQ有两种常用的消息发送方式:同步发送、异步发送。

2.1 同步发送

DefaultMQProducer producer = new DefaultMQProducer("unique_group_name", true);
producer.setNamesrvAddr("127.0.0.1:9876;127.0.0.1:9870");

SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss");
String content = "测试消息" + format.format(new Date());
Message msg = new Message("TopicTest", 
            "TagA", UUID.randomUUID().toString(),
            content.getBytes(StandardCharsets.UTF_8));

try {
	producer.start();
	SendResult sendResult = producer.send(msg);
	log.info("MsgId= {},结果= {} ", sendResult.getMsgId(), sendResult.getSendStatus());
} catch (MQClientException | RemotingException | InterruptedException | MQBrokerException e) {
	log.error("消息发送发生了错误[{}]", msg, e);
}

同步发送时只要send()方法没有抛出异常,就可以认为消息发送成功,即消息队列Broker成功接受到了消息。

既然是同步发送肯定就比较耗费一些时间,如果你的业务比较注重RT那就可以使用异步发送的方式。

2.2 异步发送

异步发送消息的方式可以降低消息发送的RT,我比较喜欢这种方式。

DefaultMQProducer producer = new DefaultMQProducer("unique_group_name", true);
producer.setNamesrvAddr("127.0.0.1:9876;127.0.0.1:9870");

// 消息自定义的唯一标识
String key = UUID.randomUUID().toString();
String content = "消息发送测试";
Message msg = new Message("TopicTest", "TagA", key, content.getBytes(StandardCharsets.UTF_8));
try {
	producer.start();
	producer.send(msg, new SendCallback() {
		@Override
		public void onSuccess(SendResult sendResult) {
			log.info("根据消息[{}]的key[{}]更新消息[{}]的发送状态[{}]", 
                    msg.getProperty(MessageConst.PROPERTY_KEYS), key,
                    sendResult.getMsgId(), sendResult.getSendStatus());
		}

		@Override
		public void onException(Throwable e) {
			log.error("发送出现错误[{}]", msg, e);
		}
	});
} catch (RemotingException | InterruptedException e) {
	log.info("消息发送发生异常[{}]", msg, e);
}

使用异步发送方式时记得重写SendCallback类的两个方法,在onSuccess()方法中更新消息的发送状态为发送成功,只要不发生异常且回调了onSuccess()方法也可以认为成功发送到了Broker。

2.3 SendStatus问题

发送消息时,将获得包含SendStatus的SendResult。以下是每个状态的说明列表:

  • SEND_OK
    SEND_OK并不意味着它是可靠的。要确保不会丢失任何消息,还应启用SYNC_MASTER或SYNC_FLUSH。
  • FLUSH_DISK_TIMEOUT
    如果Broker设置MessageStoreConfig的FlushDiskType = SYNC_FLUSH(默认为ASYNC_FLUSH),并且Broker没有在MessageStoreConfig的syncFlushTimeout(默认为5秒)内完成刷新磁盘,您将获得此状态。
  • FLUSH_SLAVE_TIMEOUT
    如果Broker的角色是SYNC_MASTER(默认为ASYNC_MASTER),并且从属Broker未在MessageStoreConfig的syncFlushTimeout(默认为5秒)内完成与主服务器的同步,则您将获得此状态。
  • SLAVE_NOT_AVAILABLE
    如果Broker的角色是SYNC_MASTER(默认为ASYNC_MASTER),但没有配置slave Broker,您将获得此状态。

对于SendStatus有多种情况的问题,因此无论使用同步还是异步的发送方式,都需要判断SendStatus是不是SEND_OK,如果不是则需要针对不同的情况进行分别处理。

  • FLUSH_DISK_TIMEOUT,FLUSH_SLAVE_TIMEOUT
    这两种情况说明消息落盘出现了异常,为了不丢失消息,我们可以稍等时间后重发消息。
  • SLAVE_NOT_AVAILABLE
    这种情况说明集群中的Slave不可用,重新发送是无用的,需要人工介入处理。

其实你查看RocketMQ的源码就会发现,不论是同步发送还是异步发送,都是可以针对不同的场景自定义重试次数的,而且很多方法还有内部重试机制。

Warn: this method has internal retry-mechanism, that is, internal implementation will retry
{@link #retryTimesWhenSendFailed} times before claiming failure. As a result, multiple messages may potentially
delivered to broker(s). It's up to the application developers to resolve potential duplication issue.

源码默认的处理方式

/**
 * Maximum number of retry to perform internally before claiming sending failure in synchronous mode. </p>
 *
 * This may potentially cause message duplication which is up to application developers to resolve.
 */
private int retryTimesWhenSendFailed = 2;

/**
 * Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. </p>
 *
 * This may potentially cause message duplication which is up to application developers to resolve.
 */
private int retryTimesWhenSendAsyncFailed = 2;

/**
 * Indicate whether to retry another broker on sending failure internally.
 */
private boolean retryAnotherBrokerWhenNotStoreOK = false;

自定义重试机制:

producer.setRetryTimesWhenSendFailed(5);
producer.setRetryAnotherBrokerWhenNotStoreOK(true);
producer.setRetryTimesWhenSendAsyncFailed(5);

这里就要提下消息投递语义(message delivery semantic),简单的来说就是消息传递过程中的传递保证。主要分为三种:

  • at most once:最多一次。消息可能丢失也可能被处理,但最多只会被处理一次。
  • at least once:至少一次。消息不会丢失,但可能被处理多次,可能重复,不会丢失。
  • exactly once:精确传递一次。消息被处理且只会被处理一次,不丢失不重复就一次。

有些异常情况的出现,可能是因为网络的偶尔波动导致,其实已经发送到了Broker,只不过是返回ACK给生产者的时候出现了超时,这个时候生产者重试就会导致消息重复投递。毕竟生产者为了保证消息一定成功投递到Broker中,就无法保证只进行一次精确投递。为了防止消息重复消费,那就需要消费者自身保证业务处理的幂等性。另外 对于发送状态SendStatus 不是SEND_OK的消息要使用定时任务进行补偿发送 。还要提到一点的就是,重试也是需要做好限制的,设定最大重试次数,也要保证重试的时间间隔,毕竟经验告诉我们,有些异常情况下短时间内的重试是没有意义的。具体的设计可以参考我之前文章中的本地消息表方案,本地事务+定时任务补偿保证消息一定投递成功。

三、消息队列Broker存储阶段

默认的情况下,消息队列为了快速响应,在接受到生产者的请求,将消息保存在内存成功之后,就会立刻返回ACK响应给生产者。

你以为人家的架构是这样的:

你以为人家的架构是这样的

3.1 消息刷盘方式

  • 同步刷盘
    在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘, 然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写 成功的状态。
  • 异步刷盘
    在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。

将默认的异步刷盘修改成同步刷盘

flushDiskType=SYNC_FLUSH

异步刷盘方式在遇到消息队列宕机、机器异常断电或者内存硬盘损坏的情况,消息就无法成功持久化到硬盘中,那这个消息就永久丢失了。对于这种情况,我们就需要改变RocketMQ的刷盘机制,将默认的异步刷盘,修改成同步刷盘。即消息成功保存到硬盘上时才返回给生产者ACK响应。

其实人家的架构是这样的:

image-20210312223647464
同步刷盘的缺点很明显,那就是降低了吞吐量,加大了消息发送的响应RT时间,但是为了不丢失宝贵的消息这一点损耗是值得的。

3.2 集群部署

上面讲的是单个消息队列Broker对于可靠保存消息的处理方式,但是生产环境肯定是采用的集群部署。目前RocketMQ支持单Master模式、多Master模式、多Master多Slave模式(异步)和多Master多Slave模式(同步)4种集群方式。

我这里申明一下,生产环境下的消息队列一定是采用集群的方式进行部署,不会有单机部署的情况。自己在本地搞搞单机部署玩玩肯定是可以的,生产环境也这么搞,你肯定是在逗我!

3.2.1 单Master模式

这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试。

3.2.2 多Master模式

一个集群无Slave,全是Master,例如2个Master或者3个Master,这种模式的优缺点如下:

  • 优点:配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;
  • 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。

3.2.3 多Master多Slave模式

3.2.3.1 异步刷盘多Master多Slave模式

每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:

  • 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样;
  • 缺点:Master宕机,磁盘损坏情况下会丢失少量消息。

3.2.3.2 同步多Master多Slave模式

每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:

  • 优点:数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;
  • 缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。

即使消息成功保存到了Master的硬盘上,然后在Master将消息同步给Slave的时候,这个期间Master挂了,而且是那种硬盘修不好的那种,不要说这种情况不可能,支付宝的专用电缆都能被挖断,还有啥不可能的。哈哈哈,也是够倒霉的!

其实人家真正的架构是这样的:

其实人家真正的架构是这样的
四种集群方式优缺点都列出来了,很明显为了保证消息一定不会在Broker这个阶段丢失,生产环境一定要使用第四种集群方式:同步复制多Master多Slave模式。具体配置如下:

## master 节点配置
brokerRole=SYNC_MASTER
flushDiskType=SYNC_FLUSH
 
## slave 节点配置
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH

加上前面的同步刷盘的配置,这样生产者发送消息给Broker,Master使用同步刷盘方式将消息保存到硬盘上,保存成功之后使用同步复制的方式将消息复制到Slave上,slave保存成功之后,Broker才返回给生产者ACK。

3.3 消息堆积

对于并发比较高的系统,如果下游的消费者宕机,则会导致大量的消息堆积在消息队列里,这样很容易会把服务器的硬盘撑爆,新的消息发送到消息队列,硬盘拒绝写入,这时消息很容易就会丢失。所以,部署消息队列的机器硬盘空间要比较充裕,且要有一定的监控,防止这种情况发生。

四、消费阶段

终于到了最后一个阶段,但是大家也不能大意。消费者拉取消息进行本地业务处理,业务处理完成才能提交ACK ConsumeConcurrentlyStatus.CONSUME_SUCCESS,切不可先提交ACK再进行业务处理。如果业务处理出现异常情况,可以先返回ConsumeConcurrentlyStatus.RECONSUME_LATER等待消息队列的下次重试。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("unique_group_name");
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setNamesrvAddr("127.0.0.1:9876;127.0.0.1:9870");
consumer.setConsumeMessageBatchMaxSize(1);
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
 
  MessageExt messageExt = msgs.get(0);
  // 进行业务处理
 
  // 处理失败返回ConsumeConcurrentlyStatus.RECONSUME_LATER
 
  // 处理成功返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS
  return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();

还有一点要注意的是,消息队列RocketMQ默认允许每条消息最多重试16次,每次重试的间隔时间如下:

第几次重试 与上次重试的间隔时间 第几次重试 与上次重试的间隔时间
1 10 秒 9 7 分钟
2 30 秒 10 8 分钟
3 1 分钟 11 9 分钟
4 2 分钟 12 10 分钟
5 3 分钟 13 20 分钟
6 4 分钟 14 30 分钟
7 5 分钟 15 1 小时
8 6 分钟 16 2 小时

如果消息重试16次后仍然失败,消息将不再投递。如果严格按照上述重试时间间隔计算,某条消息在一直消费失败的前提下,将会在接下来的4小时46分钟之内进行16次重试,超过这个时间范围消息将不再重试投递。此时,消息队列RocketMQ不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。在消息队列RocketMQ中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。死信队列里的消息有效期与正常消息相同,均为3天。3天后会被自动删除。针对这种情况,为了不丢失消息我们需要处理死信队列里的消息。

有消息进入死信队列,意味着某些问题导致消费者无法正常消费消息,因此,通常需要人工介入对其进行特殊处理。排查可疑因素并解决问题后,可以在消息队列RocketMQ控制台重新发送该消息让消费者重新消费一次,或者直接让专门的消费者订阅死信队列进行消费。

死信队列名称一般是%DLQ% + ConsumerGroupName组成,还有个重试队列名称一般是%RETRY% + ConsumerGroupName组成,这些都是RocketMQ自动创建的。

五、总结

一个消息从新生到终结经历了生产、存储、消费三个阶段,针对不同的阶段可能会出现丢失消息的地方,我们给出不同的解决方案。最终,RocketMQ丢失消息的概率被大大的降低了。我们将视角拔高一点,你就会发现,解决不同的消息队列不丢失消息,只有消息队列的配置稍有不同,其他地方都是类似的。好像我们已经形成了解决消息不丢失的方法论了,再遇到其他的消息队列我们就不慌了。

标签:重试,队列,发送,Master,丢失,保证,刷盘,RocketMQ,消息
From: https://www.cnblogs.com/ciel717/p/17817249.html

相关文章

  • js精度丢失的问题,利用lodash函数库重新封装
    functionroundAndPad(num,decimalPlaces){  varrounded=_.round(num,decimalPlaces); //使用Lodash的_.round函数四舍五入  varstr=rounded.toString();  vardecimalIndex=str.indexOf('.');  console.log("str:",str);  console.lo......
  • logback丢失日志
    问题描述、现象项目正常运行日志文件生成以及文件名都看不出来问题日志里记录的信息也可以记录完整的每次请求记录偶尔随机丢失一次或者多次完整的请求记录,与没有收到用户请求情况相同 问题排查排除AsyncAppender的丢失机制导致,可以将日志记录方式先改为同步方式或者......
  • 解决VS编译C++时,该文件包含不能在当前代码页(936)中表示的字符。请将该文件保存为 Uni
    使用VS编译C++时,报错: warningC4819:该文件包含不能在当前代码页(936)中表示的字符。请将该文件保存为Unicode格式以防止数据丢失。利用VS的高级保存选项,修改合适的编码规则即可解决,最新版VS需要手动添加高级保存选线的命令,方法如下:打开工具-->自定义 选择命令-->选择添......
  • rocketMq安装
    #拉取镜像dockerpullrocketmqinc/rocketmq#创建一个文件夹用于存放相关文件mkdirrocketMqAtHome01&&cdrocketMqAtHome01#创建namesrv数据卷文件夹mkdir-pdata/namesrv/logsdata/namesrv/store#构建namesrv容器dockerrun-d\--restart=always\--namename......
  • 2023-11-29:用go语言,给你一个字符串 s ,请你去除字符串中重复的字母,使得每个字母只出现
    2023-11-29:用go语言,给你一个字符串s,请你去除字符串中重复的字母,使得每个字母只出现一次。需保证返回结果的字典序最小。要求不能打乱其他字符的相对位置)。输入:s="cbacdcbc"。输出:"acdb"。来自左程云。答案2023-11-29:所有的代码用灵捷3.5编写,感觉有点抽风了,生成的代码需要修改......
  • js阻止浏览器刷新,以防误操作丢失数据
    //禁用回退window.history.forward(1);history.pushState(null,null,document.URL);window.addEventListener('popstate',function(){history.pushState(null,null,document.URL);});//按键判断是否点了刷新相关的按......
  • 京东技术面:Redis是如何保证高效查询的?
    大家好,我是pub,马上就到一年中最热闹的金九银十,你是不是要检验一下自己。这篇我们来看看redis。为什么Redis比较快Redis中的查询速度为什么那么快呢?1、因为它是内存数据库;2、归功于它的数据结构;3、Redis中是单线程(引入了多线程,但核心内存读写仍为单线程);4、Redis中使用了多路复......
  • 虾皮一面:如何保证数据双写一致?
    年关将至,又到了准备面试跳槽的季节了。据不完全统计,跳槽是涨薪最快的方式,没有之一。而跳槽成功与否的关键是“面试”,所以认真准备面试=快速涨薪。准备面试,自然就少不了刷面试真题了,而今天这份刚出炉的虾皮Java后端面试题就非常典型,它的难度适中,面试结构分为:半小时八股+半......
  • Kafka 如何保证消息消费的全局顺序性
    哈喽大家好,我是咸鱼今天我们继续来讲一讲Kafka当有消息被生产出来的时候,如果没有指定分区或者指定key,那么消费会按照【轮询】的方式均匀地分配到所有可用分区中,但不一定按照分区顺序来分配我们知道,在Kafka中消费者可以订阅一个或多个主题,并被分配一个或多个分区如果一......
  • 小米二面:Redis 如何保证数据不丢失?
    前段时间表妹收到了小米秋招补录的面试邀请,一面还算顺利,很快就通过了,但在看二面面试录屏的时候,我发现了一个问题,回答的不是很好,也就是我们今天要聊的这个问题:Redis如何保证数据不丢失?很多人看到这个问题的第一反应是,这个问题不难,就是Redis的持久化技术嘛!但如果你这样回答,可能......