首页 > 数据库 >redisson-2.10.4源代码分析

redisson-2.10.4源代码分析

时间:2022-11-25 22:37:23浏览次数:80  
标签:redisson return details entry 源代码 config 连接 连接池 2.10


    

redis 学习问题总结

​http://aperise.iteye.com/blog/2310639​

ehcache memcached redis 缓存技术总结

​http://aperise.iteye.com/blog/2296219​

redis-stat 离线安装

​http://aperise.iteye.com/blog/2310254​

redis  cluster 非ruby方式启动

​http://aperise.iteye.com/blog/2310254​

redis-sentinel安装部署

​http://aperise.iteye.com/blog/2342693​

spring-data-redis使用

​ http://aperise.iteye.com/blog/2342615 ​

redis客户端redisson实战

​http://aperise.iteye.com/blog/2396196​

redisson-2.10.4源代码分析

​http://aperise.iteye.com/blog/2400528​

tcmalloc jemalloc libc选择

 

1.RedissonClient一主两从部署时连接池组成

主从部署(1主2从): 

redisson-2.10.4源代码分析_java

       redisson纯java操作代码如下: 

redisson-2.10.4源代码分析_java_02

1. Config config = new Config();// 创建配置
2. // 指定使用主从部署方式
3. //.setReadMode(ReadMode.SLAVE) 默认值SLAVE,读操作只在从节点进行
4. //.setSubscriptionMode(SubscriptionMode.SLAVE) 默认值SLAVE,订阅操作只在从节点进行
5. //.setMasterConnectionMinimumIdleSize(10) 默认值10,针对每个master节点初始化10个连接
6. //.setMasterConnectionPoolSize(64) 默认值64,针对每个master节点初始化10个连接,最大可以扩展至64个连接
7. //.setSlaveConnectionMinimumIdleSize(10) 默认值10,针对每个slave节点初始化10个连接
8. //.setSlaveConnectionPoolSize(64) 默认值,针对每个slave节点初始化10个连接,最大可以扩展至64个连接
9. //.setSubscriptionConnectionMinimumIdleSize(1) 默认值1,在SubscriptionMode=SLAVE时候,针对每个slave节点初始化1个连接
10. //.setSubscriptionConnectionPoolSize(50) 默认值50,在SubscriptionMode=SLAVE时候,针对每个slave节点初始化1个连接,最大可以扩展至50个连接
11. "redis://192.168.29.24:6379") // 设置redis主节点
12. "redis://192.168.29.24:7000") // 设置redis从节点
13. "redis://192.168.29.24:7001"); // 设置redis从节点
14. RedissonClient redisson = Redisson.create(config);// 创建客户端(发现这一操作非常耗时,基本在2秒-4秒左右)

       上面代码执行完毕后,如果在redis服务端所在服务器执行以下linux命令:

1. #6379上建立了10个连接  
2. netstat -ant |grep 6379|grep ESTABLISHED
3. #7000上建立了11个连接
4. netstat -ant |grep 7000|grep ESTABLISHED
5. #7001上建立了11个连接
6. netstat -ant |grep 7001|grep ESTABLISHED

       你会发现redisson连接到redis服务端总计建立了32个连接,其中masterpool占据10个连接,slavepool占据20个连接,另外pubSubConnectionPool占据2个连接,连接池中池化对象分布如下图:

redisson-2.10.4源代码分析_数据库_03

  • MasterConnectionPool:默认针对每个不同的IP+port组合,初始化10个对象,最大可扩展至64个,因为只有一个master,所以上图创建了10个连接;
  • MasterPubSubConnectionPool:默认针对每个不同的IP+port组合,初始化1个对象,最大可扩展至50个,因为默认SubscriptionMode=SubscriptionMode.SLAVE,所以master上不会创建连接池所以上图MasterPubSubConnectionPool里没有创建任何连接;
  • SlaveConnectionPool:默认针对每个不同的IP+port组合,初始化10个对象,最大可扩展至64个,因为有两个slave,每个slave上图创建了10个连接,总计创建了20个连接;
  • PubSubConnectionPool:默认针对每个不同的IP+port组合,初始化1个对象,最大可扩展至50个,因为有两个slave,每个slave上图创建了1个连接,总计创建了2个连接。

哪里初始化的?如何初始化的?读操作和写操作如何进行的?这就是今天要解答的问题,要解答这些问题最好还是查看redisson的源码。

 

2.Redisson初始化连接池源码分析

    2.1 Redisson.java 

       RedissonClient.java是一个接口类,它的实现类是Redisson.java,对于Redisson.java的介绍先以一张Redisson的4大组件关系图开始,如下图:

redisson-2.10.4源代码分析_初始化_04


       对Redisson.java的代码注释如下:

1. /**  
2. * 根据配置Config创建redisson操作类RedissonClient
3. * @param config for Redisson
4. * @return Redisson instance
5. */
6. public static
7. //调用构造方法
8. new
9. if
10. redisson.enableRedissonReferenceSupport();
11. }
12. return
13. }
14.
15. /**
16. * Redisson构造方法
17. * @param config for Redisson
18. * @return Redisson instance
19. */
20. protected
21. //赋值变量config
22. this.config = config;
23. //产生一份对于传入config的备份
24. new
25.
26. //根据配置config的类型(主从模式、单机模式、哨兵模式、集群模式、亚马逊云模式、微软云模式)而进行不同的初始化
27. connectionManager = ConfigSupport.createConnectionManager(configCopy);
28. //连接池对象回收调度器
29. new
30. //Redisson的对象编码类
31. codecProvider = configCopy.getCodecProvider();
32. //Redisson的ResolverProvider,默认为org.redisson.liveobject.provider.DefaultResolverProvider
33. resolverProvider = configCopy.getResolverProvider();
34. }

       其中与连接池相关的就是ConnectionManager,ConnectionManager的初始化转交工具类ConfigSupport.java进行,ConfigSupport.java会根据部署方式(主从模式、单机模式、哨兵模式、集群模式、亚马逊云模式、微软云模式)的不同而分别进行。

 

    2.2 ConfigSupport.java

       这里现将ConfigSupport.java创建ConnectionManager的核心代码注释如下: 

