在之前的教程中,我们改进了日志系统。我们没有使用只能进行虚拟广播的扇出交换器,而是使用了直接交换器,并获得了选择性接收日志的可能性。 虽然使用直接交换改进了我们的系统,但它仍然有局限性——它不能基于多个标准进行路由。 在我们的日志系统中,我们可能不仅希望根据严重性订阅日志,还希望根据发出日志的源订阅日志。您可能从 syslog unix 工具知道这个概念,它根据严重性(info/warn/crit...)和设施(auth/cron/kern...)路由日志。 这会给我们很大的灵活性——我们可能只想听来自“cron”的严重错误,但也想听来自“kern”的所有日志。 要在我们的日志系统中实现它,我们需要了解更复杂的主题交换。 发送到主题交换的消息不能有任意的 routing_key - 它必须是一个单词列表,由点分隔。词可以是任何东西,但通常它们指定与消息相关的一些特征。一些有效的路由键示例:“ stock.usd.nyse ”、“ nyse.vmw ”、“ quick.orange.rabbit ”。路由键中的单词可以有任意多个,最多不超过 255 个字节。 绑定密钥也必须采用相同的形式。主题交换背后的逻辑 类似于直接交换——使用特定路由键发送的消息将被传递到与匹配绑定键绑定的所有队列。然而,绑定键有两个重要的特殊情况:
- *(星号)只能代替一个词。
- (hash) 可以替代零个或多个单词。
一、创建一个exchange
二、消息产生端
#!/usr/bin/env python import pika import sys import json import datetime def get_message(): # 产生消息入口处 for i in range(100): # 生成100条消息 # 生成三种类型的消息 for str_t in ['quick.orange.rabbit','lazy.orange.elephant','quick.orange.fox','lazy.brown.fox','lazy.pink.rabbit','quick.brown.fox','quick.orange.new.rabbit']: # 生成多种类型的消息 message = json.dumps({'id': "%s-90000%s" % (str_t, i), "amount": 100 * i, "name": "%s" % str_t, "createtime": str(datetime.datetime.now())}) producer(message, str_t) def producer(message, routing_key): # 登陆并创建信道 connection = pika.BlockingConnection( pika.ConnectionParameters(virtual_host='/melon_demo', host='82.156.19.94', port=5672, credentials=pika.PlainCredentials('guest', 'guest'))) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', exchange_type='topic',durable=True) channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close() if __name__ == "__main__": get_message() # 程序执行入口
三、消息接收端【lazy.#】
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(virtual_host='/melon_demo', host='82.156.19.94', port=5672, credentials=pika.PlainCredentials('guest', 'guest'))) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', exchange_type='topic',durable=True) result = channel.queue_declare('', exclusive=True) queue_name = result.method.queue # for binding_key in ['lazy.#','lazy.*']: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key='lazy.#') print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming()
四、消息接收端【.orange.】
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(virtual_host='/melon_demo', host='82.156.19.94', port=5672, credentials=pika.PlainCredentials('guest', 'guest'))) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', exchange_type='topic',durable=True) result = channel.queue_declare('', exclusive=True) queue_name = result.method.queue # for binding_key in ['*.orange.*','*.*.rabbit','lazy.#','lazy.*']: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key='*.orange.*') print(' [*] Waiting for Orange. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming()
五、消息接收端【..rabbit】
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(virtual_host='/melon_demo', host='82.156.19.94', port=5672, credentials=pika.PlainCredentials('guest', 'guest'))) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', exchange_type='topic',durable=True) result = channel.queue_declare('', exclusive=True) queue_name = result.method.queue # for binding_key in ['*.orange.*','*.*.rabbit','lazy.#','lazy.*']: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key='*.*.rabbit') print(' [*] Waiting for *.*.rabbit. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming()
六、运行截图
标签:topic,pika,RBMQ,exchange,主题,queue,案例,key,channel From: https://www.cnblogs.com/1314520xh/p/17093536.html