首页 > 其他分享 >最简最快了解RPC核心流程

最简最快了解RPC核心流程

时间:2024-02-23 13:55:20浏览次数:22  
标签:最简 调用 String 流程 Object RPC Proxy new rpcRequest

本文主要以最简易最快速的方式介绍RPC调用核心流程,文中以Dubbo为例。同时,会写一个简易的RPC调用代码,方便理解和记忆核心组件和核心流程。

1、核心思想

RPC调用过程中,最粗矿的核心组件3个:RegistryProviderConsumer。最粗矿的流程4个:注册、订阅、通知、调用。最简单的流程图就1个:

本文会继续细粒度地拆解以上流程,拆解之前,请牢记这段话:

RPC调用,不管中间流程多么复杂,不管代码多么复杂,所有的努力也只为做2件事情:

  1. 在Consumer端,将ReferenceConfig配置的类转换成Proxy代理。
  1. 在Provider端,将ServiceConfig配置的类转换成Proxy代理。

2、核心组件

为了能在Consumer端和Provider端生成各自的Proxy代理,并且发起调用和响应,需要如下核心组件:

  • Registry:注册中心,主要是为了实现 Provider接口注册、Consumer订阅接口、接口变更通知、接口查找等功能。
  • Proxy:服务代理,核心中的核心,一切的努力都是为了生成合适的Proxy服务代理。
    • Consumer的Proxy:Consumer端根据ReferenceConfig生成Proxy,此Proxy主要用于找到合适的Provider接口,然后发起网络调用。
    • Provider的Proxy:Provider端根据ServiceConfig生成Proxy,此Proxy主要作用是通过类似反射的方法调用本地代码,再将结果返回给Consumer。
  • Protocol:服务协议,它相当于一个中间层,用于与注册中心打交道 和 封装 RPC 调用。它在初始化时会创建Client模块 与 服务端建立连接,也会生成用于远程调用的Invoker
  • Cluster:服务集群,主要用于路由、负载均衡、服务容错等。
  • Invoker:服务调用者。
    • Consumer的服务调用者主要是利用Client模块发起远程调用,然后等待Provider返回结果。
    • Provider的服务调用者主要是根据接收到的消息利用反射生成本地代理,然后执行方法,再将结果返回到Consumer。
  • Client:客户端模块,默认是Netty实现,主要用于客户端和服务端通讯(主要是服务调用),比如将请求的接口、参数、请求ID等封装起来发给Server端。
  • Server:服务端模拟,默认是Netty实现。主要是用于客户端和服务端通讯。

3、核心流程

3.1、Consumer流程

流程:

Consumer的流程实际上就是一个从ReferenceConfig 生成Proxy代理的过程。核心事情由Protocol完成。

  1. 根据ReferenceConfig生成代理
  2. 注册到注册中心、订阅注册中心事件
  3. 建立NettyClient,并且与NettyServer建立连接
  4. 生成客户端的ClientInvoker
  5. 选择负载均衡和集群容错
  6. ClientInvoker发起网络调用和等待结果

流程图:

3.2、Provider流程

流程

Provider的流程实际上就是个从ServiceConfig生成Proxy代理的过程。核心事情由PorxyProtocol完成。

  1. 根据ServiceConfig生成本地代理
  2. 注册到注册中心
  3. 启动NettyServer等待客户端连接
  4. 生成服务端Invoker
  5. Invoker监听调用请求
  6. 接收到请求后新建任务丢入到线程池去执行
  7. 执行时会生成本地代理执行(比如通过反射去调用具体的方法),再将返回结果写出去

流程图:

3.3、整体流程图

4、简易代码实现

4.1、核心代码介绍

客户端Proxy

/**
 * 获取代理Service
 */
@SuppressWarnings("unchecked")
public <T> T getService(Class clazz) throws Exception {

    return (T) Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{clazz}, new InvocationHandler() {
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            String methodName = method.getName();

            if ("equals".equals(methodName) || "hashCode".equals(methodName)) {
                throw new IllegalAccessException("不能访问" + methodName + "方法");
            }
            if ("toString".equals(methodName)) {
                return clazz.getName() + "#" + methodName;
            }

            List<RegistryInfo> registryInfoList = interfaceMethodsRegistryInfoMap.get(clazz);
            if (registryInfoList == null) {
                throw new RuntimeException("无法找到对应的服务提供者");
            }

            LoadBalancer loadBalancer = new RandomLoadBalancer();
            RegistryInfo registryInfo = loadBalancer.choose(registryInfoList);

            ChannelHandlerContext ctx = registryChannelMap.get(registryInfo);

            String identity = InvokeUtils.buildInterfaceMethodIdentify(clazz, method);
            String requestId;

            synchronized (ProxyProtocol.this) {
                requestIdWorker.increment();
                requestId = String.valueOf(requestIdWorker.longValue());
            }

            ClientInvoker clientInvoker = new DefaultClientInvoker(method.getReturnType(), ctx, requestId, identity);

            inProcessInvokerMap.put(identity + "#" + requestId, clientInvoker);

            return clientInvoker.invoke(args);
        }
    });
}

