首页 > 其他分享 >openstack nova基础知识——RabbitMQ

openstack nova基础知识——RabbitMQ

时间:2023-08-27 11:05:38浏览次数:38  
标签:pika exchange nova queue connection RabbitMQ openstack channel routing


nova中各个组件之间的交互是通过“消息队列”来实现的,其中一种实现方法就是使用RabbitMQ,对RabbitMQ的使用,官方文档上有一个非常好的Get Started,由浅及深,结合例子,很容易理解。现在对RabbitMQ的理解,就是利用它可以非常灵活的定制自己想要实现的消息收发机制。

其中,有这样几个角色:producer, consumer, exchange, queue

producer是消息发送者,consumer是消息接受者,中间要通过exchange和queue。producer将消息发送给exchange,exchange决定消息的路由,即决定要将消息发送给哪个queue,然后consumer从queue中取出消息,进行处理,大致流程如下图:

openstack nova基础知识——RabbitMQ_ide

这几个角色当中,我觉得最关键的是这个exchange,它有3种类型:direct, topic, fanout。其中,功能最强大的就是topic,用它完全可以实现direct和fanout的功能。

direct是单条件的路由,即在exchange判断要将消息发送给哪个queue时,判断的依据只能是一个条件;

fanout是广播式的路由,即将消息发送给所有的queue;

topic是多条件的路由,转发消息时,依据的条件是多个,所以只使用topic就可以实现direct和fanout的功能。


上面所说的“条件”,反映到程序中,就是routing_key,这个routing_key出现在两个地方:

    1. 每一个发送的消息都有一个routing_key,表示发送的是一个什么样的消息;

    2. 每一个queue要和exchange绑定,绑定的时候要提供一个routing_key,表示这个queue想要接收什么样的消息。

这样,exchange就可以根据routing_key,来将消息发送到合适的queue中。


基本的思路就这些吧,下面来看一下官方文档上的那由浅及深的六个例子:

(我很喜欢这种风格的文档,整体由浅及深,适合初学者,其次文章没有大量的生僻词汇,而且例子+图片,比较容易懂,更好的是文章还带点小小的幽默,不由得让人汇心一笑,感觉老外做事就是认真细腻,希望自己也能养成这样的风格)


1. Hello World

最简单的情况,发一个消息,接收,打印出来这个消息。

openstack nova基础知识——RabbitMQ_ide_02

send.py:

#!/usr/bin/env python
import pika

# 1. Establish a connection with RabbitMQ server.
connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
channel = connection.channel()

# 2. Create a queue to which the message will be delivered, let's name it 'hello'
channel.queue_declare(queue='hello')

# 3. Use a default exchange identified by an empty string, which allows us to specify
#    exactly to which queue the message should go. The queue name needs to be specified
#    in the routing_key parameter:
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print " [x] Sent 'Hello World!'"

# 4. Close the connection
connection.close()

recv.py:

#!/usr/bin/env python
import pika

# 1. Establish a connection with RabbitMQ server
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

# 2. Make sure that the queue exists,run the command as many times as we like, and only one will be created.
channel.queue_declare(queue='hello')

print ' [*] Waiting for messages. To exit press CTRL+C'

# 3. Define a callback function.Whenever we receive a message, 
#    this callback function is called by the Pika library.
def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)

# 4. Subscribe the callback function to a queue.
#    Tell RabbitMQ that this particular callback function should receive messages from our hello queue.
channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

# 5. Enter a never-ending loop that waits for data and runs callbacks whenever necessary.
channel.start_consuming()



2. 多个consumer

这个例子跟第一个例子基本上一样,只是启动了多个consumer,并且模拟真实情况,即发送的消息使得consumer在短时间内不能完成工作。在这种情况下,多个consumer是如何协调工作的呢?其实,这些都是可以在程序中进行控制的。

openstack nova基础知识——RabbitMQ_python_03

send.py

#!/usr/bin/env python
import pika
import sys

# 1. Establish a connection with RabbitMQ server.
connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
channel = connection.channel()

# 2. Create a queue to which the message will be delivered, let's name it 'hello'
#    'durable=True' makes the queue persistent
channel.queue_declare(queue='task_queue',durable=True)

message=' '.join(sys.argv[1:]) or "Hello World!"

# 3. Use a default exchange identified by an empty string, which allows us to specify
#    exactly to which queue the message should go. The queue name needs to be specified
#    in the routing_key parameter:
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))
print " [x] Sent %r" % (message,)

# 4. Close the connection
connection.close()



recv.py:

#!/usr/bin/env python
import pika
import time

# 1. Establish a connection with RabbitMQ server
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

# 2. Make sure that the queue exists,run the command as many times as we like, and only one will be created.
#    'durable=True' makes the queue persistent
channel.queue_declare(queue='task_queue',durable=True)

print ' [*] Waiting for messages. To exit press CTRL+C'

