概念
RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产 者甚至都不知道这些消息传递传递到了哪些队列中。
相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来 自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消 息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。
在交换机中衍生出两个概念:
- routingkey(路由键):生产者发送消息给交换机时,需要指定一个routingkey
- 绑定键:通过绑定键将交换机和队列绑定起来,这样mq在发送消息时能正确的发送到指定队列
- 两者中的关系:生产者将消息发送到哪个交换机是由routingkey觉得的,在由交换机通过指定bindingkey指定队列。
交换机的类型
在RabbitMQ中,交换机(Exchange)和队列(Queue)的声明仅需要在生产者或消费者任意一方进行即可。
通常的做法是:
- 生产者负责声明Exchange和绑定队列关系
- 消费者负责声明Queue
扇出交换机(Fanout Exchange)
Fanout 这种类型非常简单。正如从名称中猜到的那样,它是将接收到的所有消息广播到它知道的 所有队列中。
生产者:
/**
* 扇出交换机,生产者
* 生产者负责声明 Exchange和绑定队列关系
*/
public class FanoutProducer {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtil.getChannel();
// 声明交换机参数:1-交换机名称 , 2-交换机类型
channel.exchangeDeclare(FanoutContans.EXCHANGENAME, BuiltinExchangeType.FANOUT);
// 绑定键,绑定队列与交换机的关系,参数:1-队列名称 2-交换机名称 3-路由键
channel.queueBind(FanoutContans.QUEUENAME,FanoutContans.EXCHANGENAME,"");
channel.queueBind(FanoutContans.QUEUENAME1,FanoutContans.EXCHANGENAME,"");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.next();
// 发送消息参数:1-交换机名称 2-路由键 3-其他参数 4-消息
channel.basicPublish(FanoutContans.EXCHANGENAME,"123",null,message.getBytes());
System.out.println("生产者发出消息是:"+message);
}
}
}
消费者:
/**
* 消费者,测试扇出模式
* 消费者负责声明Queue
*/
public class FanoutConsumer {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtil.getChannel();
/**
* 生成一个临时的队列 队列的名称是随机的
* 当消费者断开和该队列的连接时 队列自动删除
*/
// 声明队列
channel.queueDeclare(FanoutContans.QUEUENAME,false,false,false,null);
int prefetchCount = 1;
channel.basicQos(prefetchCount);
// 读取消息
System.out.println("等待接收消息。。。。。把接收到的消息打印在控制台上。。。。。");
// 回调函数 成功后的消息
DeliverCallback deliverCallback = (consumerTag, message)->{
System.out.println("接收到的消息是:"+new String(message.getBody()));
};
channel.basicConsume(FanoutContans.QUEUENAME,true,deliverCallback,null,null);
}
}
经过测试可以实现一条消息发送个多个消费者
绑定关系如下:
然后我想着,如果说routingkey 不同是不是就不会发送消息,于是我修改了生产者的代码:
// 绑定键,绑定队列与交换机的关系,参数:1-队列名称 2-交换机名称 3-路由键
channel.queueBind(FanoutContans.QUEUENAME,FanoutContans.EXCHANGENAME,"aaaa");
channel.queueBind(FanoutContans.QUEUENAME1,FanoutContans.EXCHANGENAME,"bbbb");
// 发送消息
channel.basicPublish(FanoutContans.EXCHANGENAME,"aaaa",null,message.getBytes());
测试发现,还是一样的发送了,就像广播一样,不管你的routingkey是否一致,都会发送都同一个交换机上,由交换机统一发送给消费者。
原因如下:
对于fanout类型交换机,routing key不起任何作用,不会影响消息的传递。任何绑定的队列都会收到消息。
这确实是一个fanout交换机的设计特点,文档中也有明确说明。我们在使用时需要注意这一点。
对于上诉代码还是由缺点的,在mq中必须要先声明在使用,例如我生产者端,只声明了交换机和绑定关系,并没有声明队列,那么此时运行就会报错,只有消费者端(声明了队列)先运行了,在运行生产者才可以运行,解决办法:
// 在生产者中同样声明队列就好了
/**
* 扇出交换机,生产者
* 生产者负责声明 Exchange和绑定队列关系
*/
public class FanoutProducer {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtil.getChannel();
// 声明交换机参数:1-交换机名称 , 2-交换机类型
channel.exchangeDeclare(FanoutContans.EXCHANGENAME, BuiltinExchangeType.FANOUT);
// 绑定键,绑定队列与交换机的关系,参数:1-队列名称 2-交换机名称 3-路由键
channel.queueBind(FanoutContans.QUEUENAME,FanoutContans.EXCHANGENAME,"");
channel.queueBind(FanoutContans.QUEUENAME1,FanoutContans.EXCHANGENAME,"");
// 声明队列
channel.queueDeclare(FanoutContans.QUEUENAME,false,false,false,null);
channel.queueDeclare(FanoutContans.QUEUENAME1,false,false,false,null);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.next();
// 发送消息参数:1-交换机名称 2-路由键 3-其他参数 4-消息
channel.basicPublish(FanoutContans.EXCHANGENAME,"123",null,message.getBytes());
System.out.println("生产者发出消息是:"+message);
}
}
}
直连交换机(Direct Exchange)
直连交换机工作原理:将消息推送到与routingkey相同的队列上
在上面这张图中,我们可以看到 X 绑定了两个队列,绑定类型是 direct。队列 Q1 绑定键为 orange, 队列 Q2 绑定键有两个:一个绑定键为 black,另一个绑定键为 green.
在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列 Q1。绑定键为 blackgreen 和的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃。
那么根据这个原理,我们也可以得出,当指定的routingkey相同时,就变成了扇出交换机了。
生产者代码:
/**
* 直接交换机---路由模式
* 根据不同的routingkey 指定发送到不同的队列
*/
public class DirectProducer {
public static void main(String[] args) throws Exception{
Channel channel = RabbitMQUtil.getChannel();
// 声明交换机-> 指明直连交换机
channel.exchangeDeclare(DirectContians.DIRECT_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 声明队列
channel.queueDeclare(DirectContians.DIRECT_QUEUE_NAME,false,false,false,null);
channel.queueDeclare(DirectContians.DIRECT_QUEUE_NAME1,false,false,false,null);
// 绑定队列关系 -->队列名称,交换机名称,关联的routingkey
channel.queueBind(DirectContians.DIRECT_QUEUE_NAME,DirectContians.DIRECT_EXCHANGE_NAME,DirectContians.ROUTING_KEY_H1);
channel.queueBind(DirectContians.DIRECT_QUEUE_NAME1,DirectContians.DIRECT_EXCHANGE_NAME,DirectContians.ROUTING_KEY_H2);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.next();
// 发送消息参数:1-交换机名称 2-路由键 3-其他参数 4-消息
// 发送消息到 routingkeyH1中,那么另外一个队列由于绑定的是h2,所以h2 接收不到
channel.basicPublish(DirectContians.DIRECT_EXCHANGE_NAME,DirectContians.ROUTING_KEY_H1,null,message.getBytes());
System.out.println("生产者发出消息是:"+message);
}
}
}
消费者代码:
/**
* 消费者代码,
* 生产者通过direct模式指定routingkey发送给消费者1
*/
public class DirectConsumer1 {
public static void main(String[] args) throws Exception{
Channel channel = RabbitMQUtil.getChannel();
/**
* 生成一个临时的队列 队列的名称是随机的
* 当消费者断开和该队列的连接时 队列自动删除
*/
// 声明队列
channel.queueDeclare(DirectContians.DIRECT_QUEUE_NAME,false,false,false,null);
System.out.println("等待接收消息。。。。。把接收到的消息打印在控制台上。。。。。");
// 回调函数 成功后的消息
DeliverCallback deliverCallback = (consumerTag, message)->{
System.out.println("接收到的消息是:"+new String(message.getBody()));
};
// 从队列当中读取消息
channel.basicConsume(DirectContians.DIRECT_QUEUE_NAME,true,deliverCallback,null,null);
}
}
消费者2代码:与1基本一致,唯一不同点是从另外一个队列中获取消息
channel.basicConsume(DirectContians.DIRECT_QUEUE_NAME1,true,deliverCallback,null,null);
测试结果:
在指定了routingkey 为 h1 的前提下,无论发送多少条消息,都由队列1 接收到
绑定关系如下:
主题交换机(Topic Exchange)
场景引入
比方说我们想接收的日志类型有 info.base 和 info.advantage,某个队列只想 info.base 的消息,那这个时候 direct 就办不到了。这个时候 就只能使用 topic 类型
Topic 的要求
发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单 词列表,以点号分隔开。这些单词可以是任意单词,比如说:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit".这种类型的。当然这个单词列表最多不能超过 255 个字节。
在这个规则列表中,其中有两个替换符是大家需要注意的
*(星号)可以代替一个单词
#(井号)可以替代零个或多个单词
下图绑定关系如下:
Q1-->绑定的是
- 中间带 orange 带 3 个单词的字符串(.orange.)
Q2-->绑定的是
- 最后一个单词是 rabbit 的 3 个单词(..rabbit)
- 第一个单词是 lazy 的多个单词(lazy.#)
当队列绑定关系是下列这种情况时需要引起注意
- 当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了
- 如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct 了
生产者代码:
/**
* 主题交换机,routing_key 不能随便写,以点号分隔开
* *号代替一个代词
* #号代替多个单词或者无单词
*/
public class TopicsProducer {
public static void main(String[] args) throws Exception {
//1.创建连接
Channel channel = RabbitMQUtil.getChannel();
//2 声明交换机为topic模式
channel.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
//3.声明三个队列
channel.queueDeclare("Q1",false,false,false,null);
channel.queueDeclare("Q2",false,false,false,null);
channel.queueDeclare("Q3",false,false,false,null);
//4.绑定
channel.queueBind("Q1","topic_logs","*.orange.*");
channel.queueBind("Q2","topic_logs","*.*.rabbit");
channel.queueBind("Q3","topic_logs","lazy.#");
//测试数据
Map<String, String> bindingKeyMap = new HashMap<>();
// key 为 routing-key 值为消息
bindingKeyMap.put("quick.orange.rabbit","被队列 Q1Q2 接收到");
bindingKeyMap.put("lazy.orange.elephant","被队列 Q1Q2 接收到");
bindingKeyMap.put("quick.orange.fox","被队列 Q1 接收到");
bindingKeyMap.put("lazy.brown.fox","被队列 Q2 接收到");
bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2 接收一次");
bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");
bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");
bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2");
//发送消息
Set<Map.Entry<String, String>> entries = bindingKeyMap.entrySet();
Iterator<Map.Entry<String, String>> iterator = entries.iterator();
while (iterator.hasNext()){
Map.Entry<String, String> next = iterator.next();
String key = next.getKey();
String value = next.getValue();
channel.basicPublish("topic_logs",key,null,value.getBytes(StandardCharsets.UTF_8));
System.out.println("生产者发送消息:"+value);
}
}
}
消费者代码:
public class TopicConsumer {
public static void main(String[] args) throws Exception{
Channel channel = RabbitMQUtil.getChannel();
// 接收消息的回调
DeliverCallback ackCallback = ( consumerTag, message)->{
System.out.println("消费者1接收到的消息为:"+new String(message.getBody()));
};
// 取消消息的回调
CancelCallback nackCallback =(consumerTag)->{
System.out.println("消息被取消了");
};
channel.basicConsume("Q1",true,ackCallback,nackCallback);
}
}
绑定关系: