首页 > 其他分享 >【6.0】RabbitMQ使用之发布订阅

【6.0】RabbitMQ使用之发布订阅

时间:2023-09-10 19:44:21浏览次数:55  
标签:订阅 pika exchange 队列 RabbitMQ queue 交换机 6.0 channel

【一】发布订阅

在这里插入图片描述

【1】发布者

import pika

# 【1】创建连接并设置认证信息
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))

# 【2】创建通道
channel = connection.channel()

# 【3】声明一个队列(创建一个队列)
# 声明一个交换机(exchange)
channel.exchange_declare(exchange='m1',exchange_type='fanout')

# 【4】发布消息
channel.basic_publish(exchange='m1',
                      routing_key='',
                      body='dream nb')

# 【5】关闭连接
connection.close()
  • 使用Pika库创建了一个与RabbitMQ服务器的连接
    • 然后创建了一个通道(channel)。
    • 接下来,我们声明了一个名为'm1'的交换机,并指定其类型为'fanout',这表示消息将被广播到所有订阅者。
  • 调用basic_publish方法,我们将消息发布到名为'm1'的交换机,使用空的routing_key表示该消息将被发送到交换机绑定的所有队列中。

【2】订阅者(启动几次订阅者会生成几个队列)

import pika

# 【1】创建连接并设置认证信息
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))

# 【2】创建通道
channel = connection.channel()


# 声明一个交换机(exchange)
# exchange='m1',exchange(秘书)的名称
# exchange_type='fanout' , 秘书工作方式将消息发送给所有的队列
channel.exchange_declare(exchange='m1',exchange_type='fanout')

# 【3】随机生成一个队列
# 【3.1】
result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue
print(queue_name)

# 【3.2】将队列与交换机绑定
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='m1',queue=queue_name)

# 【4】定义回调函数
def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)

# 【5】消费消息 
channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)

channel.start_consuming()
  • 使用相同的连接参数创建了一个新的通道。
    • 然后,我们声明了与发布者相同的交换机,并随机生成一个独占的队列。
    • 随后,我们通过调用queue_bind方法将队列与交换机进行绑定,这样队列就能接收到交换机发来的消息。
  • 接下来,我们定义了一个回调函数callback,当订阅者接收到消息时,该函数会被调用。
    • 在本例中,回调函数简单地打印出接收到的消息。
  • 最后,我们通过调用basic_consume方法开始消费消息。
    • 订阅者会一直等待,直到接收到消息,并调用回调函数进行处理。

【3】小结

  • 1 生产者扔给交换机消息
  • 2 交换机根据自身的类型将会把所有消息复制同步到所有与其绑定的队列
  • 3 每个队列可以有一个消费者,接收消息进行消费逻辑

【4】应用场景

  • 邮件群发
  • 广告

【5】场景

  • 有一个商城,我们新添加一个商品后,可能同时需要去更新缓存和数据库

【二】发布订阅高级之Routing(按关键字匹配)

在这里插入图片描述

  • direct路由模式

【1】发布者

import pika

# 【1】创建连接并设置认证信息
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))

# 【2】创建通道
channel = connection.channel()

# 【3】声明一个队列(创建一个队列)
# 声明一个交换机(exchange)
channel.exchange_declare(exchange='m2',exchange_type='direct')

# 【4】发布消息
channel.basic_publish(exchange='m2',
                      routing_key='bnb', # 多个关键字,指定routing_key
                      body='dream nb')

# 【5】关闭连接
connection.close()
  • 首先使用给定的认证信息来建立与消息队列服务器的连接。
  • 接下来,创建一个通道,所有的API调用都是通过通道进行的。
  • 然后,声明一个交换机(exchange),这里使用了direct类型的交换机。
  • 在发布消息时,指定了交换机名称为m2,并将消息发送到路由键bnb
  • 最后,关闭连接。

【2】订阅者1

import pika

# 【1】创建连接并设置认证信息
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))

# 【2】创建通道
channel = connection.channel()


# 【3】声明交换机
# exchange='m1',exchange(秘书)的名称
# exchange_type='direct' , 秘书工作方式将消息发送给不同的关键字
channel.exchange_declare(exchange='m2',exchange_type='direct')

# 【4】随机生成一个队列
result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue
print(queue_name)

# 【5】将队列与交换机进行绑定
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='m2',queue=queue_name,routing_key='nb')
channel.queue_bind(exchange='m2',queue=queue_name,routing_key='bnb')

# 【6】定义回调函数
def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)

# 【7】消费消息
channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)

channel.start_consuming()
  • 首先建立与消息队列服务器的连接,并创建一个通道。
  • 然后,声明交换机,与发布者使用相同的交换机名称和类型。
  • 接着,随机生成一个队列,并将该队列与交换机通过路由键nbbnb进行绑定。
  • 定义了一个回调函数用于处理接收到的消息,并使用basic_consume函数开始消费消息。

【3】订阅者2

