首页 > 其他分享 >Rpc-实现Zookeeper注册中心

Rpc-实现Zookeeper注册中心

时间:2023-02-17 16:24:25浏览次数:75  
标签:return String Zookeeper private Rpc serviceName static 注册 public

1.前言

本文章是笔主在声哥的手写RPC框架的学习下,对注册中心的一个拓展。因为声哥某些部分没有保留拓展性,所以本文章的项目与声哥的工程有部分区别,核心内容在Curator的注册发现与注销,思想看准即可。

本文章Git仓库:zko0/zko0-rpc

声哥的RPC项目写的确实很详细,跟学一遍受益匪浅:

何人听我楚狂声的博客

在声哥的项目里使用Nacos作为了服务注册中心。本人拓展添加了ZooKeeper实现服务注册。

Nacos的服务注册和发现,设计的不是非常好,每次服务的发现都需要去注册中心拉取。本人实现ZooKeeper注册中心时,参考了Dubbo的设计原理,结合本人自身想法,添加了本地缓存:

  • Client发现服务后缓存在本地,维护一个服务——实例列表
  • 当监听到注册中心的服务列表发生了变化,Client更新本地列表
  • 当注册中心宕机,Client能够依靠本地的服务列表继续提供服务

问题:

  1. 实现服务注册的本地缓存,还需要实现注册中心的监听,当注册中心的服务发生更改时能够实现动态更新。或者用轮训的方式,定时更新,不过这种方式的服务实时性较差
  2. 当Server宕机,非临时节点注册容易出现服务残留无法清除的问题。所以我建议全部使用临时节点去注册。

2.内容

zookeeper需要简单学一下,知识内容非常简单,搭建也很简单,在此跳过。

如果你感兴趣,可以参考我的ZooKeeper的文章:Zookeeper学习笔记 - zko0

①添加依赖

Curator:(简化ZooKeeper客户端使用)(Netfix研发,捐给Apache,是Apache顶级项目)

这里排除slf4j依赖,因为笔主使用的slf4j存在冲突

<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.2.0</version>
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
        </exclusion>
    </exclusions>
</dependency>

②代码编写

1.首先创建一个连接类:

@Slf4j
public class ZookeeperUtil {

    //内部化构造方法
    private ZookeeperUtil(){
    }

    private static final String SERVER_HOSTNAME= RegisterCenterConfig.getHostName();

    private static final Integer SERVER_PORT=RegisterCenterConfig.getServerPort();

    private static CuratorFramework zookeeperClient;

    public static CuratorFramework getZookeeperClient(){
        if (zookeeperClient==null){
            synchronized (ZookeeperUtil.class){
                if (zookeeperClient==null){
                    RetryPolicy retryPolic=new ExponentialBackoffRetry(3000,10);
                    zookeeperClient = CuratorFrameworkFactory.builder()
                            .connectString(SERVER_HOSTNAME+":"+SERVER_PORT)
                            .retryPolicy(retryPolic)
                            //  zookeeper根目录为/serviceRegister,不为/
                            .namespace("serviceRegister")
                            .build();
                    zookeeperClient.start();
                }
            }
        }
        return zookeeperClient;
    }

    public static String getServerHostname(){
        return SERVER_HOSTNAME;
    }

    public static Integer getServerPort(){
        return SERVER_PORT;
    }

}

其中HOST,PORT信息我保存在regiserCenter.properties配置文件夹中,使用类读取:

public class RpcConfig {


    //注册中心类型
    private static String registerCenterType;

    //序列化类型
    private static String serializerType;

    //负载均衡类型
    private static String loadBalanceType;

    //配置Nacos地址
    private static String registerCenterHost;

    private static Integer registerCenterPort;


    private static boolean zookeeperDestoryIsEphemeral;

    private static String serverHostName;

    private static Integer serverPort;

