首页 > 其他分享 >使用RabbitMQ实现延迟消息的完整指南

使用RabbitMQ实现延迟消息的完整指南

时间:2024-10-22 23:16:42浏览次数:3  
标签:指南 String 队列 STROKE RabbitMQ 死信 消息 import 延迟

在分布式系统中,消息队列通常用于解耦服务,RabbitMQ是一个广泛使用的消息队列服务。延迟消息(也称为延时队列或TTL消息)是一种常见的场景应用,特别适合处理某些任务在一段时间后执行的需求,如订单超时处理、延时通知等。

本文将以具体代码为例,展示如何使用RabbitMQ来实现延迟消息处理,涵盖队列和交换机的配置、消息的发送与接收以及死信队列的处理。

什么是延迟消息?

延迟消息是指消息在发送到队列后,经过设定的时间延迟再被消费。RabbitMQ 本身没有直接支持延迟队列的功能,但可以通过 TTL(Time To Live)+ 死信队列(Dead Letter Queue, DLQ) 的组合来实现。当消息超过TTL(消息存活时间)后,不会被立即消费,而是会被转发到绑定的死信队列,从而实现延迟处理。

RabbitMQ中的延迟消息原理

在RabbitMQ中,我们可以通过以下几个概念来实现延迟消息:

  1. TTL(Time To Live):可以为队列设置TTL,消息超过该时间后会被标记为“死信”。
  2. 死信队列(Dead Letter Queue):当消息在正常队列中过期或处理失败时,RabbitMQ可以将它们路由到一个死信队列,死信队列可以用来处理这些过期或未处理的消息。
  3. x-dead-letter-exchangex-dead-letter-routing-key:可以通过配置队列的参数,将过期消息发送到一个专门的死信交换器,并根据指定的路由键转发到死信队列。

 

 消息来到ttl.queue消息队列,过期时间内无人消费,消息来到死信交换机hmall.direct,在direct.queue消息队列无需等待。

1. RabbitMQ的配置

首先,我们需要配置两个队列和两个交换机:一个用于存放延时消息,另一个用于处理超时的死信消息。

package com.heima.stroke.configuration;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitConfig {
    // 延迟时间 单位:毫秒 (这里设为30秒)
    private static final long DELAY_TIME = 1000 * 30;

    // 行程超时队列
    public static final String STROKE_OVER_QUEUE = "STROKE_OVER_QUEUE";
    // 行程死信队列
    public static final String STROKE_DEAD_QUEUE = "STROKE_DEAD_QUEUE";

    // 行程超时队列交换机
    public static final String STROKE_OVER_QUEUE_EXCHANGE = "STROKE_OVER_QUEUE_EXCHANGE";
    // 行程死信队列交换机
    public static final String STROKE_DEAD_QUEUE_EXCHANGE = "STROKE_DEAD_QUEUE_EXCHANGE";

    // 行程超时交换机 Routing Key
    public static final String STROKE_OVER_KEY = "STROKE_OVER_KEY";
    // 行程死信交换机 Routing Key
    public static final String STROKE_DEAD_KEY = "STROKE_DEAD_KEY";

    /**
     * 声明行程超时队列,并设置其参数
     * x-dead-letter-exchange:绑定的死信交换机
     * x-dead-letter-routing-key:死信路由Key
     * x-message-ttl:消息的过期时间
     */
    @Bean
    public Queue strokeOverQueue() {
        Map<String, Object> args = new HashMap<>(3);
        args.put("x-dead-letter-exchange", STROKE_DEAD_QUEUE_EXCHANGE);
        args.put("x-dead-letter-routing-key", STROKE_DEAD_KEY);
        args.put("x-message-ttl", DELAY_TIME); // 设置TTL为30秒
        return QueueBuilder.durable(STROKE_OVER_QUEUE).withArguments(args).build();
    }

    @Bean
    public DirectExchange strokeOverQueueExchange() {
        return new DirectExchange(STROKE_OVER_QUEUE_EXCHANGE);
    }

    @Bean
    public Binding bindingStrokeOverDirect() {
        return BindingBuilder.bind(strokeOverQueue()).to(strokeOverQueueExchange()).with(STROKE_OVER_KEY);
    }
}

解释:

TTL设置:我们通过x-message-ttl设置消息的过期时间为30秒。

死信队列绑定:通过x-dead-letter-exchangex-dead-letter-routing-key设置,当消息过期时,它会被转发到死信交换机,再路由到死信队列。

2. 生产者发送延迟消息

