首页 > 其他分享 >Rabbitmq介绍,安装,基于queue实现消费者生产者,基本使用,消息安全,持久化,闲置消费,发布订阅,发布订阅高级routing按关键字,模糊匹配

Rabbitmq介绍,安装,基于queue实现消费者生产者,基本使用,消息安全,持久化,闲置消费,发布订阅,发布订阅高级routing按关键字,模糊匹配

时间:2023-05-06 23:23:19浏览次数:57  
标签:订阅 pika exchange 队列 Rabbitmq queue routing credentials channel

内容详细

Rabbmit介绍

消息队列

中间件概念很大,准确一些叫消息队列中间件

消息队列中间件

使用redis当作消息队列来用,blpop阻塞式弹出,实现队列,先进先出

MQ,消息队列,MessageQueue是什么?

消息队列就是基础数据结构中先进先出(队列)的一种数据机制,类比于生活中,买东西,需要排队,先排队的人先买消费,就是典型的先进先出(队列)

MQ解决的问题

应用解耦

流量削峰

消息分发(发布订阅)

异步消息

IPC进程间通信也可以通过消息队列

Rabbmit安装

windows安装:https://www.rabbitmq.com/install-windows-manual.html

依赖于erlang解释器

rabbmit软件

cemtos

yum -y install erlang
yum -y install rabbitmq-server

docker安装(安装简单便捷)

docker pull rabbitmq:management
docker run -di --name Myrabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:management

安装完毕后,访问15672端口可以来到rabbitmq的图像化管理界面(官方提供),手动点点操作

基于queue实现生产者消费者

import threading
import queue

message = queue.Queue(10)


def product(i):
    while True:
        message.put(i)


def consumer(j):
    while True:
        print(message.get())


if __name__ == '__main__':
    t = threading.Thread(target=product, args=[123, ])
    t1 = threading.Thread(target=consumer, args=[11])
    t.start()
    t1.start()
    for i in range(10):
        t = threading.Thread(target=product, args=[i, ])
    
        t.start()
    for j in range(10):
        t1 = threading.Thread(target=consumer, args=[j])
        t1.start()

基本使用

发送者

import pika

###第一步,连接
# 无密码
# connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.101',port=5672))

# 有密码
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.101',credentials=credentials))

# 第二步:连接channal
channel = connection.channel()
# 第三步:创建一个队列,名字叫hello
channel.queue_declare(queue='hello')
# 第三步:向hello队列中,发送Hello World
# routing_key 队列名字
#body 发送的内容

"""
queue ,routing_key
如果queue为空或则会创建一个队列
channel.queue_declare(queue='hello')
这个控制向哪个队列发送消息
channel.basic_publish(exchange='', routing_key='hello',
                      body="123")
"""

channel.basic_publish(exchange='', routing_key='hello', body='Hello World4441!')
print("  Sent 'Hello World!'")
connection.close()

消费者

import pika

# 连接
credentials = pika.PlainCredentials("admin", "admin")

connecnt = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', 5672, credentials=credentials))
# 假设队列不存在,如果存在不会有任何作用,不存在会创建队列
channle = connecnt.channel()
channle.queue_declare(queue='hello')


# 回调函数

def callback(ch, method, properties, body):
    print(body,'消费正在消费消息')
    # 停止消费循环
    channle.stop_consuming()

# queue从那个队列去取
# on_message_callback获取数据后的回调函数
# auto_ack 自定确认,获取到数据就确认删除,不管是否出错都会删除
channle.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
channle.start_consuming()  # 程序会夯在这里,等待从消息队列中取消息

消费安全

消费完,确认后,在删除消息

import pika

# 连接
credentials = pika.PlainCredentials("admin", "admin")

connecnt = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', 5672, credentials=credentials))
# 假设队列不存在,如果存在不会有任何作用,不存在会创建队列
channle = connecnt.channel()
channle.queue_declare(queue='hello')


# 回调函数

def callback(ch, method, properties, body):
    print(body, '消费正在消费消息')
    # 停止消费循环
    # channle.stop_consuming()

    # raise Exception(123)
    # 使用完毕,手动确认
    ch.basic_ack(delivery_tag=method.delivery_tag)


