首页 > 其他分享 >RabbitMQ 实现消息队列延迟

RabbitMQ 实现消息队列延迟

时间:2023-04-29 09:57:38浏览次数:31  
标签:amqp 队列 springframework RabbitMQ org import public 延迟

1.概述

要实现RabbitMQ的消息队列延迟功能,一般采用官方提供的 rabbitmq_delayed_message_exchange插件。但RabbitMQ版本必须是3.5.8以上才支持该插件,否则得用其死信队列功能。

2.安装RabbitMQ延迟插件

  • 检查插件
    使用rabbitmq-plugins list命令用于查看RabbitMQ安装的插件。
rabbitmq-plugins list

检查RabbitMQ插件安装情况

  • 下载插件

如果没有安装插件,则直接访问官网进行下载

https://www.rabbitmq.com/community-plugins.html

  • 安装插件

下载后,将其拷贝到RabbitMQ安装目录的plugins目录;并进行解压,如:

E:\software\RabbitMQ Server\rabbitmq_server-3.11.13\plugins

打开cmd命令行窗口,如果系统已经配置RabbitMQ环境变量,则直接执行以下的命令进行安装;否则需要进入到RabbitMQ安装目录的sbin目录。

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

3.实现RabbitMQ消息队列延迟功能

  • pom.xml配置信息文件中,添加相关依赖文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.olive</groupId>
	<artifactId>rabbitmq-spring-demo</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.7.7</version>
		<relativePath />
	</parent>
	<dependencies>
		<!--rabbitmq-->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		
	<dependency>
		    <groupId>org.eclipse.paho</groupId>
		    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
		    <version>1.2.5</version>
		</dependency>

	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<configuration>
					<source>1.8</source>
					<target>1.8</target>
				</configuration>
			</plugin>
		</plugins>
	</build>
</project>
  • application.yml配置文件中配置RabbitMQ信息
server:
  port: 8080
spring:
  #给项目来个名字
  application:
    name: rabbitmq-spring-demo
  #配置rabbitMq 服务器
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: admin
    password: admin123
    #虚拟host。可以不设置,使用server默认host;不同虚拟路径下的队列是隔离的
    virtual-host: /
  • RabbitMQ配置类
package com.olive.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * RabbitMQ配置类
 **/
@Configuration
public class RabbitMqConfig {
	
	public static final String DELAY_EXCHANGE_NAME = "delayed_exchange";
	
	public static final String DELAY_QUEUE_NAME = "delay_queue_name";
	
	public static final String DELAY_ROUTING_KEY = "delay_routing_key";

	@Bean
	public CustomExchange delayExchange() {
		Map<String, Object> args = new HashMap<>();
		args.put("x-delayed-type", "direct");
		return new CustomExchange(DELAY_EXCHANGE_NAME, "x-delayed-message", true, false, args);
	}

	@Bean
	public Queue queue() {
		Queue queue = new Queue(DELAY_QUEUE_NAME, true);
		return queue;
	}

	@Bean
	public Binding binding(Queue queue, CustomExchange delayExchange) {
		return BindingBuilder.bind(queue).to(delayExchange).with(DELAY_ROUTING_KEY).noargs();
	}
}
  • 发送消息

实现消息发送,设置消息延迟5s。

package com.olive.service;

import java.text.SimpleDateFormat;
import java.util.Date;

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.olive.config.RabbitMqConfig;

/**
 * 消息发送者
 **/
@Service
public class CustomMessageSender {
	
	@Autowired
	private RabbitTemplate rabbitTemplate;

	public void sendMsg(String msg) {
		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
		System.out.println("消息发送时间:" + sdf.format(new Date()));
		rabbitTemplate.convertAndSend(RabbitMqConfig.DELAY_EXCHANGE_NAME, 
				RabbitMqConfig.DELAY_ROUTING_KEY, 
				msg, 
				new MessagePostProcessor() {
					@Override
					public Message postProcessMessage(Message message) throws AmqpException {
						// 消息延迟5秒
						message.getMessageProperties().setHeader("x-delay", 5000);
						return message;
					}
				});
	}
}
  • 接收消息
package com.olive.service;

import java.text.SimpleDateFormat;
import java.util.Date;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import com.olive.config.RabbitMqConfig;

/**
 * 消息接收者
 **/
@Component
public class CustomMessageReceiver {
	
	@RabbitListener(queues = RabbitMqConfig.DELAY_QUEUE_NAME)
	public void receive(String msg) {
		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
		System.out.println(sdf.format(new Date()) + msg);
		System.out.println("Receiver:执行取消订单");
	}
}
  • 测试验证
