首页 > 其他分享 >RabbitMQ - 消息确认

RabbitMQ - 消息确认

时间:2024-09-21 21:52:18浏览次数:3  
标签:消费者 确认 RabbitMQ 消息 import message public

 1. 消息确认机制

生产者发送消息之后, 到达消费端之后, 可能会有以下情况:

        (1)消息处理成功

        (2)消息处理失败

RabbitMQ向消费者发送消息之后, 就会把这条消息删掉, 那么第二种情况, 就会造成消息丢失。

那么如何确保消费端已经成功接收了, 并正确处理了呢?

为了保证消息从队列可靠地到达消费者,RabbitMQ提供了消息确认机制(message acknowledgement)。

消费者在订阅队列时,可以指定autoAck参数,根据这个参数设置,消息确认机制分为以下两种:
  • 自动确认:当autoAck等于true时,RabbitMQ  会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息,自动确认模式适合对于消息可靠性要求不高的场景。
  • 手动确认:当autoAck等于false时,RabbitMO会等待消费者显式地调用Basic.Ack命令,回复确认信号后才从内存(或者磁盘)中移除消息,这种模式适合对消息可靠性要求比较高的场景。

当autoAck参数置为false,对于RabbitMO服务端而言,队列中的消息分成了两个部分:

一是等待投递给消费者的消息。

二是已经投递给消费者,但是还没有收到消费者确认信号的消息。

如果RabbitM一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接。

则RabbitM会安排该消息重新进入队列,等待投递给下一个消费者,当然也有可能还是原来的那个消费。

从RabbitMQ的Web管理平台上, 也可以看到当前队列中Ready状态和Unacked状态的

Ready: 等待投递给消费者的消息数。

Unacked: 已经投递给消费者, 但是未收到消费者确认信号的消息。

2. 手动确认消息

消费者在收到消息之后,可以选择确认,也可以选择直接拒绝或者跳过,RabbitMQ也提供了不同的确认应答的方式,消费者客户端可以调用与其对应的channel的相关方法,共有以下三种:

(1)肯定确认 

Channel.basicAck(long deliveryTag, boolean multiple)

RabbitMo已知道该消息并且成功的处理消息.可以将其丢弃了

参数说明:

deliveryTag:消息的唯一标识,它是一个单调递增的64位的长整型值。

multiple:是否批量确认。

deliveryTag 是RabbitMQ中消息确认机制的一个重要组成部分,它确保了消息传递的可靠性和顺序性。

(2)否定确认

Channel.basicReject(long deliveryTag, boolean requeue)

参数说明:

requeue:表示拒绝后,这条消息如何处理。

如果requeue参数设置为true,则RabbitMQ会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者。

如果requeue参数设置为false,则RabbitMQ会把消息从队列中移除,而不会把它发送给新的消者。

Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)

Basic.Reject命令一次只能拒绝一条消息,如果想要批量拒绝消息,则可以使用Basic.Nack这个命令。

3. 代码示例

Spring-AMQP 对消息确认机制提供了三种策略:
public enum AcknowledgeMode {
    NONE,
    MANUAL,
    AUTO;
}

AcknowledgeMode.NONE

这种模式下,消息一旦投递给消费者,不管消费者是否成功处理了消息,RabbitMO就会自动确认消息,从RabbitMQ队列中移除消息。如果消费者处理消息失败,消息可能会丢失。

AcknowledgeMode.AUTO(默认)

这种模式下,消费者在消息处理成功时会自动确认消息,但如果处理过程中抛出了异常,则不会确认消息。

AcknowledgeMode.MANUAL

手动确认模式下,消费者必须在成功处理消息后显式调用basicAck方法来确认消息。

如果消息未被确认,RabbitMO会认为消息尚未被成功处理,并且会在消费者可用时重新投递该消息。

这种模式提高了消息处理的可靠性,因为即使消费者处理消息后失败,消息也不会丢失,而是可以被重新处理。

(1) AcknowledgeMode.NONE

配置确认机制

spring:
  rabbitmq:
    # 消息监听配置
    listener:
      simple:
        acknowledge-mode: none

发送消息

交换机,队列配置

@Configuration
public class RabbitMQConfig {

    @Bean("ackQueue")
    public Queue ackQueue() {
        return QueueBuilder.durable(Constant.ACK_QUEUE).build();
    }

    @Bean("ackExchange")
    public Exchange ackExchange() {
        return ExchangeBuilder.directExchange(Constant.ACK_EXCHANGE).durable(true).build();
    }

    /**
     * 绑定ack队列和ack交换机
     *
     * @param ackExchange
     * @param ackQueue
     * @return
     */
    @Bean("ackBinding")
    public Binding ackBinding(@Qualifier("ackExchange") Exchange ackExchange, @Qualifier("ackQueue") Queue ackQueue) {
        return BindingBuilder.bind(ackQueue).to(ackExchange).with("ack").noargs();
    }

}

通过接口发送消息

package com.example.orderservice.controller;

import com.example.orderservice.Constant;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class AckController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/ack")
    public String ack() {
        rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE, "ack", "ack test...");
        return "发送成功";
    }

}

消费端逻辑

package com.example.materialflowservice.listener;

import com.example.orderservice.Constant;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;

@Slf4j
@Component
public class AckListener {

    /**
     * 监听消息
     *
     * @param message
     */
    @RabbitListener(queues = Constant.ACK_QUEUE)
    public void listener(Message message, Channel channel) {
        log.info("接收到消息: {}, deliverTag: {}", new String(message.getBody(), StandardCharsets.UTF_8), message.getMessageProperties().getDeliveryTag());

        //int num = 3 / 0;

        log.info("业务处理完成");
    }
}

