首页 > 数据库 >redisson-2.10.4源代码分析

redisson-2.10.4源代码分析

时间:2023-04-28 12:32:26浏览次数:57  
标签:redisson return entry new 源代码 config 连接 连接池 2.10


    

redis 学习问题总结

http://aperise.iteye.com/blog/2310639

ehcache memcached redis 缓存技术总结

http://aperise.iteye.com/blog/2296219

redis-stat 离线安装

http://aperise.iteye.com/blog/2310254

redis  cluster 非ruby方式启动

http://aperise.iteye.com/blog/2310254

redis-sentinel安装部署

http://aperise.iteye.com/blog/2342693

spring-data-redis使用

 http://aperise.iteye.com/blog/2342615

redis客户端redisson实战

http://aperise.iteye.com/blog/2396196

redisson-2.10.4源代码分析

http://aperise.iteye.com/blog/2400528

tcmalloc jemalloc libc选择


 

1.RedissonClient一主两从部署时连接池组成

主从部署(1主2从): 

redisson-2.10.4源代码分析_redis

       redisson纯java操作代码如下: 

Config config = new Config();// 创建配置
// 指定使用主从部署方式
//.setReadMode(ReadMode.SLAVE) 默认值SLAVE,读操作只在从节点进行
//.setSubscriptionMode(SubscriptionMode.SLAVE) 默认值SLAVE,订阅操作只在从节点进行
//.setMasterConnectionMinimumIdleSize(10) 默认值10,针对每个master节点初始化10个连接
//.setMasterConnectionPoolSize(64) 默认值64,针对每个master节点初始化10个连接,最大可以扩展至64个连接
//.setSlaveConnectionMinimumIdleSize(10) 默认值10,针对每个slave节点初始化10个连接
//.setSlaveConnectionPoolSize(64) 默认值,针对每个slave节点初始化10个连接,最大可以扩展至64个连接
//.setSubscriptionConnectionMinimumIdleSize(1) 默认值1,在Subscriptinotallow=SLAVE时候,针对每个slave节点初始化1个连接
//.setSubscriptionConnectionPoolSize(50) 默认值50,在Subscriptinotallow=SLAVE时候,针对每个slave节点初始化1个连接,最大可以扩展至50个连接
"redis://192.168.29.24:6379")  // 设置redis主节点  
"redis://192.168.29.24:7000") // 设置redis从节点  
"redis://192.168.29.24:7001"); // 设置redis从节点  
RedissonClient redisson = Redisson.create(config);// 创建客户端(发现这一操作非常耗时,基本在2秒-4秒左右)


       上面代码执行完毕后,如果在redis服务端所在服务器执行以下linux命令:

#6379上建立了10个连接  
netstat -ant |grep 6379|grep  ESTABLISHED  
#7000上建立了11个连接  
netstat -ant |grep 7000|grep  ESTABLISHED  
#7001上建立了11个连接  
netstat -ant |grep 7001|grep  ESTABLISHED

       你会发现redisson连接到redis服务端总计建立了32个连接,其中masterpool占据10个连接,slavepool占据20个连接,另外pubSubConnectionPool占据2个连接,连接池中池化对象分布如下图:

redisson-2.10.4源代码分析_java_02

MasterConnectionPool:默认针对每个不同的IP+port组合,初始化10个对象,最大可扩展至64个,因为只有一个master,所以上图创建了10个连接;

MasterPubSubConnectionPool:默认针对每个不同的IP+port组合,初始化1个对象,最大可扩展至50个,因为默认SubscriptionMode=SubscriptionMode.SLAVE,所以master上不会创建连接池所以上图MasterPubSubConnectionPool里没有创建任何连接;

SlaveConnectionPool:默认针对每个不同的IP+port组合,初始化10个对象,最大可扩展至64个,因为有两个slave,每个slave上图创建了10个连接,总计创建了20个连接;

PubSubConnectionPool:默认针对每个不同的IP+port组合,初始化1个对象,最大可扩展至50个,因为有两个slave,每个slave上图创建了1个连接,总计创建了2个连接。

哪里初始化的?如何初始化的?读操作和写操作如何进行的?这就是今天要解答的问题,要解答这些问题最好还是查看redisson的源码。

 

2.Redisson初始化连接池源码分析

    2.1 Redisson.java 

       RedissonClient.java是一个接口类,它的实现类是Redisson.java,对于Redisson.java的介绍先以一张Redisson的4大组件关系图开始,如下图:

redisson-2.10.4源代码分析_redis_03


       对Redisson.java的代码注释如下:

/**  
* 根据配置Config创建redisson操作类RedissonClient  
* @param config for Redisson  
* @return Redisson instance  
*/
public static
//调用构造方法  
new
if
        redisson.enableRedissonReferenceSupport();    
    }    
return
}    
  
/**  
* Redisson构造方法  
* @param config for Redisson  
* @return Redisson instance  
*/
protected
//赋值变量config  
this.config = config;    
//产生一份对于传入config的备份  
new
  
//根据配置config的类型(主从模式、单机模式、哨兵模式、集群模式、亚马逊云模式、微软云模式)而进行不同的初始化  
    connectionManager = ConfigSupport.createConnectionManager(configCopy);    
//连接池对象回收调度器  
new
//Redisson的对象编码类  
    codecProvider = configCopy.getCodecProvider();    
//Redisson的ResolverProvider,默认为org.redisson.liveobject.provider.DefaultResolverProvider  
    resolverProvider = configCopy.getResolverProvider();    
}


       其中与连接池相关的就是ConnectionManager,ConnectionManager的初始化转交工具类ConfigSupport.java进行,ConfigSupport.java会根据部署方式(主从模式、单机模式、哨兵模式、集群模式、亚马逊云模式、微软云模式)的不同而分别进行。

 

    2.2 ConfigSupport.java

       这里现将ConfigSupport.java创建ConnectionManager的核心代码注释如下: 