# 3. Define a callback function.Whenever we receive a message, 
#    this callback function is called by the Pika library.
#
#    Send a ack to tell rabbitmq a task is done, then it can release the message.
#    If a worker dies, rabbitmq fail to receive the ack, it will redeliver the message to another worker.
#    Remember to write the last line code, or rabbitmq will eat more and more memory.
def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep(body.count('.'))
    print "[x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag) 

# Fair dispatch: Tell rabbitmq not give a worker more than one messages at a time
channel.basic_qos(prefetch_count=1)

# 4. Subscribe the callback function to a queue.
#    Tell RabbitMQ that this particular callback function should receive messages from our hello queue.
channel.basic_consume(callback,
                      queue='task_queue',
                      no_ack=False)# turn on the (ack)onwledgment, default is False

# 5. Enter a never-ending loop that waits for data and runs callbacks whenever necessary.
channel.start_consuming()



3. fanout exchange的例子:

openstack nova基础知识——RabbitMQ_python_04

send.py:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

# declare a exchange, type is fanout(means broadcast),named 'logs'.
# exchange is used to receive messages form producer, and send messages to queue.
# there are four exchange types: direct, topic, headers and fanout
channel.exchange_declare(exchange='logs',
                         type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='', #routing_key is '', because 'fanout' exchange will ignore its value.
                      body=message)
print " [x] Sent %r" % (message,)
connection.close()



recv.py:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

# if a exchange named 'logs' have not declared yet, then declare one, 
# or just use the existed exchange.
channel.exchange_declare(exchange='logs',
                         type='fanout')

# declare a temporary queue with a random name
# 'exclusive=True' flag will delete the queue when the consumer dies.
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

# bind the queue to the exchange, to tell the exchange to send messages to our queue.
channel.queue_bind(exchange='logs',
                   queue=queue_name)

print ' [*] Waiting for logs. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] %r" % (body,)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()



4. direct exchange的例子:

需要注意,一个queue是可以和同一个exchange多次绑定的,每次绑定要用不同的routing_key

openstack nova基础知识——RabbitMQ_ide_05

send.py:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

# declare a exchange, type is direct, named 'logs'.
channel.exchange_declare(exchange='direct_logs',
                         type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'

# a message is sent to the direct exchange with a routing_key.
# a message is identified by the routing_key.
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print " [x] Sent %r:%r" % (severity, message)
connection.close()



recv.py:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

# declare a direct exchange named 'direct_logs'
channel.exchange_declare(exchange='direct_logs',
                         type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    print >> sys.stderr, "Usage: %s [info] [warning] [error]" % \
                         (sys.argv[0],)
    sys.exit(1)

# Bind the queue to the direct exchange,
# 'routing_key' flag tells the direct exchange which kind of message it wants to receive.
# A queue can bind multiple times to the same direct exchange with different routing_keys,
# which means it wants to receive several kinds of messages.
for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

print ' [*] Waiting for logs. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] %r:%r" % (method.routing_key, body,)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()



5. topic exchange的例子

这里的routing_key可以使用一种类似正则表达式的形式,但是特殊字符只能是“*”和“#”,“*”代表一个单词,“#”代表0个或是多个单词。这样发送过来的消息如果符合某个queue的routing_key定义的规则,那么就会转发给这个queue。如下图示例:

openstack nova基础知识——RabbitMQ_python_06

send.py:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

# declare a exchange, type is topic, named 'topic_logs'.
# topic exchange allows to do routing based on multiple criteria.
channel.exchange_declare(exchange='topic_logs',
                         type='topic')

severity = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'

# a message is sent to the topic exchange with a routing_key.
# a message is identified by the routing_key.
# the topic routing_key can be like 'topic.host','topic.topic1.topic3', etc
# also can use '*'(one word) and '#'(zero or more words) to substitute word(s).
channel.basic_publish(exchange='topic_logs',
                      routing_key=severity,
                      body=message)
print " [x] Sent %r:%r" % (severity, message)
connection.close()



recv.py:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

# declare a topic exchange named 'topic_logs'
channel.exchange_declare(exchange='topic_logs',
                         type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]

if not binding_keys:
    print >> sys.stderr, "Usage: %s [binding_key]..." % (sys.argv[0],)
    sys.exit(1)

# Bind the queue to the topic exchange,
# 'routing_key' flag tells the topic exchange which kind of message it wants to receive.
# A queue can bind multiple times to the same direct exchange with different routing_keys,
# which means it wants to receive several kinds of messages.
for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)

print ' [*] Waiting for logs. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] %r:%r" % (method.routing_key, body,)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()



6. PRC(Remote Procedure Call,远程过程调用)

目前对这个的理解就是发送一个消息,然后还要得到一个结果,即消息要走一个来回。如下图所示:

openstack nova基础知识——RabbitMQ_python_07

client.py:

#!/usr/bin/env python
import pika
import uuid

class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                host='localhost'))

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id,
                                         ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)

