首页 > 其他分享 >如何保证RabbitMQ消息不重复消费

如何保证RabbitMQ消息不重复消费

时间:2023-04-15 10:33:27浏览次数:38  
标签:消费 处理 重复 确认 RabbitMQ 队列 保证 消息

如何保证RabbitMQ消息不重复消费

消息中间件是无法保证消息重复消费,所以只能从业务上来保证消费不重复消费,在消费端保证接口的幂等性

什么是幂等性

幂等性原本是数学上的概念,用在接口上就可以理解为:同一个接口,多次发出同一个请求,必须保证操作只执行一次。
调用接口发生异常并且重复尝试时,总是会造成系统所无法承受的损失,所以必须阻止这种现象的发生。
比如下面这些情况,如果没有实现接口幂等性会有很严重的后果:
支付接口,重复支付会导致多次扣钱
订单接口,同一个订单可能会多次创建。

接口幂等性,一般是用在订单处理或者消费处理的场景的。使用接口幂等性是为了防止同一请求多次处理的状况。

重复消费的原因

  1. 消费方的业务项目从MQ队列中接收数据;
  2. 接着处理业务;
  3. 业务处理成功后,消费方项目给MQ返回ack进行手动确认;
  4. 返回回调执行结果的过程中,因为网络抖动等原因,回调数据时,MQ没有返回成功。所以MQ队列中的数据会再次发给业务项目,造成重复消费。

如何解决

RabbitMQ提供了消息确认机制和幂等性处理来保证不重复消费:

  • 消息确认机制:消费者在消费消息后向RabbitMQ服务器发送确认消息(ACK),RabbitMQ收到确认消息后才会将该消息从队列中删除,如果由于某种原因消费者未能发送确认消息,RabbitMQ会认为该消息未被正常处理,然后重新将该消息发送给其他消费者或者当前消费者进行重试。
  • 幂等性处理:通过在系统设计中引入唯一标识符,使得同一个消息可以被多次接收并处理,但只有第一次处理会对系统状态产生影响。例如,消费端可以在处理完一个消息后,在数据库中记录下该消息的ID,以便之后查询是否已经处理过该消息,如果已经处理过则直接忽略该消息。

消息确认机制

RabbitMQ的消息确认机制基于消费者向队列发送确认消息来告诉队列已经成功接收并处理了消息。当确认消息被队列接收后,队列就会将该消息从队列中删除,确保下次不会再次投递给同一个消费者。

如果消费者没有确认消息,RabbitMQ会将该消息重新投递给其他可用的消费者或者重新投递给当前消费者。这种方式可以确保消息不会丢失,并且能够在消费者异常退出或宕机时重新分发消息。

因此,通过按照上述机制进行消息确认,RabbitMQ可以避免消息重复消费的情况。

实现:

1.在这里,我们选择手动确认模式,使用acknowledgeMode参数指定消息确认模式为手动:

@Component
@RabbitListener(queues = RabbitMqConfig.ORDER_QUEUE, ackMode = "MANUAL")
public class OrderConsumer {

    @Autowired
    private OrderService orderService;

    @RabbitHandler
    public void handleMessage(OrderMessage orderMessage, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        try {
            orderService.handleOrder(orderMessage);
            // 手动确认消息已被消费
            channel.basicAck(tag, false);
        } catch (Exception e) {
            // 出现异常,拒绝消息并将其返回到队列中重新处理
            channel.basicReject(tag, true);
            e.printStackTrace();
        }
    }

}

在上面的代码中,我们使用了RabbitMQ的@RabbitListener注解来声明消息消费者,并使用@RabbitHandler注解来指定消息处理方法。在handleMessage方法中,我们处理了订单消息,并在处理成功后手动确认该消息已被消费,使用basicAck方法实现确认。如果在处理过程中出现异常,我们将拒绝该消息并将其返回到队列中,使用basicReject方法实现拒绝操作。

2.最后,我们需要编写消息生产者代码。在这里,我们构造一个订单消息对象,使用RabbitTemplate将消息发送到指定的交换机和队列:

@Service
public class OrderService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void placeOrder(Order order) {
        // 构造订单消息
        OrderMessage orderMessage = new OrderMessage();
        orderMessage.setOrderId(order.getId());
        orderMessage.setOrderNo(order.getOrderNo());
        orderMessage.setOrderAmount(order.getOrderAmount());
        // 发送订单消息
        rabbitTemplate.convertAndSend(RabbitMqConfig.ORDER_EXCHANGE, RabbitMqConfig.ORDER_ROUTING_KEY, orderMessage);
    }

    public void handleOrder(OrderMessage orderMessage) {
        // 订单处理逻辑
    }

}

