首页 > 其他分享 >RabbitMQ学习笔记05:Routing

RabbitMQ学习笔记05:Routing

时间:2023-01-11 16:34:33浏览次数:80  
标签:Run exchange 05 direct RabbitMQ queue Routing key channel

参考资料:RabbitMQ tutorial - Routing — RabbitMQ 

 

前言

在之前的文章中我们构建了一个简单的日志系统,它可以广播消息到多个消费者中。

在这篇文章中,我们打算实现仅订阅消息的子集(即不是所有的消息,仅仅只是一部分消息。注意,这里不是说一条消息的一部分)。例如我们只会把严重的错误消息写到日志文件中,同时我们会把所有的消息都输出到屏幕上。

 

Bindings

在之前的示例中我们学会了绑定exchange和队列。

channel.queue_bind(exchange=exchange_name,
                   queue=queue_name)

绑定是exchange和队列之间的关系,我们可以简单理解为某个队列对于来自于某个exchange的消息感兴趣。

在绑定的过程当中可以使用额外的routing_key参数。为了避免和basic_publish参数混淆,我们将其称之为binding_key

其实binding_key就是exchange和队列在绑定的时候使用的routing_key

下面是我们在绑定的时候创建了一个binding_key

channel.queue_bind(exchange=exchange_name,
                   queue=queue_name,
                   routing_key='black')

binding_key的含义取决于exchange的类型,我们之前创建的fanout类型的exchange会忽略这个参数的值。

 

Direct exchange

想要完成前言中我们提到的需求的话,我们需要使用direct类型的exchangedirect的路由算法也十分简单,发送给direct类型的exchange的消息一般会带有一个routing_key参数,这条消息会被路由到带有相同的binding_key值的队列中。这种匹配是必须准确无误的。

就像这样:

从图中我们可以看到,direct类型的名为xexchange绑定了2个队列Q1Q2,在与Q1绑定时的binding_keyorange,在与Q2绑定时共绑定了2次,第一个绑定的binding_keyblack,第二个绑定的binding_keygreen

在这种情况下,发给x的消息,如果其routing_keyorange的话,则消息会被发送给队列Q1;如果其routing_keyblack或者green的话,则消息会被发送给队列Q2;如果其routing_key是其他值的话,那么这条消息会被丢弃

 

Multiple bindings

direct类型的exchange在绑定多个队列的时候可以使用相同的binding_key,这是被允许的。这种情况下的路由模式就类似于fanout即消息会被复制并广播到多个匹配的队列中去,只不过区别在于fanout不会丢弃消息,而direct的话,如果routing_keybinding_key没有匹配的话,消息就会被丢弃。

 

Emitting logs

我们将会采用这个新的模型来作为日志系统。日志的严重性我们用routing_key来表示。

让我们先聚焦提供日志的程序。首先我们声明一个exchange

channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')

向该exchange发送消息。

channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)

我们假设severity的取值为

  • info
  • warning
  • error

 

Subscribing

接收消息的消费者程序和之前的差不多,区别在于我们会根据我们的兴趣和需要做绑定。

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

 

Putting it all together

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

severity = sys.argv[1] if len(sys.argv) > 2 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
    exchange='direct_logs', routing_key=severity, body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

receive_logs_direct.py

#!/usr/bin/env python
import pika, sys, os

def main():
    connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

    result = channel.queue_declare(queue='', exclusive=True)
    queue_name = result.method.queue

    severities = sys.argv[1:]
    if not severities:
        sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
        sys.exit(1)

    for severity in severities:
        channel.queue_bind(
            exchange='direct_logs', queue=queue_name, routing_key=severity)

    print(' [*] Waiting for logs. To exit press CTRL+C')


    def callback(ch, method, properties, body):
        print(" [x] %r:%r" % (method.routing_key, body.decode()))


    channel.basic_consume(
        queue=queue_name, on_message_callback=callback, auto_ack=True)

    channel.start_consuming()


if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

按照官方的测试的话,其实和上一篇博文一样,都会遇到文件写入未成功的问题。因此我们测试时,只能都直接输出到控制台了,这种测试方式不影响我们希望测试的功能。

第一个终端,启动一个仅接收error的消息的消费者进程。

python receive_logs_direct.py error

第二个终端,启动一个仅接收info或者warning的消息的消费者进程。

python receive_logs_direct.py info warning error

第三个终端用于运行生产者程序,每次使用不同的routing_key

[root@rabbitmq-01 code]# python emit_log_direct.py error "Run. Run. Or it will explode."
 [x] Sent 'error':'Run. Run. Or it will explode.'
[root@rabbitmq-01 code]# python emit_log_direct.py info "Run. Run. Or it will explode."
 [x] Sent 'info':'Run. Run. Or it will explode.'