/**  
* 据配置config的类型(主从模式、单机模式、哨兵模式、集群模式、亚马逊云模式、微软云模式)而进行不同的初始化  
* @param configCopy for Redisson  
* @return ConnectionManager instance  
*/
public static
if (configCopy.getMasterSlaveServersConfig() != null) {//配置configCopy类型为主从模式  
        validate(configCopy.getMasterSlaveServersConfig());    
return new
else if (configCopy.getSingleServerConfig() != null) {//配置configCopy类型为单机模式  
        validate(configCopy.getSingleServerConfig());    
return new
else if (configCopy.getSentinelServersConfig() != null) {//配置configCopy类型为哨兵模式  
        validate(configCopy.getSentinelServersConfig());    
return new
else if (configCopy.getClusterServersConfig() != null) {//配置configCopy类型为集群模式  
        validate(configCopy.getClusterServersConfig());    
return new
else if (configCopy.getElasticacheServersConfig() != null) {//配置configCopy类型为亚马逊云模式  
        validate(configCopy.getElasticacheServersConfig());    
return new
else if (configCopy.getReplicatedServersConfig() != null) {//配置configCopy类型为微软云模式  
        validate(configCopy.getReplicatedServersConfig());    
return new
else if (configCopy.getConnectionManager() != null) {//直接返回configCopy自带的默认ConnectionManager  
return
else
throw new IllegalArgumentException("server(s) address(es) not defined!");    
    }    
}


       上面可以看到根据传入的配置Config.java的不同,会分别创建不同的ConnectionManager的实现类。

 

    2.3 MasterSlaveConnectionManager.java

主从模式、单机模式、哨兵模式、集群模式、亚马逊云模式、微软云模式),如下如所示:

redisson-2.10.4源代码分析_初始化_04

       这里以主从部署方式进行讲解,先通过一张图了解MasterSlaveConnectionManager的组成:

redisson-2.10.4源代码分析_java_05


       上图中最终要的组件要数MasterSlaveEntry,在后面即将进行介绍,这里注释MasterSlaveConnectionManager.java的核心代码如下:  

/**   
* MasterSlaveConnectionManager的构造方法 
* @param cfg for MasterSlaveServersConfig 
* @param config for Config   
*/
public
//调用构造方法
this(config);  
//
    initTimer(cfg);  
this.config = cfg;  
//初始化MasterSlaveEntry
    initSingleEntry();  
}  
/**   
* MasterSlaveConnectionManager的构造方法 
* @param cfg for Config 
*/
public
//读取redisson的jar中的文件META-INF/MANIFEST.MF,打印出Bundle-Version对应的Redisson版本信息
    Version.logVersion();  
//EPOLL是linux的多路复用IO模型的增强版本,这里如果启用EPOLL,就让redisson底层netty使用EPOLL的方式,否则配置netty里的NIO非阻塞方式
if
if (cfg.getEventLoopGroup() == null) {  
//使用linux IO非阻塞模型EPOLL
this.group = new EpollEventLoopGroup(cfg.getNettyThreads(), new DefaultThreadFactory("redisson-netty"));  
else
this.group = cfg.getEventLoopGroup();  
        }  
this.socketChannelClass = EpollSocketChannel.class;  
else
if (cfg.getEventLoopGroup() == null) {  
//使用linux IO非阻塞模型NIO
this.group = new NioEventLoopGroup(cfg.getNettyThreads(), new DefaultThreadFactory("redisson-netty"));  
else
this.group = cfg.getEventLoopGroup();  
        }  
this.socketChannelClass = NioSocketChannel.class;  
    }  
if (cfg.getExecutor() == null) {  
//线程池大小,对于2U 2CPU 8cores/cpu,意思是有2块板子,每个板子上8个物理CPU,那么总计物理CPU个数为16
//对于linux有个超线程概念,意思是每个物理CPU可以虚拟出2个逻辑CPU,那么总计逻辑CPU个数为32
//这里Runtime.getRuntime().availableProcessors()取的是逻辑CPU的个数,所以这里线程池大小会是64
int threads = Runtime.getRuntime().availableProcessors() * 2;  
if (cfg.getThreads() != 0) {  
            threads = cfg.getThreads();  
        }  
new DefaultThreadFactory("redisson"));  
else
        executor = cfg.getExecutor();  
    }  
  
this.cfg = cfg;  
this.codec = cfg.getCodec();  
//一个可以获取异步执行任务返回值的回调对象,本质是对于java的Future的实现,监控MasterSlaveConnectionManager的shutdown进行一些必要的处理
this.shutdownPromise = newPromise();  
//一个持有MasterSlaveConnectionManager的异步执行服务
this.commandExecutor = new CommandSyncService(this);  
}  
/**   
* 初始化定时调度器
* @param config for MasterSlaveServersConfig 
*/
protected void
//读取超时时间配置信息
int[] timeouts = new int[]{config.getRetryInterval(), config.getTimeout(), config.getReconnectionTimeout()};  
    Arrays.sort(timeouts);  
int minTimeout = timeouts[0];  
//设置默认超时时间
if (minTimeout % 100 != 0) {  
100) / 2;  
else if (minTimeout == 100) {  
50;  
else
100;  
    }  
//创建定时调度器
new HashedWheelTimer(Executors.defaultThreadFactory(), minTimeout, TimeUnit.MILLISECONDS, 1024);  
      
// to avoid assertion error during timer.stop invocation
try
class.getDeclaredField("leak");  
true);  
null);  
catch
throw new
    }  
//检测MasterSlaveConnectionManager的空闲连接的监视器IdleConnectionWatcher,会清理不用的空闲的池中连接对象
new IdleConnectionWatcher(this, config);  
}  
  
/**   
* 创建MasterSlaveConnectionManager的MasterSlaveEntry  
*/
protected void
try
//主从模式下0~16383加入到集合slots  
new
        slots.add(singleSlotRange);  
  
        MasterSlaveEntry entry;  
if (config.checkSkipSlavesInit()) {//ReadMode不为MASTER并且SubscriptionMode不为MASTER才执行
new SingleEntry(slots, this, config);  
            RFuture<Void> f = entry.setupMasterEntry(config.getMasterAddress());  
            f.syncUninterruptibly();  
else {//默认主从部署ReadMode=SLAVE,Subscriptinotallow=SLAVE,这里会执行
            entry = createMasterSlaveEntry(config, slots);  
        }  
//将每个分片0~16383都指向创建的MasterSlaveEntry
for (int slot = singleSlotRange.getStartSlot(); slot < singleSlotRange.getEndSlot() + 1; slot++) {  
            addEntry(slot, entry);  
        }  
//DNS相关
if (config.getDnsMonitoringInterval() != -1) {  
new DNSMonitor(this, Collections.singleton(config.getMasterAddress()),   
                    config.getSlaveAddresses(), config.getDnsMonitoringInterval());  
            dnsMonitor.start();  
        }  
catch
        stopThreads();  
throw
    }  
}  
/**   
* MasterSlaveEntry的构造方法 
* @param config for MasterSlaveServersConfig  
* @param slots for HashSet<ClusterSlotRange> 
* @return MasterSlaveEntry
*/
protected
//创建MasterSlaveEntry
new MasterSlaveEntry(slots, this, config);  
//从节点连接池SlaveConnectionPool和PubSubConnectionPool的默认的最小连接数初始化
    List<RFuture<Void>> fs = entry.initSlaveBalancer(java.util.Collections.<URI>emptySet());  
for
        future.syncUninterruptibly();  
    }  
    主节点连接池MasterConnectionPool和MasterPubSubConnectionPool的默认的最小连接数初始化  
    RFuture<Void> f = entry.setupMasterEntry(config.getMasterAddress());  
    f.syncUninterruptibly();  
return
}


       上面个人觉得有两处代码值得我们特别关注,特别说明如下:

entry.initSlaveBalancer:从节点连接池SlaveConnectionPoolPubSubConnectionPool的默认的最小连接数初始化。

entry.setupMasterEntry:主节点连接池MasterConnectionPoolMasterPubSubConnectionPool的默认的最小连接数初始化。

 

    2.4 MasterSlaveEntry.java

       用一张图来解释MasterSlaveEntry的组件如下:

redisson-2.10.4源代码分析_netty_06


       MasterSlaveEntry.java里正是我们一直在寻找着的四个连接池MasterConnectionPool、MasterPubSubConnectionPool、SlaveConnectionPool和PubSubConnectionPool,这里注释MasterSlaveEntry.java的核心代码如下:

