RabbitMQ(三)整合SpringBoot
1 整合RabbitMQ
1 导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2 查看容器的自动配置
-
给容器自动配置了
RabbitTemplate
、AmqpAdmin
、CachingConnectionFactory
、RabbitMessagingTemplate
-
属性配置的开头是:
spring.rabbitmq
@ConfigurationProperties(prefix = "spring.rabbitmq") public class RabbitProperties { ... }
3 配置配置文件
spring.rabbitmq.addresses=192.168.60.101
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
账号密码信息均采用默认值了
4 启用RabbitMQ
@SpringBootApplication
@EnableRabbit
public class GrainmallOrderApplication {
public static void main(String[] args) {
SpringApplication.run(GrainmallOrderApplication.class, args);
}
}
2 使用AmqpAdmin
2.1 创建Exchange
@Test
public void createExchange() {
Exchange exchange = new DirectExchange("java-exchange", true, false);
amqpAdmin.declareExchange(exchange);
log.info("java-exchange创建成功");
}
- 第二个参数表示是否持久化交换器(即重启RabbitMQ还能否存在)
- 第三个表示是否自动删除
2.2 创建queue
@Test
public void createQueue() {
Queue queue = new Queue("java-queue", true, false, false);
amqpAdmin.declareQueue(queue);
log.info("java-queue创建成功");
}
- 第二个参数表示是否持久化消息队列(即重启RabbitMQ还能否存在)
- 第三个表示是否进行排他设计,即只允许一个连接通过交换机连接到队列
- 第四个表示是否自动删除
2.3 创建Binding
@Test
public void createBinding() {
Binding binding = new Binding("java-queue", Binding.DestinationType.QUEUE,
"java-exchange", "java-binding", null);
amqpAdmin.declareBinding(binding);
log.info("java-binding创建成功");
}
- 参数分别为:目的地(队列名或类型名、交换器名)、目的地类型(队列或交换器或类型)、交换器名、绑定名、其他参数
3 使用RabbitMQ Template
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void testRabbitMQTemplate() {
rabbitTemplate.convertAndSend("java-exchange", "java-binding", "Hello World!");
}
3.1 传输java对象
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void testRabbitMQTemplate() {
OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity();
orderReturnReasonEntity.setId(1L);
orderReturnReasonEntity.setName("Utada Hikaru");
rabbitTemplate.convertAndSend("java-exchange",
"java-binding",
orderReturnReasonEntity);
}
然后看到传输的消息是Java序列化后的字符串(这点要求对象的类必须实现序列化接口),在自动配置类中,使用了一种SimpleMessageConvert
,会判段传输对象是字符串、序列化对象还是字符数组从而进行响应转换
if (object instanceof byte[]) {
bytes = (byte[])((byte[])object);
messageProperties.setContentType("application/octet-stream");
} else if (object instanceof String) {
try {
bytes = ((String)object).getBytes(this.defaultCharset);
} catch (UnsupportedEncodingException var6) {
throw new MessageConversionException("failed to convert to Message content", var6);
}
messageProperties.setContentType("text/plain");
messageProperties.setContentEncoding(this.defaultCharset);
} else if (object instanceof Serializable) {
try {
bytes = SerializationUtils.serialize(object);
} catch (IllegalArgumentException var5) {
throw new MessageConversionException("failed to convert to serialized Message content", var5);
}
messageProperties.setContentType("application/x-java-serialized-object");
}
因此在配置类中导入需要的转化器即可,再次执行程序即可看到转化为json后的对象
@Configuration
public class RabbitConfig {
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
4 Rabbit Listener监听消息
Rabbit Listener
- 使用Rabbit Listener必须开启注解
@EnableRabbit
@RabbitListener(queues = {"java-queue"})
public void receiveMessage(Object message) {
System.out.println("接收到消息,内容为:" + message + "类型为" + message);
}
收到的消息:
接收到消息,内容为:(Body:'{"id":1,"name":"Utada Hikaru","sort":null,"status":null,"createTime":null}' MessageProperties [headers={__TypeId__=com.hikaru.grainmall.order.entity.OrderReturnReasonEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=java-exchange, receivedRoutingKey=java-binding, deliveryTag=1, consumerTag=amq.ctag-B_4ckQ10EHPx4EJvxywHQQ, consumerQueue=java-queue])类型为class org.springframework.amqp.core.Message
观察到可以写的参数有:
@RabbitListener(queues = {"java-queue"})
public void receiveMessage(Message message,
OrderReturnReasonEntity returnReasonEntity,
Channel channel) {
byte[] body = message.getBody();
MessageProperties messageProperties = message.getMessageProperties();
System.out.println(body);
System.out.println(messageProperties);
System.out.println(returnReasonEntity);
System.out.println(channel);
}
org.springframework.amqp.core.Message
,包含:- 消息体
body
- 消息头
MessageProperties
- 消息体
- 具体类型的消息体
通道com.rabbitmq.client.Channel
:与MQ进行数据交互使用的通道
[B@1469e150
MessageProperties [headers={__TypeId__=com.hikaru.grainmall.order.entity.OrderReturnReasonEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=java-exchange, receivedRoutingKey=java-binding, deliveryTag=1, consumerTag=amq.ctag-DxTaQHMll66p-V31rDUqew, consumerQueue=java-queue]
OrderReturnReasonEntity(id=1, name=Utada Hikaru, sort=null, status=null, createTime=null)
Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), conn: Proxy@3421debd Shared Rabbit Connection: SimpleConnection@66e17eff [delegate=amqp://[email protected]:5672/, localPort= 49417]
消息队列的特性是:很多人都可以来监听,但是只要收到消息,队列就要删除消息,并且只能有一个能收到此消息,因此:
多个监听的情况,只会有一个客户端能够收到
如果此时启动两个监听的服务,会是下面的场景:
@Test
public void testRabbitMQTemplate() {
for(int i = 0; i < 10; i++) {
OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity();
orderReturnReasonEntity.setId(1L);
orderReturnReasonEntity.setName("Utada Hikaru" + i);
rabbitTemplate.convertAndSend("java-exchange",
"java-binding",
orderReturnReasonEntity);
}
}
监听1:
OrderReturnReasonEntity(id=1, name=Utada Hikaru0, sort=null, status=null, createTime=null)
OrderReturnReasonEntity(id=1, name=Utada Hikaru3, sort=null, status=null, createTime=null)
OrderReturnReasonEntity(id=1, name=Utada Hikaru6, sort=null, status=null, createTime=null)
OrderReturnReasonEntity(id=1, name=Utada Hikaru9, sort=null, status=null, createTime=null)
监听2:
OrderReturnReasonEntity(id=1, name=Utada Hikaru1, sort=null, status=null, createTime=null)
OrderReturnReasonEntity(id=1, name=Utada Hikaru4, sort=null, status=null, createTime=null)
OrderReturnReasonEntity(id=1, name=Utada Hikaru7, sort=null, status=null, createTime=null)
由于单元测试本身也会启动SpringBoot客户端,因此也会接收消息:
OrderReturnReasonEntity(id=1, name=Utada Hikaru2, sort=null, status=null, createTime=null)
OrderReturnReasonEntity(id=1, name=Utada Hikaru5, sort=null, status=null, createTime=null)
OrderReturnReasonEntity(id=1, name=Utada Hikaru8, sort=null, status=null, createTime=null)
只有前一个消息处理完成,才能再接收下一个消息
@RabbitListener(queues = {"java-queue"})
public void receiveMessage(Message message,
OrderReturnReasonEntity returnReasonEntity,
Channel channel) throws InterruptedException {
System.out.println("收到消息:" + returnReasonEntity.getName());
Thread.sleep(3000);
}
效果就是下面的每句都是三秒才打印下一个:
收到消息:Utada Hikaru0
收到消息:Utada Hikaru2
收到消息:Utada Hikaru4
收到消息:Utada Hikaru6
收到消息:Utada Hikaru8
收到消息:Utada Hikaru3
收到消息:Utada Hikaru5
收到消息:Utada Hikaru7
收到消息:Utada Hikaru9
5 RabbitHandle
RabbitHandle
可以标在方法上,RabbitListener
既可以标在方法上也可以标在类上RabbitHandle
可以来重载区分不同的消息
@Test
public void testRabbitMQTemplate() {
for(int i = 0; i < 10; i++) {
if(i % 2 == 0) {
OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity();
orderReturnReasonEntity.setId(1L);
orderReturnReasonEntity.setName("Utada Hikaru" + i);
rabbitTemplate.convertAndSend("java-exchange",
"java-binding",
orderReturnReasonEntity);
} else {
OrderEntity orderEntity = new OrderEntity();
orderEntity.setOrderSn(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("java-exchange",
"java-binding",
orderEntity);
}
}
}
@RabbitListener(queues = {"java-queue"})
public void receiveMessage1(Message message,
OrderReturnReasonEntity returnReasonEntity,
Channel channel) throws InterruptedException {
System.out.println("收到消息:" + returnReasonEntity.getName());
Thread.sleep(3000);
}
@RabbitListener(queues = {"java-queue"})
public void receiveMessage2(OrderEntity orderEntity) throws InterruptedException {
System.out.println("收到消息:" + orderEntity.getOrderSn());
Thread.sleep(3000);
}
收到消息:Utada Hikaru0
收到消息:57a79418-1caa-4320-87b9-9d8ba1441dfc
收到消息:Utada Hikaru4
收到消息:ed9f250e-3198-4e3f-bfa8-e8dc132af261
收到消息:Utada Hikaru8
收到消息:42597c05-ac2f-4920-87f9-e86fb349117c
标签:java,SpringBoot,Utada,RabbitMQ,OrderReturnReasonEntity,消息,整合,null,public
From: https://www.cnblogs.com/tod4/p/17574413.html