今日内容
1 消息队列Rabbitmq介绍
---------------------------------------------
# 消息队列
也叫消息队列中间件
celery中使用redis做过消息队列来用
换Rabbitmq做消息队列,就只需要把broker的连接地址换成Rabbitmq的连接地址就行了
---------------------------------------------
# 消息队列 MessageQueue 也叫MQ
消息队列就是基础数据结构中的“先进先出”的一种数据结构。
生活中买东西,需要排队,先排的人先买消费,就是典型的“先进先出”
---------------------------------------------
# MQ解决什么问题
最主要就是: 应用解耦(微服务中,服务间数据的传递)
流量削峰
消息分发(发布订阅)
异步
IPC 进程间通信也可以通过消息队列
.
.
.
常见消息队列比较
.
.
.
.
.
.
.
2 rabbitmq安装
rabbitmq官网 https://www.rabbitmq.com/
----------------------------------------------
# windows安装地址 https://www.rabbitmq.com/install-windows-manual.html
先下载 erlang解释器
再下载 rabbitmq的软件
----------------------------------------------
# centos 系统安装
yum -y install erlang
yum -y install rabbitmq-server
----------------------------------------------
# docker安装
docker pull rabbitmq:management
docker run -di --name Myrabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:management
# 默认用户名是admin 密码是admin,做了两个端口映射,
# 一个端口是连接服务的端口5672
# 另一个是web管理界面的端口15672
# 访问虚拟机的15672端口,就可以看到图形化界面(官方提供的),手动点点点操作
http://http://10.0.0.200:15672/#/
windows安装erlang解释器
.
windows安装rabbitmq
.
.
用容器运行rabbitmq后,访问虚拟机的15672端口http://10.0.0.200:15672/ 就能登录了
重点就3个 channels信道 exchanges交换机 queues队列
.
.
看官网的7个案例,学习如何使用
.
.
.
.
.
3 基于queue实现生产者消费者
import Queue
import threading
message = Queue.Queue(10)
def producer(i):
while True:
message.put(i)
def consumer(i):
while True:
msg = message.get()
for i in range(12):
t = threading.Thread(target=producer, args=(i,))
t.start()
for i in range(10):
t = threading.Thread(target=consumer, args=(i,))
t.start()
.
.
.
.
.
.
4 基本使用
4.1 发送基本的 hello world
# 生产者
import pika
# 第一步,连接服务端
# 无密码连接
# connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.200',port=5672))
# 有密码连接
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials))
# 第二步:连接channel信道
channel = connection.channel()
# 第三步:信道中创建一个队列,名字叫hello,也可以在图形化界面手动创建队列
channel.queue_declare(queue='hello')
# 第四步: 向hello队列中,发送Hello World
# routing_key是队列名字 body是发送的内容
channel.basic_publish(exchange='', routing_key='hello', body='Hello World !')
print(" Sent 'Hello World!'")
connection.close()
--------------------------------------------------
# 消费者
import pika
# 连接rabbitmq
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials))
# 连接信道
channel = connection.channel()
# 假设队列不存在,先创建队列,因为生产者与消费者不一定谁先运行
# 一旦该队列没有就创建,该队列已有就不创建
channel.queue_declare(queue='hello')
# 回调函数
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() # 程序会夯在这里,等待从消息队列中取消息
# auto_ack=True 表示消费者从队列里拿出数据后,默认没有异常,就把数据删掉
# 如果回调函数运行过程中报错,队列里数据也没了,所以一般把自动确认设为False
# 在回到函数的最后,所有逻辑的处理完了,再确认队列里该数据可以删掉!!!
--------------------------------------------------
# 生产者每产生一个消息,都会放到队列里面
# 消费者会从队列里一个一个的取出消息来
.
.
.
.
.
.
5 消息安全
# 消费完,确认后,再删除消息
.
.
.
.
6 持久化(详见笔记)
# queue和消息都要持久化
.
.
.
.
7 闲置消费(详见笔记)
# 正常情况如果有多个消费者,是按照顺序第一个消息给第一个消费者,第二个消息给第二个消费者
# 但是可能第一个消息的消费者处理消息很耗时,一直没结束,就可以让第二个消费者优先获得闲置的消息
.
.
.
.
.
8 发布订阅(详见笔记)
.
.
.
.
.
9 发布订阅高级之Routing(按关键字匹配)(详见笔记)
.
.
.