引言
在我们实际项目中需要对消息消费的高可用
做保证,首先需要做到的就是消息的重试机制,设想一下以下场景:
当库存服务处理上游服务发过来的订单消息时,此时服务宕机了,或者网络不可用了,那我这个消息是应该算消费成功还是消费失败呢?
显然,我们肯定要对处理不成功的消息进行重试,那么如果消费不成功的话,就要无限次数的重试吗?
答案是否,因为当代码逻辑有Bug或者上游的消息内容是错误的时候,无论再重试多少次也不会成功,此时应该将消息抛弃掉。
所以消费的最佳实践
是:消息处理失败需要重试,且重试需要指定最大次数以及重试的时间间隔,当消息重试次数耗尽时,对消息做持久化处理。
那么该如何实现这个功能呢?由此引出今天的主题:针对Spring-RabbitMq,如何实现重试机制
。
功能实现
这一功能其实并不需要我们自己实现,Spring-RabbitMq已经为我们做好了,我们只需要进行一些配置。
重试机制的配置同样依赖RabbitListenerContainerFactory
,对这个不熟悉的可以先阅读上一篇博客
【Spring-RabbitMq配置一个消费端接入多个vhost】。
配置MethodInterceptor
spring-rabbitmq包下的org.springframework.amqp.rabbit.config.RetryInterceptorBuilder
,该类是一个建造者,通过他可以配置重试机制,在我的项目中指定了最多重试5次,并且重试间隔是:重试初始间隔8秒.2倍递增,不超过60秒。
/**
* 重试机制
* 重试初始间隔8秒.2倍递增,不超过60秒
* 最多重试5次
*
* @return 重试模板
*/
public MethodInterceptor createInterceptor() {
return RetryInterceptorBuilder.stateless()
.maxAttempts(8)
.backOffOptions(8000, 2, 180 * 1000)
// 指定消息重试达到最大次数后的回调
.recoverer(messageRecoverer())
.build();
// RejectAndDontRequeueRecoverer: 重试最大次数后消息会一直处于 UnAcked 状态,当断开连接时,消息会转为 Ready,继续等待被消费
}
配置重试次数耗尽后的策略MessageRecoverer
让我们看org.springframework.amqp.rabbit.retry.MessageRecoverer
,这是一个函数式接口,在消息重试次数耗尽后,会我们执行这个方法。
package org.springframework.amqp.rabbit.retry;
import org.springframework.amqp.core.Message;
/**
* @author Dave Syer
* @author Gary Russell
*
*/
@FunctionalInterface
public interface MessageRecoverer {
/**
* Callback for message that was consumed but failed all retry attempts.
*
* @param message the message to recover
* @param cause the cause of the error
*/
void recover(Message message, Throwable cause);
}
在我的项目中,次数耗尽后,我们会把消息内容记录到日志中,并发出企业微信告警。
这里主要是结合具体业务,把消息转发到死信交换机
或者持久化到数据库
都是可以的。
/**
* 消息达到最大重试次数后的回调
*
* @return 已消费但所有重试失败的消息的回调。
*/
private MessageRecoverer messageRecoverer() {
return (message, cause) -> {
String messageContent = new String(message.getBody(), StandardCharsets.UTF_8);
log.error("消息达到最大重试次数,从队列删除,消息:{},message:{}", new String(message.getBody(), StandardCharsets.UTF_8), JSON.toJSONString(message));
log.error("消息达到最大重试次数,异常",cause);
try {
String errorMsg = Optional.ofNullable(cause.getMessage()).map(msg -> msg.length() > 100 ? msg.substring(0, 100) : msg).orElse("");
String notifyContent = weChatWarnMessageUtil.buildMqWarnMessage(StrUtil.format(MESSAGE_RECOVER_MSG, message.getMessageProperties().getConsumerQueue(),
messageContent, errorMsg),
"MQ消费异常");
weChatWarnMessageUtil.send(notifyContent, WeChatNotifyContentType.MARKDOWM);
} catch (Exception e) {
log.info("企业微信告警失败", e);
}
throw new AmqpRejectAndDontRequeueException(null, true, cause);
};
}
最后,像上篇内容说的一样,配置好RabbitListenerContainerFactory,再与队列绑定即可。