首页 > 其他分享 >RabbitMQ学习笔记07:RPC

RabbitMQ学习笔记07:RPC

时间:2023-01-17 17:26:35浏览次数:61  
标签:07 self rpc RabbitMQ queue RPC id 客户端

参考资料:RabbitMQ tutorial - Remote procedure call (RPC) — RabbitMQ 

 

Remote Procedure Call

What this tutorial focuses on

RabbitMQ学习笔记03:Work Queues 中我们学会了如何使用work queue在多个worker进程中分发耗时的任务。

但是如果我们需要在远程的电脑上执行一个函数并等待其结果呢?那就是完全不同的情况了。这种模式我们称之为远程过程调用 Remote Procedure Call,简称RPC

在本博文中,我们将会使用RabbitMQ去构建一个RPC系统:一个客户端和一个可扩展的服务器。实际上由于我们并没有任何值得分发的耗时任务,因此我们将会创建一个假的RPC服务用于返回斐波那契数列。

 

Client interface

为了描述一个RPC服务是如何被使用的,我们将会创建一个简单的客户端类。它会暴露一个名叫call的方法,该方法会发送一个RPC请求并阻塞直到收到回答:

fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print("fib(4) is %r" % result)

A note on RPC

尽管RPC在计算机技术中是一种非常常见的模型,但是它经常备受批评。当一个程序员没有办法知道是否函数的调用是本地的或者如果它是一个慢RPC,那么问题就会产生。这样的困惑会导致系统变得不稳定以及增加非必要的排错复杂度。滥用RPC不仅不会简化软件,反而会导致出现无法维护的面条式代码 spaghetti code

永远将以下内容记在心里:

  • 哪些函数调用是本地的,哪些函数调用是远程的,这些一定要确保是清晰的。
  • 文档要做好,确保不同的组件之间的依赖关系是清晰的。
  • 处理错误案例。当一个RPC服务器挂了很长一段时间以后,客户端会是什么样的反应?

如果不确定自己是否要使用RPC。如果可以的话,可以尝试使用异步管道,结果会被异步地推送到下一个计算阶段。

 

Callback queue

一般来说,通过RabbitMQ来实现RPC是简单的。一个客户端发送请求消息,然后一个服务器回复响应消息。为了接收响应,客户端需要在请求的时候发出callback队列的地址。

result = channel.queue_declare(queue='', exclusive=True)
callback_queue = result.method.queue

channel.basic_publish(exchange='',
                      routing_key='rpc_queue',
                      properties=pika.BasicProperties(
                            reply_to = callback_queue,
                            ),
                      body=request)

# ... and some code to read a response message from the callback_queue ...

Message properties

AMQP 0-9-1 预先定义了14种消息属性可以伴随消息一起发送出去。大多数属性很少使用到,除了以下即可:

  • delivery_mode: 标记一个消息是永久或者临时存在的。我们在消息的持久化时有使用到。
  • content_type: 用于描述编码时的MIME类型。比如常见的JSON编码application/json
  • reply_to: 一般用于命名一个callback队列。
  • correlation_id: 用于关联RPC的请求和响应。

 

Correlation id

在上面列出的方法种我们建议每次RPC请求都创建一个callback队列。这样是非常低效的,更好的办法是基于客户端来创建callback队列而不是基于每次RPC请求。

这会带来一个新的问题,在callback队列中接收到的响应消息,我们不知道对应的是哪条请求消息。因此我们需要使用到correlation_id属性,我们会针对每条消息单独设一个该属性的唯一的取值,然后当我们在回调队列中收到消息的时候,我们会关注消息中的这个属性的值,如果correlation_id的值相同就表示它们匹配的是同一条消息的请求和响应。如果我们发现correlation_id属性的值是我们所不知道的,那么它就不属于我们的请求,我们就可以安全丢弃它们。

