一、概述
RabbitMQ 是实现 AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
RabbitMQ 主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。
整体上看其实就是一个生产者消费者模型。只是这个模型更加抽象及精细化了。
RabbitMQ大体可以分为三层,其中第二层又可以细分为两层:
1.生产者
2.Broker
a.交换机(exchange)
b.队列
3.消费者
大体上如下图所示:
其中P代表生产者、X代表交换机、红色的长条代表队列,C代表消费者。
运行过程大致描述:
P生产的消息先放入交换机,交换机通过路由键找到绑定的队列,这样交换机的数据就直接到队列中了。而C作为消费者会监听是否有消息(可以主动去拿,可以被动接受)
生产消费过程:P->exchange->queue->c
二、代码示例
1.在pom.xml中引入RabbitMQ
<!-- 集成rabbitmq--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2.配置application.yml
rabbitmq: host: rabbitmq主机ip port: 5672 username: 用户名 password: 密码 publisher-returns: true #开启发送失败退回 publisher-confirm-type: correlated
3.创建配置文件(简单配置)RabbitConfig.java
public class RabbitConfig { /** * 交换机名称(自定义的,想起什么就起身名字) */ public static final String EXCHANGE = "topic.exchange"; /** * 队列名称(自定义的,想起什么就起身名字) */ public static final String QUEUE_A = "topic_test_queue"; /** * 路由键(自定义的,想起什么就起身名字) */ public static final String ROUTINGKEY_A = "key.#"; }
4.编写生产者
/** * 消息生产者 * * @author Tony * @version 2023 * @date 2023/9/20 11:40 */ @Slf4j @Component public class RabbitMqProducer implements RabbitTemplate.ConfirmCallback { private RabbitTemplate rabbitTemplate; @Autowired public RabbitMqProducer(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; rabbitTemplate.setConfirmCallback(this); } /** * 发送消息 * * @param content 消息内容 */ public void sendMessage(String content) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend( RabbitConfig.EXCHANGE,//交换机 RabbitConfig.ROUTINGKEY_A,//路由键 content, correlationData ); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("接收到了RabbitMQ的回调id:{}", correlationData); if (ack) { log.info("消息成功消费"); } else { log.info("消息消费失败:{}", cause); } } }
5.编写消费者
/** * 队列消费者 * * @author Tony * @version 2023 * @date 2023/9/20 11:41 */ @Slf4j @Component @RabbitListener(bindings = @QueueBinding( exchange = @Exchange(value =RabbitConfig.EXCHANGE,durable = "false",type = "topic"),//指明交换机及交换机的类型以及是否持久化 value = @Queue(value = RabbitConfig.QUEUE_A,durable = "false"),//指明交换机绑定的队列 key = RabbitConfig.ROUTINGKEY_A))//指明交换机和队列之间的桥梁路由键 public class RabbitMqConsumer { /** * 接收String类型的消息 * @param message */ @RabbitHandler public void onStringHandle(String message) { log.info("RabbitMQ=>这是String类型的消息:{}",message); } /** * 接收Byte数组类型的消息 * @param message */ @RabbitHandler public void onByteHandle(byte[] message) { log.info("RabbitMQ=>这是byte[]类型的消息:{}",message); } }
6.编写一个RabbitMQController.java进行发送消息的测试
@RestController @RequestMapping("/api/v1/pub/mq/") public class RabbitMqController { @Autowired RabbitMqProducer rabbitMqProducer; @GetMapping("send") public String sendMsg() { String msg = "第" + new Random().nextInt(1000) + "个消息," + UUID.randomUUID().toString(); rabbitMqProducer.sendMessage(msg); return msg; } }
7.运行效果
标签:String,RabbitConfig,队列,RabbitMQ,交换机,SpringBoot2,注解,public From: https://www.cnblogs.com/tony-yang-flutter/p/17717523.html