    static {
        ResourceBundle bundle = ResourceBundle.getBundle("rpc");
        registerCenterType=bundle.getString("registerCenter.type");
        loadBalanceType=bundle.getString("loadBalance.type");
        registerCenterHost=bundle.getString("registerCenter.host");
        registerCenterPort = Integer.parseInt(bundle.getString("registerCenter.port"));
        try {
            zookeeperDestoryIsEphemeral="true".equals(bundle.getString("registerCenter.destory.isEphemeral"));
        } catch (Exception e) {
            zookeeperDestoryIsEphemeral=false;
        }
        serializerType=bundle.getString("serializer.type");
        serverHostName=bundle.getString("server.hostName");
        serverPort=Integer.parseInt(bundle.getString("server.port"));
    }

    public static String getRegisterCenterType() {
        return registerCenterType;
    }

    public static String getSerializerType() {
        return serializerType;
    }

    public static String getLoadBalanceType() {
        return loadBalanceType;
    }

    public static String getRegisterCenterHost() {
        return registerCenterHost;
    }

    public static Integer getRegisterCenterPort() {
        return registerCenterPort;
    }


    public static String getServerHostName() {
        return serverHostName;
    }

    public static Integer getServerPort() {
        return serverPort;
    }

    public static boolean isZookeeperDestoryIsEphemeral() {
        return zookeeperDestoryIsEphemeral;
    }

}

下面的代码我和声哥有些不同,我将服务注册,注销方法放在ServerUtils中,服务发现方法放在ClientUtils中:

服务的高一致性存在两种做法:

  • 因为ZooKeeper存在临时节点,注册中心可以实现Client(RPC的Server)断开,注册服务信息的自动丢失
  • 不设置为临时节点,手动的服务注册清除

我这里两种都实现了,虽然做两种方式不同但是功能相同的代码放在一起看起来很奇怪,这里只是做演示。选择其中一种即可。(我建议使用临时节点,当Server宕机,残留的服务信息也能及时清除)

注册实现原理图:

626723792516afae5530e634e691794

接口:

public interface ServiceDiscovery {
    InetSocketAddress searchService(String serviceName);

    void cleanLoaclCache(String serviceName);
}
public interface ServiceRegistry {
    //服务注册
    void register(String serviceName, InetSocketAddress inetAddress);

    void cleanRegistry();
}

ZooKeeper接口实现:

public class ZookeeperServiceDiscovery implements ServiceDiscovery{

    private final LoadBalancer loadBalancer;

    public ZookeeperServiceDiscovery(LoadBalancer loadBalancer) {
        this.loadBalancer = loadBalancer;
    }
    
    @Override
    public InetSocketAddress searchService(String serviceName) {
        return ZookeeperClientUtils.searchService(serviceName,loadBalancer);
    }

    @Override
    public void cleanLoaclCache(String serviceName) {
        ZookeeperClientUtils.cleanLocalCache(serviceName);
    }
}
public class ZookeeperServiceRegistry implements ServiceRegistry{
    @Override
    public void register(String serviceName, InetSocketAddress inetAddress) {
        ZookeeperServerUitls.register(serviceName,inetAddress);
    }

    @Override
    public void cleanRegistry() {
        ZookeeperServerUitls.cleanRegistry();
    }
}

Factory工厂:

public class ServiceFactory {

    private static String center = RpcConfig.getRegisterCenterType();
    private static String lb= RpcConfig.getLoadBalanceType();

    private static  ServiceRegistry registry;

    private static  ServiceDiscovery discovery;

    private static Object registerLock=new Object();

    private static Object discoveryLock=new Object();

    public static ServiceDiscovery getServiceDiscovery(){
        if (discovery==null){
            synchronized (discoveryLock){
                if (discovery==null){
                    if ("nacos".equalsIgnoreCase(center)){
                        discovery= new NacosServiceDiscovery(LoadBalancerFactory.getLoadBalancer(lb));
                    }else if ("zookeeper".equalsIgnoreCase(center)){
                        discovery= new ZookeeperServiceDiscovery(LoadBalancerFactory.getLoadBalancer(lb));
                    }
                }
            }
        }
        return discovery;
    }