接下来,我们通过生产者向超时队列发送消息,这些消息将在TTL过期后转发到死信队列。

package com.heima.stroke.rabbitmq;

import com.alibaba.fastjson.JSON;
import com.heima.modules.vo.StrokeVO;
import com.heima.stroke.configuration.RabbitConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MQProducer {
    private final static Logger logger = LoggerFactory.getLogger(MQProducer.class);

    @Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * 发送延时消息到行程超时队列
     *
     * @param strokeVO 消息体
     */
    public void sendOver(StrokeVO strokeVO) {
        String mqMessage = JSON.toJSONString(strokeVO);
        logger.info("send timeout msg:{}", mqMessage);

        rabbitTemplate.convertAndSend(RabbitConfig.STROKE_OVER_QUEUE_EXCHANGE, RabbitConfig.STROKE_OVER_KEY, mqMessage);
    }
}

解释:

sendOver 方法将消息发送到超时队列,消息将在超时后进入死信队列。生产者不需要额外处理TTL或死信的配置,只需发送消息即可。

3. 消费者监听死信队列

当消息超过TTL后,将会被转发到死信队列。消费者需要监听死信队列并处理这些消息。

j

package com.heima.stroke.rabbitmq;

import com.alibaba.fastjson.JSON;
import com.heima.modules.vo.StrokeVO;
import com.heima.stroke.configuration.RabbitConfig;
import com.heima.stroke.handler.StrokeHandler;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

@Component
public class MQConsumer {
    private final static Logger logger = LoggerFactory.getLogger(MQConsumer.class);

    @Autowired
    private StrokeHandler strokeHandler;

