首页 > 其他分享 >MQ技术方案

MQ技术方案

时间:2024-09-12 11:50:41浏览次数:19  
标签:方案 处理 技术 发送 重试 失败 消息 MQ ID

1. 保证MQ消息的可靠性

保证MQ消息的可靠性分两个方面:保证生产消息的可靠性、保证消费消息的可靠性。

1.1 保证生产消息的可靠性

1.1.1 重试机制

首先发送消息的方法如果执行失败会进行重试,这里我们在发送消息的工具类中使用spring提供的@Retryable注解,实现发送失败重试机制,通过注解的backoff属性指定重试等待策略,通过Recover注解指定失败回调方法,失败重试后仍然失败的会走失败回调方法,在回调方法中将失败消息写入一个失效消息表由定时任务进行补偿(重新发送),如果系统无法补偿成功则由人工进行处理,单独开发人工处理失败消息的功能模块。

通过@Retryable注解可以实现消息发送失败时的重试机制,避免因为网络抖动或临时故障导致的发送失败:

  • backoff属性用于定义重试间隔。
  • maxAttempts属性用于定义最大重试次数。
  • Recover注解用于指定重试失败后的回调逻辑,将未成功发送的消息记录到失败表中。
@Retryable(value = Exception.class, maxAttempts = 3, backoff = @Backoff(delay = 2000))
public void sendMessage(String message) {
    // 发送消息逻辑
}

@Recover
public void recover(Exception e, String message) {
    // 发送失败的回调逻辑,如记录失败消息
}

1.1.2 生产者确认机制(Producer Confirm)

RabbitMQ提供了生产者确认机制,通过Publisher Confirm来确保消息从生产者成功发送到Broker,并最终到达队列。这一机制通过ACK(Acknowledgment)和NACK(Negative Acknowledgment)来确认消息的处理结果。

  • ACK: 表示消息已成功到达Broker。
  • NACK: 表示消息没有成功到达,可以根据NACK结果进行处理。

具体的做法:

  1. 消息唯一标识:每个消息发送时指定一个唯一ID,用于标识该消息的状态。
  2. 回调机制:通过设置回调方法(ConfirmCallback),可以监控ACK/NACK的回执状态。
    • 如果返回ACK,则表示消息已成功投递。
    • 如果返回NACK,生产者可根据NACK结果进行重发或记录到失败消息表,交给定时任务处理。
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
    if (ack) {
        // 消息成功投递
        System.out.println("消息成功发送到Broker,ID:" + correlationData.getId());
    } else {
        // 消息投递失败
        System.out.println("消息发送失败,原因:" + cause);
        // 记录到失败表
    }
});

1.1.3 ReturnCallback回调机制

在某些情况下,消息可能会成功到达Broker,但未能路由到队列。这时会触发ReturnCallback回调方法,通过它可以接收到未成功投递的消息,并进行相应的处理或补偿。

rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
    System.out.println("消息无法路由,交换机:" + exchange + ",路由键:" + routingKey);
    // 补偿逻辑,如记录消息
});

1.1.4 定时任务

上述三种方法,将失败的消息写入到一个失败消息记录表,然后由定时任务进行补偿(重新发送),如果系统无法补偿成功则由人工进行处理,单独开发人工处理失败消息的功能模块。

1.2 保证消费消息可靠性

为了防止消息在Broker中丢失,可以将消息设置为持久化,具体需要设置交换机和队列支持持久化,发送消息设置deliveryMode=2。这样即使RabbitMQ重启,也能从磁盘中恢复未处理的消息。

1.2.1 重试机制

消费者在处理消息时,如果出现异常,可以使用重试机制来进行故障恢复。重试机制通常会和死信队列结合使用:

  • 当重试次数超过一定阈值,消息将被投递到失败消息队列,由定时任务或者人工处理。
  • 通过设置队列的x-dead-letter-exchangex-dead-letter-routing-key,可以指定消息在重试失败后进入失败消息队列。

1.2.2 消费确认机制(Consumer Acknowledgment)

RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理完成消息,RabbitMQ收到ACK后删除消息。

RabbitMQ提供了三种消费确认模式:

  1. 自动ACK(默认): RabbitMQ自动确认消息,不管消费者是否成功处理了消息,这种模式下容易丢失消息,不建议使用。
  2. 手动ACK: 需要消费者手动确认消息是否被处理成功,如果成功处理则发送ACK回执,RabbitMQ会删除该消息;如果处理失败,可以进行重试或将消息放入死信队列。
  3. 关闭ACK: 消费者不发送任何ACK,不推荐使用。

