首页 > 其他分享 >SpringBoot 整合RabbitMq 实战

SpringBoot 整合RabbitMq 实战

时间:2022-09-28 15:09:46浏览次数:90  
标签:实战 SpringBoot -- springframework send RabbitMq org import public


SpringBoot 整合RabbitMq 实战


参考官网: ​​https://docs.spring.io/spring-boot/docs/current/reference/html/boot-features-messaging.html#boot-features-amqp​

spring-boot-starter-amqp

高级消息队列协议(AMQP)是面向消息中间件的平台中立的有线协议。Spring AMQP项目将核心Spring概念应用于基于AMQP的消息传递解决方案的开发。Spring Boot为通过RabbitMQ与AMQP一起工作提供了一些便利,包括spring-boot-starter-amqp “Starter”。

springboot集成RabbitMQ非常简单,如果只是简单的使用配置非常少,springboot提供了spring-boot-starter-amqp项目对消息各种支持。

添加依赖

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

RabbitMQ是基于AMQP协议的轻量级,可靠,可扩展,可移植的消息代理。Spring使用RabbitMQ通过AMQP协议进行通信。

属性配置

RabbitMQ配置由外部配置属性控制 spring.rabbitmq.*。例如,您可以在以下部分声明以下部分 application.properties:

spring.rabbitmq.host = localhost
spring.rabbitmq.port = 5672
spring.rabbitmq.username = guest
spring.rabbitmq.password

快速上手

1.队列配置

package com.example.rabbitmqdemo.config;


import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* RabbitMQ 配置类
*
* @author itguang
* @create
@Configuration
public class RabbitConfig

@Bean
public Queue queue(){
return new Queue("hello");
}
}

2 发送者

rabbitTemplate是springboot 提供的默认实现.

package com.example.rabbitmqdemo.rabbitmq;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Date;

