首页 > 其他分享 >RabbitMQ实现延迟发送消息

RabbitMQ实现延迟发送消息

时间:2023-03-13 22:36:44浏览次数:35  
标签:队列 RabbitMQ 发送 死信 消息 rabbitmq 交换机 message 延迟

前言

最近在做一个可以根据用户选择的时间,实现微信推送订阅消息的功能,突然想到rabbitmq好像可以实现这个功能,本着试试的心态开始研究,第一个想到的就是使用死信队列

死信队列

何为死信队列,其实rabbitmq本身并不能实现延迟发送消息的功能,不过因为本身有着队列ttl+死信exchange的机制,可以借助这个机制实现延迟发送消息

原理就是,用户发送消息到一个队列上并设置过期时间,但是这个队列没有消费者,到了过期时间,就由该队列绑定的死信exchange根据路由key的方式发送到另一个队列上,并消费,从而实现延迟发送

注:消息不仅只有超时才会成为死信,还有可能消息被消费者reject或者返回nack

理论上很简单,开始实践

导入依赖

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

创建配置类

1.声明死信队列,死信交换机,并绑定

2.声明普通队列,普通交换机,并绑定

3.普通队列需设置死信交换机,以及死信路由key

前两步很简单,就不贴代码了,需要注意第三步

Map<String, Object> args = new HashMap<>();
//设置死信交换机
args.put("x-dead-letter-exchange", "死信交换机名称");
//设置死信 routing_key
args.put("x-dead-letter-routing-key", "死信路由key");
return new Queue("普通队列", true, false, false, args);

创建之后,就需要一个生产者,发送消息

 String id = UUID.randomUUID().toString();
logger.info("传递id消息========",id);
rabbitTemplate.convertAndSend(
"普通交换机名称","绑定普通队列路由key",id,
message -> {
//设置过期时间(毫秒)
message.getMessageProperties().setExpiration("过期时间");
return message;
});

生产消息后,创建消费者配置类

@RabbitListener(queues = "死信队列名称")
public void receiveMsg(Message message) {
String msg = new String(message.getBody());
logger.info("接收到消息:{}", msg);
}

至此死信队列完成,测试一下,果然能实现延迟发送,不出意外的话马上要出意外了,我的最初需求是可以根据客户选择的时间推送消息,死信队列不就只能固定时间吗,我要是有别的时间,还能重新创建一个队列发送消息,不然可能会出现一种情况,比如第一个用户发送了一个10秒的消息,第二个用户发送一个5秒的消息,那么在第一条消息成为死信之前,后面的消息即使过期也不会投递为死信,所以只能一个交换机绑定的普通队列对应一个时间,但是又不想创建那么多,非常麻烦不说,关键是解决不了我的需求;

于是在网上找资料,看到rabbitmq推出的一个插件 rabbitmq-delayed-message-exchange,这时候才明白,死信队列设计的初衷只是为了存储那些没有被正常消费的消息,便于重新发送,不至于出现消息丢失等情况,而 rabbitmq-delayed-message-exchange是专门用于发送延迟消息的,于是开始研究插件

 插件:rabbitmq-delayed-message-exchange

安装

要想使用插件肯定是先下载啦,网上教程有很多,我用的比较方便简单的方式下载,因为我是linux系统,所以直接使用命令下载

wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.8.9/rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez

注意:需要安装自己rabbitmq对应版本的插件

安装完毕启动

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

这时候执行命令就可以看到有这个插件了

rabbitmq-plugins list

RabbitMQ实现延迟发送消息_spring


能看到这个插件,恭喜你安装成功,真聪明,接着往下走

代码配置

1.首先需声明队列,交换机根据路由key绑定

2.使用延迟消息交换器需要声明一个 x-delayed-message 类型的交换器

类似这样

  Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
//自定义交换机
return new CustomExchange("延迟消息交换机", "x-delayed-message", false, false, args);

然后就可以定义生产者发送消息了

rabbitTemplate.convertAndSend("延迟消息交换机", "路由Key", "消息", message -> {
// 设置过期时间
message.getMessageProperties().setDelay("过期时间");
return message;
});