你可能会问,为什么我们要丢弃回调队列中未知的消息而不是报错?这是因为在服务器端可能会出现系统错乱的情况(race condition)。尽管可能性很小,存在一种可能在RPC服务器发送给我们回答之后至在发送请求的确认消息之前这段时间,RPC刚好宕机了。这种情况下,当RPC服务器重启之后,它会重新处理一次请求。这就是为什么在客户端我们必须优雅地处理重复的响应,而且RPC理想情况下应该是幂等的。

 

Summary

我们的RPC的工作流程:

  1. 当客户端启动的时候,它会创建一个匿名的独有的回调队列。
  2. 对于一次RPC请求,客户端会发送一条消息并伴随2个消息属性。reply_to用于指向回调队列的地址,correlation_id用于标志每次请求消息。
  3. 请求会被发往rpc_queue队列。
  4. RPC的服务器端会作为消费者在该队列上等待请求(消息)。一旦收到请求就会对其处理,然后将返回的消息发往reply_to所指向的回调对类中。返回的消息会包含correlation_id,其值和请求时保持一致。
  5. 客户端将会在回调队列上等待数据。如果有新的数据抵达,并且correlation_id和之前请求时发出的消息中的correlation_id值一致的话,就会处理这条响应消息;否则若不一致,则丢弃消息。

有意思的一点,在RPC的C/S架构中,客户端和服务器端均作为消息队列的生产者和消费者。

 

 

Putting it all together

rpc_server.py

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')


def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)


def on_request(ch, method, props, body):
    n = int(body)

    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id = \
                                                         props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)

print(" [x] Awaiting RPC requests")
channel.start_consuming()

服务器端的代码很直接:

  1. 像之前一样,我们建立connection、channel,并创建名为rpc_queue的队列。
  2. 我们声明了一个斐波那契数列函数,它只能接受正整数。我们不要传递太大的数,否则程序会变得很慢。
  3. 我们定义了回调函数on_requestbasic_consume使用,这是RPC服务器的核心部分。当rpc_queue队列中有消息的时候它就会被执行,用于发送响应给客户端。
  4. 在服务器端我们可能会想要运行多个rpc_server.py进程,为了让空闲的进程可以立即处理请求,我们使用了prefetch_count

rpc_client.py

#!/usr/bin/env python
import pika
import uuid


class FibonacciRpcClient(object):

    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host='localhost'))

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(
            queue=self.callback_queue,
            on_message_callback=self.on_response,
            auto_ack=True)

        self.response = None
        self.corr_id = None

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(
            exchange='',
            routing_key='rpc_queue',
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,
                correlation_id=self.corr_id,
            ),
            body=str(n))
        self.connection.process_data_events(time_limit=None)
        return int(self.response)


fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)

客户端的代码则会稍微有点复杂:

  1. 建立connection、channel,并创建名为随机独有的回调队列,将该队列的名字保存到callback_queue
  2. 我们订阅callback_queue,这样我们就可以接收RPC响应。
  3. 每次有响应消息回来的时候,on_response回调函数就会被执行,完成一个很简单的工作,对于每条响应的消息它会检查是否correlation_id是我们在找的那个。如果是的话,它会将响应保存在self.response并且离开消费循环。
  4. 接下来我们定义我们的主方法call——它来执行实际的RPC请求。
  5. call方法中我们生成了correlation_id并保存到self.corr_id中。on_response回调函数会基于这个值获取合适的响应。
  6. 同样在call方法中发布消息的时候,我们包含了2个属性reply_tocorrelation_id
  7. 最后我们等待合适的响应到达,然后将该消息返回给用户。

接下来就可以执行代码了,打开第一个终端运行python rpc_server.py,打开第二个终端运行python rpc_client.py

两个终端最终结果如下

# 第一个终端
[root@rabbitmq-01 code]# python rpc_server.py 
 [x] Awaiting RPC requests
 [.] fib(30)

# 第二个终端
[root@rabbitmq-01 code]# python rpc_client.py 
 [x] Requesting fib(30)
 [.] Got 832040