import pika

# 【1】创建连接并设置认证信息
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))

channel = connection.channel()

# 【2】声明交换机
# exchange='m1',exchange(秘书)的名称
# exchange_type='direct' , 秘书工作方式将消息发送给不同的关键字
channel.exchange_declare(exchange='m2',exchange_type='direct')

# 【3】随机生成一个队列
result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue
print(queue_name)

# 【4】将队列与交换机进行绑定
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='m2',queue=queue_name,routing_key='nb')

# 【5】定义回调函数
def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)

# 【6】消费消息
channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)

# 【7】开始消费消息
channel.start_consuming()
  • 订阅者2的代码与订阅者1类似,它也会建立与消息队列服务器的连接,并创建一个通道。
  • 然后,声明交换机,并生成一个随机队列。
  • 将这个队列与交换机绑定通过路由键nb,定义了一个回调函数用于处理接收到的消息,并开始消费消息。

【4】小结

  • 发布者向交换机m2发送一条消息,路由键为bnb
  • 订阅者1和订阅者2都绑定了交换机m2并监听不同的路由键,订阅者1监听nbbnb,订阅者2只监听nb
  • 当有消息被发送到交换机时,与之匹配的订阅者将接收到消息并执行回调函数。
  • 1 生产者还是将消息发送给交换机,消息携带具体的路由key(routingKey)
  • 2 交换机类型direct,将接收到的消息中的routingKey,比对与之绑定的队列的routingKey
  • 3 消费者监听一个队列,获取消息,执行消费逻辑

【5】应用场景

  • 根据生产者的要求发送给特定的一个或者一批队列;
  • 错误的通报;

【6】场景

  • 还是一样,有一个商城,新添加了一个商品,实时性不是很高,只需要添加到数据库即可,不用刷新缓存

【三】发布订阅高级之Topic(按关键字模糊匹配)

在这里插入图片描述

【1】发布者

import pika

# 【1】连接到RabbitMQ服务器
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()

# 【2】声明一个topic类型的交换机
channel.exchange_declare(exchange='m3',exchange_type='topic')

# 【3】发布消息到交换机
channel.basic_publish(exchange='m3',
                      # routing_key='dream.handsome', #都能收到
                      routing_key='dream.handsome.xx', #只有dream.#能收到
                      body='dream nb')

# 【4】关闭连接
connection.close()
  • 发布者通过连接到RabbitMQ服务器并创建一个通道(channel)。
  • 然后声明了一个名为'm3'的交换机(exchange),类型为'topic'。
  • 交换机的作用是将消息发送给相应的队列。

【2】订阅者1

  • * 只能加一个单词
  • # 可以加任意单词字符
import pika


# 【1】连接到RabbitMQ服务器
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()


# 【2】声明一个topic类型的交换机
# exchange='m1',exchange(秘书)的名称
# exchange_type='direct' , 秘书工作方式将消息发送给不同的关键字
channel.exchange_declare(exchange='m3',exchange_type='topic')

# 【3】随机生成一个队列
result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue
print(queue_name)

# 【4】将队列绑定到交换机上,指定路由键为'dream.#'
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='m3',queue=queue_name,routing_key='dream.#')


# 【5】设置接收消息的回调函数
def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)

channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)

channel.start_consuming()
  • 订阅者1首先连接到RabbitMQ服务器,并创建一个通道。
  • 然后声明了一个名字为空的队列,该队列是一个随机生成的独占队列,用来接收消息。
  • 接着绑定队列到交换机上,并指定路由键为'dream.#',表示订阅所有以'dream.'开头的消息。
  • 最后定义一个回调函数,在接收到消息时进行处理。

【3】订阅者2

import pika


# 【1】连接到RabbitMQ服务器
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()

# 【2】声明一个topic类型的交换机
# exchange='m1',exchange(秘书)的名称
# exchange_type='topic' , 模糊匹配
channel.exchange_declare(exchange='m3',exchange_type='topic')

# 【3】随机生成一个队列
result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue
print(queue_name)

# 【4】将队列绑定到交换机上,指定路由键为'dream.*'
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='m3',queue=queue_name,routing_key='dream.*')

# 【5】设置接收消息的回调函数
def callback(ch, method, properties, body):
  	queue_name = result.method.queue # 发送的routing_key是什么
    print("消费者接受到了任务: %r" % body)

channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)

channel.start_consuming()
  • 订阅者2的代码与订阅者1的代码相似,区别在于路由键的设置。
  • 这里使用了通配符'*'来匹配一个单词,表示只订阅以'dream.'开头并且只有一个单词的消息