服务端Proxy

private class RpcInvokerTask implements Runnable {
    private RpcRequest rpcRequest;

    public RpcInvokerTask(RpcRequest rpcRequest) {
        this.rpcRequest = rpcRequest;
    }

    @Override
    public void run() {
        try {
            ChannelHandlerContext ctx = rpcRequest.getCtx();
            String interfaceIdentity = rpcRequest.getInterfaceIdentity();
            String requestId = rpcRequest.getRequestId();
            Map<String, Object> parameterMap = rpcRequest.getParameterMap();

            //interfaceIdentity组成:接口类+方法+参数类型
            Map<String, String> interfaceIdentityMap = string2Map(interfaceIdentity);

            //拿出是哪个类
            String interfaceName = interfaceIdentityMap.get("interface");
            Class interfaceClass = Class.forName(interfaceName);
            Object o = interfaceInstanceMap.get(interfaceClass);

            //拿出是哪个方法
            Method method = interfaceMethodMap.get(interfaceIdentity);

            //反射执行
            Object result = null;
            String parameterStr = interfaceIdentityMap.get("parameter");
            if (parameterStr != null && parameterStr.length() > 0) {
                String[] parameterTypeClasses = parameterStr.split(",");//接口方法参数参数可能有多个,用,号隔开
                Object[] parameterInstance = new Object[parameterTypeClasses.length];
                for (int i = 0; i < parameterTypeClasses.length; i++) {
                    parameterInstance[i] = parameterMap.get(parameterTypeClasses[i]);
                }
                result = method.invoke(o, parameterInstance);
            } else {
                result = method.invoke(o);
            }

            //将结果封装成rcpResponse
            RpcResponse rpcResponse = RpcResponse.create(JSONObject.toJSONString(result), interfaceIdentity, requestId);

            //ctx返回执行结果
            String resultStr = JSONObject.toJSONString(rpcResponse) + DELIMITER_STR;

            ByteBuf byteBuf = Unpooled.copiedBuffer(resultStr.getBytes());
            ctx.writeAndFlush(byteBuf);

            System.out.println("响应给客户端:" + resultStr);

        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

Protocol

public ProxyProtocol(String registryUrl, List<ServiceConfig> serviceConfigList, List<ReferenceConfig> referenceConfigList, int port) throws Exception {
    this.serviceConfigList = serviceConfigList == null ? new ArrayList<>() : serviceConfigList;
    this.registryUrl = registryUrl;
    this.port = port;
    this.referenceConfigList = referenceConfigList == null ? new ArrayList<>() : referenceConfigList;

    //1、初始化注册中心
    initRegistry(this.registryUrl);

    //2、将服务注册到注册中心
    InetAddress addr = InetAddress.getLocalHost();
    String hostName = addr.getHostName();
    String hostAddr = addr.getHostAddress();
    registryInfo = new RegistryInfo(hostName, hostAddr, this.port);
    doRegistry(registryInfo);

    //3、初始化nettyServer,启动nettyServer
    if (!this.serviceConfigList.isEmpty()) {
        nettyServer = new NettyServer(this.serviceConfigList, this.interfaceMethodMap);
        nettyServer.init(this.port);
    }

    //如果是客户端引用启动,则初始化处理线程
    if (!this.referenceConfigList.isEmpty()) {
        initProcessor();
    }
}

客户端Invoker

@Override
public T invoke(Object[] args) {
    JSONObject jsonObject = new JSONObject();
    jsonObject.put("interfaces", identity);

    JSONObject param = new JSONObject();
    if (args != null) {
        for (Object obj : args) {
            param.put(obj.getClass().getName(), obj);
        }
    }
    jsonObject.put("parameter", param);
    jsonObject.put("requestId", requestId);
    String msg = jsonObject.toJSONString() + Constants.DELIMITER_STR;
    System.out.println("发送给服务端JSON为:" + msg);

    ByteBuf byteBuf = Unpooled.copiedBuffer(msg.getBytes());
    ctx.writeAndFlush(byteBuf);

    wait4Result();

    return result;
}

private void wait4Result() {
    synchronized (this) {
        try {
            wait();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

@Override
public void setResult(String result) {
    synchronized (this) {
        this.result = (T) JSONObject.parseObject(result, returnType);
        notifyAll();
    }
}

服务端Invoker

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    String message = (String) msg;
    System.out.println("提供者收到消息:" + message);
    //解析消费者发来的消息
    RpcRequest rpcRequest = RpcRequest.parse(message, ctx);
    //接受到消息,启动线程池处理消费者发过来的请求
    threadPoolExecutor.execute(new RpcInvokerTask(rpcRequest));
}

/**
 * 处理消费者发过来的请求
 */
private class RpcInvokerTask implements Runnable {
    private RpcRequest rpcRequest;

    public RpcInvokerTask(RpcRequest rpcRequest) {
        this.rpcRequest = rpcRequest;
    }

    @Override
    public void run() {
        try {
            ChannelHandlerContext ctx = rpcRequest.getCtx();
            String interfaceIdentity = rpcRequest.getInterfaceIdentity();
            String requestId = rpcRequest.getRequestId();
            Map<String, Object> parameterMap = rpcRequest.getParameterMap();

            //interfaceIdentity组成:接口类+方法+参数类型
            Map<String, String> interfaceIdentityMap = string2Map(interfaceIdentity);

            //拿出是哪个类
            String interfaceName = interfaceIdentityMap.get("interface");
            Class interfaceClass = Class.forName(interfaceName);
            Object o = interfaceInstanceMap.get(interfaceClass);

            //拿出是哪个方法
            Method method = interfaceMethodMap.get(interfaceIdentity);

            //反射执行
            Object result = null;
            String parameterStr = interfaceIdentityMap.get("parameter");
            if (parameterStr != null && parameterStr.length() > 0) {
                String[] parameterTypeClasses = parameterStr.split(",");//接口方法参数参数可能有多个,用,号隔开
                Object[] parameterInstance = new Object[parameterTypeClasses.length];
                for (int i = 0; i < parameterTypeClasses.length; i++) {
                    parameterInstance[i] = parameterMap.get(parameterTypeClasses[i]);
                }
                result = method.invoke(o, parameterInstance);
            } else {
                result = method.invoke(o);
            }

            //将结果封装成rcpResponse
            RpcResponse rpcResponse = RpcResponse.create(JSONObject.toJSONString(result), interfaceIdentity, requestId);

            //ctx返回执行结果
            String resultStr = JSONObject.toJSONString(rpcResponse) + DELIMITER_STR;

            ByteBuf byteBuf = Unpooled.copiedBuffer(resultStr.getBytes());
            ctx.writeAndFlush(byteBuf);

            System.out.println("响应给客户端:" + resultStr);

        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

Client

EventLoopGroup group = new NioEventLoopGroup();
try {
    Bootstrap bootstrap = new Bootstrap();
    bootstrap.group(group)
            .channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .handler(new ChannelInitializer() {
                @Override
                protected void initChannel(Channel ch) throws Exception {
                    ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024 * 1024, Constants.DELIMITER));
                    ch.pipeline().addLast(new StringDecoder());
                    ch.pipeline().addLast(new NettyClientHandler());

                    System.out.println("initChannel - " + Thread.currentThread().getName());
                }
            });
    ChannelFuture cf = bootstrap.connect(ip, port).sync();
//            cf.channel().closeFuture().sync();
    System.out.println("客户端启动成功");
} catch (Exception e) {
    e.printStackTrace();
    group.shutdownGracefully();
}

Server

public NettyServer(List<ServiceConfig> serviceConfigList, Map<String, Method> interfaceMethodMap) {
    this.serviceConfigList = serviceConfigList;
    this.interfaceMethodMap = interfaceMethodMap;
}

public int init(int port) throws InterruptedException {
    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 1024)
            .childHandler(new ChannelInitializer() {
                @Override
                protected void initChannel(Channel channel) throws Exception {
                    channel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024 * 1024, DELIMITER));
                    channel.pipeline().addLast(new StringDecoder());
                    channel.pipeline().addLast(new RpcInvokeHandler(serviceConfigList, interfaceMethodMap));
                }
            });
    ChannelFuture cf = bootstrap.bind(port).sync();
    System.out.println("启动NettyServer,端口为:" + port);
    return port;
}

4.2、项目地址

https://github.com/yclxiao/rpc-demo.git

5、总结

本文主要以Dubbo为例介绍了RPC调用核心流程,同时,写了个简易的RPC调用代码。

记住以上的流程图即可搞明白整个调用流程。然后再记住最核心的2句话:

  • 所有的努力都是为了能在Consumer端和Provider端生成功能丰富的Proxy。核心事情由Protocol完成。
  • 核心的5个部件:Registry、ProxyProtocolInvokerClientServer

本篇完结!欢迎点赞 关注 收藏!!!

原文链接:https://mp.weixin.qq.com/s/9fF2weLLBR7SChOxPEEqEA

======>>>>>> 关于我 <<<<<<======

标签:最简,调用,String,流程,Object,RPC,Proxy,new,rpcRequest
From: https://www.cnblogs.com/mangod/p/18029345

相关文章

  • java的基本流程控制情况
    1、顺序结构顺序结构◆JAVA的基本结构就是顺序结构除非特别指明,否则就按照顺序一句一句执行。◆顺序结构是最简单的算法结构。语句与语句之间,框与框之间是按从上到下的顺序进行的,它是由若干个依次执行的处理步骤组成的,它是任何一个算法都离不开的一种基本算法结构。2、if单......
  • MySQL进阶语法【视图 触发器 事务 存储过程 流程控制】
    MySQL进阶语法【一】视图【1】概念在MySQL数据库中,视图(View)是虚拟的表,其内容是基于执行特定查询所得到的结果集。视图是通过查询来定义的,它可以包含一个或多个表中的特定列,也可以包含计算字段、聚合函数等。视图并不实际存储数据,而是动态地从基本表中获取数据并进行组合,每当......
  • Solana 开发学习之通过RPC与Solana交互
    Solana开发学习之通过RPC与Solana交互相关链接https://solana.com/docs/rpc/httphttps://www.jsonrpc.org/specificationhttps://www.json.org/json-en.htmlJSON-RPC2.0规范JSON-RPC是一种无状态、轻量级远程过程调用(RPC)协议。该规范主要定义了几种数据结构及其处......
  • 车载打气泵应用方案设计流程
    便携车载打气泵主要使用在汽车轮胎充气及车胎检测上,是一个气压精度测量产品。打气泵方案则是通过马达运转工作而进行设计,利用芯片和气压传感器所做的一个智能化便携车载打气泵方案。下面就来说说关于一个打气泵应用解决方案的开发流程。打气泵方案确定开发需求......
  • ubuntu server 22.04.2 LTS安装流程
    1、下载Ubuntu镜像下载地址:清华大学开源软件镜像站 选择Ubuntu版本22.04.XX(amd64,Server),其中XX小版本自选(当前示例版本为22.04.2)。 2、本地加载Ubuntu的iso镜像默认选中「TryorInstallUbuntuServer」安装选项,回车(或等待30秒后),等待系统镜像自检并进行安装初始化......
  • 微信小游戏开发流程
    微信小游戏开发相对于传统的网页小游戏开发有一些特定的步骤和要求,以下是一个微信小游戏开发的基本流程。1.注册和准备:注册微信开发者账号,并在微信开放平台创建小游戏项目。下载并安装微信开发者工具。2.项目初始化:在微信开发者工具中创建一个新的小游戏项目。选择合适的开......
  • AppBox快速开发框架(开源)开发流程介绍
      目前很多低代码平台都是基于Web用拖拽方式生成界面,确实可以极大的提高开发效率,但也存在一些问题:大部分平台灵活性不够,特殊需求需要较大的自定义开发;解析json配置的执行效率不是太高;大部分平台缺乏后端支持或复杂的业务逻辑支持;与后端的数据结构及业务服务不存在强关联,修......
  • 最简单的深拷贝和浅拷贝(原理)
    拷贝:其实就是一个对象复制给另外一整个对象,让对象相互不影响。对象的拷贝又分为浅拷贝和深拷贝对象的浅拷贝:浅拷贝是指只复制对象的第一层属性,如果对象的属性值是引用类型(如对象、数组等),则复制的是引用,而不是真正的拷贝。对象的深拷贝:深拷贝是指在复制对象时,不仅复制对象本身,还......
  • 流程控制和方法
    流程控制和方法1.1用户交互Scanner基本语法Scanners=newScanner(System.in);通过Scanner类的next()与nextLine()方法获取用户的字符串,读取前一般用hasNext()与hasNextLine()判断是否还有输入的数据if(s.hasNext()){Stringstr=s.next();System.out.printl......
  • 全流程点云机器学习(二)使用PaddlePaddle进行PointNet的机器学习训练和评估
    前言这不是高支模项目需要嘛,他们用传统算法切那个横杆竖杆流程复杂耗时很长,所以想能不能用机器学习完成这些工作,所以我就来整这个工作了。基于上文的数据集切分,现在来对切分好的数据来进行正式的训练。本系列文章所用的核心骨干网络代码主要来自点云处理:实现PointNet点云分割......