    public static ServiceRegistry getServiceRegistry(){
        if (registry==null){
            synchronized (registerLock){
                if (registry==null){
                    if ("nacos".equalsIgnoreCase(center)){
                        registry=  new NacosServiceRegistry();
                    }else if ("zookeeper".equalsIgnoreCase(center)){
                        registry= new ZookeeperServiceRegistry();
                    }
                }
            }
        }
        return registry;
    }

}

使用Gson序列化InetSocketAddress存在问题,编写Util:

public class InetSocketAddressSerializerUtil {
    public static String getJsonByInetSockerAddress(InetSocketAddress address){
        HashMap<String, String> map = new HashMap<>();
        map.put("host",address.getHostName());
        map.put("port",address.getPort()+"");
        return new Gson().toJson(map);
    }

    public static InetSocketAddress getInetSocketAddressByJson(String json){
        HashMap<String,String> hashMap = new Gson().fromJson(json, HashMap.class);
        String host = hashMap.get("host");
        Integer port=Integer.parseInt(hashMap.get("port"));
        return new InetSocketAddress(host,port);
    }

}

上面主要是注册,发现的逻辑,我把主要方法写在了Utils中:

@Slf4j
public class ZookeeperServerUitls {

    private static CuratorFramework client = ZookeeperUtil.getZookeeperClient();

    private static final Set<String> instances=new ConcurrentHashSet<>();

    public static void register(String serviceName, InetSocketAddress inetSocketAddress){

        serviceName=ZookeeperUtil.serviceName2Path(serviceName);;
        String uuid = UUID.randomUUID().toString();
        serviceName=serviceName+"/"+uuid;
        String json = InetSocketAddressSerializerUtil.getJsonByInetSockerAddress(inetSocketAddress);
        try {
            if (RpcConfig.isZookeeperDestoryIsEphemeral()){
                //会话结束节点,创建消失
                client.create()
                        .creatingParentsIfNeeded()
                        .withMode(CreateMode.EPHEMERAL)
                        .forPath(serviceName,json.getBytes());
            } else {
                client.create()
                        .creatingParentsIfNeeded()
                        .forPath(serviceName,json.getBytes());
            }
        }
            catch (Exception e) {
            log.error("服务注册失败");
            throw  new RpcException(RpcError.REGISTER_SERVICE_FAILED);
        }
        //放入map
        instances.add(serviceName);
    }

    public static void cleanRegistry(){
        log.info("注销所有注册的服务");
        //如果自动销毁,不需要清除
        if (RpcConfig.isZookeeperDestoryIsEphemeral()) return;
        if (ZookeeperUtil.getServerHostname()!=null&&ZookeeperUtil.getServerPort()!=null&&!instances.isEmpty()){
            for (String path:instances) {
                try {
                    client.delete().forPath(path);
                } catch (Exception e) {
                    log.error("服务注销失败");
                    throw new RpcException(RpcError.DESTORY_REGISTER_FALL);
                }
            }
        }
    }
}
@Slf4j
public class ZookeeperClientUtils {

    private static CuratorFramework client = ZookeeperUtil.getZookeeperClient();

    private static final Map<String,  List<InetSocketAddress>> instances=new ConcurrentHashMap<>();

    public static InetSocketAddress searchService(String serviceName, LoadBalancer loadBalancer) {
        InetSocketAddress address;
        //本地缓存查询
        if (instances.containsKey(serviceName)){
            List<InetSocketAddress> addressList = instances.get(serviceName);
            if (!addressList.isEmpty()){
                //使用lb进行负载均衡
                return loadBalancer.select(addressList);
            }
        }
        try {
            String path = ZookeeperUtil.serviceName2Path(serviceName);
            //获取路径下所有的实现
            List<String> instancePaths = client.getChildren().forPath(path);
            List<InetSocketAddress> addressList = new ArrayList<>();
            for (String instancePath : instancePaths) {
                byte[] bytes = client.getData().forPath(path+"/"+instancePath);
                String json = new String(bytes);
                InetSocketAddress instance = InetSocketAddressSerializerUtil.getInetSocketAddressByJson(json);
                addressList.add(instance);
            }
            addLocalCache(serviceName,addressList);
            return loadBalancer.select(addressList);
        } catch (Exception e) {
            log.error("服务获取失败====>{}",e);
            throw new RpcException(RpcError.SERVICE_NONE_INSTANCE);
        }
    }