/**
* 消息发送者
*
* @author itguang
* @create

@Component
public class HelloSender


@Autowired
private AmqpTemplate amqpTemplate;


public void send(){
String context = "hello----"+LocalDateTime.now();
System.out.println("send:"+context);
//往名称为 hello 的queue中发送消息
this.amqpTemplate.convertAndSend("hello",context);
}

}

3 接收者

package com.example.rabbitmqdemo.rabbitmq;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* 消息接受者
*
* @author itguang
* @create

@Component
@RabbitListener(queues = "hello") //监听 名称为 hello 的queue
public class HelloReceiver

//消息处理器
@RabbitHandler
public void process(String message){
System.out.println("Receiver:"+message);

}


}

测试

package com.example.rabbitmqdemo;

import com.example.rabbitmqdemo.rabbitmq.HelloSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqdemoApplicationTests

@Autowired
HelloSender helloSender;

@Test
public void contextLoads() {
helloSender.send();

}
}

查看控制台输出结果

send:hello----2018-04-21T11:29:47.739
Receiver:hello----2018-04-21T11:29:47.739

一对多发送:一个发送者多个接受者

对上面的代码进行了小改造,接收端注册了两个Receiver,Receiver1和Receiver2,发送端加入参数计数,接收端打印接收到的参数,下面是测试代码,发送一百条消息,来观察两个接收端的执行效果

  • 添加一个队列叫 hello2
package com.example.rabbitmqdemo.config;


import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* RabbitMQ 配置类
*
* @author itguang
* @create
@Configuration
public class RabbitConfig



@Bean
public Queue queue(){
return new Queue("hello");
}

@Bean
public Queue queue2(){
return new Queue("hello2");
}



}
  • 给队列 hello2 发送消息,接受一个计数参数
package com.example.rabbitmqdemo.rabbitmq;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Date;

/**
* 消息发送者
*
* @author itguang
* @create

@Component
public class HelloSender


@Autowired
private AmqpTemplate amqpTemplate;


public void send(){
String context = "hello----"+LocalDateTime.now();
System.out.println("send:"+context);
this.amqpTemplate.convertAndSend("hello",context);
}

//给hello2发送消息,并接受一个计数参数
public void send2(int i){
String context = i+"";
System.out.println(context+"--send:");
this.amqpTemplate.convertAndSend("hello2",context);
}
}
  • 两个hello2 的接受者
@Component
@RabbitListener(queues = "hello2")
public class HelloReceiver1


@RabbitHandler
public void process(String message){

System.out.println("Receiver1:"+message);
}


}
@Component
@RabbitListener(queues = "hello2")
public class HelloReceiver2

@RabbitHandler
public void process(String message){
System.out.println("Receiver2:"+message);
}
}

测试

@Test
public void manyReceiver(){
for (int i=0;i<100;i++){
helloSender.send2(i);
}

}

查看控制台输出结果:

0--send:
1--send:
2--send:
3--send:
4--send:

...(省略)

58--send:
59--send:
60--send:
61--send:
62--send:
63--send:
Receiver2:1
Receiver1:0
64--send:
65--send:
Receiver1:2
Receiver2:3
66--send:
Receiver1:4
Receiver2:5
...(省略)

可以看到:在消息发送到63时,接受者Receiver已经收到了消息,
结论:

一个发送者,N个接受者,经过测试会均匀的将消息发送到N个接收者中

多对多: 多个发送者对多个接受者

我们可以注入两个发送者,放在循环中,如下:

@Test
public void many2many(){
for (int i=0;i<100;i++){
helloSender.send2(i);
helloSender2.send2(i);

}
}

运行单元测试,查看控制台输出:

0--send:
0--send:
1--send:
1--send:
2--send:
2--send:
3--send:
3--send:

...(省略)

22--send:
22--send:
23--send:
23--send:
24--send:
24--send:
Receiver2:0
25--send:
25--send:
Receiver2:1
26--send:
Receiver2:2
26--send:
Receiver2:3
27--send:
Receiver1:0
27--send:
Receiver2:4
Receiver1:1
28--send:
Receiver2:5
Receiver1:2
28--send:
Receiver2:6
Receiver1:3
29--send:
Receiver2:7
Receiver1:4
29--send:
Receiver2:8
Receiver1:5
30--send:
Receiver2:9
Receiver1:6
30--send:
31--send:
31--send:
32--send:
32--send:

结论:和一对多一样,接收端仍然会均匀接收到消息

发送对象

首先我们创建一个实体类对象 User,注意必须实现 Serializable 接口.

package com.example.rabbitmqdemo.pojo;

import java.io.Serializable;

/**
* @author itguang
* @create
public class User implements Serializable


private String username;
private String password;

public User(String username, String password) {
this.username = username;
this.password = password;
}

public String getUsername() {
return username;
}

public void setUsername(String username) {
this.username = username;
}

public String getPassword() {
return password;
}

public void setPassword(String password) {
this.password = password;
}

@Override
public String toString() {
return "User{" +
"username='" + username + '\'' +
", password='" + password + '\'' +
'}';
}
}

然后在配置文件中再创建一个队列,叫 ​​object_queue​

@Bean
public Queue queue3(){
return new Queue("object_queue");
}

接下里就是User对象的两个发送者ObjectSender和接受者ObjectReceiver:

@Component
public class ObjectSender

@Autowired
AmqpTemplate amqpTemplate;

public void sendUser(User user){

System.out.println("Send object:"+user.toString());
this.amqpTemplate.convertAndSend("object_queue",user);

}
}
@Component
@RabbitListener(queues = "object_queue")
public class ObjectReceiver

@RabbitHandler
public void objectReceiver(User user){

System.out.println("Receiver object:"+user.toString());

}
}

运行单元测试,查看控制台输出结果:

Send object:User{username='李增光', password='666666'}
Receiver object:User{username='李增光', password='666666'}

Topic Exchange

topic 是RabbitMQ中最灵活的一种方式,可以根据routing_key自由的绑定不同的队列

首先对topic规则配置,这里使用两个队列来测试

package com.example.rabbitmqdemo.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @author itguang
* @create
@Configuration
public class TopicRabbitConfig


final static String message = "topic.message";
final static String messages = "topic.messages";


//创建两个 Queue
@Bean
public Queue queueMessage(){
return new Queue(TopicRabbitConfig.message);
}

@Bean
public Queue queueMessages(){
return new Queue(TopicRabbitConfig.messages);
}

//配置 TopicExchange,指定名称为 topicExchange
@Bean
public TopicExchange exchange(){
return new TopicExchange("topicExchange");
}

//给队列绑定 exchange 和 routing_key

@Bean
public Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange){
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
}

@Bean
public Binding bingingExchangeMessages(Queue queueMessages,TopicExchange exchange){
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
}


}

消息发送者:都是用topicExchange,并且绑定到不同的 routing_key

package com.example.rabbitmqdemo.topic;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
* @author itguang
* @create
@Component
public class TopicSender

@Autowired
AmqpTemplate amqpTemplate;

public void send1(){
String context = "hi, i am message 1";
System.out.println("Sender : " + context);
amqpTemplate.convertAndSend("topicExchange","topic.message",context);
}

public void send2() {
String context = "hi, i am messages 2";
System.out.println("Sender : " + context);
amqpTemplate.convertAndSend("topicExchange", "topic.messages", context);
}
}

两个消息接受者,分别指定不同的 queue

package com.example.rabbitmqdemo.topic;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* @author itguang
* @create
@Component
@RabbitListener(queues = "topic.message")
public class TopicReceiver1

@RabbitHandler
public void process(String message){

System.out.println("Receiver topic.message :"+ message);

}

}
package com.example.rabbitmqdemo.topic;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* @author itguang
* @create
@Component
@RabbitListener(queues = "topic.messages")
public class TopicReceiver2

@RabbitHandler
public void process(String message){

System.out.println("Receiver topic.messages: "+ message);

}

}

测试:

发送send1会匹配到topic.#和topic.message 两个Receiver都可以收到消息,发送send2只有topic.#可以匹配所有只有Receiver2监听到消息

Fanout Exchange

Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。

Fanout 相关配置:

package com.example.rabbitmqdemo.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.security.PublicKey;

/**
* @author itguang
* @create
@Configuration
public class FanOutRabbitMq


//创建三个队列

@Bean
public Queue AMessage() {
return new Queue("fanout.A");
}

@Bean
public Queue BMessage() {
return new Queue("fanout.B");
}

@Bean
public Queue CMessage() {
return new Queue("fanout.C");
}


//创建exchange,指定交换策略

@Bean
public FanoutExchange fanoutExchange() {

return new FanoutExchange("fanoutExchange");
}


//分别给三个队列指定exchange,这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略:

@Bean
public Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange){
return BindingBuilder.bind(AMessage).to(fanoutExchange);

}

@Bean
public Binding bindingExchangeB(Queue BMessage,FanoutExchange fanoutExchange){
return BindingBuilder.bind(BMessage).to(fanoutExchange);

}

@Bean
Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
return

消息发送者:

这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略

package com.example.rabbitmqdemo.fanout;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
* @author itguang
* @create
@Component
public class FanoutSender


@Autowired
AmqpTemplate amqpTemplate;


public void send(){

String context = "hi, fanout msg ";
System.out.println("Sender : " + context);
//这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略:
amqpTemplate.convertAndSend("fanoutExchange","", context);

}

}

三个消息接受者:

@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA


@RabbitHandler
public void process(String message){

System.out.println("Receiver form fanout.A: "+message);

}

}

@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB


@RabbitHandler
public void process(String message){

System.out.println("Receiver form fanout.B: "+message);

}

}

@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiverC


@RabbitHandler
public void process(String message){

System.out.println("Receiver form fanout.C: "+message);

}

}

运行单元测试,查看结果:

Sender : hi, fanout msg 

Receiver form fanout.C: hi, fanout msg
Receiver form fanout.A: hi, fanout msg
Receiver form fanout.B: hi, fanout msg

结果说明,绑定到fanout交换机上面的队列都收到了消息.


标签:实战,SpringBoot,--,springframework,send,RabbitMq,org,import,public
From: https://blog.51cto.com/u_13866611/5719398

相关文章

  • synchronized实战
    synchronizedsynchronized是最常用的实现同步的手段,在JavaSE1.6以及之后的版本,对synchronized进行了优化,使synchronized整体的性能得到了很大的提升,下面看下synch......
  • java8实战五:用流收集数据
    用流收集数据我们已经在前面两篇文章中用过collect终端操作了,当时主要是用来把Stream中所有的元素结合成一个List。在本章中,你会发现collect是一个归约操作,就像r......
  • 实战指南 | Serverless 架构下的应用开发
    简介: 基于Serverless架构的应用开发流程将会比基于传统架构的应用开发更简单。在Serverless架构下进行应用开发,用户通常只需要按照规范编写代码、构建产物,然后部署到......
  • java8实战十一: java8----新的日期时间API
    java8中新的时间和日期APIJava的API提供了很多有用的组件,能帮助你构建复杂的应用。不过,JavaAPI也不总是完美的。我们相信大多数有经验的程序员都会赞同Java8之前的库对......
  • java8实战三:函数式数据处理--流
    函数式数据处理(一)–流引入流集合是Java中使用最多的API。要是没有集合,还能做什么呢?几乎每个Java应用程序都会制造和处理集合。集合对于很多编程任务来说都是非常基本的:......
  • Springboot 外置配置详解
    Springboot外置配置springBoot自动配置的bean提供了300多个用于微调的属性.当调整设置时,只需要在环境变量,Java系统属性,JNDI,命令行参数,属性文件进行配置就好了.举例......
  • java8实战一:通过行为参数化传递代码
    通过行为参数化传递代码如何对你的代码加以改进,从而更灵活地适应不断变化的需求?行为参数化就是可以帮你处理频繁变更的需求的一种软件开发模式.一言以蔽之,它意味着拿出一......
  • java8实战二:Lambda 表达式
    Lambda表达式前片文章讲到,使用匿名类来表示不同的行为并不令人满意:代码十分啰嗦,这会影响程序员在实践中使用行为参数化的积极性。在本章中,我们会教给你Java8中解决这个......
  • Netty实战:Netty优雅的创建高性能TCP服务器(附源码)
    文章目录前言1.前置准备2.消息处理器3.重写通道初始化类4.核心服务5.效果预览6.添加通道管理,给指定的客户端发送消息7.源码分享 前言Springbo......
  • 玩转SpringBoot之定时任务
    玩转SpringBoot之定时任务使用SpringBoot创建定时任务非常简单,目前主要有以下三种创建方式:一、基于注解(@Scheduled)二、基于接口(SchedulingConfigurer)前者相信大家......