一、入门
引入依赖
在springboot中引入spring-amqp-starter
<!--amqp的起步依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
编写配置文件
spring:
rabbitmq:
addresses: xxx.xx.x.xxx # rabbitMQ地址
port: 5672 # 连接端口号
username: xxxx #用户名
password: xxxxv #密码
virtual-host: /xxxx #要连接的虚拟机
编写常量类
package com.java.coder.constant;
public class BootMqConstant {
/**
* 交换机名称
*/
public static final String TOPICS_EXCHANGE_NAME="topics_exchange";
/**
* 队列名称
*/
public static final String TOPICS_QUEUE_01="topics_queue_01";
/**
* 队列名称
*/
public static final String TOPICS_QUEUE_02="topics_queue_02";
/**
* 路由键
*/
public static final String TOPICS_ROUTING_INFO_KEY="#.info.#";
/**
* 路由键
*/
public static final String TOPICS_ROUTING_ERROR_KEY="#.error.#";
}
编写配置类
在生产者端编写配置类
package com.java.coder.config;
import com.java.coder.constant.BootMqConstant;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMqConfig {
//声明交换机
@Bean("itemTopicExchange")
public Exchange topicExchange(){
return ExchangeBuilder.topicExchange(BootMqConstant.TOPICS_EXCHANGE_NAME)// 交换机名称
.durable(true)// 是否持久话
.build();
}
//声明队列
@Bean("itemQueue01")
public Queue itemQueue01(){
return QueueBuilder.durable(BootMqConstant.TOPICS_QUEUE_01)// 持久话的队列
.build();
}
//声明队列
@Bean("itemQueue02")
public Queue itemQueue02(){
return QueueBuilder.durable(BootMqConstant.TOPICS_QUEUE_02)// 持久话的队列
.build();
}
//声明队列
@Bean("objectQueue")
public Queue objectQueue(){
return QueueBuilder.durable(BootMqConstant.TOPICS_OBJECT_QUEUE)// 持久话的队列
.build();
}
/**
* 绑定队列 topics_queue_01
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding itemQueueExchange01(@Qualifier("itemQueue01") Queue queue,
@Qualifier("itemTopicExchange") Exchange exchange){
return BindingBuilder.bind(queue)
.to(exchange)
.with(BootMqConstant.TOPICS_ROUTING_INFO_KEY)
.noargs();
}
/**
* 绑定队列 topics_queue_02
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding itemQueueExchange02(@Qualifier("itemQueue02") Queue queue,
@Qualifier("itemTopicExchange") Exchange exchange){
return BindingBuilder.bind(queue)
.to(exchange)
.with(BootMqConstant.TOPICS_ROUTING_ERROR_KEY)
.noargs();
}
/**
* 绑定队列 topics_object_queue
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding objectQueueExchange(@Qualifier("objectQueue") Queue queue,
@Qualifier("itemTopicExchange") Exchange exchange){
return BindingBuilder.bind(queue)
.to(exchange)
.with(BootMqConstant.TOPICS_ROUTING_OBJECT_KEY)
.noargs();
}
}
编写生产者代码
package com.java.coder.controller;
import com.java.coder.constant.BootMqConstant;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
@RestController
@RequestMapping("/produce")
public class RabbitMqProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/sendInfo")
public String sendInfo(){
rabbitTemplate.convertAndSend(BootMqConstant.TOPICS_EXCHANGE_NAME,"info.hahha","这是info级别的日志");
return "ok";
}
@RequestMapping("/sendError")
public String sendError(){
rabbitTemplate.convertAndSend(BootMqConstant.TOPICS_EXCHANGE_NAME,"error.hahha","error级别的日志");
return "ok";
}
}
编写消费者代码
package com.java.coder.rabbitmq.listener;
import com.java.coder.constant.BootMqConstant;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
public class ConsumerListener {
/**
* 监听某个队列的消息
* @param message 接收到的消息
*/
@RabbitListener(queues = BootMqConstant.TOPICS_QUEUE_01)
public void myListener01(String message){
System.out.println("消费者接收到的消息为:" + message);
}
/**
* 监听某个队列的消息
* @param message 接收到的消息
*/
@RabbitListener(queues = BootMqConstant.TOPICS_QUEUE_02)
public void myListener02(String message){
System.out.println("消费者接收到的消息为:" + message);
}
}
注解声明交换机和队列
除了上述在配置文件中声明交换机、队列、绑定交换机和队列 之外,还可以在消费者端通过@RabbitListener 来配置。
/**
* 监听某个队列的消息
* @param map 接收到的消息
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "${队列名称}"),
exchange = @Exchange(name = "${交换机名称}",type = ExchangeTypes.DIRECT),
key = {"red","blue"}
))
public void myListener(Map<String,Object> map ){
System.out.println("消费者接收到的消息为:" + map);
}
二、消息转换器
发送对象消息
@RequestMapping("/sendObject")
public String sendObject(){
Map<String,Object> map=new HashMap<>();
map.put("name","张三");
map.put("address","浙江杭州");
rabbitTemplate.convertAndSend(BootMqConstant.TOPICS_EXCHANGE_NAME,"object.hahha",map);
return "ok";
}
队列存储的结果
只有两个属性的数据,序列化之后消息占用123byte大小。
spring-amqp是采用Message
这个类代表要发送的消息,我们传递给convertAndSend
方法的消息内容最终都要封装成Message
这个类。如果我们没有指定消息转换器,那么默认的消息转换器就是SimpleMessageConverter
。 因为消息内容还需要在网络上传输,因此需要对消息内容进行序列化,序列化方式的代码如下:
protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
byte[] bytes = null;
if (object instanceof byte[]) {
bytes = (byte[])((byte[])object);
messageProperties.setContentType("application/octet-stream");
} else if (object instanceof String) {
try {
bytes = ((String)object).getBytes(this.defaultCharset);
} catch (UnsupportedEncodingException var6) {
throw new MessageConversionException("failed to convert to Message content", var6);
}
messageProperties.setContentType("text/plain");
messageProperties.setContentEncoding(this.defaultCharset);
} else if (object instanceof Serializable) {
try {
bytes = SerializationUtils.serialize(object);
} catch (IllegalArgumentException var5) {
throw new MessageConversionException("failed to convert to serialized Message content", var5);
}
messageProperties.setContentType("application/x-java-serialized-object");
}
if (bytes != null) {
messageProperties.setContentLength((long)bytes.length);
return new Message(bytes, messageProperties);
} else {
throw new IllegalArgumentException(this.getClass().getSimpleName() + " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName());
}
}
如果对象的化,会走到object instanceof Serializable
,此时的序列化方式是jdk ObjectOutputStream方式的序列化,这种方式序列化,存在以下问题:
1、安全风险
2、消息太大
3、可读性差
我们采用json的序列化方式。
json序列化
引入依赖
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
</dependency>
在生产者和消费者中都配置消息转换器。
@Bean
public MessageConverter jacksonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
再次对象消息,RabbitMq内存储的消息内如下:
消费者接收
发送的是什么类型的消息,消费者就接收什么样的消息
/**
* 监听某个队列的消息
* @param map 接收到的消息
*/
@RabbitListener(queues = BootMqConstant.TOPICS_OBJECT_QUEUE)
public void myListenerObject(Map<String,Object> map ){
System.out.println("消费者接收到的消息为:" + map);
}
标签:BootMqConstant,return,springboot,队列,TOPICS,RabbitMQ,连接,import,public From: https://www.cnblogs.com/cplinux/p/17977630