# queue从那个队列去取
# on_message_callback获取数据后的回调函数
# auto_ack 自定确认,获取到数据就确认删除,不管是否出错都会删除
channle.basic_consume(queue='hello', on_message_callback=callback, auto_ack=False)
channle.start_consuming()  # 程序会夯在这里,等待从消息队列中取消息

image-20230506173911461

image-20230506180204058

消息安全之durable持久化

生产者

import pika

# connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.200', port=5672))
# 连接
credentials = pika.PlainCredentials('admin', 'admin')
connect = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', 5672, credentials=credentials))

# 连接channal
channel = connect.channel()
# 创建队列,名字叫hello
# durable声明队列持久化
channel.queue_declare(queue='test',durable=True)
# 向队列中发送HELLO WORLD
# routing_key 队列名字
# body 发送的内容
# exchange交换机名字

# 这个发消息控制队列名
"""
queue ,routing_key
如果queue为空或则会创建一个队列
channel.queue_declare(queue='hello')
这个控制向哪个队列发送消息
channel.basic_publish(exchange='', routing_key='hello',
                      body="123")
"""
# properties=pika.BasicProperties(delivery_mode=2) 消息持久化
channel.basic_publish(exchange='', routing_key='test',
                      body="1231123",
                      properties=pika.BasicProperties(delivery_mode=2)

                      )
print('发送了HELLO')
connect.close()
"""
队列和消息都做持久化,以外的关闭后,在启动消息队列和消息都会在

"""

消费者(消费者不需要修改代码)

import pika

credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()

# 声明一个队列(创建一个队列)
channel.queue_declare(queue='lqz1')

def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)
    # 通知服务端,消息取走了,如果auto_ack=False,不加下面,消息会一直存在
    # ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(queue='lqz1',on_message_callback=callback,auto_ack=False)

channel.start_consuming()

闲置消费

正常情况如果有多个消费者,是按照顺序第一个消息给第一个第二个给第二个,第三个继续给第一个,只有等第三个消费后,第四个才能消费,如果第一个消耗时间比较久,那么后续都要等待

生产者

import pika
# 无密码
# connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1'))

# 有密码
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()
# 声明一个队列(创建一个队列),durable=True支持持久化,队列必须是新的才可以
channel.queue_declare(queue='lqz123',durable=True)

channel.basic_publish(exchange='',
                      routing_key='lqz123', # 消息队列名称
                      body='111',
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # make message persistent,消息也持久化
                      )
                      )
connection.close()

消费者(有多个,谁没有任务谁就去执行)

import pika

credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()

# 声明一个队列(创建一个队列)
# channel.queue_declare(queue='lqz123')

def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)
    # 通知服务端,消息取走了,如果auto_ack=False,不加下面,消息会一直存在
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1) #####就只有这一句话 谁闲置谁获取,没必要按照顺序一个一个来
channel.basic_consume(queue='lqz123',on_message_callback=callback,auto_ack=False)

channel.start_consuming()

发布订阅

发布者(fanout模式)

import pika

credentials = pika.PlainCredentials("admin", "admin")

connect = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', 5672, credentials=credentials))

chanle = connect.channel()

chanle.exchange_declare(exchange='f1', exchange_type='fanout')

chanle.basic_qos(prefetch_count=1)

chanle.basic_publish(exchange='f1', routing_key='',
                     body='你好啊1111',
                     properties=pika.BasicProperties(delivery_mode=2)

                     )

chanle.close()

订阅者

import pika

credentials = pika.PlainCredentials("admin", "admin")

connect = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', 5672, credentials=credentials))

chanle = connect.channel()

chanle.exchange_declare(exchange='f1', exchange_type='fanout')
result = chanle.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

chanle.queue_bind(exchange='f1', queue=queue_name)


def task(ch, method, properties, body):
    print('消费者接到了任务', body.decode('utf8'))

    ch.basic_ack(delivery_tag=method.delivery_tag)


chanle.basic_consume(queue=queue_name, on_message_callback=task, auto_ack=False)

chanle.start_consuming()
# 消费者断开后队列自动删除

发布订阅高级Routing(按关键字精准匹配)

发布者

import pika
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()

channel.exchange_declare(exchange='m2',exchange_type='direct')