/**   
* MasterSlaveEntry的构造方法 
* @param slotRanges for Set<ClusterSlotRange>   
* @param connectionManager for ConnectionManager   
* @param config for MasterSlaveServersConfig 
*/
public
//主从模式下0~16383加入到集合slots  
for
for (int i = clusterSlotRange.getStartSlot(); i < clusterSlotRange.getEndSlot() + 1; i++) {    
            slots.add(i);    
        }    
    }    
//赋值MasterSlaveConnectionManager给connectionManager  
this.connectionManager = connectionManager;    
//赋值config  
this.config = config;    
    
//创建LoadBalancerManager  
//其实LoadBalancerManager里持有者从节点的SlaveConnectionPool和PubSubConnectionPool  
//并且此时连接池里还没有初始化默认的最小连接数  
new LoadBalancerManager(config, connectionManager, this);    
//创建主节点连接池MasterConnectionPool,此时连接池里还没有初始化默认的最小连接数  
new MasterConnectionPool(config, connectionManager, this);    
//创建主节点连接池MasterPubSubConnectionPool,此时连接池里还没有初始化默认的最小连接数  
new MasterPubSubConnectionPool(config, connectionManager, this);    
}    
    
/**   
* 从节点连接池SlaveConnectionPool和PubSubConnectionPool的默认的最小连接数初始化 
* @param disconnectedNodes for Collection<URI> 
* @return List<RFuture<Void>>   
*/
public
//这里freezeMasterAsSlave=true  
boolean
    
new
//把主节点当作从节点处理,因为默认ReadMode=ReadMode.SLAVE,所以这里不会添加针对该节点的连接池  
    RFuture<Void> f = addSlave(config.getMasterAddress(), freezeMasterAsSlave, NodeType.MASTER);    
    result.add(f);    
//读取从节点的地址信息,然后针对每个从节点地址创建SlaveConnectionPool和PubSubConnectionPool  
//SlaveConnectionPool【初始化10个RedisConnection,最大可以扩展至64个】  
//PubSubConnectionPool【初始化1个RedisPubSubConnection,最大可以扩展至50个】  
for
        f = addSlave(address, disconnectedNodes.contains(address), NodeType.SLAVE);    
        result.add(f);    
    }    
return
}    
    
/**   
* 从节点连接池SlaveConnectionPool和PubSubConnectionPool的默认的最小连接数初始化 
* @param address for URI 
* @param freezed for boolean 
* @param nodeType for NodeType 
* @return RFuture<Void> 
*/
private RFuture<Void> addSlave(URI address, boolean
//创建到从节点的连接RedisClient  
    RedisClient client = connectionManager.createClient(NodeType.SLAVE, address);    
new
this.config.getSlaveConnectionMinimumIdleSize(),    
this.config.getSlaveConnectionPoolSize(),    
this.config.getSubscriptionConnectionMinimumIdleSize(),    
this.config.getSubscriptionConnectionPoolSize(), connectionManager, nodeType);    
//默认只有主节点当作从节点是会设置freezed=true  
if
synchronized
            entry.setFreezed(freezed);    
            entry.setFreezeReason(FreezeReason.SYSTEM);    
        }    
    }    
//调用slaveBalancer来对从节点连接池SlaveConnectionPool和PubSubConnectionPool的默认的最小连接数初始化  
return
}    
    
/**   
* 主节点连接池MasterConnectionPool和MasterPubSubConnectionPool的默认的最小连接数初始化 
* @param address for URI 
* @return RFuture<Void> 
*/
public
//创建到主节点的连接RedisClient  
    RedisClient client = connectionManager.createClient(NodeType.MASTER, address);    
new
            client,     
            config.getMasterConnectionMinimumIdleSize(),     
            config.getMasterConnectionPoolSize(),    
            config.getSubscriptionConnectionMinimumIdleSize(),    
            config.getSubscriptionConnectionPoolSize(),     
            connectionManager,     
            NodeType.MASTER);    
//如果配置的Subscriptinotallow=SubscriptionMode.MASTER就初始化MasterPubSubConnectionPool  
//默认Subscriptinotallow=SubscriptionMode.SLAVE,MasterPubSubConnectionPool这里不会初始化最小连接数  
if
//MasterPubSubConnectionPool【初始化1个RedisPubSubConnection,最大可以扩展至50个】  
        RFuture<Void> f = writeConnectionHolder.add(masterEntry);    
        RFuture<Void> s = pubSubConnectionHolder.add(masterEntry);    
return
    }    
//调用MasterConnectionPool使得连接池MasterConnectionPool里的对象最小个数为10个  
//MasterConnectionPool【初始化10个RedisConnection,最大可以扩展至64个】  
return
}


writeConnectionHolder.add(masterEntry):其实writeConnectionHolder的类型就是MasterConnectionPool,这里是连接池MasterConnectionPool里添加对象 

pubSubConnectionHolder.add(masterEntry):其实pubSubConnectionHolder的类型是MasterPubSubConnectionPool,这里是连接池MasterPubSubConnectionPool添加对象

slaveConnectionPool.add(entry):这里是连接池SlaveConnectionPool里添加对象

pubSubConnectionPool.add(entry):这里是连接池PubSubConnectionPool里添加对象

 

 

    2.5 LoadBalancerManager.java

       图解LoadBalancerManager.java的内部组成如下:

redisson-2.10.4源代码分析_数据库_07


       LoadBalancerManager.java里面有着从节点相关的两个重要的连接池SlaveConnectionPoolPubSubConnectionPool,这里注释LoadBalancerManager.java的核心代码如下:

/**   
* LoadBalancerManager的构造方法 
* @param config for MasterSlaveServersConfig  
* @param connectionManager for ConnectionManager   
* @param entry for MasterSlaveEntry 
*/
public
//赋值connectionManager  
this.connectionManager = connectionManager;    
//创建连接池SlaveConnectionPool  
new
//创建连接池PubSubConnectionPool  
new
}    
/**   
* LoadBalancerManager的连接池SlaveConnectionPool和PubSubConnectionPool里池化对象添加方法,也即池中需要对象时,调用此方法添加 
* @param entry for ClientConnectionsEntry 
* @return RFuture<Void> 
*/
public RFuture<Void> add(final
final
//创建一个回调监听器,在池中对象创建失败时进行2次莫仍尝试  
new
new AtomicInteger(2);    
@Override
public void operationComplete(Future<Void> future) throws
if
                result.tryFailure(future.cause());    
return;    
            }    
if (counter.decrementAndGet() == 0) {    
                String addr = entry.getClient().getIpAddr();    
                ip2Entry.put(addr, entry);    
null);    
            }    
        }    
    };    
//调用slaveConnectionPool添加RedisConnection对象到池中  
    RFuture<Void> slaveFuture = slaveConnectionPool.add(entry);    
    slaveFuture.addListener(listener);    
//调用pubSubConnectionPool添加RedisPubSubConnection对象到池中  
    RFuture<Void> pubSubFuture = pubSubConnectionPool.add(entry);    
    pubSubFuture.addListener(listener);    
