首页 > 数据库 >redis 源码分析:Jedis 哨兵模式连接原理

redis 源码分析:Jedis 哨兵模式连接原理

时间:2023-10-02 10:33:05浏览次数:39  
标签:sentinels redis masterName 源码 Jedis master sentinel new final

1. 可以从单元测试开始入手

查看类JedisSentinelPool
  private static final String MASTER_NAME = "mymaster";

  protected static final HostAndPort sentinel1 = HostAndPorts.getSentinelServers().get(1);
  protected static final HostAndPort sentinel2 = HostAndPorts.getSentinelServers().get(3);
  
  @Before
  public void setUp() throws Exception {
    sentinels.clear();

    sentinels.add(sentinel1.toString());
    sentinels.add(sentinel2.toString());
  }

  @Test
  public void repeatedSentinelPoolInitialization() {

    for (int i = 0; i < 20; ++i) {
      GenericObjectPoolConfig<Jedis> config = new GenericObjectPoolConfig<>();

      JedisSentinelPool pool = new JedisSentinelPool(MASTER_NAME, sentinels, config, 1000,
          "foobared", 2);
      pool.getResource().close();
      pool.destroy();
    }
  }

可以看到首先是创建了sentinel 的HostAndPort 对象,然后创建了连接池

2. 查看 JedisSentinelPool 构造器,正式进入源码

  public JedisSentinelPool(String masterName, Set<String> sentinels,
      final GenericObjectPoolConfig<Jedis> poolConfig, int timeout, final String password,
      final int database) {
    this(masterName, sentinels, poolConfig, timeout, timeout, null, password, database);
  }
  
  ...
    public JedisSentinelPool(String masterName, Set<String> sentinels,
      final GenericObjectPoolConfig<Jedis> poolConfig,
      final int connectionTimeout, final int soTimeout, final int infiniteSoTimeout,
      final String user, final String password, final int database, final String clientName,
      final int sentinelConnectionTimeout, final int sentinelSoTimeout, final String sentinelUser,
      final String sentinelPassword, final String sentinelClientName) {
    this(masterName, parseHostAndPorts(sentinels), poolConfig,
        DefaultJedisClientConfig.builder().connectionTimeoutMillis(connectionTimeout)
            .socketTimeoutMillis(soTimeout).blockingSocketTimeoutMillis(infiniteSoTimeout)
            .user(user).password(password).database(database).clientName(clientName).build(),
        DefaultJedisClientConfig.builder().connectionTimeoutMillis(sentinelConnectionTimeout)
            .socketTimeoutMillis(sentinelSoTimeout).user(sentinelUser).password(sentinelPassword)
            .clientName(sentinelClientName).build()
    );
  }
  
  ...
    public JedisSentinelPool(String masterName, Set<HostAndPort> sentinels,
      final GenericObjectPoolConfig<Jedis> poolConfig, final JedisClientConfig masterClientConfig,
      final JedisClientConfig sentinelClientConfig) {
    this(masterName, sentinels, poolConfig, new JedisFactory(masterClientConfig), sentinelClientConfig);
  }
  
    public JedisSentinelPool(String masterName, Set<HostAndPort> sentinels,
      final GenericObjectPoolConfig<Jedis> poolConfig, final JedisFactory factory,
      final JedisClientConfig sentinelClientConfig) {
    super(poolConfig, factory);

    this.factory = factory;
    this.sentinelClientConfig = sentinelClientConfig;

    HostAndPort master = initSentinels(sentinels, masterName);
    initMaster(master);
  }

这里执行了两个重要方法
initSentinelsinitMaster

