首页 > 其他分享 >RabbitMQ高级篇之消费者可靠性 业务幂等处理

RabbitMQ高级篇之消费者可靠性 业务幂等处理

时间:2025-01-08 12:32:15浏览次数:10  
标签:可靠性 消费者 重复 高级 RabbitMQ 业务 处理 消息 ID

文章目录

消息消费中的可靠性问题

当我们处理消息时,尤其是异步消息队列系统(如RabbitMQ),常常面临网络或系统故障导致消息未确认或重复消费的问题。以下是常见的场景:

  • 消费者成功处理业务,但网络故障导致未发送确认(ack)消息,RabbitMQ认为消费者崩溃,将重新投递消息。
  • 消费者已经处理过消息,然而消息再次投递时,导致重复处理相同的消息,从而引发业务逻辑问题(如库存重复扣减、余额重复扣款等)。

业务幂等性

  • 幂等性定义:在编程中,幂等性意味着同一个操作执行一次或多次,结果一致,不会引发额外的副作用。具体而言,对于同一业务,不管是被执行一次还是多次,业务状态的影响应当一致。
  • 幂等性的重要性:如果业务不具备幂等性,重复消费消息可能会引发严重的业务问题。例如,重复扣减库存或重复扣款。



消息重复消费的原因

  • 消息确认机制中,如果消费者处理成功,但因网络问题未能及时发送ack,RabbitMQ可能会重新投递消息。消费者再次接收时,可能已经处理过该消息,导致重复消费。
  • 常见的业务问题包括:
    • 库存扣减:执行两次扣减操作,导致库存数量错误。
    • 余额扣款:重复扣款可能导致余额错误。


如何确保业务幂等性

为了避免消息重复消费带来的问题,确保业务幂等性是关键。具体做法有:

  • 业务本身具备幂等性:如查询操作或删除操作,这些操作天生幂等。查询一次与查询多次结果一致,删除操作只要删除成功,后续重复删除无影响。
  • 通过业务判断实现幂等性:对于非幂等业务,如扣款或库存扣减,我们需要加入额外的业务逻辑来避免重复处理。



解决方案

方案一:唯一消息ID

通过为每条消息生成唯一的ID,可以确保消费者识别消息是否被重复消费。具体步骤如下:

  • 发送方为每条消息生成唯一ID,并将其放入消息头部。
  • 消费者在处理消息时,将消息ID与数据库中的记录比对,如果ID已存在,则认为该消息是重复消费,跳过处理。

实现步骤

  • 在Spring AMQP中,配置MessageConverter来自动为每条消息生成唯一ID。
  • 消费者收到消息时,首先检查消息ID是否已存在于数据库中,如果存在则跳过处理。

@Bean
public MessageConverter messageConverter() {
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jjmc.setCreatteMessageIds(true);
    return jjmc;
}


方案二:业务逻辑判断

基于业务的特点,判断当前操作是否可以执行,从而避免重复执行。例如,在支付业务中,我们可以通过判断订单的状态来决定是否需要执行支付操作。

示例:在处理订单支付时,订单的状态应为“未支付”才能进行支付操作。如果订单状态已经是“已支付”或“已退款”,则认为该消息是重复的,直接跳过处理。



具体步骤

  • 查询订单状态,判断是否为“未支付”。
  • 如果是“未支付”,执行支付操作并更新订单状态;否则,跳过此操作。



异常处理与重试机制

在实际生产环境中,网络问题可能导致消息未确认或处理失败,因此需要设计有效的重试机制。Spring AMQP支持消费者确认机制和失败重试机制,常见的策略有:

  • 发送者确认机制:确保消息成功投递到RabbitMQ。
  • 消费者确认机制:确保消费者成功处理消息。
  • 失败重试策略:如果消息处理失败,可以配置重试次数与延迟,以确保消息最终被成功处理。

消息的持久化与可靠性

为了保证消息不丢失,RabbitMQ支持消息持久化。消费者确认机制(ack)和消息持久化可以保证在系统故障或重启后,消息能够正确传递给消费者。



面试问题回顾

在实际的面试中,可能会问到如何保证微服务间状态一致性(如支付服务与交易服务之间的状态一致性)。回答时可以提到:

  • 使用消息队列(如RabbitMQ)来实现异步通知,避免同步调用的性能瓶颈和耦合。
  • 为了确保消息投递的可靠性,采用生产者确认机制、消费者确认机制以及消息持久化策略。
  • 对于重复消费问题,采用业务逻辑判断或唯一消息ID来保证幂等性。


总结

解决RabbitMQ中的消息重复消费问题,关键是保证业务逻辑的幂等性。通过唯一ID或基于业务的判断逻辑,可以确保消息的重复消费不会导致不一致或错误的业务状态。同时,借助消息确认机制和重试机制,可以确保消息投递和处理的可靠性。



代码示例

配置消息ID生成
@Bean
public MessageConverter messageConverter() {
    Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
    // 自动生成唯一消息ID
    converter.setCreateMessageId(true);
    return converter;
}