return
}


我们已经了解了开篇提到的四个连接池是在哪里创建的。

 

3. Redisson的4类连接池

MasterConnectionPool、MasterPubSubConnectionPool、SlaveConnectionPool和PubSubConnectionPool,它们的父类都是ConnectionPool,其类继承关系图如下:  

redisson-2.10.4源代码分析_数据库_08


       通过上图我们了解了ConnectionPool类的继承关系图,再来一张图来了解下ConnectionPool.java类的组成,如下:

redisson-2.10.4源代码分析_数据库_09


       好了,再来图就有点啰嗦了,注释ConnectionPool.java代码如下:

abstract class ConnectionPool<T extends
private final
//维持着连接池对应的redis节点信息  
//比如1主2从部署MasterConnectionPool里的entries只有一个主节点(192.168.29.24 6379)  
//比如1主2从部署MasterPubSubConnectionPool里的entries为空,因为Subscriptinotallow=SubscriptionMode.SLAVE  
//比如1主2从部署SlaveConnectionPool里的entries有3个节点(192.168.29.24 6379,192.168.29.24 7000,192.168.29.24 7001,但是注意192.168.29.24 6379冻结属性freezed=true不会参与读操作除非2个从节点全部宕机才参与读操作)  
//比如1主2从部署PubSubConnectionPool里的entries有2个节点(192.168.29.24 7000,192.168.29.24 7001),因为Subscriptinotallow=SubscriptionMode.SLAVE,主节点不会加入  
protected final List<ClientConnectionsEntry> entries = new
//持有者RedissonClient的组件ConnectionManager  
final
//持有者RedissonClient的组件ConnectionManager里的MasterSlaveServersConfig  
final
//持有者RedissonClient的组件ConnectionManager里的MasterSlaveEntry  
final
    
//构造函数  
public
this.config = config;    
this.masterSlaveEntry = masterSlaveEntry;    
this.connectionManager = connectionManager;    
    }    
    
//连接池中需要增加对象时候调用此方法  
public RFuture<Void> add(final
final
new
@Override
public void operationComplete(Future<Void> future) throws
                entries.add(entry);    
            }    
        });    
true);    
return
    }    
    
//初始化连接池中最小连接数  
private void initConnections(final ClientConnectionsEntry entry, final RPromise<Void> initPromise, boolean
final int
    
if (minimumIdleSize == 0
null);    
return;    
        }    
    
final AtomicInteger initializedConnections = new
int startAmount = Math.min(50, minimumIdleSize);    
final AtomicInteger requests = new
for (int i = 0; i < startAmount; i++) {    
            createConnection(checkFreezed, requests, entry, initPromise, minimumIdleSize, initializedConnections);    
        }    
    }    
    
//创建连接对象到连接池中  
private void createConnection(final boolean checkFreezed, final AtomicInteger requests, final ClientConnectionsEntry entry, final
final int minimumIdleSize, final
    
if
int
new
"Unable to init enough connections amount! Only " + totalInitializedConnections + " from " + minimumIdleSize + " were initialized. Server: "
                                        + entry.getClient().getAddr());    
            initPromise.tryFailure(cause);    
return;    
        }    
            
new
                
@Override
public void
                RPromise<T> promise = connectionManager.newPromise();    
                createConnection(entry, promise);    
new
@Override
public void operationComplete(Future<T> future) throws
if
                            T conn = future.getNow();    
    
                            releaseConnection(entry, conn);    
                        }    
    
                        releaseConnection(entry);    
    
if
int
                            String errorMsg;    
if (totalInitializedConnections == 0) {    
"Unable to connect to Redis server: "
else
"Unable to init enough connections amount! Only "
" from " + minimumIdleSize + " were initialized. Redis server: "
                            }    
new
                            initPromise.tryFailure(cause);    
return;    
                        }    
    
int
if (value == 0) {    
"{} connections initialized for {}", minimumIdleSize, entry.getClient().getAddr());    
if (!initPromise.trySuccess(null)) {    
throw new
                            }    
else if (value > 0
if
                                createConnection(checkFreezed, requests, entry, initPromise, minimumIdleSize, initializedConnections);    
                            }    
                        }    
                    }    
                });    
            }    
        });    
    
    }    
    
//连接池中租借出连接对象  
public
for (int j = entries.size() - 1; j >= 0; j--) {    
final
if
                    && tryAcquireConnection(entry)) {    
return
            }    
        }    
            
new
new
for
if
                freezed.add(entry.getClient().getAddr());    
else
                failedAttempts.add(entry.getClient().getAddr());    
            }    
        }    
    
new StringBuilder(getClass().getSimpleName() + " no available Redis entries. ");    
if
" Disconnected hosts: "
        }    
if
" Hosts disconnected due to `failedAttempts` limit reached: "
        }    
    
new
return
    }    
    
//连接池中租借出连接对象执行操作RedisCommand  
public
if
                tryAcquireConnection(entry)) {    
return
        }    
    
new
"Can't aquire connection to "
return
    }    
      
//通过向redis服务端发送PING看是否返回PONG来检测连接
private void ping(RedisConnection c, final
        RFuture<String> f = c.async(RedisCommands.PING);    
        f.addListener(pingListener);    
    }    
    
//归还连接对象到连接池  
public void
if
            connection.closeAsync();    
else
            releaseConnection(entry, connection);    
        }    
        releaseConnection(entry);    
    }    
    
//释放连接池中连接对象  
protected void
        entry.releaseConnection();    
    }    
    
//释放连接池中连接对象  
protected void
        entry.releaseConnection(conn);    
    }    
}

       用一张图来解释ConnectionPool干了些啥,如下图:

redisson-2.10.4源代码分析_数据库_10


       都到这里了,不介意再送一张图了解各种部署方式下的连接池分布了,如下图:

redisson-2.10.4源代码分析_java_11


4.Redisson的读写操作句柄类RedissonObject

操作句柄类RedissonObject,RedissonObject根据不同的数据类型有不同的RedissonObject实现类,RedissonObject的类继承关系图如下:

redisson-2.10.4源代码分析_java_12


       例如想设置redis服务端的key=key的值value=123,你需要查询Redis命令和Redisson对象匹配列表,找到如下对应关系:

redisson-2.10.4源代码分析_netty_13


       然后我们就知道调用代码这么写:

Config config = new Config();// 创建配置  
// 指定使用主从部署方式  
"redis://192.168.29.24:6379")  // 设置redis主节点    
"redis://192.168.29.24:7000") // 设置redis从节点    
"redis://192.168.29.24:7001"); // 设置redis从节点    
RedissonClient redisson = Redisson.create(config);// 创建客户端(发现这一操作非常耗时,基本在2秒-4秒左右) 
  
//任何Redisson操作首先需要获取对应的操作句柄
//RBucket是操作句柄之一,实现类是RedissonBucket
RBucket<String> rBucket = redissonClient.getBucket("key");  
  
//通过操作句柄rBucket进行读操作
rBucket.get();  
  
//通过操作句柄rBucket进行写操作
rBucket.set("123");


       至于其它的redis命令对应的redisson操作对象,都可以官网的Redis命令和Redisson对象匹配列表 查到。

 