channel.basic_publish(exchange='m2',
                      routing_key='bnb', # 多个关键字,指定routing_key
                      body='lqz nb')

connection.close()

订阅者

import pika

credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()

# exchange='m1',exchange(秘书)的名称
# exchange_type='direct' , 秘书工作方式将消息发送给不同的关键字
channel.exchange_declare(exchange='m2',exchange_type='direct')

# 随机生成一个队列
result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue
print(queue_name)
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='m2',queue=queue_name,routing_key='nb')
channel.queue_bind(exchange='m2',queue=queue_name,routing_key='bnb')


def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)

channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)

channel.start_consuming()

订阅者2

import pika

credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()

# exchange='m1',exchange(秘书)的名称
# exchange_type='direct' , 秘书工作方式将消息发送给不同的关键字
channel.exchange_declare(exchange='m2',exchange_type='direct')

# 随机生成一个队列
result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue
print(queue_name)
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='m2',queue=queue_name,routing_key='nb')



def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)

channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)

channel.start_consuming()

发布订阅高级Topic(按关键字模糊匹配)

发布者

import pika

credentials = pika.PlainCredentials("admin", "admin")
connect = pika.BlockingConnection(parameters=pika.ConnectionParameters('10.0.0.200', 5672, credentials=credentials))
channel = connect.channel()

channel.exchange_declare(exchange='f3', exchange_type='topic')

channel.basic_publish(exchange='f3', routing_key='lqz.hand', body='一个*或#可收到消息',
                      properties=pika.BasicProperties(delivery_mode=2)
                      )
# channel.basic_publish(exchange='m3',
#                       # routing_key='lqz.handsome', #都能收到
#                       routing_key='lqz.handsome.xx', #只有lqz.#能收到
#                       body='lqz nb')

connect.close()

发布者2

import pika

credentials = pika.PlainCredentials("admin", "admin")
connect = pika.BlockingConnection(parameters=pika.ConnectionParameters('10.0.0.200', 5672, credentials=credentials))
channel = connect.channel()

channel.exchange_declare(exchange='f3', exchange_type='topic')

# channel.basic_publish(exchange='f3', routing_key='lqz.xx.xx', body='一个#可以收到消息',
#                       properties=pika.BasicProperties(delivery_mode=2)
#                       )
channel.basic_publish(exchange='f3', routing_key='zzz.xx', body='一个#可以收到消息',
                      properties=pika.BasicProperties(delivery_mode=2)
                      )

connect.close()

订阅者1

*只能加一个单词例如:lqz.123 lqz.*可以匹配

#可以加任意单词字符例如 lqz.xx.qq.zz lqz.#可以匹配

一个匹配只能使用一种而且只能有一个

import pika

credentials = pika.PlainCredentials("admin", "admin")
connect = pika.BlockingConnection(parameters=pika.ConnectionParameters('10.0.0.200', 5672, credentials=credentials))
channel = connect.channel()

channel.exchange_declare(exchange='f3', exchange_type='topic')

result = channel.queue_declare(queue='', exclusive=True, durable=True)
queue_name = result.method.queue
channel.queue_bind(queue=queue_name, exchange='f3', routing_key='lqz.xx.*')
channel.queue_bind(queue=queue_name, exchange='f3', routing_key='zzz.*')


# channel.queue_bind(queue=queue_name, exchange='f3', routing_key='lqz.xx.*')
# 一个匹配里面只能使用一个*或一个#
# 可以监听多个


def task(ch, method, pro, body):
    print('*消费消费者', body.decode('utf-8'))


channel.basic_consume(queue=queue_name, on_message_callback=task, auto_ack=True)

channel.start_consuming()

订阅者2

import pika

credentials = pika.PlainCredentials("admin", "admin")
connect = pika.BlockingConnection(parameters=pika.ConnectionParameters('10.0.0.200', 5672, credentials=credentials))
channel = connect.channel()

channel.exchange_declare(exchange='f3', exchange_type='topic')

result = channel.queue_declare(queue='', exclusive=True, durable=True)
queue_name = result.method.queue
channel.queue_bind(queue=queue_name, exchange='f3', routing_key='lqz.#')

def task(ch, method, pro, body):
    print('#消费消费者', body.decode('utf-8'))


channel.basic_consume(queue=queue_name, on_message_callback=task, auto_ack=True)