1. initSentinels 负责初始化sentinel ,并获得master的地址
2. 有了master地址,就可以 initMaster 了
  private HostAndPort initSentinels(Set<HostAndPort> sentinels, final String masterName) {

    HostAndPort master = null;
    boolean sentinelAvailable = false;

    LOG.info("Trying to find master from available Sentinels...");

    for (HostAndPort sentinel : sentinels) {

      LOG.debug("Connecting to Sentinel {}", sentinel);
      //连接sentinel 节点
      try (Jedis jedis = new Jedis(sentinel, sentinelClientConfig)) {

        // 向sentinel发送命令  sentinel get-master-addr-by-name mymaster
        List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName);

        // connected to sentinel...
        sentinelAvailable = true;

        if (masterAddr == null || masterAddr.size() != 2) {
          LOG.warn("Can not get master addr, master name: {}. Sentinel: {}", masterName, sentinel);
          continue;
        }

        master = toHostAndPort(masterAddr);
        LOG.debug("Found Redis master at {}", master);
        break;
      } catch (JedisException e) {
        // resolves #1036, it should handle JedisException there's another chance
        // of raising JedisDataException
        LOG.warn(
          "Cannot get master address from sentinel running @ {}. Reason: {}. Trying next one.", sentinel, e);
      }
    }

    if (master == null) {
      if (sentinelAvailable) {
        // can connect to sentinel, but master name seems to not monitored
        throw new JedisException("Can connect to sentinel, but " + masterName
            + " seems to be not monitored...");
      } else {
        throw new JedisConnectionException("All sentinels down, cannot determine where is "
            + masterName + " master is running...");
      }
    }

    LOG.info("Redis master running at {}, starting Sentinel listeners...", master);

    for (HostAndPort sentinel : sentinels) {

      MasterListener masterListener = new MasterListener(masterName, sentinel.getHost(), sentinel.getPort());
      // whether MasterListener threads are alive or not, process can be stopped
      masterListener.setDaemon(true);
      masterListeners.add(masterListener);
      masterListener.start();
    }

    return master;
  }
这里最终要的一步就是jedis.sentinelGetMasterAddrByName(masterName);,即向sentinel发送命令

sentinel get-master-addr-by-name mymaster, 用来获取master节点的地址,并将地址返回

然后initMaster(master);
  private void initMaster(HostAndPort master) {
    synchronized (initPoolLock) {
      if (!master.equals(currentHostMaster)) {
        currentHostMaster = master;
        // 这里是容易忽略但非常关键的一步
        factory.setHostAndPort(currentHostMaster);
        // although we clear the pool, we still have to check the returned object in getResource,
        // this call only clears idle instances, not borrowed instances
        super.clear();

        LOG.info("Created JedisSentinelPool to master at {}", master);
      }
    }
  }
在这里对factory的连接地址进行了设置(在之前这里还是空值)

它是由 构造方法中的 new JedisFactory(masterClientConfig) 构造出来,在单元测试中我们得知masterClientConfig里面的属性都是空值

到这里 sentinelpool 就构造完毕了,其实这里还没有初始化出一个连接到master节点的实例,我们继续往后看

3. 单元测试中下一步 getResouce()

  @Override
  public Jedis getResource() {
    while (true) {
      // 关键一步
      Jedis jedis = super.getResource();
      // 这里没啥大用,容易误导
      jedis.setDataSource(this);

      // get a reference because it can change concurrently
      final HostAndPort master = currentHostMaster;
      final HostAndPort connection = jedis.getClient().getHostAndPort();

      if (master.equals(connection)) {
        // connected to the correct master
        return jedis;
      } else {
        returnBrokenResource(jedis);
      }
    }
  }
这里调用 super.getResource(), 父类是Pool, 而Pool 的对象一般是由Factory 构建出来
  public JedisSentinelPool(String masterName, Set<HostAndPort> sentinels,
      final GenericObjectPoolConfig<Jedis> poolConfig, final JedisClientConfig masterClientConfig,
      final JedisClientConfig sentinelClientConfig) {
    this(masterName, sentinels, poolConfig, new JedisFactory(masterClientConfig), sentinelClientConfig);
  }

  public JedisSentinelPool(String masterName, Set<HostAndPort> sentinels,
      final JedisFactory factory, final JedisClientConfig sentinelClientConfig) {
    super(factory);

    this.factory = factory;
    this.sentinelClientConfig = sentinelClientConfig;

    HostAndPort master = initSentinels(sentinels, masterName);
    initMaster(master);
  }

由此可知 factory 是 new JedisFactory(masterClientConfig), 并且由 父类子类都引用到,并且在 initMaster 方法中调用factory.setHostAndPort(currentHostMaster); 更新了master的地址。