6.Redisson的读写操作源码分析

       从一个读操作的代码作为入口分析代码,如下:

//任何Redisson操作首先需要获取对应的操作句柄,RBucket是操作句柄之一,实现类是RedissonBucket  
RBucket<String> rBucket = redissonClient.getBucket("key");    
    
//通过操作句柄rBucket进行读操作  
rBucket.get();

       继续追踪上面RBucket的get方法,如下:

redisson-2.10.4源代码分析_redis_14


       上面我们看到不管是读操作还是写操作都转交 CommandAsyncExecutor进行处理,那么这里我们需要看一下 CommandAsyncExecutor.java里关于读写操作处理的核心代码,注释代码如下:

private
//通过公式CRC16.crc16(key.getBytes()) % MAX_SLOT
//计算出一个字符串key对应的分片在0~16383中哪个分片
int
//之前已经将0~16383每个分片对应到唯一的一个MasterSlaveEntry,这里取出来
    MasterSlaveEntry entry = connectionManager.getEntry(slot);  
//这里将MasterSlaveEntry包装成NodeSource【slot=null,addr=null,redirect=null,entry=MasterSlaveEntry】
return new
}  
@Override
public
    RPromise<R> mainPromise = connectionManager.newPromise();  
//获取NodeSource【slot=null,addr=null,redirect=null,entry=MasterSlaveEntry】
    NodeSource source = getNodeSource(key);  
    调用异步执行方法async  
true, source, codec, command, params, mainPromise, 0);  
return
}  
protected <V, R> void async(final boolean readOnlyMode, final NodeSource source, final
final RedisCommand<V> command, final Object[] params, final RPromise<R> mainPromise, final int
//操作被取消,那么直接返回
if
        free(params);  
return;  
    }  
//连接管理器无法连接,释放参数所占资源,然后返回
if
        free(params);  
new RedissonShutdownException("Redisson is shutdown"));  
return;  
    }  
      
final
if
try
for (int i = 0; i < params.length; i++) {  
                RedissonReference reference = RedissonObjectFactory.toReference(getConnectionManager().getCfg(), params[i]);  
if (reference != null) {  
                    params[i] = reference;  
                }  
            }  
catch
            connectionManager.getShutdownLatch().release();  
            free(params);  
            mainPromise.tryFailure(e);  
return;  
        }  
    }  
      
//开始从connectionManager获取池中的连接
//这里采用异步方式,创建一个RFuture对象,等待池中连接,一旦获得连接,然后进行读和写操作
final
if (readOnlyMode) {//对于读操作默认readOnlyMode=true,这里会执行
        connectionFuture = connectionManager.connectionReadOp(source, command);  
else {//对于写操作默认readOnlyMode=false,这里会执行
        connectionFuture = connectionManager.connectionWriteOp(source, command);  
    }  
      
//创建RPromise,用于操作失败时候重试
final
    details.init(connectionFuture, attemptPromise, readOnlyMode, source, codec, command, params, mainPromise, attempt);  
//创建FutureListener,监测外部请求是否已经取消了之前提交的读写操作,如果取消了,那么就让正在执行的读写操作停止
new
@Override
public void operationComplete(Future<R> future) throws
if (future.isCancelled() && connectionFuture.cancel(false)) {  
"Connection obtaining canceled for {}", command);  
                details.getTimeout().cancel();  
if (details.getAttemptPromise().cancel(false)) {  
                    free(params);  
                }  
            }  
        }  
    };  
  
//创建TimerTask,用于操作失败后通过定时器进行操作重试
final TimerTask retryTimerTask = new
@Override
public void run(Timeout t) throws
if
return;  
            }  
if (details.getConnectionFuture().cancel(false)) {  
                connectionManager.getShutdownLatch().release();  
else
if
if (details.getWriteFuture() == null
if
if (details.getWriteFuture().cancel(false)) {  
if (details.getException() == null) {  
new RedisTimeoutException("Unable to send command: " + command + " with params: " + LogHelper.toString(details.getParams()) + " after " + connectionManager.getConfig().getRetryAttempts() + " retry attempts"));  
                                }  
                                details.getAttemptPromise().tryFailure(details.getException());  
                            }  
return;  
                        }  
                        details.incAttempt();  
this, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);  
                        details.setTimeout(timeout);  
return;  
                    }  
  
if
return;  
                    }  
                }  
            }  
if
if (details.getAttemptPromise().cancel(false)) {  
                    free(details);  
                    AsyncDetails.release(details);  
                }  
return;  
            }  
if
if (details.getException() == null) {  
new RedisTimeoutException("Unable to send command: " + command + " with params: " + LogHelper.toString(details.getParams() + " after " + connectionManager.getConfig().getRetryAttempts() + " retry attempts")));  
                }  
                details.getAttemptPromise().tryFailure(details.getException());  
return;  
            }  
if (!details.getAttemptPromise().cancel(false)) {  
return;  
            }  
int count = details.getAttempt() + 1;  
if
"attempt {} for command {} and params {}",  
                        count, details.getCommand(), Arrays.toString(details.getParams()));  
            }  
            details.removeMainPromiseListener();  
            async(details.isReadOnlyMode(), details.getSource(), details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), count);  
            AsyncDetails.release(details);  
        }  
    };  
  
//配置对于读写操作的超时时间
    Timeout timeout = connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);  
    details.setTimeout(timeout);  
    details.setupMainPromiseListener(mainPromiseListener);  
  
//给connectionFuture增加监听事件,当从连接池中获取连接成功,成功的事件会被触发,通知这里执行后续读写动作
new
@Override
public void operationComplete(Future<RedisConnection> connFuture) throws
if (connFuture.isCancelled()) {//从池中获取连接被取消,直接返回
return;  
            }  
  
if (!connFuture.isSuccess()) {//从池中获取连接失败
                connectionManager.getShutdownLatch().release();  
                details.setException(convertException(connectionFuture));  
return;  
            }  
  
if (details.getAttemptPromise().isDone() || details.getMainPromise().isDone()) {//从池中获取连接失败,并且尝试了一定次数仍然失败,默认尝试次数为0
                releaseConnection(source, connectionFuture, details.isReadOnlyMode(), details.getAttemptPromise(), details);  
return;  
            }  
  
//从池中获取连接成功,这里取出连接对象RedisConnection
final
//如果需要重定向,这里进行重定向
//重定向的情况有:集群模式对应的slot分布在其他节点,就需要进行重定向
if
new ArrayList<CommandData<?, ?>>(2);  
                RPromise<Void> promise = connectionManager.newPromise();  
new CommandData<Void, Void>(promise, details.getCodec(), RedisCommands.ASKING, new
new
                RPromise<Void> main = connectionManager.newPromise();  
new
                details.setWriteFuture(future);  
else
if
"acquired connection for command {} and params {} from slot {} using node {}... {}",  
                            details.getCommand(), Arrays.toString(details.getParams()), details.getSource(), connection.getRedisClient().getAddr(), connection);  
                }  
//发送读写操作到RedisConnection,进行执行
new
                details.setWriteFuture(future);  
            }  