一般,我们都是使用默认的auto即可。

spring:
    rabbitmq:
       ....
        listener:
            simple:
                acknowledge-mode: auto #,出现异常时返回nack,消息回滚到mq;没有异常,返回ack
                retry:
                    enabled: true # 开启消费者失败重试
                    initial-interval: 1000 # 初识的失败等待时长为1秒
                    multiplier: 10 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
                    max-attempts: 3 # 最大重试次数
                    stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

1.2.3 定时任务

重试机制达到上限后,将消息投递到失败消息队列,失败消息队列由定时任务程序定时处理,如果系统无法处理成功则由人工进行处理。

1.3 无法百分百保证MQ的消息可靠性

尽管通过生产者确认机制、消费者ACK机制、持久化、消息重试和补偿等措施可以极大提高MQ消息的可靠性,但由于不可控因素(如网络断开、硬件故障、进程异常终止等),无法百分之百保证消息的可靠性。因此,企业在设计使用MQ时,需要充分考虑异常情况的存在,并通过补偿机制、消息幂等性设计、消息重试等手段进一步降低消息丢失的风险。

2. 保证消息幂等性

2.1 幂等性是什么

幂等性(Idempotency)是指一个操作可以重复执行多次,但无论执行多少次,结果都是相同的,不会有副作用。对于分布式系统中使用消息队列的场景,幂等性通常用于保证消息不会因重复消费而导致数据错误。

举个例子:

假设你在电商系统中处理“用户下单”的消息,如果这个操作不是幂等的,用户下单的消息被重复消费时,可能会导致系统生成多笔订单、重复扣款。而如果“用户下单”这个操作是幂等的,不管消息被消费几次,最终都只会有一笔订单生成,避免重复处理。

2.2 为什么会有重复消费

MQ(消息队列)中,可能会发生消费者重复消费的情况。导致重复消费的原因有很多:

  1. 消费者处理失败:消费者处理消息时出现异常或超时,导致MQ认为消息未被成功处理,于是重新投递。
  2. 网络抖动:消费者收到消息后未能及时发送ACK确认,导致消息被再次投递。
  3. 业务逻辑错误:系统没有考虑幂等性设计,在同一条消息重复消费时导致数据出错。

2.3 如何保证MQ消息的幂等性

2.3.1 使用数据库的唯一约束控制幂等性

数据库的唯一约束可以帮助确保一条消息只被处理一次。例如,在插入一条订单数据时,可以为订单ID设置唯一约束。这样即使同一条消息被重复消费,数据库的唯一约束也会保证只有第一次插入成功,后续的重复插入会失败。

  • 适用场景: 比如订单系统,确保同一个订单号不会被重复处理。
CREATE TABLE orders (
    order_id VARCHAR(255) PRIMARY KEY, -- 唯一约束,订单ID
    user_id VARCHAR(255),
    product_id VARCHAR(255),
    quantity INT,
    status VARCHAR(50)
);
try {
    // 尝试插入订单
    insertOrder(order);
} catch (DuplicateKeyException e) {
    // 捕获唯一键冲突异常,表明该订单已经存在,无需重复处理
    System.out.println("重复订单,不再处理");
}

2.3.2 使用Token机制

Token机制是另一种确保幂等性的常见手段。可以在发送消息时为每条消息指定一个唯一的标识符(Token或消息ID),然后在消费时通过Redis等缓存系统去判断该消息是否已经被消费过。

  1. 消息发送时生成唯一Token: 每条消息在生产时会生成一个唯一ID,比如UUID。

  2. 消息ID记录到Redis: 生产者在发送消息时,将该消息的ID存储到Redis,作为消息已经处理的标记。

  3. 消费者消费前先查询Redis: 当消费者接收到消息时,首先在Redis中查找该消息ID,如果发现ID已经存在,说明该消息已经被处理过,则跳过该次处理。

  4. 消费完成后,记录消息ID到Redis: 消费者成功处理完消息后,将该消息ID记录到Redis,表示该消息已成功消费。

String messageId = message.getId(); // 消息唯一ID

// 检查消息是否已处理
if (redisTemplate.hasKey(messageId)) {
    // 如果存在,表明消息已处理过,跳过
    return;
}

// 处理消息
processMessage(message);

// 消费完成后,将消息ID存储到Redis,设置过期时间
redisTemplate.opsForValue().set(messageId, "consumed", 10, TimeUnit.MINUTES);

优点: 这种方式适合需要临时性幂等检查的场景,过期时间可以根据业务场景进行设置。

