一、概述
一个消息从开始到结束会经历这么三个阶段:生产阶段、消息队列Broker存储阶段和消费阶段。一个消息在三个阶段中的任何一个阶段都有可能丢失,知道这个之后,我们只要保证这三个阶段不出现问题,消息自然就不会出现丢失了。接下来我们来细说一下如何保证这三个阶段不出现问题。
二、生产阶段
生产阶段的使命就是将消息发送到队列之中。生产者(Producer)通过网络请求将消息发送给消息队列,消息队列接受到之后返回响应给生产者。RocketMQ有两种常用的消息发送方式:同步发送、异步发送。
2.1 同步发送
DefaultMQProducer producer = new DefaultMQProducer("unique_group_name", true);
producer.setNamesrvAddr("127.0.0.1:9876;127.0.0.1:9870");
SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss");
String content = "测试消息" + format.format(new Date());
Message msg = new Message("TopicTest",
"TagA", UUID.randomUUID().toString(),
content.getBytes(StandardCharsets.UTF_8));
try {
producer.start();
SendResult sendResult = producer.send(msg);
log.info("MsgId= {},结果= {} ", sendResult.getMsgId(), sendResult.getSendStatus());
} catch (MQClientException | RemotingException | InterruptedException | MQBrokerException e) {
log.error("消息发送发生了错误[{}]", msg, e);
}
同步发送时只要send()
方法没有抛出异常,就可以认为消息发送成功,即消息队列Broker成功接受到了消息。
既然是同步发送肯定就比较耗费一些时间,如果你的业务比较注重RT那就可以使用异步发送的方式。
2.2 异步发送
异步发送消息的方式可以降低消息发送的RT,我比较喜欢这种方式。
DefaultMQProducer producer = new DefaultMQProducer("unique_group_name", true);
producer.setNamesrvAddr("127.0.0.1:9876;127.0.0.1:9870");
// 消息自定义的唯一标识
String key = UUID.randomUUID().toString();
String content = "消息发送测试";
Message msg = new Message("TopicTest", "TagA", key, content.getBytes(StandardCharsets.UTF_8));
try {
producer.start();
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("根据消息[{}]的key[{}]更新消息[{}]的发送状态[{}]",
msg.getProperty(MessageConst.PROPERTY_KEYS), key,
sendResult.getMsgId(), sendResult.getSendStatus());
}
@Override
public void onException(Throwable e) {
log.error("发送出现错误[{}]", msg, e);
}
});
} catch (RemotingException | InterruptedException e) {
log.info("消息发送发生异常[{}]", msg, e);
}
使用异步发送方式时记得重写SendCallback
类的两个方法,在onSuccess()
方法中更新消息的发送状态为发送成功,只要不发生异常且回调了onSuccess()
方法也可以认为成功发送到了Broker。
2.3 SendStatus问题
发送消息时,将获得包含SendStatus的SendResult。以下是每个状态的说明列表:
- SEND_OK
SEND_OK并不意味着它是可靠的。要确保不会丢失任何消息,还应启用SYNC_MASTER或SYNC_FLUSH。 - FLUSH_DISK_TIMEOUT
如果Broker设置MessageStoreConfig的FlushDiskType = SYNC_FLUSH(默认为ASYNC_FLUSH),并且Broker没有在MessageStoreConfig的syncFlushTimeout(默认为5秒)内完成刷新磁盘,您将获得此状态。 - FLUSH_SLAVE_TIMEOUT
如果Broker的角色是SYNC_MASTER(默认为ASYNC_MASTER),并且从属Broker未在MessageStoreConfig的syncFlushTimeout(默认为5秒)内完成与主服务器的同步,则您将获得此状态。 - SLAVE_NOT_AVAILABLE
如果Broker的角色是SYNC_MASTER(默认为ASYNC_MASTER),但没有配置slave Broker,您将获得此状态。
对于SendStatus有多种情况的问题,因此无论使用同步还是异步的发送方式,都需要判断SendStatus是不是SEND_OK,如果不是则需要针对不同的情况进行分别处理。
- FLUSH_DISK_TIMEOUT,FLUSH_SLAVE_TIMEOUT
这两种情况说明消息落盘出现了异常,为了不丢失消息,我们可以稍等时间后重发消息。 - SLAVE_NOT_AVAILABLE
这种情况说明集群中的Slave不可用,重新发送是无用的,需要人工介入处理。
其实你查看RocketMQ的源码就会发现,不论是同步发送还是异步发送,都是可以针对不同的场景自定义重试次数的,而且很多方法还有内部重试机制。
Warn: this method has internal retry-mechanism, that is, internal implementation will retry
{@link #retryTimesWhenSendFailed} times before claiming failure. As a result, multiple messages may potentially
delivered to broker(s). It's up to the application developers to resolve potential duplication issue.
源码默认的处理方式
/**
* Maximum number of retry to perform internally before claiming sending failure in synchronous mode. </p>
*
* This may potentially cause message duplication which is up to application developers to resolve.
*/
private int retryTimesWhenSendFailed = 2;
/**
* Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. </p>
*
* This may potentially cause message duplication which is up to application developers to resolve.
*/
private int retryTimesWhenSendAsyncFailed = 2;
/**
* Indicate whether to retry another broker on sending failure internally.
*/
private boolean retryAnotherBrokerWhenNotStoreOK = false;
自定义重试机制:
producer.setRetryTimesWhenSendFailed(5);
producer.setRetryAnotherBrokerWhenNotStoreOK(true);
producer.setRetryTimesWhenSendAsyncFailed(5);
这里就要提下消息投递语义(message delivery semantic),简单的来说就是消息传递过程中的传递保证。主要分为三种:
- at most once:最多一次。消息可能丢失也可能被处理,但最多只会被处理一次。
- at least once:至少一次。消息不会丢失,但可能被处理多次,可能重复,不会丢失。
- exactly once:精确传递一次。消息被处理且只会被处理一次,不丢失不重复就一次。
有些异常情况的出现,可能是因为网络的偶尔波动导致,其实已经发送到了Broker,只不过是返回ACK给生产者的时候出现了超时,这个时候生产者重试就会导致消息重复投递。毕竟生产者为了保证消息一定成功投递到Broker中,就无法保证只进行一次精确投递。为了防止消息重复消费,那就需要消费者自身保证业务处理的幂等性。另外 对于发送状态SendStatus 不是SEND_OK的消息要使用定时任务进行补偿发送 。还要提到一点的就是,重试也是需要做好限制的,设定最大重试次数,也要保证重试的时间间隔,毕竟经验告诉我们,有些异常情况下短时间内的重试是没有意义的。具体的设计可以参考我之前文章中的本地消息表方案,本地事务+定时任务补偿保证消息一定投递成功。
三、消息队列Broker存储阶段
默认的情况下,消息队列为了快速响应,在接受到生产者的请求,将消息保存在内存成功之后,就会立刻返回ACK响应给生产者。
你以为人家的架构是这样的:
3.1 消息刷盘方式
- 同步刷盘
在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘, 然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写 成功的状态。 - 异步刷盘
在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。
将默认的异步刷盘修改成同步刷盘
flushDiskType=SYNC_FLUSH
异步刷盘方式在遇到消息队列宕机、机器异常断电或者内存硬盘损坏的情况,消息就无法成功持久化到硬盘中,那这个消息就永久丢失了。对于这种情况,我们就需要改变RocketMQ的刷盘机制,将默认的异步刷盘,修改成同步刷盘。即消息成功保存到硬盘上时才返回给生产者ACK响应。
其实人家的架构是这样的:
同步刷盘的缺点很明显,那就是降低了吞吐量,加大了消息发送的响应RT时间,但是为了不丢失宝贵的消息这一点损耗是值得的。
3.2 集群部署
上面讲的是单个消息队列Broker对于可靠保存消息的处理方式,但是生产环境肯定是采用的集群部署。目前RocketMQ支持单Master模式、多Master模式、多Master多Slave模式(异步)和多Master多Slave模式(同步)4种集群方式。
我这里申明一下,生产环境下的消息队列一定是采用集群的方式进行部署,不会有单机部署的情况。自己在本地搞搞单机部署玩玩肯定是可以的,生产环境也这么搞,你肯定是在逗我!
3.2.1 单Master模式
这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试。
3.2.2 多Master模式
一个集群无Slave,全是Master,例如2个Master或者3个Master,这种模式的优缺点如下:
- 优点:配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;
- 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。
3.2.3 多Master多Slave模式
3.2.3.1 异步刷盘多Master多Slave模式
每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:
- 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样;
- 缺点:Master宕机,磁盘损坏情况下会丢失少量消息。
3.2.3.2 同步多Master多Slave模式
每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:
- 优点:数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;
- 缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。
即使消息成功保存到了Master的硬盘上,然后在Master将消息同步给Slave的时候,这个期间Master挂了,而且是那种硬盘修不好的那种,不要说这种情况不可能,支付宝的专用电缆都能被挖断,还有啥不可能的。哈哈哈,也是够倒霉的!
其实人家真正的架构是这样的:
四种集群方式优缺点都列出来了,很明显为了保证消息一定不会在Broker这个阶段丢失,生产环境一定要使用第四种集群方式:同步复制多Master多Slave模式。具体配置如下:
## master 节点配置
brokerRole=SYNC_MASTER
flushDiskType=SYNC_FLUSH
## slave 节点配置
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
加上前面的同步刷盘的配置,这样生产者发送消息给Broker,Master使用同步刷盘方式将消息保存到硬盘上,保存成功之后使用同步复制的方式将消息复制到Slave上,slave保存成功之后,Broker才返回给生产者ACK。
3.3 消息堆积
对于并发比较高的系统,如果下游的消费者宕机,则会导致大量的消息堆积在消息队列里,这样很容易会把服务器的硬盘撑爆,新的消息发送到消息队列,硬盘拒绝写入,这时消息很容易就会丢失。所以,部署消息队列的机器硬盘空间要比较充裕,且要有一定的监控,防止这种情况发生。
四、消费阶段
终于到了最后一个阶段,但是大家也不能大意。消费者拉取消息进行本地业务处理,业务处理完成才能提交ACK ConsumeConcurrentlyStatus.CONSUME_SUCCESS
,切不可先提交ACK再进行业务处理。如果业务处理出现异常情况,可以先返回ConsumeConcurrentlyStatus.RECONSUME_LATER
等待消息队列的下次重试。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("unique_group_name");
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setNamesrvAddr("127.0.0.1:9876;127.0.0.1:9870");
consumer.setConsumeMessageBatchMaxSize(1);
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
MessageExt messageExt = msgs.get(0);
// 进行业务处理
// 处理失败返回ConsumeConcurrentlyStatus.RECONSUME_LATER
// 处理成功返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
还有一点要注意的是,消息队列RocketMQ
默认允许每条消息最多重试16
次,每次重试的间隔时间如下:
第几次重试 | 与上次重试的间隔时间 | 第几次重试 | 与上次重试的间隔时间 |
---|---|---|---|
1 | 10 秒 | 9 | 7 分钟 |
2 | 30 秒 | 10 | 8 分钟 |
3 | 1 分钟 | 11 | 9 分钟 |
4 | 2 分钟 | 12 | 10 分钟 |
5 | 3 分钟 | 13 | 20 分钟 |
6 | 4 分钟 | 14 | 30 分钟 |
7 | 5 分钟 | 15 | 1 小时 |
8 | 6 分钟 | 16 | 2 小时 |
如果消息重试16
次后仍然失败,消息将不再投递。如果严格按照上述重试时间间隔计算,某条消息在一直消费失败的前提下,将会在接下来的4小时46分钟之内进行16
次重试,超过这个时间范围消息将不再重试投递。此时,消息队列RocketMQ
不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。在消息队列RocketMQ
中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。死信队列里的消息有效期与正常消息相同,均为3天。3天后会被自动删除。针对这种情况,为了不丢失消息我们需要处理死信队列里的消息。
有消息进入死信队列,意味着某些问题导致消费者无法正常消费消息,因此,通常需要人工介入对其进行特殊处理。排查可疑因素并解决问题后,可以在消息队列RocketMQ
控制台重新发送该消息让消费者重新消费一次,或者直接让专门的消费者订阅死信队列进行消费。
死信队列名称一般是
%DLQ% + ConsumerGroupName
组成,还有个重试队列名称一般是%RETRY% + ConsumerGroupName
组成,这些都是RocketMQ
自动创建的。
五、总结
一个消息从新生到终结经历了生产、存储、消费三个阶段,针对不同的阶段可能会出现丢失消息的地方,我们给出不同的解决方案。最终,RocketMQ
丢失消息的概率被大大的降低了。我们将视角拔高一点,你就会发现,解决不同的消息队列不丢失消息,只有消息队列的配置稍有不同,其他地方都是类似的。好像我们已经形成了解决消息不丢失的方法论了,再遇到其他的消息队列我们就不慌了。