//对于写操作增加监听事件回调,对写操作是否成功,失败原因进行日志打印
new
@Override
public void operationComplete(ChannelFuture future) throws
                    checkWriteFuture(details, connection);  
                }  
            });  
//返回RedisConnection连接到连接池
            releaseConnection(source, connectionFuture, details.isReadOnlyMode(), details.getAttemptPromise(), details);  
        }  
    });  
  
new
@Override
public void operationComplete(Future<R> future) throws
            checkAttemptFuture(source, details, future);  
        }  
    });  
}

       上面的代码我用一张读写操作处理流程图总结如下:

redisson-2.10.4源代码分析_数据库_15


 

 

       至此,关于读写操作的源码讲解完毕。在上面的代码注释中,列出如下重点。

    6.1 分片SLOT的计算公式

SLOT=CRC16.crc16(key.getBytes()) % MAX_SLOT

 

    6.2 每个ConnectionPool持有的ClientConnectionsEntry对象冻结判断条件

       一个节点被判断为冻结,必须同时满足以下条件:

该节点有slave节点,并且从节点个数大于0;

设置的配置ReadMode不为并且SubscriptionMode不为MASTER;

该节点的从节点至少有一个存活着,也即如果有从节点宕机,宕机的从节点的个数小于该节点总的从节点个数

    6.3 读写负载图

 

redisson-2.10.4源代码分析_java_16


 

redisson-2.10.4源代码分析_redis_17


 

7.Redisson的读写操作从连接池获取连接对象源码分析和Redisson里RedisClient使用netty源码分析

CommandAsyncExecutor.java里的如下代码获取连接对象:

//开始从connectionManager获取池中的连接  
//这里采用异步方式,创建一个RFuture对象,等待池中连接,一旦获得连接,然后进行读和写操作  
final
if (readOnlyMode) {//对于读操作默认readOnlyMode=true,这里会执行  
    connectionFuture = connectionManager.connectionReadOp(source, command);    
} else {//对于写操作默认readOnlyMode=false,这里会执行  
    connectionFuture = connectionManager.connectionWriteOp(source, command);    
}

       上面读操作调用了 connectionManager.connectionReadOp从连接池获取连接对象,写操作调用了 connectionManager.connectionWriteOp从连接池获取连接对象,我们继续跟进 connectionManager关于connectionReadOp和connectionWriteOp的源代码,注释如下:

/**   
* 读操作通过ConnectionManager从连接池获取连接对象
* @param source for NodeSource 
* @param command for RedisCommand<?> 
* @return RFuture<RedisConnection>
*/
public
//这里之前分析过source=NodeSource【slot=null,addr=null,redirect=null,entry=MasterSlaveEntry】
    MasterSlaveEntry entry = source.getEntry();  
if (entry == null && source.getSlot() != null) {//这里不会执行source里slot=null
        entry = getEntry(source.getSlot());  
    }  
if (source.getAddr() != null) {//这里不会执行source里addr=null
        entry = getEntry(source.getAddr());  
if (entry == null) {  
for
if
                    entry = e;  
break;  
                }  
            }  
        }  
if (entry == null) {  
new RedisNodeNotFoundException("Node: " + source.getAddr() + " for slot: " + source.getSlot() + " hasn't been discovered yet");  
return
        }  
          
return
    }  
      
if (entry == null) {//这里不会执行source里entry不等于null
new RedisNodeNotFoundException("Node: " + source.getAddr() + " for slot: " + source.getSlot() + " hasn't been discovered yet");  
return
    }  
//MasterSlaveEntry里从连接池获取连接对象
return
}  
/**   
* 写操作通过ConnectionManager从连接池获取连接对象
* @param source for NodeSource 
* @param command for RedisCommand<?> 
* @return RFuture<RedisConnection>
*/
public
//这里之前分析过source=NodeSource【slot=null,addr=null,redirect=null,entry=MasterSlaveEntry】
    MasterSlaveEntry entry = source.getEntry();  
if (entry == null) {  
        entry = getEntry(source);  
    }  
if (entry == null) {//这里不会执行source里entry不等于null
new RedisNodeNotFoundException("Node: " + source.getAddr() + " for slot: " + source.getSlot() + " hasn't been discovered yet");  
return
    }  
//MasterSlaveEntry里从连接池获取连接对象
return
}

       我们看到上面调用 ConnectionManager从连接池获取连接对象,但是 ConnectionManager却将获取连接操作转交 MasterSlaveEntry处理,我们再一次回顾一下 MasterSlaveEntry的组成: 

redisson-2.10.4源代码分析_java_05


       MasterSlaveEntry里持有中我们开篇所提到的 四个连接池,那么这里我们继续关注 MasterSlaveEntry.java的源代码:

/**   
* 写操作从MasterConnectionPool连接池里获取连接对象
* @param command for RedisCommand<?> 
* @return RFuture<RedisConnection>
*/
public
//我们知道writeConnectionHolder的类型为MasterConnectionPool
//这里就是从MasterConnectionPool里获取连接对象
return
}  
  
/**   
* 写操作从LoadBalancerManager里获取连接对象
* @param command for RedisCommand<?> 
* @return RFuture<RedisConnection>
*/
public
if
//我们知道默认ReadMode=ReadMode.SLAVE,所以对于读操作这里不会执行
return
    }  
//我们知道slaveBalancer里持有者SlaveConnectionPool和PubSubConnectionPool
//这里就是从SlaveConnectionPool里获取连接对象
return
}

       似乎又绕回来了,最终的获取连接对象都转交到了从连接池 ConnectionPool里获取连接对象,注释 ConnectionPool里的获取连接对象代码如下: 

/**   
* 读写操作从ConnectionPool.java连接池里获取连接对象
* @param command for RedisCommand<?> 
* @return RFuture<T>
*/
public
for (int j = entries.size() - 1; j >= 0; j--) {  
final
if
//遍历ConnectionPool里维持的ClientConnectionsEntry列表
//遍历的算法默认为RoundRobinLoadBalancer
//ClientConnectionsEntry里对应的redis节点为非冻结节点,也即freezed=false
return
        }  
    }  
      
//记录失败重试信息
new
new
for
if
            freezed.add(entry.getClient().getAddr());  
else
            failedAttempts.add(entry.getClient().getAddr());  
        }  
    }  
  
new StringBuilder(getClass().getSimpleName() + " no available Redis entries. ");  
if
" Disconnected hosts: "
    }  
if
" Hosts disconnected due to `failedAttempts` limit reached: "
    }  
//获取连接失败抛出异常
new
return
}  
  
/**   
* 读写操作从ConnectionPool.java连接池里获取连接对象
* @param command for RedisCommand<?> 
* @param entry for ClientConnectionsEntry
* @return RFuture<T>
*/
private RFuture<T> acquireConnection(RedisCommand<?> command, final
//创建一个异步结果获取RPromise
final
//获取连接前首先将ClientConnectionsEntry里的空闲连接信号freeConnectionsCounter值减1
//该操作成功后将调用这里的回调函数AcquireCallback<T>
new
@Override
public void
this);  
//freeConnectionsCounter值减1成功,说明获取可以获取到连接
//这里才是真正获取连接的操作
            connectTo(entry, result);  
        }  
          