标签:方案,处理,技术,发送,重试,失败,消息,MQ,ID
From: https://blog.csdn.net/qq_46637011/article/details/142170167

相关文章

  • SD入门教程一:Stable Diffusion 基础(技术篇)
    前言在开篇的时候就大致讲了SD和VAE,那么今天我们具象化地再来讲讲StableDiffusion(稳定扩散)。严格说来它是一个由几个组件(模型)构成的系统,而非单独的一个模型。我以最常见的文生图为例,解释下StableDiffusion的整体架构和工作原理。本次教程将使用AI绘画工具StableD......
  • 技术揭秘 DWS 实时数仓解决方案,如何深度融合 Flink 简化数据链路
    摘要:DWS实时数仓解决方案支持数仓模型的分层和增量加工,能够实现数据的实时入库、出库和查询,确保数据的新鲜度。一、实时数仓介绍实时数仓是一种现代化的数据仓库系统,其核心优势在于能够处理和分析实时数据。随着信息技术和数据科学的飞速发展,企业对实时数据分析和决策支持的需求愈......
  • ssh knock 技术解析
    SSH端口敲门技术是一种网络安全措施,用于防止未经授权的访问。通过端口敲门,可以动态地在防火墙上打开指定端口(如SSH端口),仅允许符合特定敲门序列的用户访问。此技术通常用于隐藏重要服务(例如SSH),以防止暴力破解或其他未经授权的攻击 工作原理端口敲门的工作原理基于以下步骤:闭合......
  • 腾讯云升级多个云存储解决方案 以智能化存储助力企业增长
    9月6日,在腾讯数字生态大会腾讯云储存专场上,腾讯云升级多个存储解决方案:DataPlatform数据平台解决方案重磅发布,数据加速器GooseFS、数据处理平台数据万象、日志服务CLS、高性能并行文件存储CFSTurbo等多产品全新升级,能够为企业在AI时代提供更安全、高效的数据基础服务。......
  • 专业解读:USB转串口技术的应用与优势
    在计算机硬件和数据通信领域,USB转串口技术已经成为连接外部设备与计算机的一种常见且实用的方法。这项技术通过将通用串行总线(USB)接口转换为串行端口,极大地扩展了计算机的连接能力和应用范围。本文将对USB转串口技术的应用与优势进行专业解读。应用场景数据采集:在工业自动化和科研......
  • 替换NAS,超省成本的企业网盘搭建方案
    当公司决定将文件管理的“主力”,从NAS转移至企业网盘,就不得不面对NAS何去何从的问题。NAS硬件健康乐观,在搭建企业网盘时,能否将NAS作为企业网盘的存储服务?云盒子企业网盘推出的NAS存储解决方案给出了肯定的答案。NAS做为企业网盘存储,既可以解决企业关心的NAS设备利用问题,同时也......
  • 虚拟机热迁移技术介绍
    本文分享自天翼云开发者社区《虚拟机热迁移技术介绍》,作者:潘****东什么是虚拟机热迁移虚拟机热迁移(VirtualMachineLiveMigration)是一种技术,它允许在不中断虚拟机运行的情况下将虚拟机从一台物理主机迁移到另一台物理主机。传统上,当需要迁移虚拟机时,必须先将虚拟机关机,然后将......
  • Java技术深度探索:高并发场景下的线程安全与性能优化
    Java技术深度探索:高并发场景下的线程安全与性能优化在当今的软件开发领域,随着互联网应用的日益复杂和用户量的激增,高并发成为了一个不可忽视的技术挑战。Java,作为一门广泛应用于企业级开发的编程语言,其内置的并发支持机制如线程(Thread)、锁(Lock)、并发集合(ConcurrentCollect......
  • PPT中的图形与图片:插入、调整与格式设置技术详解
    目录引言一、图形与图片的插入1.插入图形2.插入图片二、图形与图片的调整1.调整大小与位置2.裁剪与旋转3.图形与图片的合并与组合三、图片格式与布局设置1.图片格式设置2.图片布局设置示例案例:制作产品展示PPT四、结论引言在现代商务、教育和学术交流......
  • Ftrans跨域文件传输方案,数据流动无阻的高效路径
    大型集团企业由于其规模庞大、业务广泛且往往将分支机构、办事处分布在多个地域,因此会涉及到跨域文件传输的需求。主要源于以下几个方面:1.业务协同:集团内部的不同部门或子公司可能位于不同的地理位置,但需要进行紧密的业务协同。文件传输是实现这种协同的重要方式之一,包括项目......