首页 > 编程语言 >手写基于Java RMI的RPC框架

手写基于Java RMI的RPC框架

时间:2022-10-21 23:04:11浏览次数:69  
标签:Java connection rpc fyp RPC registry import RMI public

留给读者

其中最大的区别就是ZooKeeper注册中心,注册中心可以有读写监听器,这是一个优势,可以用来实现订阅通知,也能做数据的同步,甚至可以做基于读写分离的RPC框架,而且它是基于一种树结构key-value的,它可以实现很多自己需要的,只不过不想Nacos一样,直接提供API注册ip地址和端口,以及对应的类,之后我会尽可能扩展一个新的注册中心到rpc-netty-framework中,满足各种不同的需求而做出改变! 使用Zookeeper作为注册中心,RMI作为连接技术,手写RPC框架

1.框架结构

连接器:提供默认链接信息配置和提供连接 ● 注册器:提供注册服务和获取代理对象(没有具体的注册信息) ● RPC静态工厂:创建注册器、获取连接、注册服务和获取代理对象(已经通过静态初始化注册信息)

2.依赖

<dependencies>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
          <!--具体以zookeeper的版本为准-->
            <version>3.4.11</version>
        </dependency>
    </dependencies>

3.项目

3.1 连接器

package com.fyp.rpc.connection;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;

/**
 * @Auther: fyp
 * @Date: 2022/1/6
 * @Description: 提供Zookeeper连接的自定义类型
 * @Package: com.fyp.rpc.connection
 * @Version: 1.0
 */
public class ZkConnection {
    private String zkServer;
    private int sessionTimeout;

    public ZkConnection() {
        super();
        this.zkServer = "localhost:2181";
        this.sessionTimeout = 10000;
    }

    public ZkConnection(String zkServer, int sessionTimeout) {
        this.zkServer = zkServer;
        this.sessionTimeout = sessionTimeout;
    }

    public ZooKeeper getConnection() throws IOException {
        return new ZooKeeper(zkServer, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {

            }
        });
    }
}

3.2 注册器

package com.fyp.rpc.registry;

import com.fyp.rpc.connection.ZkConnection;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.rmi.Naming;
import java.rmi.NotBoundException;
import java.rmi.Remote;
import java.util.List;

/**
 * @Auther: fyp
 * @Date: 2022/1/6
 * @Description: RPC注册器
 * @Package: com.fyp.rpc.registry
 * @Version: 1.0
 */
public class FypRpcRegistry {
    private ZkConnection connection;
    private String ip;
    private int port;

