首页 > 其他分享 >RabbitMQ的 RPC 消息模式你会了吗?

RabbitMQ的 RPC 消息模式你会了吗?

时间:2024-09-10 23:25:24浏览次数:11  
标签:请求 队列 模式 响应 RPC RabbitMQ 服务器 客户端

前文学习了如何使用工作队列在多个工作者之间分配耗时的任务。若需要在远程计算机上运行一个函数并等待结果呢?这种模式通常被称为远程过程调用 (RPC)。

本节使用 RabbitMQ 构建一个 RPC 系统:一个客户端和一个可扩展的 RPC 服务器。由于我们没有耗时的任务可以分配,因此我们将创建一个返回斐波那契数的虚拟 RPC 服务。

客户端接口

创建一个简单的客户端类,暴露 call 方法,该方法发送一个 RPC 请求并阻塞,直到收到响应:

FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
String result = fibonacciRpc.call("4");
System.out.println("fib(4) 是 " + result);

虽然 RPC 是计算中很常见的模式,但它经常受到批评。问题在于当程序员不确定函数调用是本地调用还是缓慢的 RPC 调用时,会引发困惑。这种混淆会导致系统不可预测,并增加调试的复杂性。错误使用 RPC 不仅没有简化软件,反而可能导致难以维护的“代码结构混乱”。
鉴于此,请遵循以下建议:
确保明确区分本地函数调用和远程函数调用。
记录你的系统,使组件之间的依赖关系清晰。
处理错误情况。例如,当 RPC 服务器长时间不可用时,客户端应如何响应?
如有疑虑,请尽量避免使用 RPC。如果可能,应该使用异步管道——与 RPC 类似的阻塞操作不同,结果将被异步推送到下一个计算阶段。

回调队列

在 RabbitMQ 上实现 RPC 很简单。客户端发送一个请求消息,服务器通过响应消息进行回复。为接收响应,需要在请求中附上一个“回调”队列地址。可用默认的队列(在 Java 客户端中是独占的)。试试这个代码:

callbackQueueName = channel.queueDeclare().getQueue();

BasicProperties props = new BasicProperties.Builder()
    .replyTo(callbackQueueName)
    .build();

channel.basicPublish("", "rpc_queue", props, message.getBytes());
// ...然后从 callback_queue 读取响应消息...

需要导入:

import com.rabbitmq.client.AMQP.BasicProperties;

消息属性
AMQP 0-9-1 协议预定义了一组 14 个与消息一起发送的属性。大多数属性很少使用,以下属性是常用的:
deliveryMode:标记消息为持久 (值为 2) 或瞬时 (其他值) 的模式
contentType:用于描述编码的 mime 类型。例如,对于常用的 JSON 编码,建议将此属性设置为:application/json
replyTo:通常用于命名回调队列
correlationId:用于将 RPC 响应与请求相关联

Correlation Id

在前面提到的方法中,我们建议为每个 RPC 请求创建一个回调队列。这很低效,但幸好有一个更好的方法——为每个客户端创建一个回调队列。

这会引发一个新问题:在回调队列中收到响应时,不清楚该响应属于哪个请求。这时 correlationId 属性派上用场。为每个请求设置一个唯一值。稍后,回调队列中收到消息时,看此属性,并根据它来匹配响应和请求。如看到一个未知 correlationId 值,可以安全地丢弃消息——它不属于我们的请求。

为啥应该忽略回调队列中的未知消息,而不非直接失败?因为服务器端可能会发生竞态条件。虽然不太可能,但可能 RPC 服务器在发送完答案后崩溃,但在为请求发送确认消息之前就崩溃了。如果发生这种情况,重启后的 RPC 服务器将重新处理该请求。因此,客户端的我们必须优雅地处理重复的响应,RPC 最好是幂等的。

总结

RPC模式工作流程:

  • 对于一个 RPC 请求,客户端发送一条带有两个属性的消息:replyTo,其值设置为为该请求创建的匿名独占队列;correlationId,其值为每个请求设置的唯一标识。
  • 请求被发送到 rpc_queue 队列。
  • RPC 工作者(即服务器)在该队列上等待请求。一旦收到请求,它将完成任务,并通过 replyTo 字段指定的队列将结果发送回客户端。
  • 客户端在回复队列中等待数据。当消息到达时,它检查 correlationId 属性。如果匹配请求中的值,它将响应返回给应用程序。

实现全流程

斐波那契任务:

private static int fib(int n) {
    if (n == 0) return 0;
    if (n == 1) return 1;
    return fib(n-1) + fib(n-2);
}

我们定义了斐波那契函数。该函数假设只接收有效的正整数输入。(对于较大数字,该算法效率较低,它可能是最慢的递归实现。)

服务器代码可在此处找到:RPCServer.java

客户端代码略显复杂,完整的示例源代码可参考 RPCClient.java

编译并设置类路径:

javac -cp $CP RPCClient.java RPCServer.java

我们的 RPC 服务现在已准备就绪。启动服务器:

java -cp $CP RPCServer
# => [x] 正在等待 RPC 请求

要请求斐波那契数,运行客户端:

java -cp $CP RPCClient
# => [x] 请求 fib(30)

此处展示的设计并非 RPC 服务的唯一实现方式,但它有以下优势:

  • 如果 RPC 服务器太慢,你可以通过运行另一个服务器实例进行扩展。试着在新的控制台中运行第二个 RPCServer
  • 在客户端,RPC 只需发送和接收一条消息。无需像 queueDeclare 这样的同步调用。因此,RPC 客户端只需一个网络往返即可完成一次 RPC 请求。