fibonacci_rpc = FibonacciRpcClient()

print " [x] Requesting fib(30)"
response = fibonacci_rpc.call(30)
print " [.] Got %r" % (response,)



server.py:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)

def on_request(ch, method, props, body):
    n = int(body)

    print " [.] fib(%s)"  % (n,)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id = \
                                                     props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')

print " [x] Awaiting RPC requests"
channel.start_consuming()




几个RabbitMQ相关的命令:

1. 查看RabbitMQ中有多少个queue,以及每个queue中有多少个消息:
$ sudo rabbitmqctl list_queues

2. 查看RabbitMQ中exchange的情况:
$ sudo rabbitmqctl list_exchanges

3. 查看RabbitMQ中exchange和queue绑定情况:
$  sudo   rabbitmqctl list_bindings

4. 启动/停止RabbitMQ:
$  sudo   invoke-rc.d rabbitmq-server stop/start/etc.




标签:pika,exchange,nova,queue,connection,RabbitMQ,openstack,channel,routing
From: https://blog.51cto.com/u_5173797/7251425

相关文章

  • OpenStack遇到问题收集
    1.AMQPChannelExceptionPROBLEM:在用stable/folsom的devstack安装stable/folsom的openstack时,遇到下面的问题:(nova.api.openstack):TRACE:AMQPChannelException:(406,u"PRECONDITION_FAILED-cannotredeclareexchange'nova'invhost'/nova'withdiff......
  • Openstack Nova Security Group——安全组之架构篇
    哈,又回来了!公司同事说不要只停留在逻辑层,你要对跑在你程序底下的数据流也要非常的清楚。但是这里还是先介绍一下逻辑层,在代码的角度看是如何实现这个功能的,关于底层的数据流,还需要哦酝酿一段时间,之后会再总结一篇底层数据流的文章,真难为我了!一、什么是安全组安全组,翻译成英文是sec......
  • 使用pdb调试openstack (How to debug openstack using pdb )
    终于摸索出了一个调试openstack的简单方法,使用pdb进行单步调试,对于分析代码很有帮助。注意,这个方法只适用于用devstack安装的openstack。调试主要是使用了一个脚本,这个脚本不记得是从哪下载的了,是用来重启使用devstack安装的openstack的各个服务的,后来,我又结合devstack的stack.sh和......
  • centos7下安装rabbitmq3.8
    curl-shttps://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh|sudobashcurl-shttps://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh|sudobashyuminstallrabbitmq-server-3.8.14#以3.8.14版本为例......
  • rabbitMq
     @ServicepublicclassOrderItemServiceImpl{/***https://blog.csdn.net/aetawt/article/details/128957417*监听方式获取消息消息只消费一次,其他消费者消费后,本消费者不再消费*@RabbitHandler:标注在方法上*@RabbitListener:标注在类、......
  • HCIP_OpenStack总结部分
    目录第一章架构介绍1.OpenStack简介1.1OpenStack简述1.2OpenStack工作原理概述1.3开源OpenStack版本介绍1.4设计理念1.5OpenStack与云计算2.OpenStack架构2.1OpenStack架构简介3.OpenStack核心服务4.OpenStack服务交互第二章界面管理1.Horizon简介1.1简介1......
  • 消息队列 & RabbitMQ
    消息队列&RabbitMQ使用总结1.消息队列定义:messagequeue是消息传递过程中一种存储数据的结构。2.特点:先进先出,可以设置优先级用于大客户优先发货,持久化,消息确认,延时队列用于订单30分钟未支付取消。3.作用:应用解耦,肖峰填谷,异步提高响应速度4.RabbitMQ:四种交换机直接交换......
  • 手动搭建开源 OpenStack(Ussur)
    最后编辑时间:2023年8月15日19点26分导语实验概述在VMware®Workstation上基于CentOSStream8虚拟机,手动搭建开源OpenStack(Ussur版)实验要求最佳:3台节点(1台Controller+2台Compute);最少:2台节点(1台Controller+1台Compute)Controller节点内存最好8G最少4G,Compute节点4G......
  • RabbitMQ的学习之快速入门
    快速入门:使用springboot整合springAmqp来创建队列和消息同时创建consumer接收发送的消息第一步:引入依赖,在父类中引入spring-boot-starter-amqp依赖第二步:在yml中建立连接 第三步:创建publisher测试类引入注解,利用 RabbitTemplate生成对象调用方法,这样publisher就创建......
  • RabbitMQ
    初识MQ同步和异步通讯微服务间通讯有同步和异步两种方式:同步通讯:就像打电话,需要实时响应。异步通讯:就像微信,不需要马上回复。同步通讯SpringCloud中Feign调用就属于同步方式,虽然调用可以实时得到结果,但存在下面的问题:优点:时效性较强,可以立即得到结果缺点:耦合......