幂等性处理

给每一个消息携带一个全局唯一的id

image-20230414162745786

可以在消息生产者服务中设置一个消息id,然后在消费者监听到消息后获取该id,再去查询这个id是否存在。如果不存在,则正常消费消息,并将消息的id存入数据库或者Redis中。如果存在,则丢弃此消息。例如:

// 消息生产者服务
public void sendMessage() {
    String messageId = UUID.randomUUID().toString();
    // 将消息id和消息体一起发送
    rabbitTemplate.convertAndSend("exchange", "routingKey", message, new CorrelationData(messageId));
}

// 消息消费者服务
@RabbitListener(queues = "queue")
public void handleMessage(Message message, Channel channel) throws Exception {
    String messageId = message.getMessageProperties().getCorrelationId();
    // 先去查询这个id是否存在
    if (!redisTemplate.opsForValue().setIfAbsent(messageId, "")) {
        // 如果存在则丢弃此消息
        return;
    }
    // 正常消费消息
    // ...
}

标签:消费,处理,重复,确认,RabbitMQ,队列,保证,消息
From: https://www.cnblogs.com/galo/p/17320628.html

相关文章

  • oracle查找重复数据和删除重复数据sql
    查找重复数据sql(思路就是根据需要判断重复数据的字段分组,根据having大于2的就是重复的)--查找某表重复数据selectBUSS_TYPE_ID,BUSS_TYPE,TRADE_VARIETY_ID,TRADE_VARIETY,TRADE_SUBVARIETY_ID,TRADE_SUBVARIETY,......
  • Struts2_防表单重复提交
    一、造成重复提交主要的两个原因:  在平时的开发过程中,经常可以遇到表单重复提交的问题,如做一个注册页面,如果表单重复提交,那么一个用户就会注册多次,重复提交主要由于两种原因。  1、一是,服务器处理时间久。当用户在表单中填完信息,点击“提交”按钮后,由于服务器反应时间过长......
  • RocketMQ是是如何管理消费进度的?又是如何保证消息成功消费的?
    RocketMQ消费者保障消息确认机制consumer的每个实例是靠队列分配来决定如何消费消息的。那么消费进度具体是如何管理的,又是如何保证消息成功消费的?(RocketMQ有保证消息肯定消费成功的特性,失败则重试)什么是ACK消息确认机制在实际使用RocketMQ的时候我们并不能保证每次发......
  • RocketMQ是是如何管理消费进度的?又是如何保证消息成功消费的?
    RocketMQ消费者保障消息确认机制consumer的每个实例是靠队列分配来决定如何消费消息的。那么消费进度具体是如何管理的,又是如何保证消息成功消费的?(RocketMQ有保证消息肯定消费成功的特性,失败则重试)什么是ACK消息确认机制在实际使用RocketMQ的时候我们并不能保证每次发......
  • 4.【RabbitMQ实战】- 发布确认
    生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息......
  • 3.【RabbitMQ实战】- 工作队列(Work Queue)
    工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。轮询分发消息......
  • 10.【RabbitMQ实战】- RabbitMQ集群
    搭建集群镜像队列默认情况下node1创建的队列不会同步到node2上此时如果已经发送到了一条消息到node1上的队列,该队列并不会备份到node2上此时node1宕机并重启,该消息会丢失,配置对应策略可保证集群上队列备份并且消息不丢失负载均衡生产者给node1发消息,此时node1宕机,但是......
  • 9.【RabbitMQ实战】- RabbitMQ其他知识点
    幂等性MQ消费者的幂等性的解决一般使用全局ID或者写个唯一标识比如时间戳或者UUID或者订单消费者消费MQ中的消息也可利用MQ的该id来判断,或者可按自己的规则生成一个全局唯一id,每次消费消息时用该id先判断该消息是否已消费过在海量订单生成的业务高峰期,生产端有可能就会重复发......
  • 哈希表:剑指 Offer 48. 最长不含重复字符的子字符串
    题目描述:请从字符串中找出一个最长的不包含重复字符的子字符串,计算该最长子字符串的长度。   提示:s.length<=40000 思路:双指针(滑动窗口)+哈希表:   复杂度分析:时间复杂度O(N):其中N为字符串长度,动态规划需遍历计算dp列表。空间复杂度O(1......
  • 哈希表:剑指 Offer 03. 数组中重复的数字
    题目描述:找出数组中重复的数字。在一个长度为n的数组nums里的所有数字都在0~n-1的范围内。数组中某些数字是重复的,但不知道有几个数字重复了,也不知道每个数字重复了几次。请找出数组中任意一个重复的数字。   限制:2<=n<=100000 哈希表/Set利用数据......