正常情况下是能接收到消息的

将 int num = 3 / 0 的注释放开

可以看到报异常了

但消息没有保留,依然被处理掉了。

这种情况就可能会导致我们的消息丢失。

(2)AcknowledgeMode.AUTO(默认)

根据deliverTag可以看出,消息是一直在不停重试的

并且还是保留在队列当中的,并没有丢弃

(3)AcknowledgeMode.MANUAL

消费者逻辑

package com.example.materialflowservice.listener;

import com.example.orderservice.Constant;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

@Slf4j
@Component
public class AckListener {

    /**
     * 监听消息
     *
     * @param message
     */
    @RabbitListener(queues = Constant.ACK_QUEUE)
    public void listener(Message message, Channel channel) throws Exception {
        try {
            log.info("接收到消息: {}, deliverTag: {}", new String(message.getBody(), StandardCharsets.UTF_8), message.getMessageProperties().getDeliveryTag());

            //int num = 3 / 0;

            log.info("业务处理完成");

            // 确认消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            // 拒绝消息,重新入队
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, true);
        }
    }
}

此时是正常情况,消息被正常签收。 

 将 int num = 3 / 0 的注释放开之后,再次运行

可以看出一直在进行重新入队操作。

如果我们将确认消息注释掉:

// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

可以看到Unacked变为1了,未确认状态。

当我们把basicNack第三个参数设为false:

// 拒绝消息,禁止重新入队
channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, false);

可以看到消息被丢弃了。

以上就是Spring-AMQP 对消息确认机制提供了三种策略。

标签:消费者,确认,RabbitMQ,消息,import,message,public
From: https://blog.csdn.net/weixin_73916358/article/details/142422592

相关文章

  • 深入剖析RocketMQ消息消费原理
    本文参考转载至《RocketMQ技术内幕第2版》一.消息消费概述消息消费以组的模式开展,一个消费组可以包含多个消费者,每个消费组可以订阅多个主题,消费组之间有集群模式和广播模式两种消费模式。集群模式是当前主题下的同一条消息只允许被其中一个消费者消费。广播模式是当前主题......
  • 消息队列:如何确保消息不会丢失?
    引言对业务系统来说,丢消息意味着数据丢失,这是无法接受的。主流的消息队列产品都提供了非常完善的消息可靠性保证机制,完全可以做到在消息传递过程中,即使发生网络中断或者硬件故障,也能确保消息的可靠传递,不丢消息。绝大部分丢消息的原因都是由于开发者不熟悉消息队列,没有正......
  • 分布式事务一致性:本地消息表设计与实践
    概念本地消息表是一种常见的解决分布式事务问题的方法。其核心思想是将分布式事务拆分成本地事务来处理,通过消息队列来保证各个本地事务的最终一致性。实现步骤创建本地消息表:在数据库中创建一个本地消息表,用于存储待发送的消息以及消息的发送状态和相关信息。表结构通......
  • kafka 消息位移提交几种方式:消息重复消息、消息丢失的关键
    消费位移Kafka中的位移(offset)是用来记录消息在分区中的位置的标志,简单说就是记录消费者的消费进度,每次消息消费后需要更新消费进度,也就是位移提交由此可见一旦位移提交发生异常,会导致消费进度不正确,就必然发生消息丢失或者重复消费消息位移存储内部主题__consumer_off......
  • 观察者模式:如何发送消息变化的通知?
    观察者模式是一种非常流行的设计模式,也常被叫作订阅-发布模式。观察者模式在现代的软件开发中应用非常广泛,比如,商品系统、物流系统、监控系统、运营数据分析系统等。现在我们常说的基于事件驱动的架构,其实也是观察者模式的一种最佳实践。当我们观察某一个对象时,对象传递出的每一个......
  • 备忘录模式:如何在聊天会话中记录历史消息?
    相较于其他的设计模式,备忘录模式不算太常用,但好在这个模式理解、掌握起来并不难,代码实现也比较简单,应用场景就更是比较明确和有限,一般应用于编辑器或会话上下文中防丢失、撤销、恢复等场景中。下面就一起来了解一下吧。一、模式原理分析备忘录模式的原始定义是:捕获并外部化对象的......
  • RabbitMQ教程
    RabbitMQ1.初识MQ1.1.同步和异步通讯服务间通讯有同步和异步两种方式:同步通讯:就像打电话,需要实时响应。异步通讯:就像发邮件,不需要马上回复。两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。发送邮件可以同时与多个人收发邮件,但是往往响应会有......
  • JavaScript 中的交互:“警报”、“提示”和“确认”
    ****欢迎回到我们的javascript世界之旅!在这篇博文中,我们将探讨在javascript中与用户交互的三种基本方法:警报、提示和确认。这些方法分别允许您显示消息、收集用户输入和确认操作。让我们深入研究每种方法,看看它们是如何工作的。1.?警报alert方法用于显示一个简单的对话......
  • Ajv-ts 有什么新消息?
     零食故事:假设您有一篮子零食:constsnacks=['apple','banana','chocolate'];现在,您想与您的朋友分享这些零食。但你不是把整个篮子都给他们,而是把每件零食都拿出来,一一递给他们:console.log(...snacks);//output:applebananachocolate...(摊开)操作符就像是把......
  • RabbitMQ进阶--保证消息的可靠性
    1.使用rabbitmq可能存在的问题在我们使用消息队列时,是否考虑过一个问题,如果在发送消息的时候存在网络波动,会引发哪些问题?无法正确的发送和接收消息重复多次的消费同一条消息举个例子,我们在购物的时候,已经支付完成,但是消息没有正确的被消费,前端发送请求查询支付状态时,肯定......