    /**
     *  注册服务的方法
     * @param serviceInterface 服务接口对象 如 : com.fyp.service.UserService.class
     * @param serviceObject 服务实现类型的对象 如: new com.fyp.service.impl.UserServiceImpl
     * @throws Exception 抛出异常,代表注册失败
     */
    public void registerService(Class<? extends Remote> serviceInterface, Remote serviceObject) throws IOException, KeeperException, InterruptedException {
        // rmi = rmi://ip:port/com.fyp.service.UsrService
        String rmi = "rmi://" + ip + ":" + port + "/" + serviceInterface.getName();
        // 拼接一个有规则的zk存储节点命名
        String path = "/fyp/rpc/" + serviceInterface.getName();

        List<String> children = connection.getConnection().getChildren("/fyp/rpc", false);
        if(!children.contains(serviceInterface.getName())) {
            Stat stat = new Stat();
            connection.getConnection().getData(path, false, stat);
            connection.getConnection().delete(path, stat.getCversion());
        }
        connection.getConnection().create(path,rmi.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        Naming.rebind(rmi, serviceObject);
    }

    /**
     * 根据服务接口类型,访问zk,获取RMI的远程代理对象
     * 1. 拼接一个zk中的节点名称
     * 2. 访问zk,查询节点中存储的数据
     * 3. 根据查询的结果,创建一个代理对象
     * @return
     */
    public <T extends Remote> T getServiceProxy(Class<T> serviceInterface) throws IOException, NotBoundException, KeeperException, InterruptedException {
        String path = "/fyp/rpc/" + serviceInterface.getName();
        byte[] datas = connection.getConnection().getData(path, false, null);
        String rmi = new String(datas);
        Object obj = Naming.lookup(rmi);
        return (T) obj;
    }

    public String getIp() {
        return ip;
    }

    public void setIp(String ip) {
        this.ip = ip;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public ZkConnection getConnection() {
        return connection;
    }

    public void setConnection(ZkConnection connection) {
        this.connection = connection;
    }
}

3.3 RPC静态工厂

package com.fyp.rpc;

import com.fyp.rpc.connection.ZkConnection;
import com.fyp.rpc.registry.FypRpcRegistry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;

import java.io.IOException;
import java.io.InputStream;
import java.rmi.NotBoundException;
import java.rmi.Remote;
import java.rmi.registry.LocateRegistry;
import java.util.List;
import java.util.Properties;

/**
 * @Auther: fyp
 * @Date: 2022/1/7
 * @Description: RPC工厂
 * @Package: com.fyp.rpc
 * @Version: 1.0
 */
public class FypRpcFactory {
    private static final Properties config = new Properties();
    private static final ZkConnection connection;
    private static final FypRpcRegistry registry;
    /**
     * 初始化过程、
     * 固定逻辑,在classpath下,提供配置文件,命名为;fyp-rpc.properties
     * registry.ip=服务端IP地址,默认为localhost
     * registry.port=服务端端口号,默认为9090
     * zk.server=Zookeeper访问地址,默认为localhost:2181
     * zk.sessionTimeout=Zookeeper连接回话超时,默认为10000
     *
     */
    static {
        try {
            InputStream input = FypRpcRegistry.class.getClassLoader().getResourceAsStream("fyp-rpc.properties");
            config.load(input);
            String serverIp = config.getProperty("registry.ip") == null ? "localhost" : config.getProperty("registry.ip");
            int serverPort= config.getProperty("registry.port") == null ? 9090 : Integer.parseInt(config.getProperty("registry.port"));
            String zkServe = config.getProperty("zk.server") == null ? "localhost:2181" : config.getProperty("zk.server");
            int zkSessionTimeout = config.getProperty("zk.sessionTimeout") == null ? 10000 : Integer.parseInt(config.getProperty("zk.sessionTimeout"));
            connection = new ZkConnection(zkServe,zkSessionTimeout);
            registry = new FypRpcRegistry();
            registry.setIp(serverIp);
            registry.setPort(serverPort);
            registry.setConnection(connection);
            LocateRegistry.createRegistry(serverPort);

            List<String> children = connection.getConnection().getChildren("/", false);
            if(!children.contains("fyp")) {
                connection.getConnection().create("/fyp",null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
            List<String> fypChildren = connection.getConnection().getChildren("/fyp", false);
            if(!fypChildren.contains("rpc")) {
                connection.getConnection().create("/fyp/rpc",null,ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
            }
        } catch (Exception e) {
            e.printStackTrace();
            // 初始化发生异常,中断虚拟机
            throw new ExceptionInInitializerError(e);
        }
    }

    public static void registerService(Class<? extends Remote> serviceInterface, Remote serviceObject) throws IOException, InterruptedException, KeeperException {
        registry.registerService(serviceInterface, serviceObject);
    }

    public static <T extends Remote> T getServiceProxy(Class<T> serviceInterface) throws IOException, KeeperException, InterruptedException, NotBoundException {
        return registry.getServiceProxy(serviceInterface);
    }
}

总结: 说白了,RPC框架已经被实现了,最大众的Dubbo大家应该都用过了,这篇文章就是基于RMI技术实现的简易版dubbo,后续会给出优化————服务自动发现注册、服务容错和负载均衡,想了解的的不妨加个收藏。 最后,如果有需要先了解Dubbo再来学习RPC框架的,可以参考学习下面这篇文章。 《Linux环境下Dubbo环境搭建及启动》

学习参考: 尚学堂RPC远程过程调用:https://www.bilibili.com/video/BV11i4y1N7LQ

标签:Java,connection,rpc,fyp,RPC,registry,import,RMI,public
From: https://blog.51cto.com/fyphome/5784628

相关文章

  • JavaScript实现数据结构 -- 集合
    集合集合是一种无序且唯一的数据结构,在ES6中有集合Set。集合的常用操作去重使用Set结合展开运算符实现数组去重。判断元素是非在集合中使用Set的has方法判断元素是......
  • JavaScript实现数据结构 -- 字典
    字典字典与集合类似,也是一种存储唯一值的数据结构,字典以键值对的形式进行存储,在ES6中有字典Map。字典的常用操作增使用set()方法可以向字典中添加新成员,可连续添加。......
  • JAVA基本类型和包装类型
    JAVA基本类型和包装类型前言Java语言中的数据类型分为基本数据类型和引用类型,而我们进行Java开发的时候都听说过基本数据类型和包装类型,今天我们就来详细聊一聊Java中的......
  • Java中Final、 finally 、finalize的区别
    1、final可以修饰类、变量、方法,修饰类表示该类不能被继承、修饰方法表示该方法不能被重写、修饰变量表示该变量是一个常量不能被重新赋值。2、finally一般作用在try-catch......
  • Java_6
    代号为Mustang。版本发布于2006年12月11日,Sun把原本的名称“J2SE”改为“JavaSE”,然后再从版本号中去掉“.0”[23],而开发者内部编号仍然是1.6.0。[24]这......
  • 常用的Java开发IDE
    IDE(IntegratedDevelopmentEnvironment),集成开发环境。NetBeans。https://netbeans.orgJBuilder。IntelliJIDEAhttps://www.jetbrains.com/idea/Eclipse中......
  • SpringBoot2.0上启动RPC框架RNF2.0已发布
    使用效果:用户访问客户端:GEThttp://localhost:8081/user/hello?name="张三来访"浏览器访问客户端:服务端接收情况:服务端负载注册服务:上面的实现就好比客户端只......
  • javaSE基础-内部类
    内部类定义:一个类A定义在类B中,则类A为内部类,类B为外部类分类:成员内部类静态成员内部类非静态成员内部类局部内部类方法内代码块内构造器内示例一://InnerCla......
  • java语言中的运算符
    java语言中的运算符java语言支持如下运算符算术运算符:+,-,*,/,%,++,--赋值运算符:=关系运算符:>,<,>=,<=,==,!=instanceof逻辑运算符:&&,||,!位运算符:&,|,^,~,>>,<<,>>>(了解!!!)......
  • 【json报错】java json can not cast to JSONObject 报错解决方法【java json】【
     请问:如下:是否会报错?  解决方法:请先格式化json,再转对象。   原因:后端只图省事,直接这样写:获取的是json字符串没有转换直接put.   返回的报文就会不......