而Pool extends GenericObjectPool , 这里GenericObjectPool 来自包 org.apache.commons.pool2
  public T getResource() {
    try {
      return super.borrowObject();
    } catch (JedisException je) {
      throw je;
    } catch (Exception e) {
      throw new JedisException("Could not get a resource from the pool", e);
    }
  }

这里borrowObject 时,实际是调用工厂的方法干活,直接看工厂类JedisFactory

  @Override
  public PooledObject<Jedis> makeObject() throws Exception {
    Jedis jedis = null;
    try {
      jedis = new Jedis(jedisSocketFactory, clientConfig);
      return new DefaultPooledObject<>(jedis);
    } catch (JedisException je) {
      logger.debug("Error while makeObject", je);
      throw je;
    }
  }
在这里会构建出Jedis对象 ,注意这里的jedisSocketFactory对象,实在构造方法中构造出
  protected JedisFactory(final URI uri, final int connectionTimeout, final int soTimeout,
      final int infiniteSoTimeout, final String clientName, final SSLSocketFactory sslSocketFactory,
      final SSLParameters sslParameters, final HostnameVerifier hostnameVerifier) {
    if (!JedisURIHelper.isValid(uri)) {
      throw new InvalidURIException(String.format(
          "Cannot open Redis connection due invalid URI. %s", uri.toString()));
    }
    this.clientConfig = DefaultJedisClientConfig.builder().connectionTimeoutMillis(connectionTimeout)
        .socketTimeoutMillis(soTimeout).blockingSocketTimeoutMillis(infiniteSoTimeout)
        .user(JedisURIHelper.getUser(uri)).password(JedisURIHelper.getPassword(uri))
        .database(JedisURIHelper.getDBIndex(uri)).clientName(clientName)
        .protocol(JedisURIHelper.getRedisProtocol(uri))
        .ssl(JedisURIHelper.isRedisSSLScheme(uri)).sslSocketFactory(sslSocketFactory)
        .sslParameters(sslParameters).hostnameVerifier(hostnameVerifier).build();
    this.jedisSocketFactory = new DefaultJedisSocketFactory(new HostAndPort(uri.getHost(), uri.getPort()), this.clientConfig);
  }

  void setHostAndPort(final HostAndPort hostAndPort) {
    if (!(jedisSocketFactory instanceof DefaultJedisSocketFactory)) {
      throw new IllegalStateException("setHostAndPort method has limited capability.");
    }
    ((DefaultJedisSocketFactory) jedisSocketFactory).updateHostAndPort(hostAndPort);
  }

4. 再看Jedis


  public Jedis(final JedisSocketFactory jedisSocketFactory, final JedisClientConfig clientConfig) {
    connection = new Connection(jedisSocketFactory, clientConfig);
    RedisProtocol proto = clientConfig.getRedisProtocol();
    if (proto != null) commandObjects.setProtocol(proto);
  }

这里直接构造出Connectrion 对象, 并传入socketFactory

  public Connection(final JedisSocketFactory socketFactory, JedisClientConfig clientConfig) {
    this.socketFactory = socketFactory;
    this.soTimeout = clientConfig.getSocketTimeoutMillis();
    this.infiniteSoTimeout = clientConfig.getBlockingSocketTimeoutMillis();
    initializeFromClientConfig(clientConfig);
  }

在这个构造方法中执行关键方法initializeFromClientConfig

