一、导入Maven依赖
<!--Springboot父依赖-->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.0</version>
</parent>
<!--RabbitMQ依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
二、配置application.yml
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
#确认消息已发送到交换机(Exchange)
publisher-confirm-type: correlated
#确认消息已发送到队列(Queue)
publisher-returns: true
#消费方消息确认,手动确认
listener:
simple:
acknowledge-mode: manual
default-requeue-rejected: false
retry:
enabled: true #监听重试是否可用
max-attempts: 5 #最大重试次数,默认为3
retryTime: 60000 # 重试间隔时间(毫秒) 1000毫秒=等于1秒
三、配置交换机、队列
RabbitConfig类可以绑定交换机,队列或者RoutingKey。
Fanout交换机配置:
package com.hhsp.gmt.backend.conf.mq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
/**
* @Author : coolsheep
* @CreateTime : 2022/10/10
* @Description :fanout交换机配置信息
**/
@Configuration
public class FanoutRabbitConfig {
@Resource
private MyProperties myProperties;
/**
* 创建fanout队列并命名
* @return
*/
@Bean
public Queue fanoutMessageQueue() {
return new Queue(myProperties.getFanoutMessageQueue(),true,false,false);
}
/**
* 创建fanout交换机并命名
* @return
*/
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange(myProperties.getFanoutExchange(),true,false);
}
/**
* 绑定fanout队列和交换机。
* @return
*/
@Bean
Binding bindingFanoutExchange() {
return BindingBuilder.bind(fanoutMessageQueue()).to(fanoutExchange());
}
}
MyProperties类可以配置各个交换机的名称队列名称等操作:
package com.hhsp.gmt.backend.conf.mq;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
//标明为配置类将其注入spring容器中
@Configuration
//从application.yml文件中前缀为spring.rabbitmq中获取配置信息
@ConfigurationProperties(prefix = "spring.rabbitmq")
//lombok注解,属性可以自动get,set
@Data
public class MyProperties {
//fanout交换机名称
private String fanoutExchange;
//direct交换机名称
private String directExchange;
//fanout队列名称
private String fanoutMessageQueue;
//direct队列名称
private String directMessageQueue;
//direct的路由key
private String directRoutingKey;
//死信交换机名称
private String deadLetterDirectExchange;
//延时队列名称
private String delayLetterQueue;
//延时交换机的路由key
private String delayLetterRoutingKey;
//死信队列名称
private String deadLetterQueue;
//死信交换机的路由key
private String deadLetterRoutingKey;
//异常消息队列
private String exceptionMsgQueue;
//异常消息路由key
private String exceptionMsgRoutingKey;
//消息重试间隔时间
private Integer retryTime;
}
Direct交换机配置:
package com.hhsp.gmt.backend.conf.mq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
/**
* @Author : coolsheep
* @CreateTime : 2022/10/10
* @Description : direct交换机配置信息
**/
@Configuration
public class DirectRabbitConfig {
@Resource
private MyProperties myProperties;
/**
* 创建direct交换机并命名
* @return
*/
@Bean
public Queue directMessageQueue() {
return new Queue(myProperties.getDirectMessageQueue(),true,false,false);
}
/**
* 创建direct交换机并命名
* @return
*/
@Bean
DirectExchange directExchange() {
return new DirectExchange(myProperties.getDirectExchange(),true,false);
}
/**
* 将队列和交换机绑定, 并设置用于匹配键
* @return
*/
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(directMessageQueue()).to(directExchange()).with(myProperties.getDirectRoutingKey());
}
/**
* 创建异常队列
* @return
*/
@Bean
public Queue exceptionMsgQueue() {
return new Queue(myProperties.getExceptionMsgQueue(),true,false,false);
}
/**
* 将异常队列和交换机绑定, 并设置用于匹配键
* @return
*/
@Bean
Binding bindingExceptionDirect() {
return BindingBuilder.bind(exceptionMsgQueue()).to(directExchange()).with(myProperties.getExceptionMsgRoutingKey());
}
}
四、编写消息的生产者
@Resource
private RabbitTemplate rabbitTemplate;
//发送消息
//第一个参数:交换机名称,第二个参数:绑定的路由Key,第三个参数:消息内容(泛型,可以为任意类型,一般用Json字符串)
//注意:如果交换机为fanout类型,则路由Key为null
rabbitTemplate.convertAndSend(myProperties.getDirectExchange(), myProperties.getDirectRoutingKey(), msg);
五、编写消息的消费者
//标注在方法上的RabbitListener,监听到队列名称则进入该方法进行消费
@RabbitListener(queues = "${spring.rabbitmq.fanoutMessageQueue}")
public void fanoutMessageReceive(Channel channel, Message message) throws Exception {
String msg = new String(message.getBody());
log.info("接收消息成功,准备消费:{}", msg);
//将消息应答
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
system.out.println("消息消费了");
}
channel应答参数
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
// return new Queue("TestDirectQueue",true,true,false);
六、参考教程地址
springboot整合rabbitmq,用心看完这一篇就够了:
https://blog.csdn.net/qq_35387940/article/details/100514134
爱分享的板栗哥:
https://blog.csdn.net/qq_36972826/category_11275315.html
标签:return,SpringBoot,配置,RabbitMQ,private,交换机,springframework,org,import From: https://www.cnblogs.com/coolsheep/p/16829031.html