留给读者
其中最大的区别就是ZooKeeper
注册中心,注册中心可以有读写监听器,这是一个优势,可以用来实现订阅通知,也能做数据的同步,甚至可以做基于读写分离的RPC
框架,而且它是基于一种树结构key-value
的,它可以实现很多自己需要的,只不过不想Nacos
一样,直接提供API
注册ip
地址和端口,以及对应的类,之后我会尽可能扩展一个新的注册中心到rpc-netty-framework
中,满足各种不同的需求而做出改变!
使用Zookeeper
作为注册中心,RMI
作为连接技术,手写RPC
框架
1.框架结构
● 连接器:提供默认链接信息配置和提供连接 ● 注册器:提供注册服务和获取代理对象(没有具体的注册信息) ● RPC静态工厂:创建注册器、获取连接、注册服务和获取代理对象(已经通过静态初始化注册信息)
2.依赖
<dependencies>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<!--具体以zookeeper的版本为准-->
<version>3.4.11</version>
</dependency>
</dependencies>
3.项目
3.1 连接器
package com.fyp.rpc.connection;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
/**
* @Auther: fyp
* @Date: 2022/1/6
* @Description: 提供Zookeeper连接的自定义类型
* @Package: com.fyp.rpc.connection
* @Version: 1.0
*/
public class ZkConnection {
private String zkServer;
private int sessionTimeout;
public ZkConnection() {
super();
this.zkServer = "localhost:2181";
this.sessionTimeout = 10000;
}
public ZkConnection(String zkServer, int sessionTimeout) {
this.zkServer = zkServer;
this.sessionTimeout = sessionTimeout;
}
public ZooKeeper getConnection() throws IOException {
return new ZooKeeper(zkServer, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
}
});
}
}
3.2 注册器
package com.fyp.rpc.registry;
import com.fyp.rpc.connection.ZkConnection;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.rmi.Naming;
import java.rmi.NotBoundException;
import java.rmi.Remote;
import java.util.List;
/**
* @Auther: fyp
* @Date: 2022/1/6
* @Description: RPC注册器
* @Package: com.fyp.rpc.registry
* @Version: 1.0
*/
public class FypRpcRegistry {
private ZkConnection connection;
private String ip;
private int port;
/**
* 注册服务的方法
* @param serviceInterface 服务接口对象 如 : com.fyp.service.UserService.class
* @param serviceObject 服务实现类型的对象 如: new com.fyp.service.impl.UserServiceImpl
* @throws Exception 抛出异常,代表注册失败
*/
public void registerService(Class<? extends Remote> serviceInterface, Remote serviceObject) throws IOException, KeeperException, InterruptedException {
// rmi = rmi://ip:port/com.fyp.service.UsrService
String rmi = "rmi://" + ip + ":" + port + "/" + serviceInterface.getName();
// 拼接一个有规则的zk存储节点命名
String path = "/fyp/rpc/" + serviceInterface.getName();
List<String> children = connection.getConnection().getChildren("/fyp/rpc", false);
if(!children.contains(serviceInterface.getName())) {
Stat stat = new Stat();
connection.getConnection().getData(path, false, stat);
connection.getConnection().delete(path, stat.getCversion());
}
connection.getConnection().create(path,rmi.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
Naming.rebind(rmi, serviceObject);
}
/**
* 根据服务接口类型,访问zk,获取RMI的远程代理对象
* 1. 拼接一个zk中的节点名称
* 2. 访问zk,查询节点中存储的数据
* 3. 根据查询的结果,创建一个代理对象
* @return
*/
public <T extends Remote> T getServiceProxy(Class<T> serviceInterface) throws IOException, NotBoundException, KeeperException, InterruptedException {
String path = "/fyp/rpc/" + serviceInterface.getName();
byte[] datas = connection.getConnection().getData(path, false, null);
String rmi = new String(datas);
Object obj = Naming.lookup(rmi);
return (T) obj;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public ZkConnection getConnection() {
return connection;
}
public void setConnection(ZkConnection connection) {
this.connection = connection;
}
}
3.3 RPC静态工厂
package com.fyp.rpc;
import com.fyp.rpc.connection.ZkConnection;
import com.fyp.rpc.registry.FypRpcRegistry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import java.io.IOException;
import java.io.InputStream;
import java.rmi.NotBoundException;
import java.rmi.Remote;
import java.rmi.registry.LocateRegistry;
import java.util.List;
import java.util.Properties;
/**
* @Auther: fyp
* @Date: 2022/1/7
* @Description: RPC工厂
* @Package: com.fyp.rpc
* @Version: 1.0
*/
public class FypRpcFactory {
private static final Properties config = new Properties();
private static final ZkConnection connection;
private static final FypRpcRegistry registry;
/**
* 初始化过程、
* 固定逻辑,在classpath下,提供配置文件,命名为;fyp-rpc.properties
* registry.ip=服务端IP地址,默认为localhost
* registry.port=服务端端口号,默认为9090
* zk.server=Zookeeper访问地址,默认为localhost:2181
* zk.sessionTimeout=Zookeeper连接回话超时,默认为10000
*
*/
static {
try {
InputStream input = FypRpcRegistry.class.getClassLoader().getResourceAsStream("fyp-rpc.properties");
config.load(input);
String serverIp = config.getProperty("registry.ip") == null ? "localhost" : config.getProperty("registry.ip");
int serverPort= config.getProperty("registry.port") == null ? 9090 : Integer.parseInt(config.getProperty("registry.port"));
String zkServe = config.getProperty("zk.server") == null ? "localhost:2181" : config.getProperty("zk.server");
int zkSessionTimeout = config.getProperty("zk.sessionTimeout") == null ? 10000 : Integer.parseInt(config.getProperty("zk.sessionTimeout"));
connection = new ZkConnection(zkServe,zkSessionTimeout);
registry = new FypRpcRegistry();
registry.setIp(serverIp);
registry.setPort(serverPort);
registry.setConnection(connection);
LocateRegistry.createRegistry(serverPort);
List<String> children = connection.getConnection().getChildren("/", false);
if(!children.contains("fyp")) {
connection.getConnection().create("/fyp",null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
List<String> fypChildren = connection.getConnection().getChildren("/fyp", false);
if(!fypChildren.contains("rpc")) {
connection.getConnection().create("/fyp/rpc",null,ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
}
} catch (Exception e) {
e.printStackTrace();
// 初始化发生异常,中断虚拟机
throw new ExceptionInInitializerError(e);
}
}
public static void registerService(Class<? extends Remote> serviceInterface, Remote serviceObject) throws IOException, InterruptedException, KeeperException {
registry.registerService(serviceInterface, serviceObject);
}
public static <T extends Remote> T getServiceProxy(Class<T> serviceInterface) throws IOException, KeeperException, InterruptedException, NotBoundException {
return registry.getServiceProxy(serviceInterface);
}
}
总结:
说白了,RPC
框架已经被实现了,最大众的Dubbo
大家应该都用过了,这篇文章就是基于RMI
技术实现的简易版dubbo,后续会给出优化————服务自动发现注册、服务容错和负载均衡,想了解的的不妨加个收藏。
最后,如果有需要先了解Dubbo
再来学习RPC框架的,可以参考学习下面这篇文章。
《Linux环境下Dubbo环境搭建及启动》
学习参考: 尚学堂RPC远程过程调用:https://www.bilibili.com/video/BV11i4y1N7LQ
标签:Java,connection,rpc,fyp,RPC,registry,import,RMI,public From: https://blog.51cto.com/fyphome/5784628