【4】小结

  • 这个发布订阅模式可以用于在分布式系统中实现解耦合的消息传递。
  • 通过使用交换机和不同的路由键,发布者可以将消息发送到不同的队列,而订阅者只需要关注自己感兴趣的消息,提高了系统的可扩展性和灵活性。
  • 1 生产端发送消息,消息携带具体的路由key
  • 2 交换机的类型topic
  • 3 队列绑定交换机不在使用具体的路由key而是一个范围值
  • .orange. : haha.orange.haha,haha.haha.orange.haha
  • lazy.# : haha.lazy.haha.haha,layz.alsdhfsh(sh9ou)N0
  • *表示一个字符串(不能携带特殊符号) 例如 *表示 haha,item,update
  • #表示任意字符串

【5】场景

  • 还是一样,有一个商城,新添加了一个商品,实时性不是很高,只需要添加到数据库即可,数据库包含了主数据库mysql1和从数据库mysql2的内容,不用刷新缓存

标签:订阅,pika,exchange,队列,RabbitMQ,queue,交换机,6.0,channel
From: https://www.cnblogs.com/dream-ze/p/17691740.html

相关文章

  • 【RabbitMQ】服务启动成功,无法访问localhost_15672(RabbitMQ Management)
    问题描述RabbitMQ服务已经启动成功,已经安装rabbitmq_management插件,无法访问RabbitMQManagement(http://localhost:15672/)。原因分析15672端口被MicrosoftEdge占用。解决方案打开cmd终端,输入指令:netstat-ano|findstr15672TCP127.0.0.1:8323127.0.0.1:15672......
  • php 安装rabbit如何使用 PHP 安装 RabbitMQ?
    示例示例安装Erlang要在PHP环境中使用,需要先安装Erlang,它是的运行环境。1、安装Erlang首先,要在PHP环境中使用RabbitMQ,需要先安装Erlang,它是RabbitMQ的运行环境。可以使用下面的命令来安装Erlang:sudoapt-getinstallerlang2、安装RabbitMQ接下来,可以使用下面的命令来安装RabbitMQ:s......
  • RabbitMQ 笔记一
    1.RabbitMQ-如何保证消息不丢失1.生产者确认机制RabbitMQ提供了publisherconfirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功消息失败之后如何处理:1回调方法即时重发2记录日志3保存到数据库然后定时重发,成功发送后即刻删除......
  • 【RabbitMQ】RabbitMQ 服务无法启动。系统出错。发生系统错误 1067。进程意外终止。
    问题描述RabbitMQ服务无法启动。rabbitmq-service.batstartRabbitMQ服务正在启动.RabbitMQ服务无法启动。系统出错。发生系统错误1067。进程意外终止。原因分析RabbitMQ和Erlang版本不匹配。解决方案查询并安装RabbitMQ版本对应Erlang版本https://www.rabbitmq.com......
  • Windows 安装 RabbitMq
    Windows上安装RabbitMQ的步骤RabbitMQ是一个强大的开源消息队列系统,广泛用于构建分布式、可扩展的应用程序。本教程将带您一步一步完成在Windows系统上安装RabbitMQ的过程。无需担心,即使您是初学者,也能够轻松跟随这些简单的步骤来完成安装。步骤1:安装ErlangRabbitMQ是使......
  • 【ROS2机器人入门到实战】话题订阅-控制LED
    1.话题订阅-控制LED写在前面当前平台文章汇总地址:ROS2机器人从入门到实战获取完整教程及配套资料代码,请关注公众号<鱼香ROS>获取教程配套机器人开发平台:两驱版|四驱版为方便交流,搭建了机器人技术问答社区:地址fishros.org.cn你好,我是爱吃鱼香ROS的小鱼。本节我们正式进入到MicroRO......
  • Ubentu 16.04.2 LTS安装mysql,jdk1.8
    一、网络设置1、网络设置sudovim/etc/network/interfaces文件中写入以下内容,写完后wq保存退出。#设置网卡名称autoeth0#设置静态IP,如果是使用自动IP用dhcp,后面的不用设置ifaceeth0inetstatic#设置IP地址addressxxx.xxx.xxx.xxx#设置子网掩码netmaskxxx.xxx.xxx.......
  • RabbitMq
    RabbitMq消息延迟消费message.getMessageProperties().setDelay(5000)死信就是消息在特定场景下的一种表现形式,这些场景包括:消息被拒绝(basic.reject/basic.nack),并且requeue=false消息的TTL过期时消息队列达到最大长度达到最大重试限制消息在这些场景中时,被称为死......
  • rabbitmqctl
    #查看队列rabbitmqctllist_queues#清空队列中消息rabbitmqctl-p/purge_queueQUEUE_TEMP_GS_PUSHrabbitmqctlstatus:节点状态rabbitmqctladd_userusernamepassword:添加用户rabbitmqctllist_users:列出所有用户rabbitmqctllist_user_permissionsusername:列出用......
  • 远程访问RabbitMQ服务
    @[TOC]转载自cpolar内网穿透的文章:无公网IP,在外公网远程访问RabbitMQ服务「内网穿透」前言RabbitMQ是一个在AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。由erlang开发的AMQP(AdvancedMessageQueue高级消息队列协议)的开源实现,由于er......