RabbitMQ笔记
个人学习笔记记录
参考:尚硅谷
1.消息队列
2. 轮训分发消息
3.消息应答
4.发布确认
5.交换机
5.1Exchanges
5.1.1Exchanges概念
RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产 者甚至都不知道这些消息传递传递到了哪些队列中。 相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来 自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消 息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。
5.2临时队列
5.3绑定(bindings)
5.4 Fanout交换机
5.4.1 Fanout介绍
fanout (扇出)交换机模式,它是将所有接受到的消息发送给它知道的工作队列中,广播。
系统默认中也有Fanout
![fanout](D:\Downloads\Typora\repo\mq\rabbitmq photo\fanout.png)
Fanout实操
Logs 和临时队列的绑定关系如下图 PS:因为是随机生成的队列,所以queueName是随机的
![img](file:///C:\Users\Administrator\AppData\Roaming\Tencent\Users\1845472368\QQ\WinTemp\RichOle$@$WS~5EC1{}S$7@QT_FF$G.png)
ReceiveLogs01 和 ReceiveLogs02 将接收到的消息打印在控制台:
package com.cheng.rabbitmq.five;
import com.cheng.rabbitmq.utils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
/**
* 收到logs02
*
* @Description: ReceiveLogs02
* @Author cheng
* @Date: 2022/11/24 20:13
* @Version 1.0
* @date 2022/11/24
*/
public class ReceiveLogs02 {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMQUtils.getChannel();
//声明一个交换机 fanout扇出
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//产生队列 临时队列
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName,EXCHANGE_NAME,"1");
//等待消息
System.out.println("R2等待消息===========");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("ReceiveLogs02接受到的消息为:" + new String(message.getBody(),"utf-8"));
};
channel.basicConsume(queueName,true,deliverCallback, ConsumerTag->{});
}
}
EmitLogs发送消息到交换机
package com.cheng.rabbitmq.five;
import com.cheng.rabbitmq.utils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import javax.print.attribute.standard.NumberUp;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
/**
* 发出日志
*
* @Description: EmitLogs
* @Author cheng
* @Date: 2022/11/24 20:12
* @Version 1.0
* @date 2022/11/24
*/
public class EmitLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
Scanner scanner = new Scanner(System.in);
System.out.println("输入消息:");
while (scanner.hasNext()){
String msg = scanner.nextLine();
channel.basicPublish(EXCHANGE_NAME,"", null,msg.getBytes("UTF-8"));
}
}
}
结果:当生产者发送消息到交换机时,交换机向绑定此交换机的所有队列发送消息,即使消费者的队列的routingKey不同,仍会接受到交换机广播的消息。
5.5 Direct交换机
介绍
direct 这种类型的工作方式是,消息只去到它绑定的 routingKey 队列中去。(路由模式)
从上图可以看出交换机模式为direct,Q1队列的routingKey为orange,Q2队列有两个routingKey分别为black,green;
在这种绑定情况下,生产者发布消息到 exchange 上,路由键为 orange 的消息会被发布到队列 Q1。路由键为 black 和 green 的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃。
多重绑定
当然如果 exchange 的绑定类型是 direct,但是它绑定的多个队列的 key 如果都相同,在这种情况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了,就跟广播差不多,如上图所示。
实战
生产者:
public class EmitLogs {
private static final String EXCHANGE_NAME = "direct_logs";
private static final String ERROR = "ERROR";
private static final String WARNING = "WARNING";
private static final String INFO = "INFO";
/**
* 主要
*
* @param args arg游戏
* @throws IOException
* @throws TimeoutException 超时异常
*/
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtils.getChannel();
//绑定交换机 direct模式
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
Scanner scanner = new Scanner(System.in);
System.out.println("输入消息:");
Boolean flag = true;
while (scanner.hasNext()) {
String msg = scanner.nextLine();
System.out.println(flag);
channel.basicPublish(EXCHANGE_NAME,INFO,null,msg.getBytes("UTF-8"));
if (flag)
channel.basicPublish(EXCHANGE_NAME, ERROR, null, msg.getBytes("UTF-8"));
else
channel.basicPublish(EXCHANGE_NAME,WARNING,null,msg.getBytes("UTF-8"));
flag = !flag;
}
}
}
消费者1:
public class ReceiveLogs01 {
private static final String EXCHANGE_NAME = "direct_logs";
private static final String ERROR = "ERROR";
private static final String WARNING = "WARNING";
private static final String INFO = "INFO";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtils.getChannel();
//绑定routingKey
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,EXCHANGE_NAME,ERROR);
channel.queueBind(queue,EXCHANGE_NAME,INFO);
System.out.println("R1等待接受消息");
DeliverCallback deliverCallback = (tag,msg)-> {
System.out.println("消息为:" + new String(msg.getBody(),"UTF-8"));
};
channel.basicConsume(queue,true,deliverCallback,tag->{});
}
}
消费者2:绑定routingKey不同
public class ReceiveLogs02 {
private static final String EXCHANGE_NAME = "direct_logs";
private static final String ERROR = "ERROR";
private static final String WARNING = "WARNING";
private static final String INFO = "INFO";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtils.getChannel();
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,EXCHANGE_NAME,WARNING);
channel.queueBind(queue,EXCHANGE_NAME,INFO);
System.out.println("R2等待接受消息");
DeliverCallback deliverCallback = (tag, msg)-> {
System.out.println("消息为:" + new String(msg.getBody(),"UTF-8"));
};
channel.basicConsume(queue,true,deliverCallback,tag->{});
}
}
总结:
- 生产者向
ROUTING_KEY_INFO
发送消息时,消费者1、2都可以收到消息 - 生产者向
ROUTING_KEY_WARNING
发送消息时,消费者2可以收到消息 - 生产者向
ROUTING_KEY_ERROR
发送消息时,消费者1可以收到消息 - 如果启动多个消费者1,生产者向
ROUTING_KEY_INFO
或ROUTING_KEY_WARNING
发送消息时,多个消费者1将会轮流公平接收消息,且每个消息只会被消费一次。