Redission实现分布式锁:
Redission是什么?
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid),它充分利用了Redis键值数据库提供的一系列优势,基于Java实用工具包中常用接口,为使用者提供了一系列具有分布式特性的常用工具类,让使用Redis更加简单、便捷,从而让使用者能够将更多精力集中到业务逻辑处理上。
Redission能解决什么问题?
解决分布式环境下锁的竞争问题,导致的比如说超卖等
基本环境搭建:
实现的需求就是,每访问一次controller:localhost:8080/deductStock,Redis存的数据就会-1
1、创建一个SpringBoot项目,添加pom依赖
<dependencies>
<!--Redis依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!--springboot2.x以后用得是lettuce:lettuce默认连接池使用 common-pool2 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<!--Redission实现分布式锁-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
<!--SpringBoot依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--Lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
2、application.properties
# 应用名称
spring.application.name=redission-demo
# 应用服务 WEB 访问端口
server.port=8080
##########################Redis配置###################################
# 连接的那个数据库(默认为0)
spring.redis.database=1
# redis服务的ip地址(默认是本机-127.0.0.1)
spring.redis.host=127.0.0.1
# redis端口号(默认)
spring.redis.port=6379
# redis的密码,没设置过密码,可为空
spring.redis.password=
# 连接超时时间
spring.redis.timeout=10s
# 连接池中的最小空闲连接
spring.redis.lettuce.pool.min-idle=0
# 连接池中的最大空闲连接
spring.redis.lettuce.pool.max-idle=8
# 连接池中的最大连接数
spring.redis.lettuce.pool.max-active=8
# 连接池中的最大等待时间(-1表示没有限制)
spring.redis.lettuce.pool.max-wait=-1ms
3、Redis配置文件
package com.zhixi.config.redis;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import java.io.File;
/**
* @ClassName RedisConfig
* @Author zhangzhixi
* @Description Redis配置类
* @Date 2022-4-29 10:23
* @Version 1.0
*/
@Configuration
public class RedisConfig {
/**
* @param redisConnectionFactory:配置不同的客户端,这里注入的redis连接工厂不同: JedisConnectionFactory、LettuceConnectionFactory
* @功能描述 :配置Redis序列化,原因如下:
* (1) StringRedisTemplate的序列化方式为字符串序列化,
* RedisTemplate的序列化方式默为jdk序列化(实现Serializable接口)
* (2) RedisTemplate的jdk序列化方式在Redis的客户端中为乱码,不方便查看,
* 因此一般修改RedisTemplate的序列化为方式为JSON方式【建议使用GenericJackson2JsonRedisSerializer】
*/
@Bean(name = "redisTemplate")
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
GenericJackson2JsonRedisSerializer genericJackson2JsonRedisSerializer = serializer();
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
// key采用String的序列化方式
redisTemplate.setKeySerializer(StringRedisSerializer.UTF_8);
// value序列化方式采用jackson
redisTemplate.setValueSerializer(genericJackson2JsonRedisSerializer);
// hash的key也采用String的序列化方式
redisTemplate.setHashKeySerializer(StringRedisSerializer.UTF_8);
//hash的value序列化方式采用jackson
redisTemplate.setHashValueSerializer(genericJackson2JsonRedisSerializer);
redisTemplate.setConnectionFactory(redisConnectionFactory);
return redisTemplate;
}
/**
* 此方法不能用@Ben注解,避免替换Spring容器中的同类型对象
*/
public GenericJackson2JsonRedisSerializer serializer() {
return new GenericJackson2JsonRedisSerializer();
}
@Value("${spring.redis.host}")
private String redisHost;
@Value("${spring.redis.port}")
private String redisPort;
@Value("${spring.redis.database}")
private Integer redisDatabase;
/**
* 注册Redission对象
*
* @return Redission客户端对象
*/
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer().setAddress("redis://" + redisHost + ":" + redisPort).setDatabase(redisDatabase);
return Redisson.create(config);
}
}
4、Controller
package com.zhixi.controller;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RedissonClient;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @ClassName RedissionController
* @Author zhangzhixi
* @Description
* @Date 2022-12-12 23:09
* @Version 1.0
*/
@Slf4j
@RestController
public class RedissionController {
/**
* 商品在Redis中的key名称
*/
private static final String KEY_STOCK = "stock";
@Resource
RedisTemplate<String, String> redisTemplate;
@Resource
RedissonClient redissonClient;
@RequestMapping("/deductStock")
public String deductStock() {
// 从redis中获取商品库存
int stock = Integer.parseInt(Objects.requireNonNull(redisTemplate.opsForValue().get(KEY_STOCK)));
AtomicInteger atomicInteger = new AtomicInteger(stock);
if (atomicInteger.get() > 0) {
// 库存-1
int remainingStock = atomicInteger.decrementAndGet();
// 更新库存
redisTemplate.opsForValue().set(KEY_STOCK, String.valueOf(remainingStock));
log.info("扣减成功,剩余库存:" + remainingStock);
} else {
log.info("扣减失败,库存不足");
}
return "end";
}
}
5、向Redis中设置值
@SpringBootTest
class RedissionDemoApplicationTests {
@Resource
RedisTemplate<String, String > redisTemplate;
@Test
void contextLoads() {
redisTemplate.opsForValue().set("stock","100");
}
}
出现的问题以及解决方案
商品超卖(并发问题)
问题出现:
比如有三个线程同时来访问这个请求,比如三个线程同时获得商品库存(100),然后-1,正常来说剩余库存是97,但是在并发环境下,可能会出现超卖问题。
下面准备三个线程,同时访问deductStock这个方法,看下是否会出现超卖问题?
@GetMapping("/test")
public void testThread() {
for (int i = 0; i < 3; i++) {
new Thread(()->{
deductStock();
}).start();
}
}
解决办法:
对代码进行加锁
@RequestMapping("/deductStock")
public String deductStock() {
// 在单机情况下,有且仅有一个线程能够访问
synchronized (this) {
// 从redis中获取商品库存
int stock = Integer.parseInt(Objects.requireNonNull(redisTemplate.opsForValue().get(KEY_STOCK)));
AtomicInteger atomicInteger = new AtomicInteger(stock);
if (atomicInteger.get() > 0) {
// 库存-1
int remainingStock = atomicInteger.decrementAndGet();
// 更新库存
redisTemplate.opsForValue().set(KEY_STOCK, String.valueOf(remainingStock));
log.info("扣减成功,剩余库存:" + remainingStock);
} else {
log.info("扣减失败,库存不足");
}
return "end";
}
}
通过下图可以看到,解决了问题
模拟高并发下商品超卖问题
在上个例子中,我们通过了Synchronized锁,成功解决了多个线程争抢导致的超卖问题,但是有个问题,假设后期公司为了保证服务可用性。
将单击的应用,升级称为了集群的模式,那么是否会有超卖问题呢?
模拟两台应用服务器:使用Nginx做负载均衡
1、修改application.properties配置文件,将端口分别修改为8080、8090,并启动应用。
2、下载Nginx:http://nginx.org/download/nginx-1.18.0.zip
3、编辑Nginx配置文件:config/nginx.config
upstream redislock{
server localhost:8080 weight=1;
server localhost:8090 weight=1;
}
server {
listen 80;
server_name localhost;
location / {
root html;
index index.html index.htm;
proxy_pass http://redislock;
}
4、启动nginx
双击nginx.exe文件即可
5、访问应用:http://localhost/deductStock
就可以看到了IDEA控制台打印了日志。
模拟高并发:使用Jmeter
1、添加线程组
2、添加HTTP请求
3、添加聚合报告
4、执行压测
IDEA控制台输出内容:
可以发现,通过Synchronized没有锁住,还是出现了超卖的问题,Synchronized只在JVM进程内部有效,也就是一个Tomcat有效。
如果在一个分布式环境下,我们要控制一段资源的争抢问题,应该怎么做呢?
使用Redis命令解决并发环境下超卖问题
在上个例子中,模拟了在高并发环境下的商品超卖问题,那么应该如何通过Redis来解决这个问题呢?
setnx命令:
将key的值设置为value,当且仅当key不存在的情况下。
若给定的key已经存在,则setnx不做任何动作。
Redis简单命令式分布式锁
@RequestMapping("/deductStock")
public String deductStock() {
// 分布式锁名称,随便什么都可以,重要的是两个应用要共享这一个Redis的key
String lockKey = "lockDeductStock";
// redis实现基础版的分布式锁
Boolean lockResult = redisTemplate.opsForValue().setIfAbsent(lockKey, lockKey);
// 加锁不成功,返回给前端错误码,前端给用户友好提示
if (Boolean.FALSE.equals(lockResult)) {
log.info("系统繁忙,请稍后再试!");
return "系统繁忙,请稍后再试!";
}
// 从redis中获取商品库存
int stock = Integer.parseInt(Objects.requireNonNull(redisTemplate.opsForValue().get(KEY_STOCK)));
AtomicInteger atomicInteger = new AtomicInteger(stock);
if (atomicInteger.get() > 0) {
// 库存-1
int remainingStock = atomicInteger.decrementAndGet();
// 更新库存
redisTemplate.opsForValue().set(KEY_STOCK, String.valueOf(remainingStock));
log.info("扣减成功,剩余库存:" + remainingStock);
} else {
log.info("扣减失败,库存不足");
}
// 业务执行完成,删除这个锁
redisTemplate.delete(lockKey);
return "end";
}
修改Jmeter参数
最开始设置的Jmeter参数Ramp-up时间是0,为了明显看到效果,这里修改成2,表示2s发送完毕这些800个请求
测试并发访问
使用Redis做分布式锁时候出现的问题?
锁永久失效问题
假如我们在setnx分布式锁的时候,正常执行,但是如果代码执行到业务逻辑地方的时候。
这时候8080程序挂掉了,那么这时候这个key就无法删除,另外一个8090服务器发现这个分布式锁的key还在,就在那边排队等着
给用户提示“系统繁忙,请稍后再试!”。显然这是不合理的。
如果我们在程序挂了,但是这个key是一直存在Redis中的,其他的用户访问这个请求也进不到后面的业务处理逻辑代码中去,应该怎么办呢?那么这时候就需要使用到了redis原生API:
即使程序挂掉了,没有删除key,10S钟后,这个key也会自动被删除。
// redis实现基础版的分布式锁
Boolean lockResult = redisTemplate.opsForValue().setIfAbsent(lockKey, lockKey,30, TimeUnit.SECONDS);
但是这一个代码,虽然比上个分布式超卖的问题优化了一点点,但是还是有问题的,比如说,
- 问题1:如果程序的执行时间,大于了锁的释放时间。这就会导致程序在最后删除的key为空,显然这个失效时间确实不容易写的比较“合理”。
- 问题2:8090给8080程序的锁释放了,如果9090这个程序进来了,拿到了锁,但是8090的锁无法释放,导致了锁一直失效,这也是导致了超卖问题。
如何解决线程之间锁释放错了?
上个问题,问题2锁释放错了应该如何解决呢?通过UUID解决,每次释放锁的时候,判断是不是当前请求的UUID,如果是则可以正常释放锁。如果不是,则释放锁失败!
@RequestMapping("/deductStock")
public String deductStock() {
// 分布式锁名称,随便什么都可以,重要的是两个应用要共享这一个Redis的key
String lockKey = "lockDeductStock";
// 分布式锁的值
String lockValue = UUID.randomUUID().toString().replaceAll("-", "");
try {
// redis实现基础版的分布式锁,设置了key的过期时间
Boolean lockResult = redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, 30, TimeUnit.SECONDS);
// 加锁不成功,返回给前端错误码,前端给用户友好提示
if (Boolean.FALSE.equals(lockResult)) {
log.info("系统繁忙,请稍后再试!");
return "系统繁忙,请稍后再试!";
}
// 从redis中获取商品库存
int stock = Integer.parseInt(Objects.requireNonNull(redisTemplate.opsForValue().get(KEY_STOCK)));
AtomicInteger atomicInteger = new AtomicInteger(stock);
if (atomicInteger.get() > 0) {
// 库存-1
int remainingStock = atomicInteger.decrementAndGet();
// 更新库存
redisTemplate.opsForValue().set(KEY_STOCK, String.valueOf(remainingStock));
log.info("扣减成功,剩余库存:" + remainingStock);
} else {
log.info("扣减失败,库存不足");
}
} finally {
// 判断是不是当前请求的UUID,如果是则可以正常释放锁。如果不是,则释放锁失败!
if (lockValue.equals(redisTemplate.opsForValue().get(lockKey))) {
redisTemplate.delete(lockKey);
}
}
return "end";
}
-------------------------------------------------使用Redis实现分布式锁有两个问题:----------------------------------------------------------------------------------------------
分别是:
- 1、锁超时问题
- 2、锁被误释放问题
在上个代码中我们通过UUID解决了锁被误释放的问题,还有一个锁超时的问题应该如何解决呢?
写一个定时任务,分线程每隔十秒去查看一次主线程是否持有这把锁,如果这个锁存在,重新将这个锁的超时时间设置为30S,对锁续命~
当然自己写上面的代码肯定无比麻烦的,当前市面上有很多开源框架已经帮助我们封装好了这些逻辑!
使用redisson解决超卖问题
Redission来帮我们实现锁续命的分布式锁。
@RequestMapping("/deductStock")
public String deductStock() {
// 分布式锁名称,随便什么都可以,重要的是两个应用要共享这一个Redis的key
String lockKey = "lockDeductStock";
// 获取锁对象
RLock redissonLock = redissonClient.getLock(lockKey);
try {
// 加锁
// 类似于Redis的一条setnx命令:Boolean lockResult = redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, 30, TimeUnit.SECONDS);
redissonLock.lock();
// 从redis中获取商品库存
int stock = Integer.parseInt(Objects.requireNonNull(redisTemplate.opsForValue().get(KEY_STOCK)));
AtomicInteger atomicInteger = new AtomicInteger(stock);
if (atomicInteger.get() > 0) {
// 库存-1
int remainingStock = atomicInteger.decrementAndGet();
// 更新库存
redisTemplate.opsForValue().set(KEY_STOCK, String.valueOf(remainingStock));
log.info("扣减成功,剩余库存:" + remainingStock);
} else {
log.info("扣减失败,库存不足");
}
} finally {
// 释放锁
redissonLock.unlock();
}
return "end";
}
Redisson分布式锁实现原理:
Redisson分布式锁底层
Lua脚本
if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);