首页 > 其他分享 >RBMQ案例五:主题模式

RBMQ案例五:主题模式

时间:2023-02-05 16:35:35浏览次数:39  
标签:topic pika RBMQ exchange 主题 queue 案例 key channel

之前的教程中,我们改进了日志系统。我们没有使用只能进行虚拟广播的扇出交换器,而是使用了直接交换器,并获得了选择性接收日志的可能性。 虽然使用直接交换改进了我们的系统,但它仍然有局限性——它不能基于多个标准进行路由。 在我们的日志系统中,我们可能不仅希望根据严重性订阅日志,还希望根据发出日志的源订阅日志。您可能从 syslog unix 工具知道这个概念,它根据严重性(info/warn/crit...)和设施(auth/cron/kern...)路由日志。 这会给我们很大的灵活性——我们可能只想听来自“cron”的严重错误,但也想听来自“kern”的所有日志。 要在我们的日志系统中实现它,我们需要了解更复杂的主题交换。   发送到主题交换的消息不能有任意的 routing_key - 它必须是一个单词列表,由点分隔。词可以是任何东西,但通常它们指定与消息相关的一些特征。一些有效的路由键示例:“ stock.usd.nyse ”、“ nyse.vmw ”、“ quick.orange.rabbit ”。路由键中的单词可以有任意多个,最多不超过 255 个字节。 绑定密钥也必须采用相同的形式。主题交换背后的逻辑 类似于直接交换——使用特定路由键发送的消息将被传递到与匹配绑定键绑定的所有队列。然而,绑定键有两个重要的特殊情况:

  • *(星号)只能代替一个词。
  • (hash) 可以替代零个或多个单词。
 

一、创建一个exchange

   

二、消息产生端

#!/usr/bin/env python
import pika
import sys
import json
import datetime

def get_message():
    # 产生消息入口处
    for i in range(100):  # 生成100条消息
        # 生成三种类型的消息
        for str_t in ['quick.orange.rabbit','lazy.orange.elephant','quick.orange.fox','lazy.brown.fox','lazy.pink.rabbit','quick.brown.fox','quick.orange.new.rabbit']: # 生成多种类型的消息
            message = json.dumps({'id': "%s-90000%s" % (str_t, i), "amount": 100 * i, "name": "%s" % str_t,
                                  "createtime": str(datetime.datetime.now())})
            producer(message, str_t)


def producer(message, routing_key):
    # 登陆并创建信道
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(virtual_host='/melon_demo', host='82.156.19.94', port=5672,
                                  credentials=pika.PlainCredentials('guest', 'guest')))
    channel = connection.channel()
    channel.exchange_declare(exchange='topic_logs', exchange_type='topic',durable=True)
    channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message)
    print(" [x] Sent %r:%r" % (routing_key, message))
    connection.close()


if __name__ == "__main__":
    get_message()  # 程序执行入口

 

三、消息接收端【lazy.#】

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(virtual_host='/melon_demo', host='82.156.19.94', port=5672,
                              credentials=pika.PlainCredentials('guest', 'guest')))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic',durable=True)

result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue


# for binding_key in ['lazy.#','lazy.*']:
channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key='lazy.#')
print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))


channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()

 

四、消息接收端【.orange.

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(virtual_host='/melon_demo', host='82.156.19.94', port=5672,
                              credentials=pika.PlainCredentials('guest', 'guest')))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic',durable=True)

result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue


# for binding_key in ['*.orange.*','*.*.rabbit','lazy.#','lazy.*']:
channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key='*.orange.*')

print(' [*] Waiting for Orange. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))


channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()

五、消息接收端【..rabbit】

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(virtual_host='/melon_demo', host='82.156.19.94', port=5672,
                              credentials=pika.PlainCredentials('guest', 'guest')))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic',durable=True)

result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue


# for binding_key in ['*.orange.*','*.*.rabbit','lazy.#','lazy.*']:
channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key='*.*.rabbit')

print(' [*] Waiting for *.*.rabbit. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))


channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()

六、运行截图

 

 

 

 

 

 

 

 

 

 

标签:topic,pika,RBMQ,exchange,主题,queue,案例,key,channel
From: https://www.cnblogs.com/1314520xh/p/17093536.html

相关文章

  • RBMQ案例三:发布/订阅模式
      在上篇教程中,我们搭建了一个工作队列,每个任务只分发给一个工作者(worker)。在本篇教程中,我们要做的跟之前完全不一样——分发一个消息给多个消费者(consumers)。这种......
  • RBMQ中python案例一:简单模式
    一、生产者与消费者模式之简单模式,原理图   二、生产者产生消息importjsonimportpikaimportdatetime#生产者producer.pydefget_message():#......
  • RBMQ案例二:工作队列模式
      工作队列模式工作队列(又名:任务队列)背后的主要思想是避免立即执行资源密集型任务而不得不等待它完成。相反,我们安排任务稍后完成。我们将任务封装为消息并将其发......
  • Spring2 - 入门案例
    Spring基本操作导入依赖在pom.xml中添加依赖添加依赖:<dependencies><!--springcontext依赖--><!--当你引入SpringContext依赖之后,表示将Spring的基础依......
  • 找了几个 Solon 的商业落地项目案例!
    Solon是啥?是一个高效的Java应用开发框架:更快、更小、更简单。(代码仓库:https://gitee.com/noear/solon)提倡:克制、简洁、开放、生态启动快5~10倍;qps高2~3倍;运......
  • idea设置字体大小(换主题后的字体大小设置)
    如果你是默认主题直接这样设置字体大小如果你换了自定义主题如果你换了自定义主题,那么上面的设置方法会没有作用,我们需要像下面这样设置:......
  • Pandas 人口密度案例分析
    fromturtleimportleftimportpandasaspd"""需求:1.导入文件,查看原始数据2.将人口数据和各州简称数据进行合并3.将合并的数据中重复的abbreviation列进行删除......
  • 【八大数据排序法】快速排序法的图形理解和案例实现 | C++
    第十八章快速排序法:::hljs-center目录第十八章快速排序法●前言●认识排序●一、快速排序法是什么?1.简要介绍2.具体情况3.算法分析●二、案例实现1.......
  • wireshark 抓包整理———— 从一个小案例开始 [一]
    前言前面已经有抓包系列了,简单写一下wireshark的抓包系列,共36节,18个理论小栗子,36个实战栗子。正文这个例子是<<wireshark分析就这么简单>>的一个例子。这个例子是这样......
  • python基础:文件内光标移动案例(了解)、计算机硬盘修改数据的原理(了解,为了文件内容修改作
    目录一、文件内光标移动案例(了解)二、计算机硬盘修改数据的原理(了解,为了文件内容修改作解释)三、文件内容修改四、函数1、概念讲解2、语法结构3、函数的定义与调用4、函数的......