代码中所呈现出来的设计不是唯一可能的RPC服务的实现,但是它有一些重要的优点:

  • 如果RPC服务太慢的话我们可以通过横向扩展的方式,在新的console窗口中再运行一个rpc_server.py进程。
  • 在客户端方面,RPC规定仅发送和接收1条消息。没有要求像queue_declare这样的异步调用。这样的结果对于一次RPC请求,RPC客户端只需要一次网络来回(network round trip)。

这里的代码只是示例代码而已,过度简化了,有一些复杂且重要的问题没有解决,比如说:

  • 如果没有server端运行的话,那么客户端会是什么反应?
  • 客户端是否应该针对RPC设置超时时长?
  • 如果服务器出现故障并且抛出了一个异常,那么这个异常是否应该被转发给客户端?
  • 处理数据前是否判断消息的有效性,比如我们只允许接收100以内的数等等这类的边界检查机制。

 

最后,tutorial在这里提到了 management UI ,这是一个以Web(GUI)形式展示RabbitMQ数据以及提供一些操作的插件,蛮适合不会或者不经常写代码操作RabbitMQ的人员,比如运维工程师。但是像生产者和消费者这种,还是得通过实际的代码来实现的。由此也可以看出,很多开源软件的应用,想要入门并且学好、用好的话,其实运维和开发相关的知识是都需要掌握的。

标签:07,self,rpc,RabbitMQ,queue,RPC,id,客户端
From: https://www.cnblogs.com/alongdidi/p/rabbitmq_tutorial_six.html

相关文章

  • 洛谷 P1098 [NOIP2007 提高组] 字符串的展开
    洛谷链接牛客链接两个平台都过了题目:题解:本题是一道比较硬核的模拟题,思路方面其实问题不大,但是难在模拟情况上面而且测试数据里还包含了一些题目中没有提到的情况,所......
  • 代码随想录算法训练营day07 | leetcode 242、349 、202、1
    基础知识哈希常见的结构(不要忘记数组)数组set(集合)map(映射)注意哈希冲突哈希函数LeetCode242分析1.0 HashMap<Character,Integer>记录字符元素及其个数,再......
  • gRPC之初体验
    前言经常看到说gRPC怎么好的文章,实际工作中也没有体验过,这次看了一下它的HelloWorld程序,记录一下这个过程。RPC是RemoteProduceCall的缩写,就是远程调用,调用远程的代......
  • 【拓扑排序】LeetCode 207. 课程表
    题目链接207.课程表思路参考Krahets大佬的思路代码classSolution{publicbooleancanFinish(intnumCourses,int[][]prerequisites){int[]inde......
  • RabbitMQ GUI客户端工具
    RabbitMQGUI客户端工具(RabbitMQAssistant)平时用控制台或者网页进行管理不免有点不方便,尤其在读取消息的时候不支持过滤和批量发送消息,在此推荐一个漂亮的GUI客户端工具。......
  • AcWing. 1072 树的最长路径
    题目描述给定一棵树,树中包含\(n\)个结点(编号\(1\)~\(n\))和\(n-1\)条无向边,每条边都有一个权值。现在请你找到树中的一条最长路径。换句话说,要找到一条路径,使得使得......
  • RabbitMQ说明与安装
    一、RabbitMQ介绍1.RabbitMQ的相关概念2007年发布,是一个在AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。RabbitMQ是......
  • RabbitMQ 高级消息队列协议
    https://blog.csdn.net/qq_46127735/article/details/120498245http://www.codebaoku.com/it-linux/it-linux-112105.htmlhttps://baike.baidu.com/item/rabbitmq/937214......
  • docker 创建seafile 1bec7de0740f4117904bf3be5122ecaf
    docker创建seafile部署Portainerdockerpullportainer/portainerdockerrun-p9000:9000-p8000:8000--nameportainer\--restart=always\-v/var/run/docker.......
  • 编译打包rabbitmq然后一键部署的简单方法
    摘要之前总结过一版,但是感觉不太全面想着本次能够将使用中遇到的问题总结一下.所以本次是第二版介质下载rabbitmq不区分介质的打包文件rabbitmq-server-generic-unix-3.11......