redis 学习问题总结 | |
ehcache memcached redis 缓存技术总结 | |
redis-stat 离线安装 | |
redis cluster 非ruby方式启动 | |
redis-sentinel安装部署 | |
spring-data-redis使用 | |
redis客户端redisson实战 | |
redisson-2.10.4源代码分析 | |
tcmalloc jemalloc libc选择 |
1.RedissonClient一主两从部署时连接池组成
主从部署(1主2从):
redisson纯java操作代码如下:
Config config = new Config();// 创建配置
// 指定使用主从部署方式
//.setReadMode(ReadMode.SLAVE) 默认值SLAVE,读操作只在从节点进行
//.setSubscriptionMode(SubscriptionMode.SLAVE) 默认值SLAVE,订阅操作只在从节点进行
//.setMasterConnectionMinimumIdleSize(10) 默认值10,针对每个master节点初始化10个连接
//.setMasterConnectionPoolSize(64) 默认值64,针对每个master节点初始化10个连接,最大可以扩展至64个连接
//.setSlaveConnectionMinimumIdleSize(10) 默认值10,针对每个slave节点初始化10个连接
//.setSlaveConnectionPoolSize(64) 默认值,针对每个slave节点初始化10个连接,最大可以扩展至64个连接
//.setSubscriptionConnectionMinimumIdleSize(1) 默认值1,在Subscriptinotallow=SLAVE时候,针对每个slave节点初始化1个连接
//.setSubscriptionConnectionPoolSize(50) 默认值50,在Subscriptinotallow=SLAVE时候,针对每个slave节点初始化1个连接,最大可以扩展至50个连接
"redis://192.168.29.24:6379") // 设置redis主节点
"redis://192.168.29.24:7000") // 设置redis从节点
"redis://192.168.29.24:7001"); // 设置redis从节点
RedissonClient redisson = Redisson.create(config);// 创建客户端(发现这一操作非常耗时,基本在2秒-4秒左右)
上面代码执行完毕后,如果在redis服务端所在服务器执行以下linux命令:
#6379上建立了10个连接
netstat -ant |grep 6379|grep ESTABLISHED
#7000上建立了11个连接
netstat -ant |grep 7000|grep ESTABLISHED
#7001上建立了11个连接
netstat -ant |grep 7001|grep ESTABLISHED
你会发现redisson连接到redis服务端总计建立了32个连接,其中masterpool占据10个连接,slavepool占据20个连接,另外pubSubConnectionPool占据2个连接,连接池中池化对象分布如下图:
MasterConnectionPool:默认针对每个不同的IP+port组合,初始化10个对象,最大可扩展至64个,因为只有一个master,所以上图创建了10个连接;
MasterPubSubConnectionPool:默认针对每个不同的IP+port组合,初始化1个对象,最大可扩展至50个,因为默认SubscriptionMode=SubscriptionMode.SLAVE,所以master上不会创建连接池,所以上图MasterPubSubConnectionPool里没有创建任何连接;
SlaveConnectionPool:默认针对每个不同的IP+port组合,初始化10个对象,最大可扩展至64个,因为有两个slave,每个slave上图创建了10个连接,总计创建了20个连接;
PubSubConnectionPool:默认针对每个不同的IP+port组合,初始化1个对象,最大可扩展至50个,因为有两个slave,每个slave上图创建了1个连接,总计创建了2个连接。
哪里初始化的?如何初始化的?读操作和写操作如何进行的?这就是今天要解答的问题,要解答这些问题最好还是查看redisson的源码。
2.Redisson初始化连接池源码分析
2.1 Redisson.java
RedissonClient.java是一个接口类,它的实现类是Redisson.java,对于Redisson.java的介绍先以一张Redisson的4大组件关系图开始,如下图:
对Redisson.java的代码注释如下:
/**
* 根据配置Config创建redisson操作类RedissonClient
* @param config for Redisson
* @return Redisson instance
*/
public static
//调用构造方法
new
if
redisson.enableRedissonReferenceSupport();
}
return
}
/**
* Redisson构造方法
* @param config for Redisson
* @return Redisson instance
*/
protected
//赋值变量config
this.config = config;
//产生一份对于传入config的备份
new
//根据配置config的类型(主从模式、单机模式、哨兵模式、集群模式、亚马逊云模式、微软云模式)而进行不同的初始化
connectionManager = ConfigSupport.createConnectionManager(configCopy);
//连接池对象回收调度器
new
//Redisson的对象编码类
codecProvider = configCopy.getCodecProvider();
//Redisson的ResolverProvider,默认为org.redisson.liveobject.provider.DefaultResolverProvider
resolverProvider = configCopy.getResolverProvider();
}
其中与连接池相关的就是ConnectionManager,ConnectionManager的初始化转交工具类ConfigSupport.java进行,ConfigSupport.java会根据部署方式(主从模式、单机模式、哨兵模式、集群模式、亚马逊云模式、微软云模式)的不同而分别进行。
2.2 ConfigSupport.java
这里现将ConfigSupport.java创建ConnectionManager的核心代码注释如下:
/**
* 据配置config的类型(主从模式、单机模式、哨兵模式、集群模式、亚马逊云模式、微软云模式)而进行不同的初始化
* @param configCopy for Redisson
* @return ConnectionManager instance
*/
public static
if (configCopy.getMasterSlaveServersConfig() != null) {//配置configCopy类型为主从模式
validate(configCopy.getMasterSlaveServersConfig());
return new
else if (configCopy.getSingleServerConfig() != null) {//配置configCopy类型为单机模式
validate(configCopy.getSingleServerConfig());
return new
else if (configCopy.getSentinelServersConfig() != null) {//配置configCopy类型为哨兵模式
validate(configCopy.getSentinelServersConfig());
return new
else if (configCopy.getClusterServersConfig() != null) {//配置configCopy类型为集群模式
validate(configCopy.getClusterServersConfig());
return new
else if (configCopy.getElasticacheServersConfig() != null) {//配置configCopy类型为亚马逊云模式
validate(configCopy.getElasticacheServersConfig());
return new
else if (configCopy.getReplicatedServersConfig() != null) {//配置configCopy类型为微软云模式
validate(configCopy.getReplicatedServersConfig());
return new
else if (configCopy.getConnectionManager() != null) {//直接返回configCopy自带的默认ConnectionManager
return
else
throw new IllegalArgumentException("server(s) address(es) not defined!");
}
}
上面可以看到根据传入的配置Config.java的不同,会分别创建不同的ConnectionManager的实现类。
2.3 MasterSlaveConnectionManager.java
主从模式、单机模式、哨兵模式、集群模式、亚马逊云模式、微软云模式),如下如所示:
这里以主从部署方式进行讲解,先通过一张图了解MasterSlaveConnectionManager的组成:
上图中最终要的组件要数MasterSlaveEntry,在后面即将进行介绍,这里注释MasterSlaveConnectionManager.java的核心代码如下:
/**
* MasterSlaveConnectionManager的构造方法
* @param cfg for MasterSlaveServersConfig
* @param config for Config
*/
public
//调用构造方法
this(config);
//
initTimer(cfg);
this.config = cfg;
//初始化MasterSlaveEntry
initSingleEntry();
}
/**
* MasterSlaveConnectionManager的构造方法
* @param cfg for Config
*/
public
//读取redisson的jar中的文件META-INF/MANIFEST.MF,打印出Bundle-Version对应的Redisson版本信息
Version.logVersion();
//EPOLL是linux的多路复用IO模型的增强版本,这里如果启用EPOLL,就让redisson底层netty使用EPOLL的方式,否则配置netty里的NIO非阻塞方式
if
if (cfg.getEventLoopGroup() == null) {
//使用linux IO非阻塞模型EPOLL
this.group = new EpollEventLoopGroup(cfg.getNettyThreads(), new DefaultThreadFactory("redisson-netty"));
else
this.group = cfg.getEventLoopGroup();
}
this.socketChannelClass = EpollSocketChannel.class;
else
if (cfg.getEventLoopGroup() == null) {
//使用linux IO非阻塞模型NIO
this.group = new NioEventLoopGroup(cfg.getNettyThreads(), new DefaultThreadFactory("redisson-netty"));
else
this.group = cfg.getEventLoopGroup();
}
this.socketChannelClass = NioSocketChannel.class;
}
if (cfg.getExecutor() == null) {
//线程池大小,对于2U 2CPU 8cores/cpu,意思是有2块板子,每个板子上8个物理CPU,那么总计物理CPU个数为16
//对于linux有个超线程概念,意思是每个物理CPU可以虚拟出2个逻辑CPU,那么总计逻辑CPU个数为32
//这里Runtime.getRuntime().availableProcessors()取的是逻辑CPU的个数,所以这里线程池大小会是64
int threads = Runtime.getRuntime().availableProcessors() * 2;
if (cfg.getThreads() != 0) {
threads = cfg.getThreads();
}
new DefaultThreadFactory("redisson"));
else
executor = cfg.getExecutor();
}
this.cfg = cfg;
this.codec = cfg.getCodec();
//一个可以获取异步执行任务返回值的回调对象,本质是对于java的Future的实现,监控MasterSlaveConnectionManager的shutdown进行一些必要的处理
this.shutdownPromise = newPromise();
//一个持有MasterSlaveConnectionManager的异步执行服务
this.commandExecutor = new CommandSyncService(this);
}
/**
* 初始化定时调度器
* @param config for MasterSlaveServersConfig
*/
protected void
//读取超时时间配置信息
int[] timeouts = new int[]{config.getRetryInterval(), config.getTimeout(), config.getReconnectionTimeout()};
Arrays.sort(timeouts);
int minTimeout = timeouts[0];
//设置默认超时时间
if (minTimeout % 100 != 0) {
100) / 2;
else if (minTimeout == 100) {
50;
else
100;
}
//创建定时调度器
new HashedWheelTimer(Executors.defaultThreadFactory(), minTimeout, TimeUnit.MILLISECONDS, 1024);
// to avoid assertion error during timer.stop invocation
try
class.getDeclaredField("leak");
true);
null);
catch
throw new
}
//检测MasterSlaveConnectionManager的空闲连接的监视器IdleConnectionWatcher,会清理不用的空闲的池中连接对象
new IdleConnectionWatcher(this, config);
}
/**
* 创建MasterSlaveConnectionManager的MasterSlaveEntry
*/
protected void
try
//主从模式下0~16383加入到集合slots
new
slots.add(singleSlotRange);
MasterSlaveEntry entry;
if (config.checkSkipSlavesInit()) {//ReadMode不为MASTER并且SubscriptionMode不为MASTER才执行
new SingleEntry(slots, this, config);
RFuture<Void> f = entry.setupMasterEntry(config.getMasterAddress());
f.syncUninterruptibly();
else {//默认主从部署ReadMode=SLAVE,Subscriptinotallow=SLAVE,这里会执行
entry = createMasterSlaveEntry(config, slots);
}
//将每个分片0~16383都指向创建的MasterSlaveEntry
for (int slot = singleSlotRange.getStartSlot(); slot < singleSlotRange.getEndSlot() + 1; slot++) {
addEntry(slot, entry);
}
//DNS相关
if (config.getDnsMonitoringInterval() != -1) {
new DNSMonitor(this, Collections.singleton(config.getMasterAddress()),
config.getSlaveAddresses(), config.getDnsMonitoringInterval());
dnsMonitor.start();
}
catch
stopThreads();
throw
}
}
/**
* MasterSlaveEntry的构造方法
* @param config for MasterSlaveServersConfig
* @param slots for HashSet<ClusterSlotRange>
* @return MasterSlaveEntry
*/
protected
//创建MasterSlaveEntry
new MasterSlaveEntry(slots, this, config);
//从节点连接池SlaveConnectionPool和PubSubConnectionPool的默认的最小连接数初始化
List<RFuture<Void>> fs = entry.initSlaveBalancer(java.util.Collections.<URI>emptySet());
for
future.syncUninterruptibly();
}
主节点连接池MasterConnectionPool和MasterPubSubConnectionPool的默认的最小连接数初始化
RFuture<Void> f = entry.setupMasterEntry(config.getMasterAddress());
f.syncUninterruptibly();
return
}
上面个人觉得有两处代码值得我们特别关注,特别说明如下:
entry.initSlaveBalancer:从节点连接池SlaveConnectionPool和PubSubConnectionPool的默认的最小连接数初始化。
entry.setupMasterEntry:主节点连接池MasterConnectionPool和MasterPubSubConnectionPool的默认的最小连接数初始化。
2.4 MasterSlaveEntry.java
用一张图来解释MasterSlaveEntry的组件如下:
MasterSlaveEntry.java里正是我们一直在寻找着的四个连接池MasterConnectionPool、MasterPubSubConnectionPool、SlaveConnectionPool和PubSubConnectionPool,这里注释MasterSlaveEntry.java的核心代码如下:
/**
* MasterSlaveEntry的构造方法
* @param slotRanges for Set<ClusterSlotRange>
* @param connectionManager for ConnectionManager
* @param config for MasterSlaveServersConfig
*/
public
//主从模式下0~16383加入到集合slots
for
for (int i = clusterSlotRange.getStartSlot(); i < clusterSlotRange.getEndSlot() + 1; i++) {
slots.add(i);
}
}
//赋值MasterSlaveConnectionManager给connectionManager
this.connectionManager = connectionManager;
//赋值config
this.config = config;
//创建LoadBalancerManager
//其实LoadBalancerManager里持有者从节点的SlaveConnectionPool和PubSubConnectionPool
//并且此时连接池里还没有初始化默认的最小连接数
new LoadBalancerManager(config, connectionManager, this);
//创建主节点连接池MasterConnectionPool,此时连接池里还没有初始化默认的最小连接数
new MasterConnectionPool(config, connectionManager, this);
//创建主节点连接池MasterPubSubConnectionPool,此时连接池里还没有初始化默认的最小连接数
new MasterPubSubConnectionPool(config, connectionManager, this);
}
/**
* 从节点连接池SlaveConnectionPool和PubSubConnectionPool的默认的最小连接数初始化
* @param disconnectedNodes for Collection<URI>
* @return List<RFuture<Void>>
*/
public
//这里freezeMasterAsSlave=true
boolean
new
//把主节点当作从节点处理,因为默认ReadMode=ReadMode.SLAVE,所以这里不会添加针对该节点的连接池
RFuture<Void> f = addSlave(config.getMasterAddress(), freezeMasterAsSlave, NodeType.MASTER);
result.add(f);
//读取从节点的地址信息,然后针对每个从节点地址创建SlaveConnectionPool和PubSubConnectionPool
//SlaveConnectionPool【初始化10个RedisConnection,最大可以扩展至64个】
//PubSubConnectionPool【初始化1个RedisPubSubConnection,最大可以扩展至50个】
for
f = addSlave(address, disconnectedNodes.contains(address), NodeType.SLAVE);
result.add(f);
}
return
}
/**
* 从节点连接池SlaveConnectionPool和PubSubConnectionPool的默认的最小连接数初始化
* @param address for URI
* @param freezed for boolean
* @param nodeType for NodeType
* @return RFuture<Void>
*/
private RFuture<Void> addSlave(URI address, boolean
//创建到从节点的连接RedisClient
RedisClient client = connectionManager.createClient(NodeType.SLAVE, address);
new
this.config.getSlaveConnectionMinimumIdleSize(),
this.config.getSlaveConnectionPoolSize(),
this.config.getSubscriptionConnectionMinimumIdleSize(),
this.config.getSubscriptionConnectionPoolSize(), connectionManager, nodeType);
//默认只有主节点当作从节点是会设置freezed=true
if
synchronized
entry.setFreezed(freezed);
entry.setFreezeReason(FreezeReason.SYSTEM);
}
}
//调用slaveBalancer来对从节点连接池SlaveConnectionPool和PubSubConnectionPool的默认的最小连接数初始化
return
}
/**
* 主节点连接池MasterConnectionPool和MasterPubSubConnectionPool的默认的最小连接数初始化
* @param address for URI
* @return RFuture<Void>
*/
public
//创建到主节点的连接RedisClient
RedisClient client = connectionManager.createClient(NodeType.MASTER, address);
new
client,
config.getMasterConnectionMinimumIdleSize(),
config.getMasterConnectionPoolSize(),
config.getSubscriptionConnectionMinimumIdleSize(),
config.getSubscriptionConnectionPoolSize(),
connectionManager,
NodeType.MASTER);
//如果配置的Subscriptinotallow=SubscriptionMode.MASTER就初始化MasterPubSubConnectionPool
//默认Subscriptinotallow=SubscriptionMode.SLAVE,MasterPubSubConnectionPool这里不会初始化最小连接数
if
//MasterPubSubConnectionPool【初始化1个RedisPubSubConnection,最大可以扩展至50个】
RFuture<Void> f = writeConnectionHolder.add(masterEntry);
RFuture<Void> s = pubSubConnectionHolder.add(masterEntry);
return
}
//调用MasterConnectionPool使得连接池MasterConnectionPool里的对象最小个数为10个
//MasterConnectionPool【初始化10个RedisConnection,最大可以扩展至64个】
return
}
writeConnectionHolder.add(masterEntry):其实writeConnectionHolder的类型就是MasterConnectionPool,这里是连接池MasterConnectionPool里添加对象
pubSubConnectionHolder.add(masterEntry):其实pubSubConnectionHolder的类型是MasterPubSubConnectionPool,这里是连接池MasterPubSubConnectionPool添加对象
slaveConnectionPool.add(entry):这里是连接池SlaveConnectionPool里添加对象
pubSubConnectionPool.add(entry):这里是连接池PubSubConnectionPool里添加对象
2.5 LoadBalancerManager.java
图解LoadBalancerManager.java的内部组成如下:
LoadBalancerManager.java里面有着从节点相关的两个重要的连接池SlaveConnectionPool和PubSubConnectionPool,这里注释LoadBalancerManager.java的核心代码如下:
/**
* LoadBalancerManager的构造方法
* @param config for MasterSlaveServersConfig
* @param connectionManager for ConnectionManager
* @param entry for MasterSlaveEntry
*/
public
//赋值connectionManager
this.connectionManager = connectionManager;
//创建连接池SlaveConnectionPool
new
//创建连接池PubSubConnectionPool
new
}
/**
* LoadBalancerManager的连接池SlaveConnectionPool和PubSubConnectionPool里池化对象添加方法,也即池中需要对象时,调用此方法添加
* @param entry for ClientConnectionsEntry
* @return RFuture<Void>
*/
public RFuture<Void> add(final
final
//创建一个回调监听器,在池中对象创建失败时进行2次莫仍尝试
new
new AtomicInteger(2);
@Override
public void operationComplete(Future<Void> future) throws
if
result.tryFailure(future.cause());
return;
}
if (counter.decrementAndGet() == 0) {
String addr = entry.getClient().getIpAddr();
ip2Entry.put(addr, entry);
null);
}
}
};
//调用slaveConnectionPool添加RedisConnection对象到池中
RFuture<Void> slaveFuture = slaveConnectionPool.add(entry);
slaveFuture.addListener(listener);
//调用pubSubConnectionPool添加RedisPubSubConnection对象到池中
RFuture<Void> pubSubFuture = pubSubConnectionPool.add(entry);
pubSubFuture.addListener(listener);
return
}
我们已经了解了开篇提到的四个连接池是在哪里创建的。
3. Redisson的4类连接池
MasterConnectionPool、MasterPubSubConnectionPool、SlaveConnectionPool和PubSubConnectionPool,它们的父类都是ConnectionPool,其类继承关系图如下:
通过上图我们了解了ConnectionPool类的继承关系图,再来一张图来了解下ConnectionPool.java类的组成,如下:
好了,再来图就有点啰嗦了,注释ConnectionPool.java代码如下:
abstract class ConnectionPool<T extends
private final
//维持着连接池对应的redis节点信息
//比如1主2从部署MasterConnectionPool里的entries只有一个主节点(192.168.29.24 6379)
//比如1主2从部署MasterPubSubConnectionPool里的entries为空,因为Subscriptinotallow=SubscriptionMode.SLAVE
//比如1主2从部署SlaveConnectionPool里的entries有3个节点(192.168.29.24 6379,192.168.29.24 7000,192.168.29.24 7001,但是注意192.168.29.24 6379冻结属性freezed=true不会参与读操作除非2个从节点全部宕机才参与读操作)
//比如1主2从部署PubSubConnectionPool里的entries有2个节点(192.168.29.24 7000,192.168.29.24 7001),因为Subscriptinotallow=SubscriptionMode.SLAVE,主节点不会加入
protected final List<ClientConnectionsEntry> entries = new
//持有者RedissonClient的组件ConnectionManager
final
//持有者RedissonClient的组件ConnectionManager里的MasterSlaveServersConfig
final
//持有者RedissonClient的组件ConnectionManager里的MasterSlaveEntry
final
//构造函数
public
this.config = config;
this.masterSlaveEntry = masterSlaveEntry;
this.connectionManager = connectionManager;
}
//连接池中需要增加对象时候调用此方法
public RFuture<Void> add(final
final
new
@Override
public void operationComplete(Future<Void> future) throws
entries.add(entry);
}
});
true);
return
}
//初始化连接池中最小连接数
private void initConnections(final ClientConnectionsEntry entry, final RPromise<Void> initPromise, boolean
final int
if (minimumIdleSize == 0
null);
return;
}
final AtomicInteger initializedConnections = new
int startAmount = Math.min(50, minimumIdleSize);
final AtomicInteger requests = new
for (int i = 0; i < startAmount; i++) {
createConnection(checkFreezed, requests, entry, initPromise, minimumIdleSize, initializedConnections);
}
}
//创建连接对象到连接池中
private void createConnection(final boolean checkFreezed, final AtomicInteger requests, final ClientConnectionsEntry entry, final
final int minimumIdleSize, final
if
int
new
"Unable to init enough connections amount! Only " + totalInitializedConnections + " from " + minimumIdleSize + " were initialized. Server: "
+ entry.getClient().getAddr());
initPromise.tryFailure(cause);
return;
}
new
@Override
public void
RPromise<T> promise = connectionManager.newPromise();
createConnection(entry, promise);
new
@Override
public void operationComplete(Future<T> future) throws
if
T conn = future.getNow();
releaseConnection(entry, conn);
}
releaseConnection(entry);
if
int
String errorMsg;
if (totalInitializedConnections == 0) {
"Unable to connect to Redis server: "
else
"Unable to init enough connections amount! Only "
" from " + minimumIdleSize + " were initialized. Redis server: "
}
new
initPromise.tryFailure(cause);
return;
}
int
if (value == 0) {
"{} connections initialized for {}", minimumIdleSize, entry.getClient().getAddr());
if (!initPromise.trySuccess(null)) {
throw new
}
else if (value > 0
if
createConnection(checkFreezed, requests, entry, initPromise, minimumIdleSize, initializedConnections);
}
}
}
});
}
});
}
//连接池中租借出连接对象
public
for (int j = entries.size() - 1; j >= 0; j--) {
final
if
&& tryAcquireConnection(entry)) {
return
}
}
new
new
for
if
freezed.add(entry.getClient().getAddr());
else
failedAttempts.add(entry.getClient().getAddr());
}
}
new StringBuilder(getClass().getSimpleName() + " no available Redis entries. ");
if
" Disconnected hosts: "
}
if
" Hosts disconnected due to `failedAttempts` limit reached: "
}
new
return
}
//连接池中租借出连接对象执行操作RedisCommand
public
if
tryAcquireConnection(entry)) {
return
}
new
"Can't aquire connection to "
return
}
//通过向redis服务端发送PING看是否返回PONG来检测连接
private void ping(RedisConnection c, final
RFuture<String> f = c.async(RedisCommands.PING);
f.addListener(pingListener);
}
//归还连接对象到连接池
public void
if
connection.closeAsync();
else
releaseConnection(entry, connection);
}
releaseConnection(entry);
}
//释放连接池中连接对象
protected void
entry.releaseConnection();
}
//释放连接池中连接对象
protected void
entry.releaseConnection(conn);
}
}
用一张图来解释ConnectionPool干了些啥,如下图:
都到这里了,不介意再送一张图了解各种部署方式下的连接池分布了,如下图:
4.Redisson的读写操作句柄类RedissonObject
操作句柄类RedissonObject,RedissonObject根据不同的数据类型有不同的RedissonObject实现类,RedissonObject的类继承关系图如下:
例如想设置redis服务端的key=key的值value=123,你需要查询Redis命令和Redisson对象匹配列表,找到如下对应关系:
然后我们就知道调用代码这么写:
Config config = new Config();// 创建配置
// 指定使用主从部署方式
"redis://192.168.29.24:6379") // 设置redis主节点
"redis://192.168.29.24:7000") // 设置redis从节点
"redis://192.168.29.24:7001"); // 设置redis从节点
RedissonClient redisson = Redisson.create(config);// 创建客户端(发现这一操作非常耗时,基本在2秒-4秒左右)
//任何Redisson操作首先需要获取对应的操作句柄
//RBucket是操作句柄之一,实现类是RedissonBucket
RBucket<String> rBucket = redissonClient.getBucket("key");
//通过操作句柄rBucket进行读操作
rBucket.get();
//通过操作句柄rBucket进行写操作
rBucket.set("123");
至于其它的redis命令对应的redisson操作对象,都可以官网的Redis命令和Redisson对象匹配列表 查到。
6.Redisson的读写操作源码分析
从一个读操作的代码作为入口分析代码,如下:
//任何Redisson操作首先需要获取对应的操作句柄,RBucket是操作句柄之一,实现类是RedissonBucket
RBucket<String> rBucket = redissonClient.getBucket("key");
//通过操作句柄rBucket进行读操作
rBucket.get();
继续追踪上面RBucket的get方法,如下:
上面我们看到不管是读操作还是写操作都转交 CommandAsyncExecutor进行处理,那么这里我们需要看一下 CommandAsyncExecutor.java里关于读写操作处理的核心代码,注释代码如下:
private
//通过公式CRC16.crc16(key.getBytes()) % MAX_SLOT
//计算出一个字符串key对应的分片在0~16383中哪个分片
int
//之前已经将0~16383每个分片对应到唯一的一个MasterSlaveEntry,这里取出来
MasterSlaveEntry entry = connectionManager.getEntry(slot);
//这里将MasterSlaveEntry包装成NodeSource【slot=null,addr=null,redirect=null,entry=MasterSlaveEntry】
return new
}
@Override
public
RPromise<R> mainPromise = connectionManager.newPromise();
//获取NodeSource【slot=null,addr=null,redirect=null,entry=MasterSlaveEntry】
NodeSource source = getNodeSource(key);
调用异步执行方法async
true, source, codec, command, params, mainPromise, 0);
return
}
protected <V, R> void async(final boolean readOnlyMode, final NodeSource source, final
final RedisCommand<V> command, final Object[] params, final RPromise<R> mainPromise, final int
//操作被取消,那么直接返回
if
free(params);
return;
}
//连接管理器无法连接,释放参数所占资源,然后返回
if
free(params);
new RedissonShutdownException("Redisson is shutdown"));
return;
}
final
if
try
for (int i = 0; i < params.length; i++) {
RedissonReference reference = RedissonObjectFactory.toReference(getConnectionManager().getCfg(), params[i]);
if (reference != null) {
params[i] = reference;
}
}
catch
connectionManager.getShutdownLatch().release();
free(params);
mainPromise.tryFailure(e);
return;
}
}
//开始从connectionManager获取池中的连接
//这里采用异步方式,创建一个RFuture对象,等待池中连接,一旦获得连接,然后进行读和写操作
final
if (readOnlyMode) {//对于读操作默认readOnlyMode=true,这里会执行
connectionFuture = connectionManager.connectionReadOp(source, command);
else {//对于写操作默认readOnlyMode=false,这里会执行
connectionFuture = connectionManager.connectionWriteOp(source, command);
}
//创建RPromise,用于操作失败时候重试
final
details.init(connectionFuture, attemptPromise, readOnlyMode, source, codec, command, params, mainPromise, attempt);
//创建FutureListener,监测外部请求是否已经取消了之前提交的读写操作,如果取消了,那么就让正在执行的读写操作停止
new
@Override
public void operationComplete(Future<R> future) throws
if (future.isCancelled() && connectionFuture.cancel(false)) {
"Connection obtaining canceled for {}", command);
details.getTimeout().cancel();
if (details.getAttemptPromise().cancel(false)) {
free(params);
}
}
}
};
//创建TimerTask,用于操作失败后通过定时器进行操作重试
final TimerTask retryTimerTask = new
@Override
public void run(Timeout t) throws
if
return;
}
if (details.getConnectionFuture().cancel(false)) {
connectionManager.getShutdownLatch().release();
else
if
if (details.getWriteFuture() == null
if
if (details.getWriteFuture().cancel(false)) {
if (details.getException() == null) {
new RedisTimeoutException("Unable to send command: " + command + " with params: " + LogHelper.toString(details.getParams()) + " after " + connectionManager.getConfig().getRetryAttempts() + " retry attempts"));
}
details.getAttemptPromise().tryFailure(details.getException());
}
return;
}
details.incAttempt();
this, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
details.setTimeout(timeout);
return;
}
if
return;
}
}
}
if
if (details.getAttemptPromise().cancel(false)) {
free(details);
AsyncDetails.release(details);
}
return;
}
if
if (details.getException() == null) {
new RedisTimeoutException("Unable to send command: " + command + " with params: " + LogHelper.toString(details.getParams() + " after " + connectionManager.getConfig().getRetryAttempts() + " retry attempts")));
}
details.getAttemptPromise().tryFailure(details.getException());
return;
}
if (!details.getAttemptPromise().cancel(false)) {
return;
}
int count = details.getAttempt() + 1;
if
"attempt {} for command {} and params {}",
count, details.getCommand(), Arrays.toString(details.getParams()));
}
details.removeMainPromiseListener();
async(details.isReadOnlyMode(), details.getSource(), details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), count);
AsyncDetails.release(details);
}
};
//配置对于读写操作的超时时间
Timeout timeout = connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
details.setTimeout(timeout);
details.setupMainPromiseListener(mainPromiseListener);
//给connectionFuture增加监听事件,当从连接池中获取连接成功,成功的事件会被触发,通知这里执行后续读写动作
new
@Override
public void operationComplete(Future<RedisConnection> connFuture) throws
if (connFuture.isCancelled()) {//从池中获取连接被取消,直接返回
return;
}
if (!connFuture.isSuccess()) {//从池中获取连接失败
connectionManager.getShutdownLatch().release();
details.setException(convertException(connectionFuture));
return;
}
if (details.getAttemptPromise().isDone() || details.getMainPromise().isDone()) {//从池中获取连接失败,并且尝试了一定次数仍然失败,默认尝试次数为0
releaseConnection(source, connectionFuture, details.isReadOnlyMode(), details.getAttemptPromise(), details);
return;
}
//从池中获取连接成功,这里取出连接对象RedisConnection
final
//如果需要重定向,这里进行重定向
//重定向的情况有:集群模式对应的slot分布在其他节点,就需要进行重定向
if
new ArrayList<CommandData<?, ?>>(2);
RPromise<Void> promise = connectionManager.newPromise();
new CommandData<Void, Void>(promise, details.getCodec(), RedisCommands.ASKING, new
new
RPromise<Void> main = connectionManager.newPromise();
new
details.setWriteFuture(future);
else
if
"acquired connection for command {} and params {} from slot {} using node {}... {}",
details.getCommand(), Arrays.toString(details.getParams()), details.getSource(), connection.getRedisClient().getAddr(), connection);
}
//发送读写操作到RedisConnection,进行执行
new
details.setWriteFuture(future);
}
//对于写操作增加监听事件回调,对写操作是否成功,失败原因进行日志打印
new
@Override
public void operationComplete(ChannelFuture future) throws
checkWriteFuture(details, connection);
}
});
//返回RedisConnection连接到连接池
releaseConnection(source, connectionFuture, details.isReadOnlyMode(), details.getAttemptPromise(), details);
}
});
new
@Override
public void operationComplete(Future<R> future) throws
checkAttemptFuture(source, details, future);
}
});
}
上面的代码我用一张读写操作处理流程图总结如下:
至此,关于读写操作的源码讲解完毕。在上面的代码注释中,列出如下重点。
6.1 分片SLOT的计算公式
SLOT=CRC16.crc16(key.getBytes()) % MAX_SLOT
6.2 每个ConnectionPool持有的ClientConnectionsEntry对象冻结判断条件
一个节点被判断为冻结,必须同时满足以下条件:
该节点有slave节点,并且从节点个数大于0;
设置的配置ReadMode不为并且SubscriptionMode不为MASTER;
该节点的从节点至少有一个存活着,也即如果有从节点宕机,宕机的从节点的个数小于该节点总的从节点个数
6.3 读写负载图
7.Redisson的读写操作从连接池获取连接对象源码分析和Redisson里RedisClient使用netty源码分析
CommandAsyncExecutor.java里的如下代码获取连接对象:
//开始从connectionManager获取池中的连接
//这里采用异步方式,创建一个RFuture对象,等待池中连接,一旦获得连接,然后进行读和写操作
final
if (readOnlyMode) {//对于读操作默认readOnlyMode=true,这里会执行
connectionFuture = connectionManager.connectionReadOp(source, command);
} else {//对于写操作默认readOnlyMode=false,这里会执行
connectionFuture = connectionManager.connectionWriteOp(source, command);
}
上面读操作调用了 connectionManager.connectionReadOp从连接池获取连接对象,写操作调用了 connectionManager.connectionWriteOp从连接池获取连接对象,我们继续跟进 connectionManager关于connectionReadOp和connectionWriteOp的源代码,注释如下:
/**
* 读操作通过ConnectionManager从连接池获取连接对象
* @param source for NodeSource
* @param command for RedisCommand<?>
* @return RFuture<RedisConnection>
*/
public
//这里之前分析过source=NodeSource【slot=null,addr=null,redirect=null,entry=MasterSlaveEntry】
MasterSlaveEntry entry = source.getEntry();
if (entry == null && source.getSlot() != null) {//这里不会执行source里slot=null
entry = getEntry(source.getSlot());
}
if (source.getAddr() != null) {//这里不会执行source里addr=null
entry = getEntry(source.getAddr());
if (entry == null) {
for
if
entry = e;
break;
}
}
}
if (entry == null) {
new RedisNodeNotFoundException("Node: " + source.getAddr() + " for slot: " + source.getSlot() + " hasn't been discovered yet");
return
}
return
}
if (entry == null) {//这里不会执行source里entry不等于null
new RedisNodeNotFoundException("Node: " + source.getAddr() + " for slot: " + source.getSlot() + " hasn't been discovered yet");
return
}
//MasterSlaveEntry里从连接池获取连接对象
return
}
/**
* 写操作通过ConnectionManager从连接池获取连接对象
* @param source for NodeSource
* @param command for RedisCommand<?>
* @return RFuture<RedisConnection>
*/
public
//这里之前分析过source=NodeSource【slot=null,addr=null,redirect=null,entry=MasterSlaveEntry】
MasterSlaveEntry entry = source.getEntry();
if (entry == null) {
entry = getEntry(source);
}
if (entry == null) {//这里不会执行source里entry不等于null
new RedisNodeNotFoundException("Node: " + source.getAddr() + " for slot: " + source.getSlot() + " hasn't been discovered yet");
return
}
//MasterSlaveEntry里从连接池获取连接对象
return
}
我们看到上面调用 ConnectionManager从连接池获取连接对象,但是 ConnectionManager却将获取连接操作转交 MasterSlaveEntry处理,我们再一次回顾一下 MasterSlaveEntry的组成:
MasterSlaveEntry里持有中我们开篇所提到的 四个连接池,那么这里我们继续关注 MasterSlaveEntry.java的源代码:
/**
* 写操作从MasterConnectionPool连接池里获取连接对象
* @param command for RedisCommand<?>
* @return RFuture<RedisConnection>
*/
public
//我们知道writeConnectionHolder的类型为MasterConnectionPool
//这里就是从MasterConnectionPool里获取连接对象
return
}
/**
* 写操作从LoadBalancerManager里获取连接对象
* @param command for RedisCommand<?>
* @return RFuture<RedisConnection>
*/
public
if
//我们知道默认ReadMode=ReadMode.SLAVE,所以对于读操作这里不会执行
return
}
//我们知道slaveBalancer里持有者SlaveConnectionPool和PubSubConnectionPool
//这里就是从SlaveConnectionPool里获取连接对象
return
}
似乎又绕回来了,最终的获取连接对象都转交到了从连接池 ConnectionPool里获取连接对象,注释 ConnectionPool里的获取连接对象代码如下:
/**
* 读写操作从ConnectionPool.java连接池里获取连接对象
* @param command for RedisCommand<?>
* @return RFuture<T>
*/
public
for (int j = entries.size() - 1; j >= 0; j--) {
final
if
//遍历ConnectionPool里维持的ClientConnectionsEntry列表
//遍历的算法默认为RoundRobinLoadBalancer
//ClientConnectionsEntry里对应的redis节点为非冻结节点,也即freezed=false
return
}
}
//记录失败重试信息
new
new
for
if
freezed.add(entry.getClient().getAddr());
else
failedAttempts.add(entry.getClient().getAddr());
}
}
new StringBuilder(getClass().getSimpleName() + " no available Redis entries. ");
if
" Disconnected hosts: "
}
if
" Hosts disconnected due to `failedAttempts` limit reached: "
}
//获取连接失败抛出异常
new
return
}
/**
* 读写操作从ConnectionPool.java连接池里获取连接对象
* @param command for RedisCommand<?>
* @param entry for ClientConnectionsEntry
* @return RFuture<T>
*/
private RFuture<T> acquireConnection(RedisCommand<?> command, final
//创建一个异步结果获取RPromise
final
//获取连接前首先将ClientConnectionsEntry里的空闲连接信号freeConnectionsCounter值减1
//该操作成功后将调用这里的回调函数AcquireCallback<T>
new
@Override
public void
this);
//freeConnectionsCounter值减1成功,说明获取可以获取到连接
//这里才是真正获取连接的操作
connectTo(entry, result);
}
@Override
public void operationComplete(Future<T> future) throws
this);
}
};
//异步结果获取RPromise绑定到上面的回调函数callback
result.addListener(callback);
//尝试将ClientConnectionsEntry里的空闲连接信号freeConnectionsCounter值减1,如果成功就调用callback从连接池获取连接
acquireConnection(entry, callback);
//返回异步结果获取RPromise
return
}
/**
* 真正从连接池中获取连接
* @param entry for ClientConnectionsEntry
* @param promise for RPromise<T>
*/
private void
if
releaseConnection(entry);
return;
}
//从连接池中取出一个连接
T conn = poll(entry);
if (conn != null) {
if
promiseFailure(entry, promise, conn);
return;
}
connectedSuccessful(entry, promise, conn);
return;
}
//如果仍然获取不到连接,可能连接池中连接对象都被租借了,这里开始创建一个新的连接对象放到连接池中
createConnection(entry, promise);
}
/**
* 从连接池中获取连接
* @param entry for ClientConnectionsEntry
* @return T
*/
protected
return
}
/**
* 调用ClientConnectionsEntry创建一个连接放置到连接池中并返回此连接
* @param entry for ClientConnectionsEntry
* @param promise for RPromise<T>
*/
private void createConnection(final ClientConnectionsEntry entry, final
//调用ClientConnectionsEntry创建一个连接放置到连接池中并返回此连接
RFuture<T> connFuture = connect(entry);
new
@Override
public void operationComplete(Future<T> future) throws
if
promiseFailure(entry, promise, future.cause());
return;
}
T conn = future.getNow();
if
promiseFailure(entry, promise, conn);
return;
}
connectedSuccessful(entry, promise, conn);
}
});
}
ConnectionPool.java里获取读写操作的连接,是遍历ConnectionPool里维持的ClientConnectionsEntry列表,找到一非冻结的ClientConnectionsEntry,然后调用ClientConnectionsEntry里的freeConnectionsCounter尝试将值减1,如果成功,说明连接池中可以获取到连接,那么就从ClientConnectionsEntry里获取一个连接出来,如果拿不到连接,会调用ClientConnectionsEntry创建一个新连接放置到连接池中,并返回此连接,这里回顾一下 ClientConnectionsEntry的组成图:
我们继续跟进 ClientConnectionsEntry.java的源代码,注释如下:
/**
* ClientConnectionsEntry里从freeConnections里获取一个连接并返回给读写操作使用
*/
public
return
}
/**
* ClientConnectionsEntry里新创建一个连接对象返回给读写操作使用
*/
public
//调用RedisClient利用netty连接redis服务端,将返回的netty的outboundchannel包装成RedisConnection并返回
RFuture<RedisConnection> future = client.connectAsync();
new
@Override
public void operationComplete(Future<RedisConnection> future) throws
if
return;
}
RedisConnection conn = future.getNow();
onConnect(conn);
"new connection created: {}", conn);
}
});
return
}
上面的代码说明如果 ClientConnectionsEntry 里的 freeConnections 有空闲连接,那么直接返回该连接,如果没有那么调用 RedisClient.connectAsync创建一个新的连接 ,这里我继续注释一下 RedisClient.java 的源代码如下:
package
import
import
import
import
import
import
import
import
import
import
import
import
import
import
import
import
import
import
import
import
import
import
import
import
import
import
import
import
/**
* 使用java里的网络编程框架Netty连接redis服务端
* 作者: Nikita Koksharov
*/
public class
private final Bootstrap bootstrap;//Netty的工具类Bootstrap,用于连接建立等作用
private final Bootstrap pubSubBootstrap;//Netty的工具类Bootstrap,用于连接建立等作用
private final InetSocketAddress addr;//socket连接的地址
//channels是netty提供的一个全局对象,里面记录着当前socket连接上的所有处于可用状态的连接channel
//channels会自动监测里面的channel,当channel断开时,会主动踢出该channel,永远保留当前可用的channel列表
private final ChannelGroup channels = new
private ExecutorService executor;//REACOTR模型的java异步执行线程池
private final long commandTimeout;//超时时间
private Timer timer;//定时器
private boolean
private RedisClientConfig config;//redis连接配置信息
//构造方法
public static
if (config.getTimer() == null) {
new
}
return new
}
//构造方法
private
this.config = config;
this.executor = config.getExecutor();
this.timer = config.getTimer();
new
bootstrap = createBootstrap(config, Type.PLAIN);
pubSubBootstrap = createBootstrap(config, Type.PUBSUB);
this.commandTimeout = config.getCommandTimeout();
}
//java的网路编程框架Netty工具类Bootstrap初始化
private
new
.channel(config.getSocketChannelClass())
.group(config.getGroup())
.remoteAddress(addr);
//注册netty相关socket数据处理RedisChannelInitializer
new RedisChannelInitializer(bootstrap, config, this, channels, type));
//设置超时时间
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout());
return
}
//构造方法
@Deprecated
public
this(URIBuilder.create(address));
}
//构造方法
@Deprecated
public
this(new HashedWheelTimer(), Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2), new
true;
}
//构造方法
@Deprecated
public
this(timer, executor, group, address.getHost(), address.getPort());
}
//构造方法
@Deprecated
public RedisClient(String host, int
this(new HashedWheelTimer(), Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2), new NioEventLoopGroup(), NioSocketChannel.class, host, port, 10000, 10000);
true;
}
//构造方法
@Deprecated
public RedisClient(Timer timer, ExecutorService executor, EventLoopGroup group, String host, int
this(timer, executor, group, NioSocketChannel.class, host, port, 10000, 10000);
}
//构造方法
@Deprecated
public RedisClient(String host, int port, int connectTimeout, int
this(new HashedWheelTimer(), Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2), new NioEventLoopGroup(), NioSocketChannel.class, host, port, connectTimeout, commandTimeout);
}
//构造方法
@Deprecated
public RedisClient(final Timer timer, ExecutorService executor, EventLoopGroup group, Class<? extends SocketChannel> socketChannelClass, String host, int
int connectTimeout, int
new
config.setTimer(timer).setExecutor(executor).setGroup(group).setSocketChannelClass(socketChannelClass)
.setAddress(host, port).setConnectTimeout(connectTimeout).setCommandTimeout(commandTimeout);
this.config = config;
this.executor = config.getExecutor();
this.timer = config.getTimer();
new
//java的网路编程框架Netty工具类Bootstrap初始化
bootstrap = createBootstrap(config, Type.PLAIN);
pubSubBootstrap = createBootstrap(config, Type.PUBSUB);
this.commandTimeout = config.getCommandTimeout();
}
//获取连接的IP地址
public
return addr.getAddress().getHostAddress() + ":"
}
//获取socket连接的地址
public
return
}
//获取超时时间
public long
return
}
//获取netty的线程池
public
return
}
//获取redis连接配置
public
return
}
//获取连接RedisConnection
public
try
return
catch
throw new RedisConnectionException("Unable to connect to: "
}
}
//启动netty去连接redis服务端,设置java的Future尝试将netty连接上的OutBoundChannel包装成RedisConnection并返回RedisConnection
public
final RPromise<RedisConnection> f = new
//netty连接redis服务端
ChannelFuture channelFuture = bootstrap.connect();
new
@Override
public void operationComplete(final ChannelFuture future) throws
if
//将netty连接上的OutBoundChannel包装成RedisConnection并返回RedisConnection
final
new
@Override
public void operationComplete(final Future<RedisConnection> future) throws
new
@Override
public void
if
if
c.closeAsync();
}
else
f.tryFailure(future.cause());
c.closeAsync();
}
}
});
}
});
else
new
public void
f.tryFailure(future.cause());
}
});
}
}
});
return
}
//获取订阅相关连接RedisPubSubConnection
public
try
return
catch
throw new RedisConnectionException("Unable to connect to: "
}
}
//启动netty去连接redis服务端,设置java的Future尝试将netty连接上的OutBoundChannel包装成RedisPubSubConnection并返回RedisPubSubConnection
public
final RPromise<RedisPubSubConnection> f = new
//netty连接redis服务端
ChannelFuture channelFuture = pubSubBootstrap.connect();
new
@Override
public void operationComplete(final ChannelFuture future) throws
if
//将netty连接上的OutBoundChannel包装成RedisPubSubConnection并返回RedisPubSubConnection
final
new
@Override
public void operationComplete(final Future<RedisPubSubConnection> future) throws
new
@Override
public void
if
if
c.closeAsync();
}
else
f.tryFailure(future.cause());
c.closeAsync();
}
}
});
}
});
else
new
public void
f.tryFailure(future.cause());
}
});
}
}
});
return
}
//关闭netty网络连接
public void
shutdownAsync().syncUninterruptibly();
if
timer.stop();
executor.shutdown();
try
15, TimeUnit.SECONDS);
catch
Thread.currentThread().interrupt();
}
bootstrap.config().group().shutdownGracefully();
}
}
//异步关闭netty网络连接
public
for
RedisConnection connection = RedisConnection.getFrom(channel);
if (connection != null) {
true);
}
}
return
}
@Override
public
return "[addr=" + addr + "]";
}
}
上面就是Redisson利用java网络编程框架netty连接redis的全过程 ,如果你对 netty 比较熟悉,阅读上面的代码应该不是问题。