谁能用通俗的语言解释一下什么是 RPC 框架?
深入浅出 RPC - 浅出篇
深入浅出 RPC - 深入篇
1小时写一个分布式系统基础框架(一个java实现帮助理解RPC)
RabbitMQ 消息队列
- 安装 http://www.rabbitmq.com/install-standalone-mac.html
- 安装python rabbitMQ module
pip install pika
or
easy_install pika
or
源码
https://pypi.python.org/pypi/pika
实现最简单的队列通信
send端:
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost')) #建立一个socket
channel = connection.channel() #声明一个管道
#声明queue
channel.queue_declare(queue='hello')
#n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='',
routing_key='hello', #queue名字
body='Hello World!') #消息内容
print(" [x] Sent 'Hello World!'")
connection.close()
receive端:
#_*_coding:utf-8_*_
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost')) #建立一个socket
channel = connection.channel() #建立一个管道
# 避免queue已经存在的情况.
channel.queue_declare(queue='hello') #声明queue
def callback(ch, method, properties, body):#ch:管道的内存对象;method:发送方法包含内容;
print(" [x] Received %r" % body)
channel.basic_consume(callback, #如果收到消息就调用callback
queue='hello', #声明从哪个队列收消息
no_ack=True) #no acknowledgement消息处理完自动确认
print(' [*] Waiting for messages. To exit press CTRL+C') #均为byte格式
channel.start_consuming() #开始收消息
远程连接rabbitmq server的话,需要配置权限
1、首先在rabbitmq server上创建一个用户
sudo rabbitmqctl add_user root admin
2、同时还要配置权限,允许从外面访问
sudo rabbitmqctl set_permissions -p / root".*" ".*" ".*"
set_permissions [-p vhost] {user} {conf} {write} {read} vhost:授权用户使用的虚拟主机的名字,默认 /. user:授权可以使用指定虚拟主机的用户名称. conf:授权配置. write:授权写. read:授权读 |
客户端连接的时候需要配置认证参数
credentials = pika.PlainCredentials('root', 'admin')
connection =pika.BlockingConnection(pika.ConnectionParameters(
'10.211.55.5',5672,'/',credentials))
channel = connection.channel()
消息分发轮询
在这种模式下,RabbitMQ会默认把p发的消息依次分发给各个消费者(c),跟负载均衡差不多
消息提供者代码
import pika
import time
connection =pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
# 声明queue
channel.queue_declare(queue='task_queue')
# n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
import sys
message = ' '.join(sys.argv[1:]) or "Hello World! %s" % time.time()
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
)
)
print(" [x] Sent %r" % message)
connection.close()
消费者代码
#_*_coding:utf-8_*_
import pika, time
connection =pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(20)
print(" [x] Done")
print("method.delivery_tag",method.delivery_tag)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(callback,
queue='task_queue',
no_ack=True
)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
此时,先启动消息生产者,然后再分别启动3个消费者,通过生产者多发送几条消息,你会发现,这几条消息会被依次分配到各个消费者身上。
为了确保消息永远不会丢失,RabByMQ支持消息确认。ACK(NoDeLeGeMeST)从消费者发送回告诉RabByMQ一个特定的消息已经被接收、处理,并且RabBMQ可以自由地删除它。
如果用户死亡(它的信道被关闭,连接被关闭,或者TCP连接丢失)而不发送ACK,RabBMQ将理解消息没有被完全处理并且将重新排队。如果同时有其他在线消费者,那么它将很快重新交付给另一个消费者。这样,你就可以确保没有消息丢失,即使工人偶尔死亡。
没有任何消息超时;当消费者死亡时,RabBMQ将重新传递消息。即使处理消息需要很长很长的时间也很好。
默认情况下,消息确认打开。在前面的示例中,我们通过no_ack=true标记显式地关闭它们。并在完成任务后删除此标志并向工人发送适当的确认。
def callback(ch, method, properties, body):
print " [x] Received %r"% (body,)
time.sleep( body.count('.') )
print " [x] Done"
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(callback,
queue='hello')
使用此代码,我们可以确保即使在处理消息时使用CTRL+C杀死一个工人,也不会丢失任何东西。工人死后不久,所有未确认的信息将被重发。
消息持久化
我们已经学会了如何确保即使消费者死亡,任务也不会丢失(默认情况下,如果想禁用NoIAkAC= true)。但是如果RabByMQ服务器停止,我们的任务仍然会丢失。
当RabBMQ退出或崩溃时,除非你告诉它不要,否则它将忘记队列和消息。需要两件事来确保消息不会丢失:我们需要将队列和消息标记为持久的。
首先,我们必须确保RabBMQ永远不会失去我们的队列。为了做到这一点,我们需要声明它是durable=true:
channel.queue_declare(queue='hello', durable=True)
虽然这个命令本身是正确的,但它在我们的设置中是行不通的。这是因为我们已经定义了一个名为hello的队列,这是不持久的。RabBMQ不允许您重新定义具有不同参数的现有队列,并将返回任何试图尝试该程序的错误。但是有一个很快的解决办法——让我们声明一个名称不同的队列,例如task_queue:
channel.queue_declare(queue='task_queue', durable=True) #队列持久化
这个队列声明更改需要应用于生产者和消费者代码。在这一点上,我们确信即使在RabByMQ重新启动时,task_queue2队列也不会丢失。现在我们需要将消息标记为持久的——通过提供一个值为2的传递模式。
channel.basic_publish(exchange='',
routing_key="task_queue",
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # 消息持久化
))
消息公平分发
如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。
channel.basic_qos(prefetch_count=1)
带消息持久化+公平分发的完整代码
生产者端
#!/usr/bin/env python
import pika
import sys
connection =pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # 消息持久化
))
print(" [x] Sent %r" % message)
connection.close()
消费者端
#!/usr/bin/env python
import pika
import time
connection =pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag =method.delivery_tag)
channel.basic_qos(prefetch_count=1) #公平分发
channel.basic_consume(callback,
queue='task_queue')
channel.start_consuming()
广播:
之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了,
Exchange
1、接收从生产者发来的消息;
2、将消息放进队列.
Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息
- fanout: 所有bind到此exchange的queue都可以接收消息;
- direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息;
- topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息;
|
fanout广播模式
headers: 通过headers 来决定把消息发给哪些queue。
消息publisher
import pika
import sys
connection =pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
type='fanout') #1、fanout
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
routing_key='',#2
body=message)
print(" [x] Sent %r" % message)
connection.close()
消息subscriber
#_*_coding:utf-8_*_
import pika
connection =pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
type='fanout')
result = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
queue_name = result.method.queue
channel.queue_bind(exchange='logs',
queue=queue_name) #绑定到转换器上
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
direct广播模式
RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。
消息publisher
import pika
import sys
connection =pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
subscriber
import pika
import sys
connection =pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
topic广播模式
不仅订阅基于severity的例子, 而且订阅基于发出日志的源. 更加灵活,可以只监听从 cron'收到的典型错误也可以收到从'kern'收到的所有例子;
publisher
import pika
import sys
connection =pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 1 else'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
subscriber
import pika
import sys
connection =pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" %sys.argv[0])
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
获取所有日志:
python receive_logs_topic.py "#"
从 facility "kern"获取所有日志:
python receive_logs_topic.py "kern.*"
只收取 "critical" 日志:
python receive_logs_topic.py "*.critical"
创建多种绑定:
python receive_logs_topic.py "kern.*" "*.critical"
用一个routing key "kern.critical" 类型发出日志:
python emit_log_topic.py "kern.critical" "A critical kernel error"
Remote procedure call (RPC)
fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print("fib(4) is %r" % result)
RPC server
#_*_coding:utf-8_*_
__author__ = 'Alex Li'
import pika
import time
connection =pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n-1) + fib(n-2)
def on_request(ch, method, props, body):
n = int(body)
print(" [.] fib(%s)" % n)
response = fib(n)
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id = \
props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag =method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')
print(" [x] Awaiting RPC requests")
channel.start_consuming()
RPC client
import pika
import uuid
class FibonacciRpcClient(object):
def __init__(self):
self.connection =pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
self.channel =self.connection.channel()
result =self.channel.queue_declare(exclusive=True)
self.callback_queue =result.method.queue
self.channel.basic_consume(self.on_response, no_ack=True,
queue=self.callback_queue)
def on_response(self, ch, method, props, body):
if self.corr_id ==props.correlation_id:
self.response = body
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to = self.callback_queue,
correlation_id = self.corr_id,
),
body=str(n))
while self.response is None:
self.connection.process_data_events()
return int(self.response)
fibonacci_rpc = FibonacciRpcClient()
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)