    public static void cleanLocalCache(String serviceName){
        log.info("服务调用失败,清除本地缓存,重新获取实例===>{}",serviceName);
        instances.remove(serviceName);
    }

    public static void addLocalCache(String serviceName,List<InetSocketAddress> addressList){
        //直接替换原本的缓存
        instances.put(serviceName,addressList);
    }
}

③配置文件

rpc.properties放在resources下

#nacos    zookeeper
#registerCenter.type=nacos
registerCenter.type=zookeeper

#registerCenter.host=127.0.0.1
registerCenter.host=101.43.244.40

#zookeeper port default 2181
#registerCenter.port=9000
registerCenter.port=2181

registerCenter.destory.isEphemeral=false

#??random?roundRobin
loadBalance.type=random

#kryo json jdk
serializer.type=kryo

server.hostName=127.0.0.1
server.port=9999

④更多

声哥的代码我做了很多修改,如果上述代码和你参考的项目代码出入比较大,可以查看本文章的工程阅读。

标签:return,String,Zookeeper,private,Rpc,serviceName,static,注册,public
From: https://www.cnblogs.com/zko0/p/17130553.html

相关文章

  • Nacos注册中心
    Nacos注册中心在cloud-demo父工程中添加spring-cloud-alilbaba的管理依赖:<!--nacos的管理依赖--><dependency><groupId>com.alibaba.cl......
  • Eureka注册中心搭建
    Eureka注册中心消费者该如何获取服务提供者具体信息?服务提供者启动时向eureka注册自己的信息eureka保存这些信息消费者根据服务名称向eureka拉取提供者信息如果有多个服务......
  • zookeeper
    zookeeper使用一、docker-compose方式安装3个节点集群version:'3.1'services:zoo1:container_name:zoo1image:zookeeperhostname:zoo1port......
  • Mac上zookeeper的安装与启动
    转载自:https://www.codenong.com/cs107101300/========== 下载压缩包从 https://archive.apache.org/dist/zookeeper/下载zookeeper-3.4.10.tar.gz解压123......
  • SpringBoot注册Servlet、Filter和Listener的方式和原理
    1实战在SpringBoot项目中,如果使用内嵌Web服务器,可以很方便地注册Servlet、Filter和Listener等组件。总的来说,包括以下方式:创建实现ServletContextInitializer接口的b......
  • [C#]注册表操作
    创建RegistryKeytheRegistryKey=Registry.CurrentUser;theRegistryKey.CreateSubKey("MyApp");theRegistryKey.SetValue("MyValue0","MyValueData0");theRegistry......
  • .Net Core(.Net6)创建grpc
    1.环境要求.Net6,VisualStudio2019以上官方文档:https://learn.microsoft.com/zh-cn/aspnet/core/tutorials/grpc/grpc-startNetFramework版本:https://www.cnblo......
  • .Net Framework创建grpc
    1.环境要求.NetFramework4.8.NetCore版本:https://www.cnblogs.com/dennisdong/p/17120990.html2.Stub和Proto2.1新建类库GrpcCommon2.2新建文件夹和配置文件......
  • Kubernetes CSI插件注册(二)—— Kubelet CSI插件注册机制源码分析
    1、概述Kubernetes的CSIPlugin注册机制的实现分为两个部分,第一部分是sidecar"node-driver-registrar",第二部分是Kubelet的pluginManager,第一部分详细内容请参见《Kuber......
  • .Net Core(.Net6)创建grpc
    1.环境要求.Net6,VisualStudio2019以上官方文档:https://learn.microsoft.com/zh-cn/aspnet/core/tutorials/grpc/grpc-startNetFramework版本:https://www.cnblog......