[root@rabbitmq-01 code]# python emit_log_direct.py warning "Run. Run. Or it will explode."
 [x] Sent 'warning':'Run. Run. Or it will explode.'
[root@rabbitmq-01 code]# python emit_log_direct.py Disater "Run. Run. Or it will explode."
 [x] Sent 'Disater':'Run. Run. Or it will explode.'

4条消息发好之后,我们来看下两个终端的结果。

# 第一个终端
[root@rabbitmq-01 code]# python receive_logs_direct.py error
 [*] Waiting for logs. To exit press CTRL+C
 [x] 'error':'Run. Run. Or it will explode.'
^CInterrupted

# 第二个终端
[root@rabbitmq-01 code]# python receive_logs_direct.py info warning error
 [*] Waiting for logs. To exit press CTRL+C
 [x] 'error':'Run. Run. Or it will explode.'
 [x] 'info':'Run. Run. Or it will explode.'
 [x] 'warning':'Run. Run. Or it will explode.'
^CInterrupted

我们得到以下信息:

  • error级别的信息在两个终端上都显示了。因为两个终端对应的队列的binding_key都包含error。
  • info和warning级别的信息只会在第二个终端上显示。因为只有第二个终端对应的队列的binding_key包含info和warning。
  • Disater级别的信息没有被显示。因为两个队列的binding_key都没有包含Disater,不匹配消息就会被丢弃,这是和fanout的区别。

 

总结

这篇博文介绍了direct类型的exchange,使得消息可以根据routing_key来路由到不同的队列中。

 

标签:Run,exchange,05,direct,RabbitMQ,queue,Routing,key,channel
From: https://www.cnblogs.com/alongdidi/p/rabbitmq_tutorial_four.html

相关文章

  • 编译打包rabbitmq然后一键部署的简单方法
    摘要之前总结过一版,但是感觉不太全面想着本次能够将使用中遇到的问题总结一下.所以本次是第二版介质下载rabbitmq不区分介质的打包文件rabbitmq-server-generic-......
  • S2-052 CVE-2017-9805 远程代码执行
    漏洞名称S2-052CVE-2017-9805远程代码执行利用条件Struts2.1.6-Struts2.3.33Struts2.5-Struts2.5.12漏洞原理Struts2REST插件的XStream组件存在反序列化......
  • RabbitMQ学习笔记04:Publish/Subscribe
    参考资料:RabbitMQtutorial-Publish/Subscribe—RabbitMQ   前言在RabbitMQ学习笔记03:WorkQueues中,每个进入队列中的消息只会被投递给一个消费者进程。而在......
  • leetcode简单(数组,字符串,链表):[168, 171, 190, 205, 228, 448, 461, 876, 836, 844]
    目录168.Excel表列名称171.Excel表列序号190.颠倒二进制位205.同构字符串228.汇总区间448.找到所有数组中消失的数字461.汉明距离876.链表的中间结点836.矩形重......
  • SIT1050Q可以替代TJA1050(NXP品牌)芯片吗?
    有客户问:“你们东沃电子代理的SIT1050Q高速CAN总线收发器能够pintopin替代NXP品牌TJA1050芯片吗?”回答这个问题之前,我们先来了解下CAN芯片SIT1050Q相关的知识。东沃电子专......
  • 605. 种花问题
    问题描述https://leetcode.cn/problems/can-place-flowers/description/解题思路这题是种左不种右的,我们要求,如果不是边界,则自己本身不是1,而且左右也不能是1.如果是边......
  • docker安装rabbitmq
    启动rabbitmq下载rabbitmq3.9-management的docker镜像:dockerpullrabbitmq:3.9.21-management使用如下命令启动RabbitMQ服务:dockerrun-p5672:5672-p15672:1......
  • 国产电源芯片DP4054 软硬件兼容TP4054 规格书资料
    DP4054是一款完整的采用恒定电流/恒定电压单节锂离子电池充电管理芯片。其SOT小封装和较少的外部元件数目使其成为便携式应用的理想器件,DP4054可以适合USB 电源和适配......
  • SPOJ SP32058 R6PL - Harbinger vs Sciencepal
    链接难度:\(\texttt{17/21}\)\(T\)组数据。有\(n\)组,每组有\(2\)个数\(x,y\),问把每组的一个数分配到一组另一个数分配到另一组两组数字和差的绝对值最小是多少。......
  • Java基础学习05
    通过一道简单的算法题找路问题总结一下一些小结论。问题:在迷宫中找到一条通路并计算长度(2023-01-09)解法:首先利用二维数组创建迷宫,创建一个类,调用类中的方法实现问题解决,方......