RabbitMQ重试机制+死信队列
RabbitMQ重试机制
RabbitMQ的消息重试机制,就是消息消费失败后进行重试,重试机制的触发条件是消费者显式的抛出异常,这个很类似@Transactional,如果没有显式地抛出异常或者try catch起来没有手动回滚,事务是不会回滚的。
if("ACK重试机制".equals(messageBody)){
message.getMessageProperties().getHeaders().put("x-death", count+1);
throw new RuntimeException("手动出发异常,测试重试机制");
}
还有一种情况就是消息被拒绝后重新加入队列,比如basic.reject和basic.nack,并且requeue = true,但是这个是重新进入到了消息队列然后重新被消费,并且也不会触发我们重试机制的配置(如重试间隔、最大重试次数等等)。重试机制是默认开启的,但是如果没有重试机制相关的配置会导致消息一直无间隔的重试,直到消费成功,所以要使用重试机制一定要有相关配置。
死信队列
死信就是消息在特定场景下的一种表现形式,这些场景包括:
- 消息被拒绝(basic.reject / basic.nack),并且requeue = false
- 消息的 TTL 过期时
- 消息队列达到最大长度
- 达到最大重试限制
消息在这些场景中时,被称为死信。
死信队列就是用于储存死信的消息队列,在死信队列中,有且只有死信构成,不会存在其余类型的消息。死信队列也是一个普通队列,也可以被消费者消费,区别在于业务队列需要绑定在死信队列上,才能正常地把死信发送到死信队列上。
业务队列绑定死信队列
@Bean
public Queue directQueue() {
/**
* 绑定死信交换机及路由key
*/
Map<String, Object> args = new HashMap<>();
// x-dead-letter-exchange:这里声明当前业务队列绑定的死信交换机
//消息被拒绝、消息过期,或者队列达到其最大长度。消息会变成死信
args.put("x-dead-letter-exchange", DEAD_TCP_DATA_DIRECT_EXCHANGE);
// x-dead-letter-routing-key:这里声明当前业务队列的死信路由 key
args.put("x-dead-letter-routing-key", DEAD_TCP_DATA_DIRECT_ROUTING);
return QueueBuilder.durable(DIRECT_QUEUE).withArguments(args).build();
}
自动ACK + RabbitMQ重试机制
appliction.properties
# 消息重试机制: 自动ACK+MQ消息重试
spring.rabbitmq.listener.simple.acknowledge-mode=auto
spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.max-attempts=5
spring.rabbitmq.listener.simple.retry.initial-interval=5000
消费者
@RabbitListener(queues = RabbitMqConfig.USER_ADD_QUEUE, concurrency = "3")
public void userAddReceiver(String data, Message message, Channel channel) throws Exception {
UserVo vo = OBJECT_MAPPER.readValue(data, UserVo.class);
boolean success = messageHandle(vo);
// 通过业务控制是否消费成功,消费失败则抛出异常触发重试
if (!success) {
log.error("消费失败");
throw new Exception("消息消费失败");
}
}
一定要开启自动ACK,才会在到达最大重试上限后发送到死信队列,而且在重试过程中会独占当前线程,如果是单线程的消费者会导致其他消息阻塞,直至重试完成,所以可以使用@RabbitListener上的concurrency属性来控制并发数量。
手动ACK + 手动重试机制
appliction.properties
# 手动ACK
spring.rabbitmq.listener.simple.acknowledge-mode=manual
手动ACK配置了重试机制,在抛出异常的时候仍会触发重试,但是达到重试上限之后,会永远处于Unacked状态,不会进入到死信队列,必须要手动拒绝才可以进入死信队列,所以说这里不用配置重试机制而是采用手动重试的方式
消费者
@RabbitHandler
@RabbitListener(queues = DirectExchangeConfig.DIRECT_QUEUE2,concurrency = "3")
public void process3(Message message, Channel channel) throws InterruptedException, IOException {
// 重试次数
int retryCount = 0;
boolean success = false;
// 消费失败并且重试次数<=重试上限次数
while (!success && retryCount < MAX_RETRIES) {
retryCount++;
// 具体业务逻辑
String messageBody = new String(message.getBody(), "UTF-8");
success = !messageBody.equals("ACK重试机制"); //如果消息体等于ACK重试机制
// 如果失败则重试
if (!success) {
String errorTip = "第" + retryCount + "次消费失败" +
((retryCount < 3) ? "," + RETRY_INTERVAL + "s后重试" : ",进入死信队列");
log.error(errorTip);
Thread.sleep(RETRY_INTERVAL * 1000);
}
}
if (success) {
// 消费成功,确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("消费成功");
} else {
// 重试多次之后仍失败,进入死信队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
log.info("消费失败");
}
}
标签:队列,RabbitMQ,重试,ACK,死信,消息,机制
From: https://www.cnblogs.com/aeolian/p/18136388