1. Rabbitmq简介
RabbitMQ是一个实现了AMQP(Advanced Message Queuing Protocol)高级消息队列协议的消息队列服务,用Erlang语言。是面向消息的中间件。
你可以把它想像成一个邮局:你把信件放入邮箱,邮递员就会把信件投递到你的收件人处。在这个比喻中,RabbitMQ是一个邮箱、邮局、邮递员。RabbitMQ和邮局的主要区别是,它处理的不是纸,而是接收、存储和发送二进制的数据——消息。
主要流程:生产者(Producer)与消费者(Consumer)和 RabbitMQ 服务(Broker)建立连接, 然后生产者发布消息(Message)同时需要携带交换机(Exchange) 名称以及路由规则(Routing Key),这样消息会到达指定的交换机,然后交换机根据路由规则匹配对应的 Binding,最终将消息发送到匹配的消息队列(Quene),最后 RabbitMQ 服务将队列中的消息投递给订阅了该队列的消费者(消费者也可以主动拉取消息)。
2. Java实现
rabbit工具类
package com.example.studydemo.Rabbit;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
@Slf4j
@Component
public class RabbitUtil {
@Autowired
private AmqpAdmin amqpAdmin;
@Autowired
private AmqpTemplate rabbitTemplate;
/**
* 创建交换机
*
* @param changeName 交换机名称
*/
public void createExchange(String changeName) {
TopicExchange exchange = new TopicExchange(changeName, true, false);
amqpAdmin.declareExchange(exchange);
}
/**
* 创建队列
*
* @param queueName 队列名称
*/
public void createQueue(String queueName) {
Queue queue = new Queue(queueName, true, false, false);
amqpAdmin.declareQueue(queue);
}
/**
* 交换机绑定
*
* @param changeName 交换机名称
* @param routingKey 路由key
* @param queueName 队列名称
*/
public void bindExchange(String changeName, String routingKey, String queueName) {
Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, changeName, routingKey, null);
amqpAdmin.declareBinding(binding);
}
/**
* 发送信息到交换机
*
* @param changeName 交换机名称
* @param routingKey 路由key
* @param message 消息
*/
public void sendMessage(String changeName, String routingKey, String message) {
rabbitTemplate.convertAndSend(changeName, routingKey, message);
}
public String receiveMessage(String queueName) {
return Objects.requireNonNull(rabbitTemplate.receiveAndConvert(queueName)).toString();
}
/**
* 删除交换机
*
* @param changeName 交换机名称
*/
public void deleteExchange(String changeName) {
amqpAdmin.deleteExchange(changeName);
}
/**
* 删除队列
*
* @param queueName 队列名称
*/
public void deleteQueue(String queueName) {
amqpAdmin.deleteQueue(queueName);
}
}
这个类原本是没有接收消息方法receiveMessage,是我加上的,不过经过测试,这个方法只会在队列中取数据,如果队列空了,就会报空指针异常,所以感觉接收这个设计不实用(sendMessage2次消息,队列中有两条数据,请求两次receiveMessage是没问题的,三次就报错了)
Controller类
package com.example.studydemo.Rabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/rabbits")
public class rabbitController {
@Autowired
private RabbitUtil rabbitUtil;
@Value("星月天空")
private String exchangeName;
@Value("hello-queue")
private String queueName;
@Value("#.hello")
private String routingKey;
@GetMapping("/send")
public String sendMessage(@RequestParam("message") String message){
rabbitUtil.createExchange(exchangeName);
rabbitUtil.createQueue(exchangeName);
rabbitUtil.createQueue(queueName);
//#.test采用通配符绑定
// rabbitUtil.bindExchange(exchangeName,"#.test",queueName);
rabbitUtil.bindExchange(exchangeName,routingKey,queueName);
//只允许hello队列接收信息
rabbitUtil.sendMessage(exchangeName,routingKey,message);
return "发送成功";
}
@GetMapping("/receive")
public String RabbitListener() {
//接收test-queue,hello-queue队列信息,如果每个队列都有信息,那么就会执行多次方法
//如果只想接收hello-queue队列信息 如: @RabbitListener(queues={"test-queue"})
return rabbitUtil.receiveMessage(queueName);
}
}
这里的sendMessage与RabbitListener都是一次性的,并非长连接,简而言之就是sendMessage是向队列中放一条数据,RabbitListener是从队列中取一条数据,如果队列为空则报空指针异常,个人觉得这个取数据不好,不过想来正常应用中应该是可以前端订阅,后端推送,后端要想持续给前端订阅接口,就是要走长连接。。。。。
后端订阅类
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "hello-queue")
public class SimpleHelloReceiver {
@RabbitHandler
public void handle(String in) {
System.out.println("我收到了消息:" + in);
}
}
原理应该是通过管道监听器实现的,还是那句话,要想用这个的话,前端要不定时任务,要不长连接。。。。。。。。。。。。
参考:
https://blog.csdn.net/weixin_44905972/article/details/131683234
https://blog.csdn.net/qq_48721706/article/details/125194646