消费者接收消息时获取消息ID
@RabbitListener(queues = "simpleQueue")
public void receiveMessage(Message message) {
    String messageId = message.getMessageProperties().getMessageId();
    // 判断消息是否重复消费
    if (messageAlreadyProcessed(messageId)) {
        return; // 跳过重复消息
    }
    // 处理消息
    processMessage(message);
}

订单支付状态判断
public void processPayment(Order order) {
    // 查询订单状态
    Order currentOrder = orderService.getOrderById(order.getId());

    // 判断订单状态是否为"未支付"
    if (currentOrder.getStatus() != OrderStatus.PENDING) {
        return; // 订单已支付或已退款,不需要重复支付
    }

    // 执行支付操作
    orderService.processPayment(order);
}

标签:可靠性,消费者,重复,高级,RabbitMQ,业务,处理,消息,ID
From: https://blog.csdn.net/2301_80093566/article/details/144829201

相关文章

  • RabbitMQ高级篇之延迟消息 介绍
    文章目录延迟消息的概念支付与交易服务一致性问题解决方案:延迟消息支付订单流程延迟消息的实现延迟消息的业务场景总结关键技术点业务流程总结实践建议延迟消息的概念延迟消息是指发送者在发送消息时,指定一个时间,消费者在指定时间后才能接收到消息,而不是立即接收。......
  • 强引蜘蛛:提升网站SEO效果的高级策略
    在搜索引擎优化(SEO)的世界里,“蜘蛛”是搜索引擎派出的自动程序,它们负责爬行和索引互联网上的网页。为了提升网站的SEO效果,让搜索引擎蜘蛛更有效地抓取和索引你的网站内容,实施一系列被称为“强引蜘蛛”的策略至关重要。本文将深入探讨这些高级策略,帮助网站管理员和内容创作者优......
  • Windows10环境下安装RabbitMq折腾记
            最近有个老项目需要迁移到windows10环境,用的是比较老的rabbitmq安装包,如下所示。经过一番折腾,死活服务起不来,最终果断放弃老版本启用新版本。现在把折腾过程记录下:一、安装erlang 安装完成后的目录结构,我的路径是D:\Apps\EnvSoft\ErlangOPT21。:    ......
  • 了解RabbitMQ中的Exchange:深入解析与实践应用
            在分布式系统设计中,消息队列(MessageQueue)扮演着至关重要的角色,而RabbitMQ作为开源消息代理软件的佼佼者,以其高性能、高可用性和丰富的功能特性,成为了众多开发者的首选。在RabbitMQ的核心组件中,Exchange(交换机)是一个不可或缺的部分,它负责接收生产者发送的消息,......
  • RabbitMQ(二)
    永远的"HelloWorld"一、目标二、具体操作1、创建Java工程2、发送消息①Java代码②查看效果3、接收消息①Java代码②控制台打印③查看后台管理界面一、目标生产者发送消息,消费者接收消息,用最简单的方式实现。官网说明参见下面超链接:RabbitMQtutorial-“Hello......
  • MQTT和传统消息队列(RabbitMQ,RocketMQ,Kafka)的区别
    适用场景选择哪种协议取决于具体的应用需求。如果需要适用于大量传感器和控制设备之间的通信,且网络环境不稳定或需要节省带宽资源,MQTT是一个不错的选择。而如果需要在浏览器和服务端之间建立实时双向通信,且对实时性和双向交互有较高要求,WebSocket可能更加适合。   产......
  • AppDomainManager注入是一种针对.NET应用程序的高级攻击技术,攻击者通过操控AppDomain
    什么是APPDomainManager注入?APPDomainManager注入通常涉及到利用**应用程序域(AppDomain)**来执行恶意操作,特别是在.NET环境下。要理解这个概念,我们需要了解几个关键术语:AppDomain:在.NET应用程序中,AppDomain是一个隔离的执行环境,它允许多个应用程序或应用程序的不同部分在同一进......
  • 招行面试:RocketMQ、Kafka、RabbitMQ,如何选型?
    本文原文链接文章很长,且持续更新,建议收藏起来,慢慢读!疯狂创客圈总目录博客园版为您奉上珍贵的学习资源:免费赠送:《尼恩Java面试宝典》持续更新+史上最全+面试必备2000页+面试必备+大厂必备+涨薪必备免费赠送:《尼恩技术圣经+高并发系列PDF》,帮你实现技术自由,完......
  • 高级java每日一道面试题-2025年01月05日-并发篇-什么是阻塞队列?阻塞队列的实现原理是
    如果有遗漏,评论区告诉我进行补充面试官:什么是阻塞队列?阻塞队列的实现原理是什么?如何使用阻塞队列来实现生产者-消费者模型?我回答:在Java高级面试中,阻塞队列是一个非常重要的概念,它涉及到多线程并发编程的核心知识。以下是对阻塞队列的详细解释,包括其定义、实现原......
  • 高级java每日一道面试题-2025年01月04日-并发篇-说说CyclicBarrier和CountDownLatch的
    如果有遗漏,评论区告诉我进行补充面试官:说说CyclicBarrier和CountDownLatch的区别?我回答:在Java高级面试中,CyclicBarrier和CountDownLatch是两个经常被提及的并发工具类,它们都用于实现线程间的同步,但存在显著的区别。以下是对这两个类的详细比较:一、计数器使用方式的......