待到秋风四起时,孤舟远赴倒悬山。
官方文档:https://github.com/redisson/redisson
使用示例
更多请参考官方文档
引入依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.4</version>
</dependency>
后面可以使用redisson-spring-boot-starter
编写配置类(单节点模式)
@Configuration
public class MyRedisConfig {
@Value("${ipAddr}")
private String ipAddr;
// redission通过redissonClient对象使用 // 如果是多个redis集群,可以配置
@Bean(destroyMethod = "shutdown")
public RedissonClient redisson() {
Config config = new Config();
// 创建单例模式的配置
config.useSingleServer().setAddress("redis://" + ipAddr + ":6379");
return Redisson.create(config);
}
}
可重入锁(Reentrant Lock)
A调用B。AB都需要同一锁,此时可重入锁就可以重入,A就可以调用B。不可重入锁时,A调用B将死锁。
// 参数为锁名字
RLock lock = redissonClient.getLock("lock");
// 该锁实现了JUC.locks.lock接口
lock.lock();// 阻塞等待方式,而不是自旋等待
// 解锁放到finally // 如果这里宕机:有看门狗,不用担心
lock.unlock();
大家都知道,如果负责储存这个分布式锁的Redisson
节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson
内部提供了一个监控锁的看门狗,它的作用是在Redisson
实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。
另外Redisson
还通过加锁的方法提供了leaseTime
的参数来指定加锁的时间。超过这个时间后锁便自动解开了。
// 加锁以后10秒钟自动解锁
// 无需调用unlock方法手动解锁
lock.lock(10, TimeUnit.SECONDS);
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
try {
...
} finally {
lock.unlock();
}
}
Redisson
同时还为分布式锁提供了异步执行的相关方法:
RLock lock = redisson.getLock("anyLock");
lock.lockAsync();
lock.lockAsync(10, TimeUnit.SECONDS);
Future<Boolean> res = lock.tryLockAsync(100, 10, TimeUnit.SECONDS);
RLock
对象完全符合Java的Lock规范。也就是说只有拥有锁的进程才能解锁,其他进程解锁则会抛出IllegalMonitorStateException
错误。但是如果遇到需要其他进程也能解锁的情况,请使用分布式信号量Semaphore
对象.
public Map<String, List<Catalog2Vo>> getCatalogJsonDbWithRedisson() {
Map<String, List<Catalog2Vo>> categoryMap=null;
RLock lock = redissonClient.getLock("CatalogJson-Lock");
lock.lock();
try {
Thread.sleep(30000);
categoryMap = getCategoryMap();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
return categoryMap;
}
}
@ResponseBody
@GetMapping("/hello")
public String hello() {
//获取一把锁,只要锁的名字相同,就是同一把锁
RLock lock = redisson.getLock("my-lock");
//加锁
lock.lock();//阻塞式等待,默认加的锁都是30s的时间
//1)锁的自动续期,如果业务超长,运行期间自动给锁续上新的30s。不用担心业务时间长,锁自动过期被删除。
//2)加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认在30s之后自动删除。
// lock.lock(10, TimeUnit.SECONDS);//10s自动解锁,自动解锁时间一定要大于业务执行时间。
//问题:lock.lock(10, TimeUnit.SECONDS);在锁时间到了以后,不会自动续期。
//1. 如果我们传递了超时时间,就发送给redis执行脚本,进行占锁,默认超时就是我们指定的时间
//2. 如果我们未指定超时时间,就使用30 * 1000【lockWatchdogTimeout看门狗的默认时间】
// 只要占锁成功,就会启动一个定时任务【重新给锁设置过期时间,新的过期时间就是看门狗的默认时间】
// internalLockLeaseTime【看门狗的的时间】 / 3 ,也就是10s。每隔10s都会自动再次续期,续成30s
//最佳实战:lock.lock(30, TimeUnit.SECONDS);省掉了整个续期操作,手动操作。将解锁时间设大一些 为30s
try {
System.out.println("加锁成功,指定业务代码...." + Thread.currentThread().getId());
Thread.sleep(30000);
} catch (Exception e) {
} finally {
//解锁 假设解锁代码没有运行,redisson会不会出现死锁。 结果是不会。
System.out.println("释放锁..." + Thread.currentThread().getId() );
lock.unlock();
}
return "hello";
}
读写锁(ReadWrite Lock)
基于Redis
的Redisson
分布式可重入读写锁RReadWriteLock
Java对象实现了java.util.concurrent.locks.ReadWriteLock
接口。其中读锁和写锁都继承了RLock接口。
分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。
RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");
// 最常见的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();
// 10秒钟以后自动解锁
// 无需调用unlock方法手动解锁
rwlock.readLock().lock(10, TimeUnit.SECONDS);
// 或
rwlock.writeLock().lock(10, TimeUnit.SECONDS);
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);
// 或
boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();
@ResponseBody
@GetMapping("/write")
public String writeValue() {
RReadWriteLock lock = redisson.getReadWriteLock("wr-lock");
String s = "";
RLock rLock = lock.writeLock();//改数据加写锁
try {
rLock.lock();
System.out.println("写锁加锁成功..." + Thread.currentThread().getId());
s = UUID.randomUUID().toString();
Thread.sleep(30000);
redisTemplate.opsForValue().set("writeValue",s);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
rLock.unlock();
System.out.println("写锁释放成功..." + Thread.currentThread().getId());
}
return s;
}
@ResponseBody
@GetMapping("/read")
public String readValue() {
RReadWriteLock lock = redisson.getReadWriteLock("wr-lock");
String s = "";
RLock rLock = lock.readLock();//读数据加读锁
try {
rLock.lock();
System.out.println("读锁加锁成功..." + Thread.currentThread().getId());
Thread.sleep(30000);
s = redisTemplate.opsForValue().get("writeValue");
} catch (Exception e) {
e.printStackTrace();
} finally {
rLock.unlock();
System.out.println("读锁释放成功..." + Thread.currentThread().getId());
}
return s;
}
-
保证一定能读到最新数据,修改期间,写锁是一个排他锁(互斥锁 / 独享锁),读锁是一个共享锁
-
写锁没有释放 读锁就必须等待
-
读 + 读:相当于无锁,并发读只会在redis中记录好所有当前的读锁,他们都会同时加锁成功
-
写 + 读:等待写锁释放
-
写 + 写:阻塞方式
-
读 + 写:有读锁,写锁也需要等待
-
总结:只要有写锁的存在,都必须要等待。
信号量(Semaphore)
信号量为存储在redis
中的一个数字,当这个数字大于0时,即可以调用acquire()
方法增加数量,也可以调用release()
方法减少数量,但是当调用release()
之后小于0的话方法就会阻塞,直到数字大于0。
RSemaphore semaphore = redisson.getSemaphore("semaphore");
semaphore.acquire();
//或
semaphore.acquireAsync();
semaphore.acquire(23);
semaphore.tryAcquire();
//或
semaphore.tryAcquireAsync();
semaphore.tryAcquire(23, TimeUnit.SECONDS);
//或
semaphore.tryAcquireAsync(23, TimeUnit.SECONDS);
semaphore.release(10);
semaphore.release();
//或
semaphore.releaseAsync();
@GetMapping("/park")
@ResponseBody
public String park() {
RSemaphore park = redissonClient.getSemaphore("park");
try {
park.acquire(2);
//信号量 可以用于分布式限流。
// boolean b = park.tryAcquire();
// if (b){
// //执行业务
// } else {
// return "error";
// }
} catch (InterruptedException e) {
e.printStackTrace();
}
return "停进2";
}
@GetMapping("/go")
@ResponseBody
public String go() {
RSemaphore park = redissonClient.getSemaphore("park");
park.release(2);
return "开走2";
}
闭锁(CountDownLatch)
基于Redisson的Redisson分布式闭锁(CountDownLatch)Java对象RCountDownLatch
采用了与java.util.concurrent.CountDownLatch
相似的接口和用法。
以下代码只有offLatch()
被调用5次后 setLatch()
才能继续执行。
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();
// 在其他线程或其他JVM里
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.countDown();
/**
* 举例:放假锁门
* 5个班全部走完,我们就可以锁门
*/
@ResponseBody
@GetMapping("/lockDoor")
public String lockDoor() throws InterruptedException {
RCountDownLatch latch = redisson.getCountDownLatch("door");
latch.trySetCount(5);
latch.await();
return "放假了...";
}
@ResponseBody
@GetMapping("/gogogo/{id}")
public String gogogo(@PathVariable("id") Long id) {
RCountDownLatch latch = redisson.getCountDownLatch("door");
latch.countDown();//计数减一
return id + "班的人都走了...";
}
使用redisson
改造代码
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedissonLock() {
// 注意锁的名字,锁的粒度,越细越快
// 锁的粒度:具体缓存的是某个数据,11-号商品:product-11-lock
RLock lock = redisson.getLock("catalogJson-lock");
lock.lock();
Map<String, List<Catalog2Vo>> dataFromDb;
try {
dataFromDb = getDataFromDb();
} finally {
lock.unlock();
}
return dataFromDb;
}
缓存和数据库一致性
缓存里面的数据如何和数据库中数据保持一致
-
双写模式:写数据库后,更新缓存。
-
出现问题:由于卡顿等原因,导致写缓存2在最前,写缓存1在后面,就出现数据不一致。【脏数据问题】
-
解决方案:① 整个操作加锁。将写数据库和写缓存这段逻辑加锁,同时只有一个线程可以操作。
-
解决方案:②看业务允不允许数据暂时不一致问题,如果允许,可以不管这个缓存不一致的事情,将数据放入缓存的时候,设置缓存过期时间,只要数据过期了,就会重新从数据库中加载数据。
-
-
失效模式:写数据库后,删除缓存。
-
一个线程先写数据库db-1,然后删除缓存;另一个线程接着写数据库db-2,还没来得写,第三个线程就读取了数据库db-1,并更新了缓存,因此数据库中存放的是db-2,而缓存中存放的是db-1。【脏数据问题】
-
解决方案:
-
无论是双写模式还是失效模式,都会存在缓存不一致的问题。即多个实例同时更新会出事,怎么办?
-
如果是用户维度数据(订单数据,用户数据),这种并发几率非常小,不用考虑这个问题,缓存数据加上过期时间,每隔一段时间触发读的主动更新即可。
-
如果是菜单,商品介绍等基础数据,也可以去使用canal订阅binlog方式。
-
缓存数据 + 过期时间 也足够解决大部分业务对缓存的要求。
-
通过加锁保证并发读写,写写的时候按顺序排好队,读读无所谓。所以适合使用读写锁。(业务不关系脏数据,允许临时脏数据可忽略)
总结:
-
我们能放入缓存的数据本就不应该是实时性、一致性要求超高的。缓存的所有数据都有过期时间,数据过期下一次查询出发主动更新。
-
读写数据的时候,加上分布式读写锁。
-
遇到实时性、一致性要求高的数据,就应该查数据库,即使慢点。
标签:10,加锁,lock,解锁,缓存,Redisson,redisson,分布式 From: https://www.cnblogs.com/l12138h/p/16779231.html