@Override
public void operationComplete(Future<T> future) throws
this);  
        }  
    };  
//异步结果获取RPromise绑定到上面的回调函数callback
    result.addListener(callback);  
//尝试将ClientConnectionsEntry里的空闲连接信号freeConnectionsCounter值减1,如果成功就调用callback从连接池获取连接
    acquireConnection(entry, callback);  
//返回异步结果获取RPromise
return
}  
  
/**   
* 真正从连接池中获取连接
* @param entry for ClientConnectionsEntry
* @param promise for RPromise<T>
*/
private void
if
        releaseConnection(entry);  
return;  
    }  
//从连接池中取出一个连接
    T conn = poll(entry);  
if (conn != null) {  
if
            promiseFailure(entry, promise, conn);  
return;  
        }  
  
        connectedSuccessful(entry, promise, conn);  
return;  
    }  
//如果仍然获取不到连接,可能连接池中连接对象都被租借了,这里开始创建一个新的连接对象放到连接池中
    createConnection(entry, promise);  
}  
  
/**   
* 从连接池中获取连接
* @param entry for ClientConnectionsEntry
* @return T
*/
protected
return
}  
  
/**   
* 调用ClientConnectionsEntry创建一个连接放置到连接池中并返回此连接
* @param entry for ClientConnectionsEntry
* @param promise for RPromise<T>
*/
private void createConnection(final ClientConnectionsEntry entry, final
//调用ClientConnectionsEntry创建一个连接放置到连接池中并返回此连接
    RFuture<T> connFuture = connect(entry);  
new
@Override
public void operationComplete(Future<T> future) throws
if
                promiseFailure(entry, promise, future.cause());  
return;  
            }  
  
            T conn = future.getNow();  
if
                promiseFailure(entry, promise, conn);  
return;  
            }  
  
            connectedSuccessful(entry, promise, conn);  
        }  
    });  
}


ConnectionPool.java里获取读写操作的连接,是遍历ConnectionPool里维持的ClientConnectionsEntry列表,找到一非冻结的ClientConnectionsEntry,然后调用ClientConnectionsEntry里的freeConnectionsCounter尝试将值减1,如果成功,说明连接池中可以获取到连接,那么就从ClientConnectionsEntry里获取一个连接出来,如果拿不到连接,会调用ClientConnectionsEntry创建一个新连接放置到连接池中,并返回此连接,这里回顾一下 ClientConnectionsEntry的组成图

redisson-2.10.4源代码分析_java_16

       我们继续跟进 ClientConnectionsEntry.java的源代码,注释如下:

/**   
*  ClientConnectionsEntry里从freeConnections里获取一个连接并返回给读写操作使用
*/
public
return
}  
  
/**   
*  ClientConnectionsEntry里新创建一个连接对象返回给读写操作使用
*/
public
//调用RedisClient利用netty连接redis服务端,将返回的netty的outboundchannel包装成RedisConnection并返回
    RFuture<RedisConnection> future = client.connectAsync();  
new
@Override
public void operationComplete(Future<RedisConnection> future) throws
if
return;  
            }  
              
            RedisConnection conn = future.getNow();  
            onConnect(conn);  
"new connection created: {}", conn);  
        }  
    });  
return
}

       上面的代码说明如果 ClientConnectionsEntry 里的 freeConnections 有空闲连接,那么直接返回该连接,如果没有那么调用 RedisClient.connectAsync创建一个新的连接 ,这里我继续注释一下 RedisClient.java 的源代码如下:

package
  
import
import
import
import
import
import
import
import
import
import
import
import
import
import
import
import
import
import
import
import
import
import
import
import
import
import
import
import
  
/**
 * 使用java里的网络编程框架Netty连接redis服务端
 * 作者: Nikita Koksharov
 */
public class
private final Bootstrap bootstrap;//Netty的工具类Bootstrap,用于连接建立等作用
private final Bootstrap pubSubBootstrap;//Netty的工具类Bootstrap,用于连接建立等作用
private final InetSocketAddress addr;//socket连接的地址
//channels是netty提供的一个全局对象,里面记录着当前socket连接上的所有处于可用状态的连接channel
//channels会自动监测里面的channel,当channel断开时,会主动踢出该channel,永远保留当前可用的channel列表
private final ChannelGroup channels = new
  
private ExecutorService executor;//REACOTR模型的java异步执行线程池
private final long commandTimeout;//超时时间
private Timer timer;//定时器
private boolean
private RedisClientConfig config;//redis连接配置信息
  
//构造方法
public static
if (config.getTimer() == null) {  
new
        }  
return new
    }  
//构造方法
private
this.config = config;  
this.executor = config.getExecutor();  
this.timer = config.getTimer();  
          
new
          
        bootstrap = createBootstrap(config, Type.PLAIN);  
        pubSubBootstrap = createBootstrap(config, Type.PUBSUB);  
          
this.commandTimeout = config.getCommandTimeout();  
    }  
  
//java的网路编程框架Netty工具类Bootstrap初始化
private
new
                        .channel(config.getSocketChannelClass())  
                        .group(config.getGroup())  
                        .remoteAddress(addr);  
//注册netty相关socket数据处理RedisChannelInitializer
new RedisChannelInitializer(bootstrap, config, this, channels, type));  
//设置超时时间
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout());  
return
    }  
      
//构造方法
@Deprecated
public
this(URIBuilder.create(address));  
    }  
      
//构造方法
@Deprecated
public
this(new HashedWheelTimer(), Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2), new
true;  
    }  
  
//构造方法
@Deprecated
public
this(timer, executor, group, address.getHost(), address.getPort());  
    }  
      
//构造方法
@Deprecated
public RedisClient(String host, int
this(new HashedWheelTimer(), Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2), new NioEventLoopGroup(), NioSocketChannel.class, host, port, 10000, 10000);  
true;  
    }  
  
//构造方法
@Deprecated
public RedisClient(Timer timer, ExecutorService executor, EventLoopGroup group, String host, int
this(timer, executor, group, NioSocketChannel.class, host, port, 10000, 10000);  
    }  
      
//构造方法
@Deprecated
public RedisClient(String host, int port, int connectTimeout, int
this(new HashedWheelTimer(), Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2), new NioEventLoopGroup(), NioSocketChannel.class, host, port, connectTimeout, commandTimeout);  
    }  
  
//构造方法
@Deprecated
public RedisClient(final Timer timer, ExecutorService executor, EventLoopGroup group, Class<? extends SocketChannel> socketChannelClass, String host, int
int connectTimeout, int
new
        config.setTimer(timer).setExecutor(executor).setGroup(group).setSocketChannelClass(socketChannelClass)  
        .setAddress(host, port).setConnectTimeout(connectTimeout).setCommandTimeout(commandTimeout);  
          
this.config = config;  
this.executor = config.getExecutor();  
this.timer = config.getTimer();  
          
new
          
//java的网路编程框架Netty工具类Bootstrap初始化
        bootstrap = createBootstrap(config, Type.PLAIN);  
        pubSubBootstrap = createBootstrap(config, Type.PUBSUB);  
          
this.commandTimeout = config.getCommandTimeout();  
    }  
  