1. /**  
2. * 据配置config的类型(主从模式、单机模式、哨兵模式、集群模式、亚马逊云模式、微软云模式)而进行不同的初始化
3. * @param configCopy for Redisson
4. * @return ConnectionManager instance
5. */
6. public static
7. if (configCopy.getMasterSlaveServersConfig() != null) {//配置configCopy类型为主从模式
8. validate(configCopy.getMasterSlaveServersConfig());
9. return new
10. else if (configCopy.getSingleServerConfig() != null) {//配置configCopy类型为单机模式
11. validate(configCopy.getSingleServerConfig());
12. return new
13. else if (configCopy.getSentinelServersConfig() != null) {//配置configCopy类型为哨兵模式
14. validate(configCopy.getSentinelServersConfig());
15. return new
16. else if (configCopy.getClusterServersConfig() != null) {//配置configCopy类型为集群模式
17. validate(configCopy.getClusterServersConfig());
18. return new
19. else if (configCopy.getElasticacheServersConfig() != null) {//配置configCopy类型为亚马逊云模式
20. validate(configCopy.getElasticacheServersConfig());
21. return new
22. else if (configCopy.getReplicatedServersConfig() != null) {//配置configCopy类型为微软云模式
23. validate(configCopy.getReplicatedServersConfig());
24. return new
25. else if (configCopy.getConnectionManager() != null) {//直接返回configCopy自带的默认ConnectionManager
26. return
27. else
28. throw new IllegalArgumentException("server(s) address(es) not defined!");
29. }
30. }


       上面可以看到根据传入的配置Config.java的不同,会分别创建不同的ConnectionManager的实现类。

 

    2.3 MasterSlaveConnectionManager.java

主从模式、单机模式、哨兵模式、集群模式、亚马逊云模式、微软云模式),如下如所示:

redisson-2.10.4源代码分析_netty_05

       这里以主从部署方式进行讲解,先通过一张图了解MasterSlaveConnectionManager的组成:

redisson-2.10.4源代码分析_初始化_06


       上图中最终要的组件要数MasterSlaveEntry,在后面即将进行介绍,这里注释MasterSlaveConnectionManager.java的核心代码如下:  

redisson-2.10.4源代码分析_java_02


    1. /**   
    2. * MasterSlaveConnectionManager的构造方法
    3. * @param cfg for MasterSlaveServersConfig
    4. * @param config for Config
    5. */
    6. public
    7. //调用构造方法
    8. this(config);
    9. //
    10. initTimer(cfg);
    11. this.config = cfg;
    12. //初始化MasterSlaveEntry
    13. initSingleEntry();
    14. }
    15. /**
    16. * MasterSlaveConnectionManager的构造方法
    17. * @param cfg for Config
    18. */
    19. public
    20. //读取redisson的jar中的文件META-INF/MANIFEST.MF,打印出Bundle-Version对应的Redisson版本信息
    21. Version.logVersion();
    22. //EPOLL是linux的多路复用IO模型的增强版本,这里如果启用EPOLL,就让redisson底层netty使用EPOLL的方式,否则配置netty里的NIO非阻塞方式
    23. if
    24. if (cfg.getEventLoopGroup() == null) {
    25. //使用linux IO非阻塞模型EPOLL
    26. this.group = new EpollEventLoopGroup(cfg.getNettyThreads(), new DefaultThreadFactory("redisson-netty"));
    27. else
    28. this.group = cfg.getEventLoopGroup();
    29. }
    30. this.socketChannelClass = EpollSocketChannel.class;
    31. else
    32. if (cfg.getEventLoopGroup() == null) {
    33. //使用linux IO非阻塞模型NIO
    34. this.group = new NioEventLoopGroup(cfg.getNettyThreads(), new DefaultThreadFactory("redisson-netty"));
    35. else
    36. this.group = cfg.getEventLoopGroup();
    37. }
    38. this.socketChannelClass = NioSocketChannel.class;
    39. }
    40. if (cfg.getExecutor() == null) {
    41. //线程池大小,对于2U 2CPU 8cores/cpu,意思是有2块板子,每个板子上8个物理CPU,那么总计物理CPU个数为16
    42. //对于linux有个超线程概念,意思是每个物理CPU可以虚拟出2个逻辑CPU,那么总计逻辑CPU个数为32
    43. //这里Runtime.getRuntime().availableProcessors()取的是逻辑CPU的个数,所以这里线程池大小会是64
    44. int threads = Runtime.getRuntime().availableProcessors() * 2;
    45. if (cfg.getThreads() != 0) {
    46. threads = cfg.getThreads();
    47. }
    48. new DefaultThreadFactory("redisson"));
    49. else
    50. executor = cfg.getExecutor();
    51. }
    52.
    53. this.cfg = cfg;
    54. this.codec = cfg.getCodec();
    55. //一个可以获取异步执行任务返回值的回调对象,本质是对于java的Future的实现,监控MasterSlaveConnectionManager的shutdown进行一些必要的处理
    56. this.shutdownPromise = newPromise();
    57. //一个持有MasterSlaveConnectionManager的异步执行服务
    58. this.commandExecutor = new CommandSyncService(this);
    59. }
    60. /**
    61. * 初始化定时调度器
    62. * @param config for MasterSlaveServersConfig
    63. */
    64. protected void
    65. //读取超时时间配置信息
    66. int[] timeouts = new int[]{config.getRetryInterval(), config.getTimeout(), config.getReconnectionTimeout()};
    67. Arrays.sort(timeouts);
    68. int minTimeout = timeouts[0];
    69. //设置默认超时时间
    70. if (minTimeout % 100 != 0) {
    71. 100) / 2;
    72. else if (minTimeout == 100) {
    73. 50;
    74. else
    75. 100;
    76. }
    77. //创建定时调度器
    78. new HashedWheelTimer(Executors.defaultThreadFactory(), minTimeout, TimeUnit.MILLISECONDS, 1024);
    79.
    80. // to avoid assertion error during timer.stop invocation
    81. try
    82. class.getDeclaredField("leak");
    83. true);
    84. null);
    85. catch
    86. throw new
    87. }
    88. //检测MasterSlaveConnectionManager的空闲连接的监视器IdleConnectionWatcher,会清理不用的空闲的池中连接对象
    89. new IdleConnectionWatcher(this, config);
    90. }
    91.
    92. /**
    93. * 创建MasterSlaveConnectionManager的MasterSlaveEntry
    94. */
    95. protected void
    96. try
    97. //主从模式下0~16383加入到集合slots
    98. new
    99. slots.add(singleSlotRange);
    100.
    101. MasterSlaveEntry entry;
    102. if (config.checkSkipSlavesInit()) {//ReadMode不为MASTER并且SubscriptionMode不为MASTER才执行
    103. new SingleEntry(slots, this, config);
    104. RFuture<Void> f = entry.setupMasterEntry(config.getMasterAddress());
    105. f.syncUninterruptibly();
    106. else {//默认主从部署ReadMode=SLAVE,SubscriptionMode=SLAVE,这里会执行
    107. entry = createMasterSlaveEntry(config, slots);
    108. }
    109. //将每个分片0~16383都指向创建的MasterSlaveEntry
    110. for (int slot = singleSlotRange.getStartSlot(); slot < singleSlotRange.getEndSlot() + 1; slot++) {
    111. addEntry(slot, entry);
    112. }
    113. //DNS相关
    114. if (config.getDnsMonitoringInterval() != -1) {
    115. new DNSMonitor(this, Collections.singleton(config.getMasterAddress()),
    116. config.getSlaveAddresses(), config.getDnsMonitoringInterval());
    117. dnsMonitor.start();
    118. }
    119. catch
    120. stopThreads();
    121. throw
    122. }
    123. }
    124. /**
    125. * MasterSlaveEntry的构造方法
    126. * @param config for MasterSlaveServersConfig
    127. * @param slots for HashSet<ClusterSlotRange>
    128. * @return MasterSlaveEntry
    129. */
    130. protected
    131. //创建MasterSlaveEntry
    132. new MasterSlaveEntry(slots, this, config);
    133. //从节点连接池SlaveConnectionPool和PubSubConnectionPool的默认的最小连接数初始化
    134. List<RFuture<Void>> fs = entry.initSlaveBalancer(java.util.Collections.<URI>emptySet());
    135. for
    136. future.syncUninterruptibly();
    137. }
    138. 主节点连接池MasterConnectionPool和MasterPubSubConnectionPool的默认的最小连接数初始化
    139. RFuture<Void> f = entry.setupMasterEntry(config.getMasterAddress());
    140. f.syncUninterruptibly();
    141. return
    142. }


           上面个人觉得有两处代码值得我们特别关注,特别说明如下:

    • entry.initSlaveBalancer:从节点连接池SlaveConnectionPoolPubSubConnectionPool的默认的最小连接数初始化。
    • entry.setupMasterEntry:主节点连接池MasterConnectionPoolMasterPubSubConnectionPool的默认的最小连接数初始化。

     

        2.4 MasterSlaveEntry.java

           用一张图来解释MasterSlaveEntry的组件如下:

    redisson-2.10.4源代码分析_数据库_08


           MasterSlaveEntry.java里正是我们一直在寻找着的四个连接池MasterConnectionPool、MasterPubSubConnectionPool、SlaveConnectionPool和PubSubConnectionPool,这里注释MasterSlaveEntry.java的核心代码如下:

      1. /**   
      2. * MasterSlaveEntry的构造方法
      3. * @param slotRanges for Set<ClusterSlotRange>
      4. * @param connectionManager for ConnectionManager
      5. * @param config for MasterSlaveServersConfig
      6. */
      7. public
      8. //主从模式下0~16383加入到集合slots
      9. for
      10. for (int i = clusterSlotRange.getStartSlot(); i < clusterSlotRange.getEndSlot() + 1; i++) {
      11. slots.add(i);
      12. }
      13. }
      14. //赋值MasterSlaveConnectionManager给connectionManager
      15. this.connectionManager = connectionManager;
      16. //赋值config
      17. this.config = config;
      18.
      19. //创建LoadBalancerManager
      20. //其实LoadBalancerManager里持有者从节点的SlaveConnectionPool和PubSubConnectionPool
      21. //并且此时连接池里还没有初始化默认的最小连接数
      22. new LoadBalancerManager(config, connectionManager, this);
      23. //创建主节点连接池MasterConnectionPool,此时连接池里还没有初始化默认的最小连接数
      24. new MasterConnectionPool(config, connectionManager, this);
      25. //创建主节点连接池MasterPubSubConnectionPool,此时连接池里还没有初始化默认的最小连接数
      26. new MasterPubSubConnectionPool(config, connectionManager, this);
      27. }
      28.
      29. /**
      30. * 从节点连接池SlaveConnectionPool和PubSubConnectionPool的默认的最小连接数初始化
      31. * @param disconnectedNodes for Collection<URI>
      32. * @return List<RFuture<Void>>
      33. */
      34. public
      35. //这里freezeMasterAsSlave=true
      36. boolean
      37.
      38. new
      39. //把主节点当作从节点处理,因为默认ReadMode=ReadMode.SLAVE,所以这里不会添加针对该节点的连接池
      40. RFuture<Void> f = addSlave(config.getMasterAddress(), freezeMasterAsSlave, NodeType.MASTER);
      41. result.add(f);
      42. //读取从节点的地址信息,然后针对每个从节点地址创建SlaveConnectionPool和PubSubConnectionPool
      43. //SlaveConnectionPool【初始化10个RedisConnection,最大可以扩展至64个】
      44. //PubSubConnectionPool【初始化1个RedisPubSubConnection,最大可以扩展至50个】
      45. for
      46. f = addSlave(address, disconnectedNodes.contains(address), NodeType.SLAVE);
      47. result.add(f);
      48. }
      49. return
      50. }
      51.
      52. /**
      53. * 从节点连接池SlaveConnectionPool和PubSubConnectionPool的默认的最小连接数初始化
      54. * @param address for URI
      55. * @param freezed for boolean
      56. * @param nodeType for NodeType
      57. * @return RFuture<Void>
      58. */
      59. private RFuture<Void> addSlave(URI address, boolean
      60. //创建到从节点的连接RedisClient
      61. RedisClient client = connectionManager.createClient(NodeType.SLAVE, address);
      62. new
      63. this.config.getSlaveConnectionMinimumIdleSize(),
      64. this.config.getSlaveConnectionPoolSize(),
      65. this.config.getSubscriptionConnectionMinimumIdleSize(),
      66. this.config.getSubscriptionConnectionPoolSize(), connectionManager, nodeType);
      67. //默认只有主节点当作从节点是会设置freezed=true
      68. if
      69. synchronized
      70. entry.setFreezed(freezed);
      71. entry.setFreezeReason(FreezeReason.SYSTEM);
      72. }
      73. }
      74. //调用slaveBalancer来对从节点连接池SlaveConnectionPool和PubSubConnectionPool的默认的最小连接数初始化
      75. return
      76. }
      77.
      78. /**
      79. * 主节点连接池MasterConnectionPool和MasterPubSubConnectionPool的默认的最小连接数初始化
      80. * @param address for URI
      81. * @return RFuture<Void>
      82. */
      83. public
      84. //创建到主节点的连接RedisClient
      85. RedisClient client = connectionManager.createClient(NodeType.MASTER, address);
      86. new
      87. client,
      88. config.getMasterConnectionMinimumIdleSize(),
      89. config.getMasterConnectionPoolSize(),
      90. config.getSubscriptionConnectionMinimumIdleSize(),
      91. config.getSubscriptionConnectionPoolSize(),
      92. connectionManager,
      93. NodeType.MASTER);
      94. //如果配置的SubscriptionMode=SubscriptionMode.MASTER就初始化MasterPubSubConnectionPool
      95. //默认SubscriptionMode=SubscriptionMode.SLAVE,MasterPubSubConnectionPool这里不会初始化最小连接数
      96. if
      97. //MasterPubSubConnectionPool【初始化1个RedisPubSubConnection,最大可以扩展至50个】
      98. RFuture<Void> f = writeConnectionHolder.add(masterEntry);
      99. RFuture<Void> s = pubSubConnectionHolder.add(masterEntry);
      100. return
      101. }
      102. //调用MasterConnectionPool使得连接池MasterConnectionPool里的对象最小个数为10个
      103. //MasterConnectionPool【初始化10个RedisConnection,最大可以扩展至64个】
      104. return
      105. }


      • writeConnectionHolder.add(masterEntry):其实writeConnectionHolder的类型就是MasterConnectionPool,这里是连接池MasterConnectionPool里添加对象 
      • pubSubConnectionHolder.add(masterEntry):其实pubSubConnectionHolder的类型是MasterPubSubConnectionPool,这里是连接池MasterPubSubConnectionPool添加对象
      • slaveConnectionPool.add(entry):这里是连接池SlaveConnectionPool里添加对象
      • pubSubConnectionPool.add(entry):这里是连接池PubSubConnectionPool里添加对象

       

       

          2.5 LoadBalancerManager.java

             图解LoadBalancerManager.java的内部组成如下:

      redisson-2.10.4源代码分析_netty_09


             LoadBalancerManager.java里面有着从节点相关的两个重要的连接池SlaveConnectionPoolPubSubConnectionPool,这里注释LoadBalancerManager.java的核心代码如下:

        1. /**   
        2. * LoadBalancerManager的构造方法
        3. * @param config for MasterSlaveServersConfig
        4. * @param connectionManager for ConnectionManager
        5. * @param entry for MasterSlaveEntry
        6. */
        7. public
        8. //赋值connectionManager
        9. this.connectionManager = connectionManager;
        10. //创建连接池SlaveConnectionPool
        11. new
        12. //创建连接池PubSubConnectionPool
        13. new
        14. }
        15. /**
        16. * LoadBalancerManager的连接池SlaveConnectionPool和PubSubConnectionPool里池化对象添加方法,也即池中需要对象时,调用此方法添加
        17. * @param entry for ClientConnectionsEntry
        18. * @return RFuture<Void>
        19. */
        20. public RFuture<Void> add(final
        21. final
        22. //创建一个回调监听器,在池中对象创建失败时进行2次莫仍尝试
        23. new
        24. new AtomicInteger(2);
        25. @Override
        26. public void operationComplete(Future<Void> future) throws
        27. if
        28. result.tryFailure(future.cause());
        29. return;
        30. }
        31. if (counter.decrementAndGet() == 0) {
        32. String addr = entry.getClient().getIpAddr();
        33. ip2Entry.put(addr, entry);
        34. null);
        35. }
        36. }
        37. };
        38. //调用slaveConnectionPool添加RedisConnection对象到池中
        39. RFuture<Void> slaveFuture = slaveConnectionPool.add(entry);
        40. slaveFuture.addListener(listener);
        41. //调用pubSubConnectionPool添加RedisPubSubConnection对象到池中
        42. RFuture<Void> pubSubFuture = pubSubConnectionPool.add(entry);
        43. pubSubFuture.addListener(listener);
        44. return
        45. }


        我们已经了解了开篇提到的四个连接池是在哪里创建的。

         

        3. Redisson的4类连接池

        MasterConnectionPool、MasterPubSubConnectionPool、SlaveConnectionPool和PubSubConnectionPool,它们的父类都是ConnectionPool,其类继承关系图如下:  

        redisson-2.10.4源代码分析_初始化_10


               通过上图我们了解了ConnectionPool类的继承关系图,再来一张图来了解下ConnectionPool.java类的组成,如下:

        redisson-2.10.4源代码分析_java_11


               好了,再来图就有点啰嗦了,注释ConnectionPool.java代码如下:

        1. abstract class ConnectionPool<T extends
        2. private final
        3. //维持着连接池对应的redis节点信息
        4. //比如1主2从部署MasterConnectionPool里的entries只有一个主节点(192.168.29.24 6379)
        5. //比如1主2从部署MasterPubSubConnectionPool里的entries为空,因为SubscriptionMode=SubscriptionMode.SLAVE
        6. //比如1主2从部署SlaveConnectionPool里的entries有3个节点(192.168.29.24 6379,192.168.29.24 7000,192.168.29.24 7001,但是注意192.168.29.24 6379冻结属性freezed=true不会参与读操作除非2个从节点全部宕机才参与读操作)
        7. //比如1主2从部署PubSubConnectionPool里的entries有2个节点(192.168.29.24 7000,192.168.29.24 7001),因为SubscriptionMode=SubscriptionMode.SLAVE,主节点不会加入
        8. protected final List<ClientConnectionsEntry> entries = new
        9. //持有者RedissonClient的组件ConnectionManager
        10. final
        11. //持有者RedissonClient的组件ConnectionManager里的MasterSlaveServersConfig
        12. final
        13. //持有者RedissonClient的组件ConnectionManager里的MasterSlaveEntry
        14. final
        15.
        16. //构造函数
        17. public
        18. this.config = config;
        19. this.masterSlaveEntry = masterSlaveEntry;
        20. this.connectionManager = connectionManager;
        21. }
        22.
        23. //连接池中需要增加对象时候调用此方法
        24. public RFuture<Void> add(final
        25. final
        26. new
        27. @Override
        28. public void operationComplete(Future<Void> future) throws
        29. entries.add(entry);
        30. }
        31. });
        32. true);
        33. return
        34. }
        35.
        36. //初始化连接池中最小连接数
        37. private void initConnections(final ClientConnectionsEntry entry, final RPromise<Void> initPromise, boolean
        38. final int
        39.
        40. if (minimumIdleSize == 0
        41. null);
        42. return;
        43. }
        44.
        45. final AtomicInteger initializedConnections = new
        46. int startAmount = Math.min(50, minimumIdleSize);
        47. final AtomicInteger requests = new
        48. for (int i = 0; i < startAmount; i++) {
        49. createConnection(checkFreezed, requests, entry, initPromise, minimumIdleSize, initializedConnections);
        50. }
        51. }
        52.
        53. //创建连接对象到连接池中
        54. private void createConnection(final boolean checkFreezed, final AtomicInteger requests, final ClientConnectionsEntry entry, final
        55. final int minimumIdleSize, final
        56.
        57. if
        58. int
        59. new
        60. "Unable to init enough connections amount! Only " + totalInitializedConnections + " from " + minimumIdleSize + " were initialized. Server: "
        61. + entry.getClient().getAddr());
        62. initPromise.tryFailure(cause);
        63. return;
        64. }
        65.
        66. new
        67.
        68. @Override
        69. public void
        70. RPromise<T> promise = connectionManager.newPromise();
        71. createConnection(entry, promise);
        72. new
        73. @Override
        74. public void operationComplete(Future<T> future) throws
        75. if
        76. T conn = future.getNow();
        77.
        78. releaseConnection(entry, conn);
        79. }
        80.
        81. releaseConnection(entry);
        82.
        83. if
        84. int
        85. String errorMsg;
        86. if (totalInitializedConnections == 0) {
        87. "Unable to connect to Redis server: "
        88. else
        89. "Unable to init enough connections amount! Only "
        90. " from " + minimumIdleSize + " were initialized. Redis server: "
        91. }
        92. new
        93. initPromise.tryFailure(cause);
        94. return;
        95. }
        96.
        97. int
        98. if (value == 0) {
        99. "{} connections initialized for {}", minimumIdleSize, entry.getClient().getAddr());
        100. if (!initPromise.trySuccess(null)) {
        101. throw new
        102. }
        103. else if (value > 0
        104. if
        105. createConnection(checkFreezed, requests, entry, initPromise, minimumIdleSize, initializedConnections);
        106. }
        107. }
        108. }
        109. });
        110. }
        111. });
        112.
        113. }
        114.
        115. //连接池中租借出连接对象
        116. public
        117. for (int j = entries.size() - 1; j >= 0; j--) {
        118. final
        119. if
        120. && tryAcquireConnection(entry)) {
        121. return
        122. }
        123. }
        124.
        125. new
        126. new
        127. for
        128. if
        129. freezed.add(entry.getClient().getAddr());
        130. else
        131. failedAttempts.add(entry.getClient().getAddr());
        132. }
        133. }
        134.
        135. new StringBuilder(getClass().getSimpleName() + " no available Redis entries. ");
        136. if
        137. " Disconnected hosts: "
        138. }
        139. if
        140. " Hosts disconnected due to `failedAttempts` limit reached: "
        141. }
        142.
        143. new
        144. return
        145. }
        146.
        147. //连接池中租借出连接对象执行操作RedisCommand
        148. public
        149. if
        150. tryAcquireConnection(entry)) {
        151. return
        152. }
        153.
        154. new
        155. "Can't aquire connection to "
        156. return
        157. }
        158.
        159. //通过向redis服务端发送PING看是否返回PONG来检测连接
        160. private void ping(RedisConnection c, final
        161. RFuture<String> f = c.async(RedisCommands.PING);
        162. f.addListener(pingListener);
        163. }
        164.
        165. //归还连接对象到连接池
        166. public void
        167. if
        168. connection.closeAsync();
        169. else
        170. releaseConnection(entry, connection);
        171. }
        172. releaseConnection(entry);
        173. }
        174.
        175. //释放连接池中连接对象
        176. protected void
        177. entry.releaseConnection();
        178. }
        179.
        180. //释放连接池中连接对象
        181. protected void
        182. entry.releaseConnection(conn);
        183. }
        184. }

               用一张图来解释ConnectionPool干了些啥,如下图:

        redisson-2.10.4源代码分析_数据库_12


               都到这里了,不介意再送一张图了解各种部署方式下的连接池分布了,如下图:

        redisson-2.10.4源代码分析_redis_13


        4.Redisson的读写操作句柄类RedissonObject

        操作句柄类RedissonObject,RedissonObject根据不同的数据类型有不同的RedissonObject实现类,RedissonObject的类继承关系图如下:

        redisson-2.10.4源代码分析_初始化_14


               例如想设置redis服务端的key=key的值value=123,你需要查询​​Redis命令和Redisson对象匹配列表​​,找到如下对应关系:

        redisson-2.10.4源代码分析_redis_15


               然后我们就知道调用代码这么写:

        redisson-2.10.4源代码分析_java_02

        1. Config config = new Config();// 创建配置  
        2. // 指定使用主从部署方式
        3. "redis://192.168.29.24:6379") // 设置redis主节点
        4. "redis://192.168.29.24:7000") // 设置redis从节点
        5. "redis://192.168.29.24:7001"); // 设置redis从节点
        6. RedissonClient redisson = Redisson.create(config);// 创建客户端(发现这一操作非常耗时,基本在2秒-4秒左右)
        7.
        8. //任何Redisson操作首先需要获取对应的操作句柄
        9. //RBucket是操作句柄之一,实现类是RedissonBucket
        10. RBucket<String> rBucket = redissonClient.getBucket("key");
        11.
        12. //通过操作句柄rBucket进行读操作
        13. rBucket.get();
        14.
        15. //通过操作句柄rBucket进行写操作
        16. rBucket.set("123");


               至于其它的redis命令对应的redisson操作对象,都可以官网的​​Redis命令和Redisson对象匹配列表​​ 查到。

        6.Redisson的读写操作源码分析

               从一个读操作的代码作为入口分析代码,如下:


        redisson-2.10.4源代码分析_java_02


        1. //任何Redisson操作首先需要获取对应的操作句柄,RBucket是操作句柄之一,实现类是RedissonBucket  
        2. RBucket<String> rBucket = redissonClient.getBucket("key");    
        3.     
        4. //通过操作句柄rBucket进行读操作  
        5. rBucket.get();    

               继续追踪上面RBucket的get方法,如下:

        redisson-2.10.4源代码分析_数据库_18


               上面我们看到不管是读操作还是写操作都转交 CommandAsyncExecutor进行处理,那么这里我们需要看一下 CommandAsyncExecutor.java里关于读写操作处理的核心代码,注释代码如下:

        redisson-2.10.4源代码分析_java_02


        1. private
        2. //通过公式CRC16.crc16(key.getBytes()) % MAX_SLOT
        3. //计算出一个字符串key对应的分片在0~16383中哪个分片
        4. int
        5. //之前已经将0~16383每个分片对应到唯一的一个MasterSlaveEntry,这里取出来
        6. MasterSlaveEntry entry = connectionManager.getEntry(slot);
        7. //这里将MasterSlaveEntry包装成NodeSource【slot=null,addr=null,redirect=null,entry=MasterSlaveEntry】
        8. return new
        9. }
        10. @Override
        11. public
        12. RPromise<R> mainPromise = connectionManager.newPromise();
        13. //获取NodeSource【slot=null,addr=null,redirect=null,entry=MasterSlaveEntry】
        14. NodeSource source = getNodeSource(key);
        15. 调用异步执行方法async
        16. true, source, codec, command, params, mainPromise, 0);
        17. return
        18. }
        19. protected <V, R> void async(final boolean readOnlyMode, final NodeSource source, final
        20. final RedisCommand<V> command, final Object[] params, final RPromise<R> mainPromise, final int
        21. //操作被取消,那么直接返回
        22. if
        23. free(params);
        24. return;
        25. }
        26. //连接管理器无法连接,释放参数所占资源,然后返回
        27. if
        28. free(params);
        29. new RedissonShutdownException("Redisson is shutdown"));
        30. return;
        31. }
        32.
        33. final
        34. if
        35. try
        36. for (int i = 0; i < params.length; i++) {
        37. RedissonReference reference = RedissonObjectFactory.toReference(getConnectionManager().getCfg(), params[i]);
        38. if (reference != null) {
        39. params[i] = reference;
        40. }
        41. }
        42. catch
        43. connectionManager.getShutdownLatch().release();
        44. free(params);
        45. mainPromise.tryFailure(e);
        46. return;
        47. }
        48. }
        49.
        50. //开始从connectionManager获取池中的连接
        51. //这里采用异步方式,创建一个RFuture对象,等待池中连接,一旦获得连接,然后进行读和写操作
        52. final
        53. if (readOnlyMode) {//对于读操作默认readOnlyMode=true,这里会执行
        54. connectionFuture = connectionManager.connectionReadOp(source, command);
        55. else {//对于写操作默认readOnlyMode=false,这里会执行
        56. connectionFuture = connectionManager.connectionWriteOp(source, command);
        57. }
        58.
        59. //创建RPromise,用于操作失败时候重试
        60. final
        61. details.init(connectionFuture, attemptPromise, readOnlyMode, source, codec, command, params, mainPromise, attempt);
        62. //创建FutureListener,监测外部请求是否已经取消了之前提交的读写操作,如果取消了,那么就让正在执行的读写操作停止
        63. new
        64. @Override
        65. public void operationComplete(Future<R> future) throws
        66. if (future.isCancelled() && connectionFuture.cancel(false)) {
        67. "Connection obtaining canceled for {}", command);
        68. details.getTimeout().cancel();
        69. if (details.getAttemptPromise().cancel(false)) {
        70. free(params);
        71. }
        72. }
        73. }
        74. };
        75.
        76. //创建TimerTask,用于操作失败后通过定时器进行操作重试
        77. final TimerTask retryTimerTask = new
        78. @Override
        79. public void run(Timeout t) throws
        80. if
        81. return;
        82. }
        83. if (details.getConnectionFuture().cancel(false)) {
        84. connectionManager.getShutdownLatch().release();
        85. else
        86. if
        87. if (details.getWriteFuture() == null
        88. if
        89. if (details.getWriteFuture().cancel(false)) {
        90. if (details.getException() == null) {
        91. new RedisTimeoutException("Unable to send command: " + command + " with params: " + LogHelper.toString(details.getParams()) + " after " + connectionManager.getConfig().getRetryAttempts() + " retry attempts"));
        92. }
        93. details.getAttemptPromise().tryFailure(details.getException());
        94. }
        95. return;
        96. }
        97. details.incAttempt();
        98. this, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
        99. details.setTimeout(timeout);
        100. return;
        101. }
        102.
        103. if
        104. return;
        105. }
        106. }
        107. }
        108. if
        109. if (details.getAttemptPromise().cancel(false)) {
        110. free(details);
        111. AsyncDetails.release(details);
        112. }
        113. return;
        114. }
        115. if
        116. if (details.getException() == null) {
        117. new RedisTimeoutException("Unable to send command: " + command + " with params: " + LogHelper.toString(details.getParams() + " after " + connectionManager.getConfig().getRetryAttempts() + " retry attempts")));
        118. }
        119. details.getAttemptPromise().tryFailure(details.getException());
        120. return;
        121. }
        122. if (!details.getAttemptPromise().cancel(false)) {
        123. return;
        124. }
        125. int count = details.getAttempt() + 1;
        126. if
        127. "attempt {} for command {} and params {}",
        128. count, details.getCommand(), Arrays.toString(details.getParams()));
        129. }
        130. details.removeMainPromiseListener();
        131. async(details.isReadOnlyMode(), details.getSource(), details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), count);
        132. AsyncDetails.release(details);
        133. }
        134. };
        135.
        136. //配置对于读写操作的超时时间
        137. Timeout timeout = connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
        138. details.setTimeout(timeout);
        139. details.setupMainPromiseListener(mainPromiseListener);
        140.
        141. //给connectionFuture增加监听事件,当从连接池中获取连接成功,成功的事件会被触发,通知这里执行后续读写动作
        142. new
        143. @Override
        144. public void operationComplete(Future<RedisConnection> connFuture) throws
        145. if (connFuture.isCancelled()) {//从池中获取连接被取消,直接返回
        146. return;
        147. }
        148.
        149. if (!connFuture.isSuccess()) {//从池中获取连接失败
        150. connectionManager.getShutdownLatch().release();
        151. details.setException(convertException(connectionFuture));
        152. return;
        153. }
        154.
        155. if (details.getAttemptPromise().isDone() || details.getMainPromise().isDone()) {//从池中获取连接失败,并且尝试了一定次数仍然失败,默认尝试次数为0
        156. releaseConnection(source, connectionFuture, details.isReadOnlyMode(), details.getAttemptPromise(), details);
        157. return;
        158. }
        159.
        160. //从池中获取连接成功,这里取出连接对象RedisConnection
        161. final
        162. //如果需要重定向,这里进行重定向
        163. //重定向的情况有:集群模式对应的slot分布在其他节点,就需要进行重定向
        164. if
        165. new ArrayList<CommandData<?, ?>>(2);
        166. RPromise<Void> promise = connectionManager.newPromise();
        167. new CommandData<Void, Void>(promise, details.getCodec(), RedisCommands.ASKING, new
        168. new
        169. RPromise<Void> main = connectionManager.newPromise();
        170. new
        171. details.setWriteFuture(future);
        172. else
        173. if
        174. "acquired connection for command {} and params {} from slot {} using node {}... {}",
        175. details.getCommand(), Arrays.toString(details.getParams()), details.getSource(), connection.getRedisClient().getAddr(), connection);
        176. }
        177. //发送读写操作到RedisConnection,进行执行
        178. new
        179. details.setWriteFuture(future);
        180. }
        181. //对于写操作增加监听事件回调,对写操作是否成功,失败原因进行日志打印
        182. new
        183. @Override
        184. public void operationComplete(ChannelFuture future) throws
        185. checkWriteFuture(details, connection);
        186. }
        187. });
        188. //返回RedisConnection连接到连接池
        189. releaseConnection(source, connectionFuture, details.isReadOnlyMode(), details.getAttemptPromise(), details);
        190. }
        191. });
        192.
        193. new
        194. @Override
        195. public void operationComplete(Future<R> future) throws
        196. checkAttemptFuture(source, details, future);
        197. }
        198. });
        199. }

               上面的代码我用一张读写操作处理流程图总结如下:

        redisson-2.10.4源代码分析_初始化_20


         

         

               至此,关于读写操作的源码讲解完毕。在上面的代码注释中,列出如下重点。

            6.1 分片SLOT的计算公式

        SLOT=CRC16.crc16(key.getBytes()) % MAX_SLOT

         

            6.2 每个ConnectionPool持有的ClientConnectionsEntry对象冻结判断条件

               一个节点被判断为冻结,必须同时满足以下条件:

        • 该节点有slave节点,并且从节点个数大于0;
        • 设置的配置ReadMode不为并且SubscriptionMode不为MASTER;
        • 该节点的从节点至少有一个存活着,也即如果有从节点宕机,宕机的从节点的个数小于该节点总的从节点个数

            6.3 读写负载图

         

        redisson-2.10.4源代码分析_数据库_21


         

        redisson-2.10.4源代码分析_数据库_22


         

        7.Redisson的读写操作从连接池获取连接对象源码分析和Redisson里RedisClient使用netty源码分析

        CommandAsyncExecutor.java里的如下代码获取连接对象:

        1. //开始从connectionManager获取池中的连接  
        2. //这里采用异步方式,创建一个RFuture对象,等待池中连接,一旦获得连接,然后进行读和写操作
        3. final
        4. if (readOnlyMode) {//对于读操作默认readOnlyMode=true,这里会执行
        5. connectionFuture = connectionManager.connectionReadOp(source, command);
        6. } else {//对于写操作默认readOnlyMode=false,这里会执行
        7. connectionFuture = connectionManager.connectionWriteOp(source, command);
        8. }

               上面读操作调用了 connectionManager.connectionReadOp从连接池获取连接对象,写操作调用了 connectionManager.connectionWriteOp从连接池获取连接对象,我们继续跟进 connectionManager关于connectionReadOp和connectionWriteOp的源代码,注释如下:

        1. /**   
        2. * 读操作通过ConnectionManager从连接池获取连接对象
        3. * @param source for NodeSource
        4. * @param command for RedisCommand<?>
        5. * @return RFuture<RedisConnection>
        6. */
        7. public
        8. //这里之前分析过source=NodeSource【slot=null,addr=null,redirect=null,entry=MasterSlaveEntry】
        9. MasterSlaveEntry entry = source.getEntry();
        10. if (entry == null && source.getSlot() != null) {//这里不会执行source里slot=null
        11. entry = getEntry(source.getSlot());
        12. }
        13. if (source.getAddr() != null) {//这里不会执行source里addr=null
        14. entry = getEntry(source.getAddr());
        15. if (entry == null) {
        16. for
        17. if
        18. entry = e;
        19. break;
        20. }
        21. }
        22. }
        23. if (entry == null) {
        24. new RedisNodeNotFoundException("Node: " + source.getAddr() + " for slot: " + source.getSlot() + " hasn't been discovered yet");
        25. return
        26. }
        27.
        28. return
        29. }
        30.
        31. if (entry == null) {//这里不会执行source里entry不等于null
        32. new RedisNodeNotFoundException("Node: " + source.getAddr() + " for slot: " + source.getSlot() + " hasn't been discovered yet");
        33. return
        34. }
        35. //MasterSlaveEntry里从连接池获取连接对象
        36. return
        37. }
        38. /**
        39. * 写操作通过ConnectionManager从连接池获取连接对象
        40. * @param source for NodeSource
        41. * @param command for RedisCommand<?>
        42. * @return RFuture<RedisConnection>
        43. */
        44. public
        45. //这里之前分析过source=NodeSource【slot=null,addr=null,redirect=null,entry=MasterSlaveEntry】
        46. MasterSlaveEntry entry = source.getEntry();
        47. if (entry == null) {
        48. entry = getEntry(source);
        49. }
        50. if (entry == null) {//这里不会执行source里entry不等于null
        51. new RedisNodeNotFoundException("Node: " + source.getAddr() + " for slot: " + source.getSlot() + " hasn't been discovered yet");
        52. return
        53. }
        54. //MasterSlaveEntry里从连接池获取连接对象
        55. return
        56. }

               我们看到上面调用 ConnectionManager从连接池获取连接对象,但是 ConnectionManager却将获取连接操作转交 MasterSlaveEntry处理,我们再一次回顾一下 MasterSlaveEntry的组成: 

        redisson-2.10.4源代码分析_初始化_06


               MasterSlaveEntry里持有中我们开篇所提到的 四个连接池,那么这里我们继续关注 MasterSlaveEntry.java的源代码.

        1. /**   
        2. * 写操作从MasterConnectionPool连接池里获取连接对象
        3. * @param command for RedisCommand<?>
        4. * @return RFuture<RedisConnection>
        5. */
        6. public
        7. //我们知道writeConnectionHolder的类型为MasterConnectionPool
        8. //这里就是从MasterConnectionPool里获取连接对象
        9. return
        10. }
        11.
        12. /**
        13. * 写操作从LoadBalancerManager里获取连接对象
        14. * @param command for RedisCommand<?>
        15. * @return RFuture<RedisConnection>
        16. */
        17. public
        18. if
        19. //我们知道默认ReadMode=ReadMode.SLAVE,所以对于读操作这里不会执行
        20. return
        21. }
        22. //我们知道slaveBalancer里持有者SlaveConnectionPool和PubSubConnectionPool
        23. //这里就是从SlaveConnectionPool里获取连接对象
        24. return
        25. }

               似乎又绕回来了,最终的获取连接对象都转交到了从连接池 ConnectionPool里获取连接对象,注释 ConnectionPool里的获取连接对象代码如下: 

          1. /**   
          2. * 读写操作从ConnectionPool.java连接池里获取连接对象
          3. * @param command for RedisCommand<?>
          4. * @return RFuture<T>
          5. */
          6. public
          7. for (int j = entries.size() - 1; j >= 0; j--) {
          8. final
          9. if
          10. //遍历ConnectionPool里维持的ClientConnectionsEntry列表
          11. //遍历的算法默认为RoundRobinLoadBalancer
          12. //ClientConnectionsEntry里对应的redis节点为非冻结节点,也即freezed=false
          13. return
          14. }
          15. }
          16.
          17. //记录失败重试信息
          18. new
          19. new
          20. for
          21. if
          22. freezed.add(entry.getClient().getAddr());
          23. else
          24. failedAttempts.add(entry.getClient().getAddr());
          25. }
          26. }
          27.
          28. new StringBuilder(getClass().getSimpleName() + " no available Redis entries. ");
          29. if
          30. " Disconnected hosts: "
          31. }
          32. if
          33. " Hosts disconnected due to `failedAttempts` limit reached: "
          34. }
          35. //获取连接失败抛出异常
          36. new
          37. return
          38. }
          39.
          40. /**
          41. * 读写操作从ConnectionPool.java连接池里获取连接对象
          42. * @param command for RedisCommand<?>
          43. * @param entry for ClientConnectionsEntry
          44. * @return RFuture<T>
          45. */
          46. private RFuture<T> acquireConnection(RedisCommand<?> command, final
          47. //创建一个异步结果获取RPromise
          48. final
          49. //获取连接前首先将ClientConnectionsEntry里的空闲连接信号freeConnectionsCounter值减1
          50. //该操作成功后将调用这里的回调函数AcquireCallback<T>
          51. new
          52. @Override
          53. public void
          54. this);
          55. //freeConnectionsCounter值减1成功,说明获取可以获取到连接
          56. //这里才是真正获取连接的操作
          57. connectTo(entry, result);
          58. }
          59.
          60. @Override
          61. public void operationComplete(Future<T> future) throws
          62. this);
          63. }
          64. };
          65. //异步结果获取RPromise绑定到上面的回调函数callback
          66. result.addListener(callback);
          67. //尝试将ClientConnectionsEntry里的空闲连接信号freeConnectionsCounter值减1,如果成功就调用callback从连接池获取连接
          68. acquireConnection(entry, callback);
          69. //返回异步结果获取RPromise
          70. return
          71. }
          72.
          73. /**
          74. * 真正从连接池中获取连接
          75. * @param entry for ClientConnectionsEntry
          76. * @param promise for RPromise<T>
          77. */
          78. private void
          79. if
          80. releaseConnection(entry);
          81. return;
          82. }
          83. //从连接池中取出一个连接
          84. T conn = poll(entry);
          85. if (conn != null) {
          86. if
          87. promiseFailure(entry, promise, conn);
          88. return;
          89. }
          90.
          91. connectedSuccessful(entry, promise, conn);
          92. return;
          93. }
          94. //如果仍然获取不到连接,可能连接池中连接对象都被租借了,这里开始创建一个新的连接对象放到连接池中
          95. createConnection(entry, promise);
          96. }
          97.
          98. /**
          99. * 从连接池中获取连接
          100. * @param entry for ClientConnectionsEntry
          101. * @return T
          102. */
          103. protected
          104. return
          105. }
          106.
          107. /**
          108. * 调用ClientConnectionsEntry创建一个连接放置到连接池中并返回此连接
          109. * @param entry for ClientConnectionsEntry
          110. * @param promise for RPromise<T>
          111. */
          112. private void createConnection(final ClientConnectionsEntry entry, final
          113. //调用ClientConnectionsEntry创建一个连接放置到连接池中并返回此连接
          114. RFuture<T> connFuture = connect(entry);
          115. new
          116. @Override
          117. public void operationComplete(Future<T> future) throws
          118. if
          119. promiseFailure(entry, promise, future.cause());
          120. return;
          121. }
          122.
          123. T conn = future.getNow();
          124. if
          125. promiseFailure(entry, promise, conn);
          126. return;
          127. }
          128.
          129. connectedSuccessful(entry, promise, conn);
          130. }
          131. });
          132. }


          ConnectionPool.java里获取读写操作的连接,是遍历ConnectionPool里维持的ClientConnectionsEntry列表,找到一非冻结的ClientConnectionsEntry,然后调用ClientConnectionsEntry里的freeConnectionsCounter尝试将值减1,如果成功,说明连接池中可以获取到连接,那么就从ClientConnectionsEntry里获取一个连接出来,如果拿不到连接,会调用ClientConnectionsEntry创建一个新连接放置到连接池中,并返回此连接,这里回顾一下 ClientConnectionsEntry的组成图

          redisson-2.10.4源代码分析_数据库_21

                 我们继续跟进 ClientConnectionsEntry.java的源代码,注释如下:


          redisson-2.10.4源代码分析_java_02

          1. /**   
          2. * ClientConnectionsEntry里从freeConnections里获取一个连接并返回给读写操作使用
          3. */
          4. public
          5. return
          6. }
          7.
          8. /**
          9. * ClientConnectionsEntry里新创建一个连接对象返回给读写操作使用
          10. */
          11. public
          12. //调用RedisClient利用netty连接redis服务端,将返回的netty的outboundchannel包装成RedisConnection并返回
          13. RFuture<RedisConnection> future = client.connectAsync();
          14. new
          15. @Override
          16. public void operationComplete(Future<RedisConnection> future) throws
          17. if
          18. return;
          19. }
          20.
          21. RedisConnection conn = future.getNow();
          22. onConnect(conn);
          23. "new connection created: {}", conn);
          24. }
          25. });
          26. return
          27. }

                 上面的代码说明如果 ClientConnectionsEntry

          里的 freeConnections

          有空闲连接,那么直接返回该连接,如果没有那么调用 RedisClient.connectAsync创建一个新的连接

          ,这里我继续注释一下 RedisClient.java

          的源代码如下:

          redisson-2.10.4源代码分析_java_02

          1. package
          2.
          3. import
          4. import
          5. import
          6. import
          7. import
          8. import
          9. import
          10. import
          11. import
          12. import
          13. import
          14. import
          15. import
          16. import
          17. import
          18. import
          19. import
          20. import
          21. import
          22. import
          23. import
          24. import
          25. import
          26. import
          27. import
          28. import
          29. import
          30. import
          31.
          32. /**
          33. * 使用java里的网络编程框架Netty连接redis服务端
          34. * 作者: Nikita Koksharov
          35. */
          36. public class
          37. private final Bootstrap bootstrap;//Netty的工具类Bootstrap,用于连接建立等作用
          38. private final Bootstrap pubSubBootstrap;//Netty的工具类Bootstrap,用于连接建立等作用
          39. private final InetSocketAddress addr;//socket连接的地址
          40. //channels是netty提供的一个全局对象,里面记录着当前socket连接上的所有处于可用状态的连接channel
          41. //channels会自动监测里面的channel,当channel断开时,会主动踢出该channel,永远保留当前可用的channel列表
          42. private final ChannelGroup channels = new
          43.
          44. private ExecutorService executor;//REACOTR模型的java异步执行线程池
          45. private final long commandTimeout;//超时时间
          46. private Timer timer;//定时器
          47. private boolean
          48. private RedisClientConfig config;//redis连接配置信息
          49.
          50. //构造方法
          51. public static
          52. if (config.getTimer() == null) {
          53. new
          54. }
          55. return new
          56. }
          57. //构造方法
          58. private
          59. this.config = config;
          60. this.executor = config.getExecutor();
          61. this.timer = config.getTimer();
          62.
          63. new
          64.
          65. bootstrap = createBootstrap(config, Type.PLAIN);
          66. pubSubBootstrap = createBootstrap(config, Type.PUBSUB);
          67.
          68. this.commandTimeout = config.getCommandTimeout();
          69. }
          70.
          71. //java的网路编程框架Netty工具类Bootstrap初始化
          72. private
          73. new
          74. .channel(config.getSocketChannelClass())
          75. .group(config.getGroup())
          76. .remoteAddress(addr);
          77. //注册netty相关socket数据处理RedisChannelInitializer
          78. new RedisChannelInitializer(bootstrap, config, this, channels, type));
          79. //设置超时时间
          80. bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout());
          81. return
          82. }
          83.
          84. //构造方法
          85. @Deprecated
          86. public
          87. this(URIBuilder.create(address));
          88. }
          89.
          90. //构造方法
          91. @Deprecated
          92. public
          93. this(new HashedWheelTimer(), Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2), new
          94. true;
          95. }
          96.
          97. //构造方法
          98. @Deprecated
          99. public
          100. this(timer, executor, group, address.getHost(), address.getPort());
          101. }
          102.
          103. //构造方法
          104. @Deprecated
          105. public RedisClient(String host, int
          106. this(new HashedWheelTimer(), Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2), new NioEventLoopGroup(), NioSocketChannel.class, host, port, 10000, 10000);
          107. true;
          108. }
          109.
          110. //构造方法
          111. @Deprecated
          112. public RedisClient(Timer timer, ExecutorService executor, EventLoopGroup group, String host, int
          113. this(timer, executor, group, NioSocketChannel.class, host, port, 10000, 10000);
          114. }
          115.
          116. //构造方法
          117. @Deprecated
          118. public RedisClient(String host, int port, int connectTimeout, int
          119. this(new HashedWheelTimer(), Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2), new NioEventLoopGroup(), NioSocketChannel.class, host, port, connectTimeout, commandTimeout);
          120. }
          121.
          122. //构造方法
          123. @Deprecated
          124. public RedisClient(final Timer timer, ExecutorService executor, EventLoopGroup group, Class<? extends SocketChannel> socketChannelClass, String host, int
          125. int connectTimeout, int
          126. new
          127. config.setTimer(timer).setExecutor(executor).setGroup(group).setSocketChannelClass(socketChannelClass)
          128. .setAddress(host, port).setConnectTimeout(connectTimeout).setCommandTimeout(commandTimeout);
          129.
          130. this.config = config;
          131. this.executor = config.getExecutor();
          132. this.timer = config.getTimer();
          133.
          134. new
          135.
          136. //java的网路编程框架Netty工具类Bootstrap初始化
          137. bootstrap = createBootstrap(config, Type.PLAIN);
          138. pubSubBootstrap = createBootstrap(config, Type.PUBSUB);
          139.
          140. this.commandTimeout = config.getCommandTimeout();
          141. }
          142.
          143. //获取连接的IP地址
          144. public
          145. return addr.getAddress().getHostAddress() + ":"
          146. }
          147. //获取socket连接的地址
          148. public
          149. return
          150. }
          151. //获取超时时间
          152. public long
          153. return
          154. }
          155. //获取netty的线程池
          156. public
          157. return
          158. }
          159. //获取redis连接配置
          160. public
          161. return
          162. }
          163. //获取连接RedisConnection
          164. public
          165. try
          166. return
          167. catch
          168. throw new RedisConnectionException("Unable to connect to: "
          169. }
          170. }
          171. //启动netty去连接redis服务端,设置java的Future尝试将netty连接上的OutBoundChannel包装成RedisConnection并返回RedisConnection
          172. public
          173. final RPromise<RedisConnection> f = new
          174. //netty连接redis服务端
          175. ChannelFuture channelFuture = bootstrap.connect();
          176. new
          177. @Override
          178. public void operationComplete(final ChannelFuture future) throws
          179. if
          180. //将netty连接上的OutBoundChannel包装成RedisConnection并返回RedisConnection
          181. final
          182. new
          183. @Override
          184. public void operationComplete(final Future<RedisConnection> future) throws
          185. new
          186. @Override
          187. public void
          188. if
          189. if
          190. c.closeAsync();
          191. }
          192. else
          193. f.tryFailure(future.cause());
          194. c.closeAsync();
          195. }
          196. }
          197. });
          198. }
          199. });
          200. else
          201. new
          202. public void
          203. f.tryFailure(future.cause());
          204. }
          205. });
          206. }
          207. }
          208. });
          209. return
          210. }
          211. //获取订阅相关连接RedisPubSubConnection
          212. public
          213. try
          214. return
          215. catch
          216. throw new RedisConnectionException("Unable to connect to: "
          217. }
          218. }
          219.
          220. //启动netty去连接redis服务端,设置java的Future尝试将netty连接上的OutBoundChannel包装成RedisPubSubConnection并返回RedisPubSubConnection
          221. public
          222. final RPromise<RedisPubSubConnection> f = new
          223. //netty连接redis服务端
          224. ChannelFuture channelFuture = pubSubBootstrap.connect();
          225. new
          226. @Override
          227. public void operationComplete(final ChannelFuture future) throws
          228. if
          229. //将netty连接上的OutBoundChannel包装成RedisPubSubConnection并返回RedisPubSubConnection
          230. final
          231. new
          232. @Override
          233. public void operationComplete(final Future<RedisPubSubConnection> future) throws
          234. new
          235. @Override
          236. public void
          237. if
          238. if
          239. c.closeAsync();
          240. }
          241. else
          242. f.tryFailure(future.cause());
          243. c.closeAsync();
          244. }
          245. }
          246. });
          247. }
          248. });
          249. else
          250. new
          251. public void
          252. f.tryFailure(future.cause());
          253. }
          254. });
          255. }
          256. }
          257. });
          258. return
          259. }
          260.
          261. //关闭netty网络连接
          262. public void
          263. shutdownAsync().syncUninterruptibly();
          264. if
          265. timer.stop();
          266. executor.shutdown();
          267. try
          268. 15, TimeUnit.SECONDS);
          269. catch
          270. Thread.currentThread().interrupt();
          271. }
          272. bootstrap.config().group().shutdownGracefully();
          273.
          274. }
          275. }
          276.
          277. //异步关闭netty网络连接
          278. public
          279. for
          280. RedisConnection connection = RedisConnection.getFrom(channel);
          281. if (connection != null) {
          282. true);
          283. }
          284. }
          285. return
          286. }
          287.
          288. @Override
          289. public
          290. return "[addr=" + addr + "]";
          291. }
          292. }

                  上面就是Redisson利用java网络编程框架netty连接redis的全过程

          ,如果你对 netty

          比较熟悉,阅读上面的代码应该不是问题。

          标签:redisson,return,details,entry,源代码,config,连接,连接池,2.10
          From: https://blog.51cto.com/u_13991401/5887918

          相关文章