消费者同上

实验一下发现,不同时间也可以正常消费,这是因为这个插件可以让消息延迟性绑定到消息本身上,使的每个消息有自己的过期时间,原以为问题解决了,没想到又出问题了,他这个时间并不是随意的(网上说不能超过1个月),是有一个限制时间,而且性能要比原生的要差一点,不能忍,于是又想了个办法,定时任务

定时任务

定时任务的原理就很简单了,因为我的发送时间是存在数据库的,所以每过一段时间查询一下是否有需要发送的消息,有则发送,并消费

实现很简单

启动类添加注解

@EnableScheduling

还是老样子,声明队列,绑定交换机

定时任务的语法可以在网上找,需要用到@Scheduled注解,放在需要定时的方法上,可以实现定时执行

总结

至此问题基本解决了,不过可能会造成比用户指定的时间晚几秒推送的情况,不过我的功能并不需要这么精准,所以使用这种方法无非是最好的方式,三种方法都有使用的场景,没有好与不好,根据自己的业务选择就好

标签:队列,RabbitMQ,发送,死信,消息,rabbitmq,交换机,message,延迟
From: https://blog.51cto.com/u_15003109/6116177

相关文章

  • Kafka、RabbitMQ、RocketMQ差异
    消息中间件消息中间件是分布式系统中重要的组件,本质就是一个具有接收消息、存储消息、分发消息的队列,应用程序通过读写队列消息来通信。在电商中,如订单系统处理完订单后,把订......
  • Spring 邮件发送
    Spring邮件发送1.主要内容2.JavaMail概述 JavaMail,顾名思义,提供给开发者处理电子邮件相关的编程接口。JavaMail是由Sun定义的一套收发电子邮件的API,它可以......
  • django使用qq的smtp邮箱服务器,作为第三方发送邮件
    1.qq邮箱设置中,开启pop3/smtp服务2.生成授权码(先复制下来)3.django配置#以下是邮箱配置项EMAIL_BACKEND='django.core.mail.backends.smtp.EmailBackend'EMAIL_HO......
  • 743. 网络延迟时间 (Medium)
    问题描述743.网络延迟时间(Medium)有n个网络节点,标记为1到n。给你一个列表times,表示信号经过有向边的传递时间。times[i]=(uᵢ,vᵢ,wᵢ),其中uᵢ是源节......
  • Unity中使用Timer实现延迟调用函数
    需求背景在Unity中实现延迟调用函数的方法,据我所知有三种使用协程中的yieldreturn使用Invoke使用DoTween中的Sequence但是如果我想一次性添加多个函数,并且在每个函......
  • 路飞:celery 执行异步任务,延迟任务,定时任务、django中使用celery、轮播图接口加缓存、
    目录一、celery执行异步任务,延迟任务,定时任务异步任务延迟任务定时任务二、django中使用celery2.1定时任务推荐使用的框架(了解)2.2秒杀功能2.2.1秒杀功能逻辑分析2.1.2......
  • celery执行异步任务 延迟任务 定时任务 Django中使用celery 双写一致性
    目录回顾补充只做定时任务的简单框架celery执行异步任务、延迟任务、定时任务定时任务:使用步骤:注意点:Django中使用celery使用步骤:秒杀功能:秒杀逻辑分析:django中使用celery......
  • 高级消息中间件rabbitmq
    rabbitmq   是一个在AMQP协议上实现的企业级消息系统。何谓消息系统,就是消息队列系统,消息队列是“”消费-生产者模型“”的一个典型的代表,一端往消息队列中不断写入消......
  • RabbitMQ图解
    之前学习过RabbitMQ,不过忘了,最近来复习下,发现效果不怎么好,于是画了一图来理解RabbitMQ,本文也不想贴代码。废话不多说,直接上图,如果有错误希望老铁指正!1、simple模式和wo......
  • Docker安装RabbitMQ
    Docker安装RabbitMQ1、准备工作准备挂载目录创建/opt/rabbitmq目录,集中管理RabbitMQ信息。特别注意目录权限问题。因为容器内RabbitMQ是以rabbitmq用户启动的,有些......