一、消费者确认机制
消费者的可靠性是靠消费者确认机制来保证。RabbitMQ提供了消费者确认机制(consumer Acknowledgement)。当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己处理状态。回执有三种可选值:
-
ack: 成功处理消息,RabbitMQ从队列中删除该消息
-
nack: 消息处理失败,RabbitMQ需要再次投递消息
-
reject: 消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息。
SpringAMQP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式,有三种方式:
-
none: 不处理。即消息投递给消费者后立刻
ack
,消息会立刻从MQ删除。非常不安全,不建议使用消费者还没有处理完消息,RabbitMQ就直接把消息删除
-
manual: 手动模式。需要自己在业务代码中调用api,发送
ack
或者reject
,存在业务入侵,但更灵活。 -
auto: 自动模式。 SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,但业务正常执行时则自动返回
ack
。当业务出现异常时,根据异常判断返回不同结果。-
如果是业务异常,会自动返回nack
-
如果是消息处理或校验异常,自动返回reject
-
直接抛异常
server:
port: 8081
spring:
rabbitmq:
addresses: xxx.xx.x.x
port: 5672
username: xx
password: xxx
virtual-host: /xx
listener:
simple:
prefetch: 1
acknowledge-mode: auto
/**
* 监听某个队列的消息
* @param message 接收到的消息
*/
@RabbitListener(queues = BootMqConstant.SIMPLE_QUEUE)
public void consumer(String message ){
System.out.println("消费者消息"+message);
throw new RuntimeException("error");
}
当采取auto模式时,
1、如果消费者监听者没有返回,消息会处于未确认状态
2、如果消费者监听器正确返回之后,会删除消息,
3、如果消费者抛出异常,RabbitMQ会再次把消息投递给消费者(可能造成一直死循环)。
当消费者一直处理消息失败时,需要设置重试次数
reject
/**
* 监听某个队列的消息
* @param message 接收到的消息
*/
@RabbitListener(queues = BootMqConstant.SIMPLE_QUEUE)
public void consumer(String message ){
System.out.println("消费者消息"+message);
throw new MessageConversionException("消息异常");
}
当抛出MessageConversionException
,消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息。
二、消费者确认重试机制
当消费者出现异常后,消息会不断requeue(重新入队)到队列,再次重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力。
我们可以利用spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列:
重试配置
消费者重试机制,在消费者端进行配置
spring:
rabbitmq:
addresses: xxx.xx.xx.xx
port: 5672
username: xxxx
password: xxx
virtual-host: /xxx
listener:
simple:
prefetch: 1
acknowledge-mode: auto
retry:
enabled: true #开启消费者失败重试
initial-interval: 1000ms # 初始的失败等待时长为1秒
multiplier: 1 # 下次失败的等待时长倍数,下次等待时长=multiplier*last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态; false有状态。如果业务中包含事务,这里改为false
注意:在消费者端重试次数配置的是listener,在生产者端重试次数配置的是template。
消费者代码:
/**
* 监听某个队列的消息
* @param message 接收到的消息
*/
@RabbitListener(queues = BootMqConstant.SIMPLE_QUEUE)
public void consumer(String message ){
System.out.println("消费者消息:"+message);
throw new RuntimeException("消息异常");
}
实验效果
RabbitMQ尝试3次投递消息给消费者,重试3次之后不再重试,但是此时RabbtiMQ把消息删除。
失败消息处理策略
在开启重启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三个实现:
-
RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。这种是默认方式
-
ImmediateRequeueMessageRecoverer: 重试耗尽后,返回nack,消息重新入队。
-
RepublishMessageRecoverer: 重试耗尽后,将失败消息投递到指定的交换机。
消息失败建议采用RepublishMessageRecoverer,这样可以把消息路由到一个专门处理失败情况的队列,然后把失败情况记录下来,由人工干预处理。
定义一个专门用来接收错误消息的队列和交换机,并配置 RepublishMessageRecoverer
配置类如下:
/**
* 当前配置类需要在开启 rabbitMQ消费者失败重试的情况才需要起作用。
*/
@Configuration
@ConditionalOnProperty(prefix ="spring.rabbitmq" ,name ="listener.simple.retry.enabled" ,havingValue = "true")
public class ErrorConfig {
@Bean
public DirectExchange errorExchange(){
return new DirectExchange("error.exchange");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue");
}
@Bean
public Binding errorBinding(){
return BindingBuilder.bind(errorQueue()).to(errorExchange()).with("error");
}
@Bean
public MessageRecoverer messageConverter(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate,"error.queue","error");
}
}
ErrorConfig
只有在消费者 开启 消息投递失败的情况下才会起作用,所以加上一个ConditionalOnProperty
注解。
ErrorConfig
只有在消费者 开启 消息投递失败的情况下才会起作用,所以加上一个ConditionalOnProperty
注解。
标签:可靠性,消费者,RabbitMQ,重试,失败,消息,message From: https://www.cnblogs.com/cplinux/p/17978066