//获取连接的IP地址
public
return addr.getAddress().getHostAddress() + ":"
    }  
//获取socket连接的地址
public
return
    }  
//获取超时时间
public long
return
    }  
//获取netty的线程池
public
return
    }  
//获取redis连接配置
public
return
    }  
//获取连接RedisConnection
public
try
return
catch
throw new RedisConnectionException("Unable to connect to: "
        }  
    }  
//启动netty去连接redis服务端,设置java的Future尝试将netty连接上的OutBoundChannel包装成RedisConnection并返回RedisConnection
public
final RPromise<RedisConnection> f = new
//netty连接redis服务端
        ChannelFuture channelFuture = bootstrap.connect();  
new
@Override
public void operationComplete(final ChannelFuture future) throws
if
//将netty连接上的OutBoundChannel包装成RedisConnection并返回RedisConnection
final
new
@Override
public void operationComplete(final Future<RedisConnection> future) throws
new
@Override
public void
if
if
                                            c.closeAsync();  
                                        }  
else
                                        f.tryFailure(future.cause());  
                                        c.closeAsync();  
                                    }  
                                }  
                            });  
                        }  
                    });  
else
new
public void
                            f.tryFailure(future.cause());  
                        }  
                    });  
                }  
            }  
        });  
return
    }  
//获取订阅相关连接RedisPubSubConnection
public
try
return
catch
throw new RedisConnectionException("Unable to connect to: "
        }  
    }  
  
//启动netty去连接redis服务端,设置java的Future尝试将netty连接上的OutBoundChannel包装成RedisPubSubConnection并返回RedisPubSubConnection
public
final RPromise<RedisPubSubConnection> f = new
//netty连接redis服务端
        ChannelFuture channelFuture = pubSubBootstrap.connect();  
new
@Override
public void operationComplete(final ChannelFuture future) throws
if
//将netty连接上的OutBoundChannel包装成RedisPubSubConnection并返回RedisPubSubConnection
final
new
@Override
public void operationComplete(final Future<RedisPubSubConnection> future) throws
new
@Override
public void
if
if
                                            c.closeAsync();  
                                        }  
else
                                        f.tryFailure(future.cause());  
                                        c.closeAsync();  
                                    }  
                                }  
                            });  
                        }  
                    });  
else
new
public void
                            f.tryFailure(future.cause());  
                        }  
                    });  
                }  
            }  
        });  
return
    }  
  
//关闭netty网络连接
public void
        shutdownAsync().syncUninterruptibly();  
if
            timer.stop();  
            executor.shutdown();  
try
15, TimeUnit.SECONDS);  
catch
                Thread.currentThread().interrupt();  
            }  
            bootstrap.config().group().shutdownGracefully();  
              
        }  
    }  
  
//异步关闭netty网络连接
public
for
            RedisConnection connection = RedisConnection.getFrom(channel);  
if (connection != null) {  
true);  
            }  
        }  
return
    }  
  
@Override
public
return "[addr=" + addr + "]";  
    }  
}

        上面就是Redisson利用java网络编程框架netty连接redis的全过程 ,如果你对 netty 比较熟悉,阅读上面的代码应该不是问题。


标签:redisson,return,entry,new,源代码,config,连接,连接池,2.10
From: https://blog.51cto.com/u_16091571/6233834

相关文章

  • 超大文件上传和断点续传的源代码
    ​ 前言文件上传是一个老生常谈的话题了,在文件相对比较小的情况下,可以直接把文件转化为字节流上传到服务器,但在文件比较大的情况下,用普通的方式进行上传,这可不是一个好的办法,毕竟很少有人会忍受,当文件上传到一半中断后,继续上传却只能重头开始上传,这种让人不爽的体验。那有没有......
  • redisson 分布式锁
    @RequestMapping(value="/testLock",method=RequestMethod.POST)publicBaseResponse<Boolean>testLock(@RequestBodyTestLockRequesttestLockRequest){RLockrLock=null;booleanisLocked=false;try{......
  • 分布式锁-Redisson
    分布式锁1、分布式锁1.1本地锁的局限性1.1.1测试代码1.1.2使用ab工具测试(单节点)1.1.3本地锁问题演示(集群情况)1.2分布式锁实现的解决方案1.3使用Redis实现分布式锁(了解即可)1.3.1编写代码1.3.2压测1.4使用Redisson解决分布式锁1.4.1实现代码1.4.1压测1.4.2可重入......
  • 仿chatGPT或chatPDF的前端界面布局,css实现对话聊天布局代码,响应式左右分栏布局(附完整
    chatPDF或者chatGPT的界面挺简洁的,就是一个左侧的列表以及右侧的对话列表,现在使用css实现这样的布局充分运用了flex布局方式实现,左右分栏,以及对话形式展示效果下面是效果图: 在手机设备看就隐藏左侧,右侧100%适应 下面就是html和css的布局代码<style>.chatpdf{......
  • 如何阅读Tomcat源代码?
    容器,简单理解就是用来装东西的工具。在Tomcat里面,容器被设计用来装载Servlet,也就是我们平常写的普通的Servlet,就会存放在容器里面。这也就是咱们平常念叨的Servlet容器,其实从广义上理解,Servlet容器是指Tomcat,从狭义上理解,Servlet容器,只是Tomcat里面的一个组件而已。1.容器概述To......
  • redisson lock的使用
    1.现在错误的用法:RLocklock=redisson.getLock(String.format(LOCK_KEY,2));try{if(lock.tryLock()){//处理logger.info("aaaaaaaaaaaaaaaaaa");}catch(Exceptione){//处理异常}finally{if(lock.isLocked()){lock.unlock();}}测试......
  • 直播平台源代码,input密码框显示与隐藏
    直播平台源代码,input密码框显示与隐藏一、html部分   <divstyle="margin-top:200px;background:#42b983;width:200px;height:100px">   <input:type="passwordType"v-model="password"placeholder="请输入账号"/>   <img......
  • UNIX环境高级编程 第三版 源代码编译及使用
    UNIX环境高级编程(第3版)中的代码示例多次包含了一下头文件:#include"apue.h"搜索发现原来这个头文件是作者自定义的一个文件,并在官网提供了源代码供下载。下载之后解压该文件:tar-zxfsrc.3e.tar.gz进入文件夹并编译:cdapue.3emake等待结束,如果没有报错就成功了。(如......
  • 如何将之前编辑的文章HTML源代码导入到TinyMCE编辑器中
    如果你想用TinyMCE来修改你之前写的文章那么你需要将源代码放到TinyMCE中,如果服务器把HTML源码发给我们可是我们应该怎样调用?方法为使用 tinymce.activeEditor.setContent()这个函数具体用法为:tinymce.activeEditor.setContent()//设置TinyMCE编辑器里的内容源代码tin......
  • 直播平台源代码,图片放大浏览功能
    直播平台源代码,图片放大浏览功能HTML <view><view><blockv-for="(item,index)infen"><view><image@click="previewImage(index)":src="item.picture"mode=""></image></view></block><......