SpringBoot 整合RabbitMq 实战
spring-boot-starter-amqp
高级消息队列协议(AMQP)是面向消息中间件的平台中立的有线协议。Spring AMQP项目将核心Spring概念应用于基于AMQP的消息传递解决方案的开发。Spring Boot为通过RabbitMQ与AMQP一起工作提供了一些便利,包括spring-boot-starter-amqp “Starter”。
springboot集成RabbitMQ非常简单,如果只是简单的使用配置非常少,springboot提供了spring-boot-starter-amqp项目对消息各种支持。
添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
RabbitMQ是基于AMQP协议的轻量级,可靠,可扩展,可移植的消息代理。Spring使用RabbitMQ通过AMQP协议进行通信。
属性配置
RabbitMQ配置由外部配置属性控制 spring.rabbitmq.*。例如,您可以在以下部分声明以下部分 application.properties:
spring.rabbitmq.host = localhost
spring.rabbitmq.port = 5672
spring.rabbitmq.username = guest
spring.rabbitmq.password
快速上手
1.队列配置
package com.example.rabbitmqdemo.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitMQ 配置类
*
* @author itguang
* @create
@Configuration
public class RabbitConfig
@Bean
public Queue queue(){
return new Queue("hello");
}
}
2 发送者
rabbitTemplate是springboot 提供的默认实现.
package com.example.rabbitmqdemo.rabbitmq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Date;
/**
* 消息发送者
*
* @author itguang
* @create
@Component
public class HelloSender
@Autowired
private AmqpTemplate amqpTemplate;
public void send(){
String context = "hello----"+LocalDateTime.now();
System.out.println("send:"+context);
//往名称为 hello 的queue中发送消息
this.amqpTemplate.convertAndSend("hello",context);
}
}
3 接收者
package com.example.rabbitmqdemo.rabbitmq;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 消息接受者
*
* @author itguang
* @create
@Component
@RabbitListener(queues = "hello") //监听 名称为 hello 的queue
public class HelloReceiver
//消息处理器
@RabbitHandler
public void process(String message){
System.out.println("Receiver:"+message);
}
}
测试
package com.example.rabbitmqdemo;
import com.example.rabbitmqdemo.rabbitmq.HelloSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqdemoApplicationTests
@Autowired
HelloSender helloSender;
@Test
public void contextLoads() {
helloSender.send();
}
}
查看控制台输出结果
send:hello----2018-04-21T11:29:47.739
Receiver:hello----2018-04-21T11:29:47.739
一对多发送:一个发送者多个接受者
对上面的代码进行了小改造,接收端注册了两个Receiver,Receiver1和Receiver2,发送端加入参数计数,接收端打印接收到的参数,下面是测试代码,发送一百条消息,来观察两个接收端的执行效果
- 添加一个队列叫 hello2
package com.example.rabbitmqdemo.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitMQ 配置类
*
* @author itguang
* @create
@Configuration
public class RabbitConfig
@Bean
public Queue queue(){
return new Queue("hello");
}
@Bean
public Queue queue2(){
return new Queue("hello2");
}
}
- 给队列 hello2 发送消息,接受一个计数参数
package com.example.rabbitmqdemo.rabbitmq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Date;
/**
* 消息发送者
*
* @author itguang
* @create
@Component
public class HelloSender
@Autowired
private AmqpTemplate amqpTemplate;
public void send(){
String context = "hello----"+LocalDateTime.now();
System.out.println("send:"+context);
this.amqpTemplate.convertAndSend("hello",context);
}
//给hello2发送消息,并接受一个计数参数
public void send2(int i){
String context = i+"";
System.out.println(context+"--send:");
this.amqpTemplate.convertAndSend("hello2",context);
}
}
- 两个hello2 的接受者
@Component
@RabbitListener(queues = "hello2")
public class HelloReceiver1
@RabbitHandler
public void process(String message){
System.out.println("Receiver1:"+message);
}
}
@Component
@RabbitListener(queues = "hello2")
public class HelloReceiver2
@RabbitHandler
public void process(String message){
System.out.println("Receiver2:"+message);
}
}
测试
@Test
public void manyReceiver(){
for (int i=0;i<100;i++){
helloSender.send2(i);
}
}
查看控制台输出结果:
0--send:
1--send:
2--send:
3--send:
4--send:
...(省略)
58--send:
59--send:
60--send:
61--send:
62--send:
63--send:
Receiver2:1
Receiver1:0
64--send:
65--send:
Receiver1:2
Receiver2:3
66--send:
Receiver1:4
Receiver2:5
...(省略)
可以看到:在消息发送到63时,接受者Receiver已经收到了消息,
结论:
一个发送者,N个接受者,经过测试会均匀的将消息发送到N个接收者中
多对多: 多个发送者对多个接受者
我们可以注入两个发送者,放在循环中,如下:
@Test
public void many2many(){
for (int i=0;i<100;i++){
helloSender.send2(i);
helloSender2.send2(i);
}
}
运行单元测试,查看控制台输出:
0--send:
0--send:
1--send:
1--send:
2--send:
2--send:
3--send:
3--send:
...(省略)
22--send:
22--send:
23--send:
23--send:
24--send:
24--send:
Receiver2:0
25--send:
25--send:
Receiver2:1
26--send:
Receiver2:2
26--send:
Receiver2:3
27--send:
Receiver1:0
27--send:
Receiver2:4
Receiver1:1
28--send:
Receiver2:5
Receiver1:2
28--send:
Receiver2:6
Receiver1:3
29--send:
Receiver2:7
Receiver1:4
29--send:
Receiver2:8
Receiver1:5
30--send:
Receiver2:9
Receiver1:6
30--send:
31--send:
31--send:
32--send:
32--send:
结论:和一对多一样,接收端仍然会均匀接收到消息
发送对象
首先我们创建一个实体类对象 User,注意必须实现 Serializable 接口.
package com.example.rabbitmqdemo.pojo;
import java.io.Serializable;
/**
* @author itguang
* @create
public class User implements Serializable
private String username;
private String password;
public User(String username, String password) {
this.username = username;
this.password = password;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
@Override
public String toString() {
return "User{" +
"username='" + username + '\'' +
", password='" + password + '\'' +
'}';
}
}
然后在配置文件中再创建一个队列,叫 object_queue
@Bean
public Queue queue3(){
return new Queue("object_queue");
}
接下里就是User对象的两个发送者ObjectSender和接受者ObjectReceiver:
@Component
public class ObjectSender
@Autowired
AmqpTemplate amqpTemplate;
public void sendUser(User user){
System.out.println("Send object:"+user.toString());
this.amqpTemplate.convertAndSend("object_queue",user);
}
}
@Component
@RabbitListener(queues = "object_queue")
public class ObjectReceiver
@RabbitHandler
public void objectReceiver(User user){
System.out.println("Receiver object:"+user.toString());
}
}
运行单元测试,查看控制台输出结果:
Send object:User{username='李增光', password='666666'}
Receiver object:User{username='李增光', password='666666'}
Topic Exchange
topic 是RabbitMQ中最灵活的一种方式,可以根据routing_key自由的绑定不同的队列
首先对topic规则配置,这里使用两个队列来测试
package com.example.rabbitmqdemo.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author itguang
* @create
@Configuration
public class TopicRabbitConfig
final static String message = "topic.message";
final static String messages = "topic.messages";
//创建两个 Queue
@Bean
public Queue queueMessage(){
return new Queue(TopicRabbitConfig.message);
}
@Bean
public Queue queueMessages(){
return new Queue(TopicRabbitConfig.messages);
}
//配置 TopicExchange,指定名称为 topicExchange
@Bean
public TopicExchange exchange(){
return new TopicExchange("topicExchange");
}
//给队列绑定 exchange 和 routing_key
@Bean
public Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange){
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
}
@Bean
public Binding bingingExchangeMessages(Queue queueMessages,TopicExchange exchange){
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
}
}
消息发送者:都是用topicExchange,并且绑定到不同的 routing_key
package com.example.rabbitmqdemo.topic;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author itguang
* @create
@Component
public class TopicSender
@Autowired
AmqpTemplate amqpTemplate;
public void send1(){
String context = "hi, i am message 1";
System.out.println("Sender : " + context);
amqpTemplate.convertAndSend("topicExchange","topic.message",context);
}
public void send2() {
String context = "hi, i am messages 2";
System.out.println("Sender : " + context);
amqpTemplate.convertAndSend("topicExchange", "topic.messages", context);
}
}
两个消息接受者,分别指定不同的 queue
package com.example.rabbitmqdemo.topic;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author itguang
* @create
@Component
@RabbitListener(queues = "topic.message")
public class TopicReceiver1
@RabbitHandler
public void process(String message){
System.out.println("Receiver topic.message :"+ message);
}
}
package com.example.rabbitmqdemo.topic;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author itguang
* @create
@Component
@RabbitListener(queues = "topic.messages")
public class TopicReceiver2
@RabbitHandler
public void process(String message){
System.out.println("Receiver topic.messages: "+ message);
}
}
测试:
发送send1会匹配到topic.#和topic.message 两个Receiver都可以收到消息,发送send2只有topic.#可以匹配所有只有Receiver2监听到消息
Fanout Exchange
Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。
Fanout 相关配置:
package com.example.rabbitmqdemo.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.security.PublicKey;
/**
* @author itguang
* @create
@Configuration
public class FanOutRabbitMq
//创建三个队列
@Bean
public Queue AMessage() {
return new Queue("fanout.A");
}
@Bean
public Queue BMessage() {
return new Queue("fanout.B");
}
@Bean
public Queue CMessage() {
return new Queue("fanout.C");
}
//创建exchange,指定交换策略
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
//分别给三个队列指定exchange,这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略:
@Bean
public Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange){
return BindingBuilder.bind(AMessage).to(fanoutExchange);
}
@Bean
public Binding bindingExchangeB(Queue BMessage,FanoutExchange fanoutExchange){
return BindingBuilder.bind(BMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
return
消息发送者:
这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略
package com.example.rabbitmqdemo.fanout;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author itguang
* @create
@Component
public class FanoutSender
@Autowired
AmqpTemplate amqpTemplate;
public void send(){
String context = "hi, fanout msg ";
System.out.println("Sender : " + context);
//这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略:
amqpTemplate.convertAndSend("fanoutExchange","", context);
}
}
三个消息接受者:
@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA
@RabbitHandler
public void process(String message){
System.out.println("Receiver form fanout.A: "+message);
}
}
@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB
@RabbitHandler
public void process(String message){
System.out.println("Receiver form fanout.B: "+message);
}
}
@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiverC
@RabbitHandler
public void process(String message){
System.out.println("Receiver form fanout.C: "+message);
}
}
运行单元测试,查看结果:
Sender : hi, fanout msg
Receiver form fanout.C: hi, fanout msg
Receiver form fanout.A: hi, fanout msg
Receiver form fanout.B: hi, fanout msg
结果说明,绑定到fanout交换机上面的队列都收到了消息.