首页 > 其他分享 >如何用MQ实现RPC远程调用?(附代码)

如何用MQ实现RPC远程调用?(附代码)

时间:2023-03-20 10:37:30浏览次数:53  
标签:调用 String factory RPC MQ new public 客户端


文章目录

  • ​​4.6 RPC 模式​​
  • ​​4.6.1 简介​​
  • ​​4.6.2 客户端​​
  • ​​4.6.2 服务端​​
  • ​​4.6.3 RPC模式小结​​

4.6 RPC 模式

4.6.1 简介

以前的几种模式的通信都是基于Producer发送消息到Consumer,然后Consumer进行消费,假设我们需要Consumer操作完毕之后返回给Producer一个回调呢?前面几种模式就行不通了;

例如我们要做一个远程调用加钱操作,客户端远程调用服务端进行加钱操作,操作完毕之后服务端将用户最新的余额返回给客户端;客户端进行后续操作,例如更新到数据库等;

  • RPC业务分析

如何用MQ实现RPC远程调用?(附代码)_消息队列

在RPC模式中,客户端和服务器都是Producer也都是Consumer;

RPC模式官网介绍:https://www.rabbitmq.com/tutorials/tutorial-five-java.html

  • RPC调用图解:

如何用MQ实现RPC远程调用?(附代码)_消息队列_02

4.6.2 客户端

package com.dfbz.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;

public class RPCClient implements AutoCloseable {

public Connection connection;
public Channel channel;
public static final String RPC_QUEUE_NAME = "rpc_queue";

public RPCClient() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.40.132");
factory.setPort(5672);
factory.setUsername("lscl");
factory.setPassword("admin");
factory.setVirtualHost("/lscl");

connection = factory.newConnection();
channel = connection.createChannel();
}

public static void main(String[] argv) throws Exception {
// 初始化信息
RPCClient rpcClient = new RPCClient();
// 发起远程调用
Integer response = rpcClient.call(20);

System.out.println(response);

rpcClient.channel.close();
rpcClient.connection.close();
}

public Integer call(Integer money) throws IOException, InterruptedException {

// 随机生成一个correlationId(密钥)
final String corrId = UUID.randomUUID().toString();

// 后期服务端回调给客户端的队列名(随机生成的回调队列名)
String replyQueueName = channel.queueDeclare().getQueue();

// 设置发送消息的一些参数
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId) // 密钥
.replyTo(replyQueueName) // 回调队列名
.build();

// 采用Simple模式发送给Server端
channel.basicPublish("", RPC_QUEUE_NAME, props, (money + "").getBytes("UTF-8"));

// 定义延迟队列
final BlockingQueue<Integer> response = new ArrayBlockingQueue<>(1);

channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {

// 回调方法,当收到消息之后,会自动执行该方法
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

if (properties.getCorrelationId().equals(corrId)) {
System.out.println("响应的消息:" + new String(body));

// 往延迟队列中添加信息(服务端响应的最新余额)
response.offer(Integer.parseInt(new String(body, "UTF-8")));
}
}
});

// 获取延迟队列中的信息(如果没有信息将一直阻塞)
return response.take();
}

public void close() throws IOException {
connection.close();
}
}

4.6.2 服务端

package com.dfbz.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;