package com.olive.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import com.olive.service.CustomMessageSender;

@RestController
public class DelayMessageController {
	
	@Autowired
	private CustomMessageSender customMessageSender;
	
	@GetMapping("/sendMessage")
	public String sendMessage() {
		// 发送消息
		customMessageSender.sendMsg("你已经支付超时,取消订单通知!");
		return "success";
	}

}

发送消息,访问

http://127.0.0.1:8080/sendMessage

查看控制台打印的信息

标签:amqp,队列,springframework,RabbitMQ,org,import,public,延迟
From: https://www.cnblogs.com/happyhuangjinjin/p/17363598.html

相关文章

  • SpringBoot RabbitMQ死信队列
    1.死信定义无法被消费的消息,称为死信。如果死信一直留在队列中,会导致一直被消费,却从不消费成功,专门有一个存放死信的队列,称为死信队列(DDX,dead-letter-exchange)。死信队列DLX,DeadLetterExchange的缩写,又死信邮箱、死信交换机。其实DLX就是一个普通的交换机,和一般的交换......
  • 深入学习RabbitMQ五种模式(二)
    #1.工作模式工作模式也被称为任务模型(TaskQueues)。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消......
  • 深入学习RabbitMQ五种模式(三)
    1.路由模式(精确匹配)路由模式(Routing)的特点:该模式的交换机为direct,意思为定向发送,精准匹配。队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)消息的发送方在向Exchange发送消息时,也必须指定消息的RoutingKey。Exchange不再把消息交给每一个绑定的队列......
  • 深入学习RabbitMQ五种模式(一)
    1.安装erlang下载otp_win64_25.3.exehttps://www.erlang.org/downloadserlang安装完成,需要配置erlang环境变量ERLANG_HOME=E:\software\ErlangOTPPATH=%PATH%;%ERLANG_HOME%\bin;2.安装RabbitMQ下载rabbitmq-server-3.11.13.exehttps://www.rabbitmq.com/download.htm......
  • SpringCloud Stream集成RabbitMQ
    1.概述SpringCloudStream框架抽象出了三个最基础的概念来对各种消息中间件提供统一调用:DestinationBinders:负责集成外部消息系统的组件。DestinationBinding:由Binders创建的,负责沟通外部消息系统、消息发送者和消息消费者的桥梁。Message:消息发送者与消息消费......
  • rabbitMQ--类型
    1.五种消息模型1.1基本消息模型 1.2work消息模型 1.3订阅模型1.3.1Fanout,也称为广播。流程说明流程图: 在广播模式下,消息发送流程是这样的:1)可以有多个消费者2)每个消费者有自己的queue(队列)3)每个队列都要绑定到Exchange(交换机)4)生产者发送的消息,只能......
  • RabbitMQ _ How to Close a Channel
    https://low-orbit.net/rabbitmq-how-to-close-a-channel RabbitMQHowtoCloseaChannelIfyouhavefoundyourwaytothispageyouareprobablywonderinghowtocloseachannelinRabbitMQ.Channelsshouldbeclosedwhentheyarenolongerinuse.There......
  • Unity的Animator中Transition有延迟的问题
    洪流学堂,让你快人几步。洪流学堂公众号回复space,下载本文用到的卡通太空主题资源。小新:“智哥,我遇到一个动画系统中问题,不知道是Unity的BUG,还是我的使用方式不对。”大智:“说来听听什么问题?”小新:“我的需求是这样的,我有一个模型,做了3段路径动画,想要每次点击模型的时候能够切换到下......
  • Provisional heads are shown、NullPointerException空指针异常?堆栈与队列的区别?Java
    Provisionalheadsareshown排查是否插件拦截,我的以前没有这种,所以排除本地网络节点问题,连接不到图片服务器,以下是解决方法:1.进入到C盘Windows文件夹System32/drivers/etc目录下,打开hosts文件,绑定下2.改下本地dns为公共dns网络节点导致的问题,一般为运营商导致,产生问题的原因为......
  • Linux安装RabbitMQ
    前言:还是和以前一样,linux安装软件的目录都是data目录 1.这次稍微不一样,不过还是进入data目录,创建RabbitMq目录并进入该目录cd/datamkdirrabbitMqcdrabbitMq 2.上传"erlang-21.1-1.el7.x86_64.rpm"文件和 "rabbitmq-server-3.7.7-1.el7.noarch.rpm"文件到当前......