private void initializeFromClientConfig(final JedisClientConfig config) {
    try {
      connect();
	......
      
  }

connect()

  public void connect() throws JedisConnectionException {
    if (!isConnected()) {
      try {
        socket = socketFactory.createSocket();
        soTimeout = socket.getSoTimeout(); //?

        outputStream = new RedisOutputStream(socket.getOutputStream());
        inputStream = new RedisInputStream(socket.getInputStream());

        broken = false; // unset broken status when connection is (re)initialized

      } catch (JedisConnectionException jce) {

        setBroken();
        throw jce;

      } catch (IOException ioe) {

        setBroken();
        throw new JedisConnectionException("Failed to create input/output stream", ioe);

      } finally {

        if (broken) {
          IOUtils.closeQuietly(socket);
        }
      }
    }
  }

最终通过 soeckFactory 构建出socket,完成对redis 的master节点的连接

标签:sentinels,redis,masterName,源码,Jedis,master,sentinel,new,final
From: https://www.cnblogs.com/gradyblog/p/17739750.html

相关文章

  • Redis数据结构
    本文大部分知识整理自网上,在正文结束后都会附上参考地址。如果想要深入或者详细学习可以通过文末链接跳转学习。前言本文主要介绍关于Redis的五种基本数据结构的底层实现原理,然后来分析我们常用的使用场景。先简单回顾一下知识点。Redis是一个开源(BSD许可)的,内存中的数据结......
  • token+redis的简单使用方式
    以用户登录为例,讲解token+redis的使用方式,环境是vue和springboot。一、用户登录时序图二、前端代码分析1、前端使用vuestore保存token2、在每次发起请求时进行响应拦截,从vuestore取出token,放在每次请求的请求头上三、后端代码分析1、在控制层接收账号,密码,调用服务层代......
  • Redis 常见面试题总结
    什么是Redis?Redis(RemoteDictionaryServer)是一个开源的使用ANSIC语言编写、遵守BSD协议、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API的非关系型数据库。传统数据库遵循ACID规则。而Nosql(非关系型数据库)一般为分布式而分布式一......
  • Redis实现分布式锁
    一、分布式锁参考资料:www.cnblogs.com/wangyingshu…很多场景中,需要使用分布式事务、分布式锁等技术来保证数据最终一致性。有的时候,我们需要保证某一方法同一时刻只能被一个线程执行。在单机(单进程)环境中,JAVA提供了很多并发相关API,但在多机(多进程)环境中就无能为力了。对于分......
  • springboot 与 Redis整合
    SpringBoot操作数据:Spring-datajpajdbcmongodbredis!SpringData也是和SpringBoot齐名的项目!说明:在SpringBoot2.X之后,原来使用的jedis被替换成了lettucejedis:采用的直连,多个线程操作的话,是不安全的,如果想要避免不安全的,使用jedispool连接池,更新BIO模式lettuce:采用ne......
  • Redis哨兵集群原理
    单节点Redis的并发能力是有上限的,要进一步提高Redis的并发能力,就需要搭建主从集群,实现读写分离主节点:可以对Redis实现读写操作从节点: 只可以对Redis实现读操作但是,当master节点宕机后,我们就不能写数据到Redis,所以需要搭建一个三节点形成的Sentinel集群,来监管之前的Redis主从集......
  • diskqueue的数据定义,运转核心ioloop()源码详解
     nsq中diskqueue是nsq消息持久化的核心,内容较多,一共分为多篇1.diskqueue是什么,为什么需要它,整体架构图,对外接口2.diskqueue的元数据文件,数据文件,启动入口,元数据文件读写及保存3.diskqueue的数据定义,运转核心ioloop()源码详解4. diskqueue怎么写入消息,怎么对外发送消息前面一篇......
  • Go每日一库之161:grm(Redis Web管理工具)
    GRM是基于go+vue的web版redis管理工具,部署简单便捷,支持SSH连接,用户校验,操作日志、命令行模式、LUA脚本执行等功能。介绍基于go+vue的web版redis管理工具【Webredismanagementtoolbasedongolangandvue】功能清单管理连接(直连和SSH)、切换DB支持string/lis......
  • lapce源码学习-编译调试
    master分支调试1、报错:`#![feature]`maynotbeusedonthestablereleasechannel2、Channel切换到nightly,报错:thetraitbound`file_type::FileType:std::sealed::Sealed`isnotsatisfied3、Channel切换到beta,编译ok,但提示不能调试rustupinstallbeta4、编译成功后,......
  • redis key 被访问后不会自动延长过期时间
    Redis的过期策略按照两个维度工作:被动过期和主动过期。被动过期:只有当有客户端尝试访问一个已经过期的key时,Redis才会删除该内容。主动过期:为了防止过期的key未被立即清理,造成内存浪费,Redis会周期性地随机检查一些key是否已经过期,如果过期,则予以删除。Redis的过期时间是静态的,......