Spring Boot操作
Spring Boot集成RabbitMQ是现在主流的操作RabbitMQ的方式。
官方文档:https://docs.spring.io/spring-amqp/docs/current/reference/html/
-
引入依赖。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
-
添加配置。
spring: rabbitmq: addresses: 127.0.0.1 username: admin password: admin virtual-host: /test
-
配置类。
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Exchange; import org.springframework.amqp.core.ExchangeBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.QueueBuilder; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * RabbitMQ配置类 */ @Configuration public class RabbitMqConfig { /** * 定义交换机,可以很多个 * @return 交换机对象 */ @Bean("directExchange") public Exchange exchange(){ return ExchangeBuilder.directExchange("amq.direct").build(); } /** * 定义消息队列 * @return 消息队列对象 */ @Bean("testQueue") public Queue queue(){ return QueueBuilder // 非持久化类型 .nonDurable("test_springboot") .build(); } /** * 定义绑定关系 * @return 绑定关系 */ @Bean public Binding binding(@Qualifier("directExchange") Exchange exchange, @Qualifier("testQueue") Queue queue){ // 将定义的交换机和队列进行绑定 return BindingBuilder // 绑定队列 .bind(queue) // 到交换机 .to(exchange) // 使用自定义的routingKey .with("test_springboot_key") // 不设置参数 .noargs(); } }
普通消费
-
实现生产者。
import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest class RabbitMqSpringBootTests { /** * RabbitTemplate封装了大量的RabbitMQ操作,已经由Starter提供,因此直接注入使用即可 */ @Autowired private RabbitTemplate rabbitTemplate; /** * 生产者 */ @Test void producer() { /* 发送消息 参数 1:指定交换机。 参数 2:指定路由标识。 参数 3:消息内容。 */ rabbitTemplate.convertAndSend("amq.direct", "test_springboot_key", "Hello World!"); } }
运行代码后,查看可视化界面,可以看到创建了一个新的队列:
绑定关系也已经建立:
-
实现消费者。
消费者实际上就是一直等待消息然后进行处理的角色,这里只需要创建一个监听器就行了,它会一直等待消息到来然后再进行处理:
import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 直连队列监听器 * @author CodeSail */ @Component public class DirectListener { /** * 定义此方法为队列test_springboot的监听器,一旦监听到新的消息,就会接受并处理 * @param message 消息内容 */ @RabbitListener(queues = "test_springboot") public void customer(Message message){ System.out.println(new String(message.getBody())); } }
-
启动服务。
可以看到,成功消费了消息。
消费并反馈
如果需要确保消息能够被消费者接受并处理,然后得到消费者的反馈,也是可以的。
-
定义生产者。
import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest class RabbitMqSpringBootTests { /** * RabbitTemplate封装了大量的RabbitMQ操作,已经由Starter提供,因此直接注入使用即可 */ @Autowired private RabbitTemplate rabbitTemplate; /** * 生产者 */ @Test void producer() { // 会等待消费者消费然后返回响应结果 Object res = rabbitTemplate.convertSendAndReceive("amq.direct", "test_springboot_key", "Hello World!"); System.out.println("收到消费者响应:" + res); } }
-
定义生产者。
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 直连队列监听器 * @author CodeSail */ @Component public class DirectListener { /** * 定义此方法为队列test_springboot的监听器,一旦监听到新的消息,就会接受并处理 * @param message 消息内容 */ @RabbitListener(queues = "test_springboot") public String customer(String message){ System.out.println("1号消息队列监听器:" + message); return "收到!"; } }
-
启动生产者发送消息。
可以看到,消费完成后接收到了反馈消息。
Json消息
-
引入依赖。
<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.14.2</version> </dependency>
-
定义对象。
import lombok.Data; /** * 用户 */ @Data public class User { /** * 姓名 */ private String name; /** * 年龄 */ private Integer age; }
-
定义Bean。
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Exchange; import org.springframework.amqp.core.ExchangeBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.QueueBuilder; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * RabbitMQ配置类 */ @Configuration public class RabbitMqConfig { ... /** * 构建Json转换器 * @return Json转换器 */ @Bean public Jackson2JsonMessageConverter jackson2JsonMessageConverter(){ return new Jackson2JsonMessageConverter(); } }
-
定义消费者。
import cn.codesail.rabbitmq.entity.User; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 直连队列监听器 */ @Component public class DirectListener { /** * 指定messageConverter为创建的Bean名称 * @param user 用户 */ @RabbitListener(queues = "test_springboot", messageConverter = "jackson2JsonMessageConverter") public void receiver(User user) { System.out.println(user); } }
-
定义生产者。
import cn.codesail.rabbitmq.entity.User; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest class RabbitMqSpringBootTests { /** * RabbitTemplate封装了大量的RabbitMQ操作,已经由Starter提供,因此直接注入使用即可 */ @Autowired private RabbitTemplate rabbitTemplate; /** * 生产者 */ @Test void producer() { // 发送Json消息 User user = new User(); user.setName("张三"); user.setAge(18); rabbitTemplate.convertAndSend("amq.direct", "test_springboot_key", user); } }
-
启动生产者发送消息。
可以看到,对象转成了Json,消费者接收到Json转为的对象。
Spring Boot操作RabbitMQ是十分方便的,也是现在的主流,后续都用这种方式演示。
- 环境
- JDK 17.0.6
- Maven 3.6.3
- SpringBoot 3.0.4
- spring-boot-starter-amqp 3.0.4
- jackson-databind 2.14.2