代码仍然相对简单,并未尝试解决更复杂但重要的问题,如:

  • 如果没有服务器运行,客户端应该如何响应?
  • RPC 是否需要某种超时机制?
  • 如果服务器发生故障并引发异常,是否应该将其转发给客户端?
  • 在处理消息前,是否应检查其有效性(如范围、类型)以防止无效消息的进入?

关注我,紧跟本系列专栏文章,咱们下篇再续!

作者简介:魔都架构师,多家大厂后端一线研发经验,在分布式系统设计、数据平台架构和AI应用开发等领域都有丰富实践经验。

各大技术社区头部专家博主。具有丰富的引领团队经验,深厚业务架构和解决方案的积累。

负责:

  • 中央/分销预订系统性能优化
  • 活动&券等营销中台建设
  • 交易平台及数据中台等架构和开发设计
  • 车联网核心平台-物联网连接平台、大数据平台架构设计及优化
  • LLM Agent应用开发
  • 区块链应用开发
  • 大数据开发挖掘经验
  • 推荐系统项目

目前主攻市级软件项目设计、构建服务全社会的应用系统。

参考:

本文由博客一文多发平台 OpenWrite 发布!

标签:请求,队列,模式,响应,RPC,RabbitMQ,服务器,客户端
From: https://www.cnblogs.com/JavaEdge/p/18407457

相关文章

  • RabbitMQ的 RPC 消息模式你会了吗?
    前文学习了如何使用工作队列在多个工作者之间分配耗时的任务。若需要在远程计算机上运行一个函数并等待结果呢?这种模式通常被称为远程过程调用(RPC)。本节使用RabbitMQ构建一个RPC系统:一个客户端和一个可扩展的RPC服务器。由于我们没有耗时的任务可以分配,因此我们将创建一......
  • 企业数智化升级新模式【数据飞轮】,高效挖掘数据资产价值
    一、前言:企业数智化升级新模式【数据飞轮】,它是一种企业数智化升级新模式,旨在通过数据和业务的双向良性驱动,有效提升工作质效。数据飞轮在多个行业中被用来充分发挥数据中台的价值,为业务发展提供参考。数据飞轮的工作原理可以类比为飞轮效应:一开始,为了使飞轮转动起来,需要花费较大的......
  • Android Studio -> Android Studio 获取release模式和debug模式的APK
    AndroidStudio上鼠标修改构建类型Release版本激活路径:Moretoolwindows->BuildVariants->ActiveBuildVariant->releaseAPK路径:Project\app\build\intermediates\apk\app-release.apkDebug版本激活路径:Moretoolwindows->BuildVariants->ActiveBuildVariant->......
  • IIC工作模式时序分析
    IIC工作模式时序分析此处利用IO口模拟IIC通信过程中的时序。通信过程在IIC通信过程SDA存在两种模式(接收模式和发送模式),发送或接受一个字节(器件的7个bit+1个bit方向(1-读方向,0-写方向))模式配置当SDA为接入模式接收了1字节数据后在第九个时钟脉冲期间就要变成输出模式发送as......
  • Java 设计模式-状态模式
    目录一.概述二.主要角色三.代码示例四.优缺点优点:缺点:五.常见应用场景一.概述        状态模式是一种行为设计模式,它允许一个对象在其内部状态改变时改变它的行为。对象看起来好像修改了它的类。状态模式把所有的与一个特定的状态相关的行为放到一个类......
  • Java 设计模式-代理模式
    目录概述一.什么是代理模式1.举例说明二.代理模式作用1.保护代理2.增强功能3.代理交互4.远程代理:三.代理模式3个角色四.静态代理1.代码示例:五.JDK动态代理1.代码示例:六.CGLIB动态代理1.代码示例 七.JDK动态代理和CGLIB动态代理区别八.两种在......
  • [设计模式] Cola-StateMachine : 一个轻量实用的Java状态机框架
    1概述:状态机1.0状态机vs工作流在介绍状态机之前,先介绍一个工作流(WorkFlow),初学者通常容易将两个概念混淆。工作流(WorkFlow),大体是指业务过程(整体或者部分)在计算机应用环境下的自动化,是对工作流程及其各操作步骤之间业务规则的描述。在计算机系统中,工作流属于计算机支持的......
  • 【学习】为什么许多大型APP会采用RPC而不是HTTP协议呢?
    https://mp.weixin.qq.com/s/JD5qDjYFVn37pAC6W5JuDw原创前端欧巴在回答这个问题之前我们有必要系统性的了解一下RPC是何方神圣。---------------------------------------------------------------------------------------------RPC(RemoteProcedureCall,远程过程调用)是......
  • Activity启动模式
    Activity启动模式1.Activity启动模式介绍1.1任务栈在Android开发中,任务栈(TaskStack)是一个非常重要的概念,主要用于管理应用程序中的Activity及其启动模式。它帮助开发者了解当用户在不同应用之间切换,或者应用内部不同Activity之间跳转时,系统如何管理这些Activity的生命周期与......
  • NS4263 3.0Wx2 双声道 AB/D 类双模音频功率放大器附加耳机模式
    1特性●工作电压范围:3.0V-5.25V●AB类和D类工作模式切换●一线脉冲控制工作模式与关断模式●内置立体声耳机输出功能●输出功率3W@ClassD/Load=4ohm●THD+N=0.1%@VDD=5V/Po=1W●优异的全带宽EMI抑制能力●优异的“上电和掉电”噪声抑制●内置过流保护......