public class RPCServer {

private static final String RPC_QUEUE_NAME = "rpc_queue";

// 总金额
private static Integer money = 0;

/**
* 加钱方法
* @param n
* @return
*/
private static Integer addMoney(int n) {
money += n;
return money;
}

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.40.132");
factory.setPort(5672);
factory.setUsername("lscl");
factory.setPassword("admin");
factory.setVirtualHost("/lscl");

try (
Connection connection = factory.newConnection();
Channel channel = connection.createChannel()
) {

channel.queueDeclare(RPC_QUEUE_NAME, true, false, false, null);

System.out.println("等待客户端请求.....");

while (true) {

// 接受到客户端的请求(消息)
channel.basicConsume(RPC_QUEUE_NAME, true, new DefaultConsumer(channel) {

// 回调方法,当收到消息之后,会自动执行该方法
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

// 本次的消息配置
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(properties.getCorrelationId()) // 客户端发送的密钥
.build();

System.out.println("客户端的消息: "+new String(body,"UTF-8"));
String response = "";

try {
String message = new String(body, "UTF-8");

// 调用加钱方法
response = addMoney(Integer.parseInt(message)) + "";

} finally {

// 发送一个消息给客户端
/*
properties.getReplyTo(): Client端设置的回调队列名
replyProps: 封装的参数(主要是CorrelationId)
*/
channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
}
}
});
}

}
}
}

4.6.3 RPC模式小结

严格意义上来说RPC并不是一种新的交换模式,他其实还是借助原有的模式(上述案例是采用Simple模式)来达到一些不同的功能,在RPC模式中只有客户端和服务端,并且客户端和服务端都既是Producer也是Consumer;

如何用MQ实现RPC远程调用?(附代码)_MQ_03

Tips:RPC模式已经违背了消息队列设计的初衷了;即一些无需及时返回且耗时的操作;


标签:调用,String,factory,RPC,MQ,new,public,客户端
From: https://blog.51cto.com/lscl/6132071

相关文章

  • 面试官:MQ的好处到底有哪些?
    文章目录​​一、消息中间件简介​​​​1.1概述​​​​1.2消息中间件的好处​​​​1.2.1应用解耦​​​​1.2.2异步处理​​​​1.2.3流量削峰​​​​1.3消息中间......
  • vue子组件怎么调用父组件的方法
    方法总结:1、子组件中通过“this.$parent.event”来调用父组件的方法。2、子组件用“$emit”向父组件触发一个事件,父组件监听这个事件即可。3、父组件把方法传入子组件中......
  • 分布式调用跟踪
    随着服务的拆分,系统的模块变得越来越多,不同的模块可能由不同的团队维护,一个请求可能会涉及几十个服务的协同处理,牵扯到多个团队的业务系统。假设现在某次服务调用失败,或......
  • 使用Docker部署Consul集群并由Ocelot调用
    关于consul的介绍就不写了百度就行,我们直接开干。一、部署consul集群拉取consul的镜像dockerpullconsul然后部署consul容器dockerrun--nameconsul1-d-p85......
  • RabbitMQ 01 概述
    什么是消息队列进行大量的远程调用时,传统的Http方式容易造成阻塞,所以引入了消息队列的概念,即让消息排队,按照队列进行消费。它能够将发送方发送的信息放入队列中,当新的......
  • RabbitMQ 02 安装
    由于现在Docker的流行,这里使用Docker进行安装。执行如下命令。dockerrun-d--restartalways--namerabbit-eRABBITMQ_DEFAULT_USER=admin-eRABBITMQ_DEFAULT_P......
  • Linux下安装RabbitMQ
     1.文件上传将文件上传到/usr/local/rabbitmq目录下(如果没有rabbitmq文件夹,自己手动创建一个)mkdirrabbitmq2.安装文件(分别按照顺序安装)rpm-ivherlang-21.3-1.el7......
  • Rabbitmq
    一、消息队列的概念及应用场景什么是消息队列消息是在不同应用间传递的数据。这里的消息可以非常简单,比如只包含字符串,也可以非常复杂,包含多个嵌套的对象。消息队列(Messa......
  • Python中通过反射来调用方法
    Isthereawaytopassinvokefunctionbymethodnameinstring,whichmeanscallthemethodbyreflectionYes,youcanusereflectioninPythontoinvokeame......
  • 【RPC高性能框架总结】5.高性能nio框架netty(中)
    接上一篇《​​4.高性能nio框架netty(上)​​》上一篇我们编写了使用Netty框架开发的客户端的启动类“NettyTestClient”以及业务处理类“NettyTestClientHandler”,本篇我......