    /**
     * 监听死信队列
     *
     * @param message 消息体
     * @param channel RabbitMQ的Channel
     * @param tag 消息的Delivery Tag
     */
    @RabbitListener(
            bindings = {
                    @QueueBinding(
                            value = @Queue(value = RabbitConfig.STROKE_DEAD_QUEUE, durable = "true"),
                            exchange = @Exchange(value = RabbitConfig.STROKE_DEAD_QUEUE_EXCHANGE),
                            key = RabbitConfig.STROKE_DEAD_KEY)
            })
    @RabbitHandler
    public void processStroke(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
        StrokeVO strokeVO = JSON.parseObject(message.getBody(), StrokeVO.class);
        logger.info("get dead msg:{}", message.getBody());
        
        if (strokeVO == null) {
            return;
        }

        try {
            // 处理超时的行程消息
            strokeHandler.timeoutHandel(strokeVO);
            // 手动确认消息
            channel.basicAck(tag, false);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

解释:

@RabbitListener 注解绑定了死信队列的监听器。当消息被转发到死信队列时,该消费者会接收到消息。

使用 channel.basicAck(tag, false) 手动确认消息处理成功,确保消息不会重复消费。

4. 处理超时业务逻辑

在我们的业务中,当消息超时未处理时,将其状态设置为超时。

public void timeoutHandel(StrokeVO strokeVO) {
    // 获取司机行程ID和乘客行程ID
    String inviterTripId = strokeVO.getInviterTripId();
    String inviteeTripId = strokeVO.getInviteeTripId();

    // 检查邀请状态是否为未确认
    String inviteeStatus = redisHelper.getHash(HtichConstants.STROKE_INVITE_PREFIX, inviteeTripId, inviterTripId);
    String inviterStatus = redisHelper.getHash(HtichConstants.STROKE_INVITE_PREFIX, inviterTripId, inviteeTripId);

    if (String.valueOf(InviteState.UNCONFIRMED.getCode()).equals(inviteeStatus) &&
        String.valueOf(InviteState.UNCONFIRMED.getCode()).equals(inviterStatus)) {
        // 更新为超时状态
        redisHelper.addHash(HtichConstants.STROKE_INVITE_PREFIX, inviteeTripId, inviterTripId, String.valueOf(InviteState.TIMEOUT.getCode()));
        redisHelper.addHash(HtichConstants.STROKE_INVITE_PREFIX, inviterTripId, inviteeTripId, String.valueOf(InviteState.TIMEOUT.getCode()));
    }
}

标签:指南,String,队列,STROKE,RabbitMQ,死信,消息,import,延迟
From: https://blog.csdn.net/sdg_advance/article/details/143118796

相关文章

  • 【C++指南】类和对象(四):类的默认成员函数——全面剖析 : 拷贝构造函数
     引言拷贝构造函数是C++中一个重要的特性,它允许一个对象通过另一个已创建好的同类型对象来初始化。了解拷贝构造函数的概念、作用、特点、规则、默认行为以及如何自定义实现,对于编写健壮和高效的C++程序至关重要。 C++类和对象系列文章,可点击下方链接阅读:【C++指南......
  • 《只狼:影逝二度》二十四项风灵月影修改器使用详解,只狼影逝二度3DM修改器完整使用指南
    当你沉浸在《只狼:影逝二度》那充满挑战与刺激的游戏世界中,风灵月影修改器的出现或许能为你的游戏体验带来全新的变化。这款拥有二十四项功能的修改器究竟该如何使用呢?下面就为你详细展开其使用方法的介绍。只狼影逝二度3DM修改器-只狼二十四项风灵月影修改器https://yz3l.com......
  • 用友client.dll故障应对:用友client.dll出错后的自救与修复指南
    用友client.dll是一个与用友财务软件紧密相关的动态链接库(DLL)文件,它对于确保用友软件的正常运行至关重要。当这个文件出现故障时,可能会导致用友软件无法启动、功能异常或系统崩溃等问题。因此,掌握一些自救与修复的方法显得尤为重要。一、了解client.dll文件的重要性client.d......
  • 计算机毕业设计价格以及选题指南
    第一点,关于计算机毕业设计的大致的价位,需要看需求的不同进行分类。下面我会给出大致价格供大家参考:一、程序部分1.成品的程序:说白了就是别人已经写好的程序,已经开发完成,可以直接拿去进行使用的程序,这种的价格会比较便宜,大概会在150-300之间。这种程序一般没有办法给到比较ok......
  • 计算机毕业设计选题及选代做避免踩坑指南
    每一年即将毕业的计算机专业的大学生都会因为毕业设计而头痛,跟着学校没有学到技术,但是毕设又会要求的你尽量完美。大多数同学都会选择代做毕设,去tb,pdd去找,但是往往到最后会不善而终,不仅浪费了钱,也浪费了时间和精力。现在这个行业全包质量也要求高的基本上都是在2000左右,程序一......
  • 鸿蒙Flutter实战:02-Windows环境搭建踩坑指南
    鸿蒙Flutter实战:02-Windows环境搭建踩坑指南环境搭建1.下载FlutterSDK,配置环境变量鸿蒙FlutterSDK需要在Gitee下载。目前建议下载dev分支代码。需要配置以下用户变量注意鸿蒙开发需要安装Java和配置相关变量#fluttersdk镜像FLUTTER_STORAGE_BASE_URL=https://s......
  • 金融交易系统延迟,NTP时间同步服务器为其保驾护航
    金融交易系统延迟,NTP时间同步服务器为其保驾护航金融交易系统延迟,NTP时间同步服务器为其保驾护航京准电子科技官微——ahjzsz“2024年9月27日,上海证券交易所(上交所)的交易系统出现了延迟现象,导致投资者在关键时刻无法及时进行操作。这一现象不仅影响了多家交易平台,还引发了投资......
  • 【北京迅为】i.mx8mm嵌入式linux开发指南第四篇 嵌入式Linux系统移植篇第六十九章uboo
      迅为i.mx8mm开发板特点: 性能强:i.MX8MM处理器采用了先进的14LPCFinFET工艺,提供更快的速度和更高的电源效率;四核Cortex-A53,单核Cortex-M4,多达五个内核,主频高达1.8GHz,2GDDR4内存、8GEMMC存储。 PMIC:采用PCA9450A电源管理,是NXP全新研制配套iMX.8M的电源管理芯片,有六个......
  • Linux环境下Tomcat的安装与配置详细指南
    ApacheTomcat是一个广泛使用的开源JavaServlet容器和Web服务器,适用于运行JavaWeb应用程序。本指南将详细介绍如何在Linux环境中安装和配置Tomcat,包括必要的前提条件、下载安装、配置环境变量、设置为系统服务以及基本的安全配置。目录前提条件安装Java环境创建Tomcat用户......
  • 《DNK210使用指南 -CanMV版 V1.0》第三十二章 音频FFT实验
    第三十二章音频FFT实验1)实验平台:正点原子DNK210开发板2)章节摘自【正点原子】DNK210使用指南-CanMV版V1.03)购买链接:https://detail.tmall.com/item.htm?&id=7828013987504)全套实验源码+手册+视频下载地址:http://www.openedv.com/docs/boards/k210/ATK-DNK210.html5)正点原......