channel.start_consuming()

标签:订阅,pika,exchange,队列,Rabbitmq,queue,routing,credentials,channel
From: https://www.cnblogs.com/clever-cat/p/17378694.html

相关文章

  • RabbitMQ
    RabbitMQRabbitMQ使用安装RabbitMQRabbitMQ官网:https://www.rabbitmq.com/RabbitMQ是使用Erlang开发,需要先安装Erlang。RabbitMQ与Erlang的版本对照表如下:启动RabbitMQ在..\RabbitMQServer\rabbitmq_server-3.11.13\sbin目录下打开CMD,输入rabbitmq-server.bat,如下所示即......
  • RabbitMQ - 消息中间件
    RabbitMQ-消息中间件目录RabbitMQ-消息中间件1消息队列Rabbitmq介绍1.0什么是消息队列1.1rabbitmq介绍1.2MQ解决的问题1.3常见的消息队列及比较2RabbitMQ介绍安装2.1下载2.2安装(1)window安装(2)linux下安装rabbitmq(3)docker安装2.3配置web管理插件(1)windows配置(2)centos7......
  • 消息队列Rabbitmq介绍、rabbitmq安装、基于queue实现生产者消费者、基本使用、消息安
    目录1消息队列Rabbitmq介绍2rabbitmq安装3基于queue实现生产者消费者4基本使用4.1发送者4.2消费者5消息安全(详见笔记)6持久化(详见笔记)7闲置消费(详见笔记)8发布订阅(详见笔记)9发布订阅高级之Routing(按关键字匹配)(详见笔记)1消息队列Rabbitmq介绍#消息队列 -......
  • Linux安装rabbitMQ常用命令
    1.拉取最新的rabbitMQdockerpullrabbitmq:management2.容器启动rabbitMQdockerrun-d--hostnamemy-rabbit--namerabbit-p15672:15672-p5672:5672rabbitmq:management其中:     --hostname:指定容器主机名称     --name:        指定容器名称  ......
  • Rabbitmq 介绍 、安装、基于Queue实现生产者消费者模型、基本使用、消息安全之ack、du
    师承老刘llnb一、消息队列介绍1.1介绍消息队列就是基础数据结构中的“先进先出”的一种数据机构。想一下,生活中买东西,需要排队,先排的人先买消费,就是典型的“先进先出”1.2MQ解决什么问题MQ是一直存在,不过随着微服务架构的流行,成了解决微服务之间问题的常用工具。应用解耦......
  • rabbitmq 使用
    今日内容1消息队列Rabbitmq介绍---------------------------------------------#消息队列也叫消息队列中间件celery中使用redis做过消息队列来用换Rabbitmq做消息队列,就只需要把broker的连接地址换成Rabbitmq的连接地址就行了---------------------------......
  • RabbitMq
    1.消息队列1.1MQ的相关概念1.1.1什么是MQMQ(messagequeue),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使......
  • Docker 安装 RabbitMQ
    系统环境系统:Debian-10.2内核:Linux4.19.0-21-amd64x86_64(通过uname-r命令查看内核版本)RabbitMQ版本:rabbitmq:3.11-management安装教程访问RabbitMQ官方文档,查看官方安装教程,直接运行:dockerrun-it--rm--namerabbitmq-p5672:5672-p15672:15672rabbitmq......
  • RabbitMQ安装Delayed Message 插件
    在官网:https://www.rabbitmq.com/community-plugins.html点击:下载好之后就是一个解压好的文件:然后在将这个文件复制到rabiitmq/plugins里面:cp/Users/sixcandy/Downloads/rabbitmq_delayed_message_exchange-3.10.2.ez/opt/homebrew/Cellar/rabbitmq/3.10.2/plugins1进入rabi......
  • rabbitmq 延迟队列_Delayed Message 插件实现 RabbitMQ 延迟队列
    延迟队列是为了存放那些延迟执行的消息,待消息过期之后消费端从队列里拿出来执行。作者简介:五月君,NodejsDeveloper,慕课网认证作者,热爱技术、喜欢分享的90后青年,欢迎关注Nodejs技术栈(id:NodejsRoadmap)和Github开源项目 https://www.nodejs.redDLX+TTL方式存在的时序问......