基于秒杀案例的锁的思考与实践
在跟黑马点评项目时,发现其中的对于“秒杀”过程的讨论非常有意思,
该项目讨论了“超买/超卖”、“一人一单”、“集群模式下的一人一单”问题
分别引出了乐观锁、悲观锁,分布式锁的概念,值得一学!
超买/超卖问题-乐观锁
秒杀的基本流程
乐观锁的常见实现手段:
版本号法
行数据的一次修改操作,使版本号自增1,利用版本号判断当前读的数据有没有被修改过s
在执行更新操作之前,匹配表中的版本号字段与之前查询出来的字段,一致则更新,不一致则自旋
核心是利用SQL语句的版本号字段
set stock = stock - 1, version = version + 1 where id = 10 and version = 1;
CAS法
只需要判断当前的库存是否能够满足当前的售卖需求,例如此处指一次请求售卖一份,那么只需要判断当前的库存是否>0即可
核心SQL:
set stock = stock - 1 where id = 10 and stock > 0;
下图中所有的SQL都是判断是否与当前库存一致,这种做法无疑需要更多的自旋次数
单机一人一单问题-悲观锁
一人一单的基本流程图
注意,这里的操作有两种思路:
基于SQL的Unique_Key实现
流程图中可以产看,我们只需要保证订单表中,(用户id,优惠券id)是唯一的,就能够保证一人一单的业务逻辑,
因此最简单的方法就是修改/维护数据库表,添加 Unique_key
这样就通过MySQL实现了一人一单,非常简单,如果是后端集群模式下, 这种设置应该也能替代分布式悲观锁
UNIQUE KEY `one_user_one_order` (`user_id`,`voucher_id`)
测试可以得到,这种基于Unique_key的方法是可以应对集群操作的,下面的案例是通过NGINX的轮询负载均衡的后的debug模式实现的“同时”请求
### The error occurred while setting parameters
### SQL: INSERT INTO tb_voucher_order ( id, user_id, voucher_id, create_time, update_time ) VALUES ( ?, ?, ?, ?, ? )
### Cause: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry '1010-3' for key 'one_user_one_order'
; Duplicate entry '1010-3' for key 'one_user_one_order'; nested exception is java.sql.SQLIntegrityConstraintViolationException: Duplicate entry '1010-3' for key 'one_user_one_order'
不要在意后面的 data字段的时间顺序问题,测试通过就好了
请求1 --打到--> 后端服务1
{
"success": true,
"data": 133060663810457608
}
请求2 --打到--> 后端服务2(代码没有做Unique_key验证,因此报错了)
{
"success": false,
"errorMsg": "服务器异常"
}
而当去掉这个联合Unique_key时,一人一单就失效了(同时进入了锁,同时查完了一人一单,同时完成了购买)
请求1 --打到--> 后端服务1
{
"success": true,
"data": 133059980910657542
}
请求2 --打到--> 后端服务2
{
"success": true,
"data": 133059989500592135
}
基于悲观锁的实现
如果表没有添加联合的Unique_key,那么只能通过加悲观锁,让所有的请求排队,进而保证线程安全和一人一单
(1)选择什么当做锁
显然,经过上述的分析,锁应当是用户的id对象
多插一句,通常,SpringBoot项目中,用户的信息是存储在 ThreadLocal 中的
用户的每一次请求,后端都会从线程池中调一个线程,响应用户的请求,也就是每一个请求可以认为都是一个新线程
ThreadLocal 提供线程的局部变量,为每一个使用该变量的线程都提供一个变量值的副本
用户的id是恒不变的,而id多是数值类型,如Long,long等,因此将其转为String类型
userId.toString()
但是,相同的字符串值并不意味着相同的对象,synchronized的加锁对象必须一致,因此使用JVM的字符串常量池
userId.toString().intern()
.itern()
方法保证了,如果字符串常量池中存在相同值的字符串,那么直接给出其引用,而不是创建新的字符串对象
引用字符串常量池,保证了锁可行性
synchronized (userId.toString().intern()) {
return createVocherOrder(voucherId, userId);
}
createVocherOrder(Long voucherId, Long userId)
@Transactional
public Result createVocherOrder(Long voucherId, Long userId) {
// 同一用户的的 session 也可以作为锁
// 每次进来的 userId 都是一个新的对象, 同一用户的值是相同的
LambdaQueryWrapper<VoucherOrder> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper
.eq(VoucherOrder::getVoucherId, voucherId)
.eq(VoucherOrder::getUserId, userId);
int count = count(lambdaQueryWrapper);
if (count > 0) {
return Result.fail("该用户已经购买过一次");
}
// 5. 扣库存
// CAS
/* SQL:
*
* UPDATE tb_seckill_voucher SET stock = stock-1 WHERE (voucher_id = ? AND stock > ?)
*
*/
LambdaUpdateWrapper<SeckillVoucher> lambdaUpdateWrapper = new LambdaUpdateWrapper<>();
lambdaUpdateWrapper.eq(SeckillVoucher::getVoucherId, voucherId);
lambdaUpdateWrapper.gt(SeckillVoucher::getStock, 0); // stock > 0
lambdaUpdateWrapper.setSql("stock = stock-1");
boolean updateFlag = seckillVoucherService.update(lambdaUpdateWrapper);
if (!updateFlag) {
return Result.fail("库存不足");
}
// 6. 创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// id, userId, voucherId
voucherOrder.setId(redisIdCreator.nextId(SECKILL_VOUCHER_REDIS_KEY_PREFIX));
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
// 7. 返回订单
return Result.ok(voucherOrder.getId());
}
(2)加锁后的Spring事务失效
上述代码中锁的范围是执行 createVocherOrder()
的前后,
但是,我们这里需要讨论流程上的问题,createVocherOrder(Long voucherId, Long userId)
方法是事务的,从注解可以看到
但是,我们的调用方法是 this.createVocherOrder(voucherId, userId);
这样会导致Spring的事务失效
原因在于,spring的事务是基于代理对象的,也就是 ProxyObj..createVocherOrder(voucherId, userId);
才是支持事务的,我们的写法
是基于普通对象的this,这样必然导致了事务失效,也即 @Transactional 注解的失效
因此,这里插入解决Spring事务失效的一种做法
-
给项目启动类添加允许暴露动态代理对象的注解
@EnableAspectJAutoProxy(exposeProxy = true) // 允许暴露动态代理对象
-
添加依赖
<!--aspectJ--> <dependency> <groupId>org.aspectj</groupId> <artifactId>aspectjweaver</artifactId> </dependency>
-
获取动态代理对象
synchronized (userId.toString().intern()) { // Spring 实现事务是基于代理对象的, // 即此处的 VoucherOrderServiceImpl 实例的代理对象是 IVoucherOrderService 类型的 // 这种写法是调用的 VoucherOrderServiceImpl 对象的方法, 此时Spring事务失效 // return createVocherOrder(voucherId, userId); // 为了使Spring事务生效, 必须获取Spring代理对象, 通过代理对象调用 createVocherOrder 方法 // 这种做法,底层使用了 AspectJ + @EnableAspectJAutoProxy(exposeProxy = true) // 获取代理对象 IVoucherOrderService currentProxy = (IVoucherOrderService) AopContext.currentProxy(); return currentProxy.createVocherOrder(voucherId, userId); // 事务提交以后, 这里才释放锁 }
可以看到,代理对象的类型是当前类的接口类型
@Service
public class VoucherOrderServiceImpl
extends ServiceImpl<VoucherOrderMapper, VoucherOrder>
implements IVoucherOrderService {}
集群模式下的一人一单并发问题-分布式悲观锁
这种情况下,JDK的常用锁,如JUC、synchronized等字段,锁监视器(图中JVM1、JVM2内的)只对当前的JVM负责,集群模式下所有前述的锁都是失效的。。。
思路是:让多个JVM使用同一个锁监视器
Redis分布式锁(TTL)
(1)基本流程
-
获取锁(保证互斥)
-
没有原子性操作的获取锁
redis> SETNX lock_name thread_info redis> EXPIRE lock_name 10 # 添加过期时间,避免服务器宕机引起的死锁(锁不释放)
-
保证了原子性操作的获取锁
非阻塞锁:成果-true,失败-false
redis> SET lock_name thread_info EX 10 NX # EX 秒 | PX 毫秒
-
-
释放锁(手动释放、超时释放)
redis> DEL lock_name
(2)实现
核心点:
锁的key使用 前缀+业务后缀的形式
锁的val是线程的id
获取锁使用 SET NX
释放锁使用 DEL
public class SimpleRedisLock implements ILock{
private static final String LOCK_KEY_PREFIX = "lock:";
private final StringRedisTemplate stringRedisTemplate;
private final String lockKey;
public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String lockKey) {
this.stringRedisTemplate = stringRedisTemplate;
this.lockKey = lockKey;
}
@Override
public boolean tryLock(long timeoutSec) {
// 获取线程的标识
String threadId = String.valueOf(Thread.currentThread().getId());
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(LOCK_KEY_PREFIX + lockKey, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
@Override
public void unlock() {
stringRedisTemplate.delete(LOCK_KEY_PREFIX + lockKey);
}
}
(3)问题分析
TTL通常是不能太长的,否则会导致业务执行效率太低,因此总是设置成一个贴近真实业务执行时间的上限值
极端情况:当业务成功获取锁之后,如果该业务出现了阻塞,在TTL内没有完成业务,锁超时释放。那么第二个线程可以拿到锁,此时如果前业务执行完毕,释放锁,那么后来的线程锁被误删除,导致后续的多个线程都会出现同样的情况,出现了并行执行的情况
核心问题在于,TTL与阻塞业务的矛盾,释放锁时没有看锁的val(线程id)就盲目释放锁
(4)改进方案
释放锁之前,判断val,看看是不是当前线程的锁
同时,不同的JVM的线程id可能会重复,因此要重新考虑存入的val
1改-Redis分布式锁(TTL)
public class SimpleRedisLock implements ILock{
private static final String LOCK_KEY_PREFIX = "lock:";
private static final String THREAD_PREFIX = UUID.randomUUID().toString(true) + "-";
private final StringRedisTemplate stringRedisTemplate;
private final String lockKey;
public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String lockKey) {
this.stringRedisTemplate = stringRedisTemplate;
this.lockKey = lockKey;
}
@Override
public boolean tryLock(long timeoutSec) {
// 获取线程的标识 UUID-ThreadId
String threadId = THREAD_PREFIX + Thread.currentThread().getId();
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(LOCK_KEY_PREFIX + lockKey, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
@Override
public void unlock() {
// 获取线程的标识 UUID-ThreadId
String threadId = THREAD_PREFIX + Thread.currentThread().getId();
String key = LOCK_KEY_PREFIX + lockKey;
// 获取锁
String val = stringRedisTemplate.opsForValue().get(key);
if (!threadId.equals(val)) {
return;
}
// 一致则删除
stringRedisTemplate.delete(key);
}
}
问题分析
判断锁和释放之间可能出现阻塞(判断和释放是两步的,不是原子的)
阻塞的原因可以是JVM的GC(这是极端情况,核心点在于讨论原子性操作的必要性)
2改-Redis分布式锁(TTL)
解决锁释放的两步操作的原子性:
(1)判断锁val(2)删除锁,
即,两个步骤合为一个步骤去做,不分开
改进方案,使用Redis的Lua脚本,将多条redis命令写到一个脚本中
Redis的调用函数
redis.call('命令名称', 'key', '其他参数', ...)
# 案例 SET name Jack
redis.call('SET', 'name', 'Jack')
# GET name
redis.call('GET', 'name')
命令案例
redis.call('SET', 'name', 'Jack')
local name = redis.call('GET', 'name')
return name
Redis 调用脚本
redis> EVAL script numkeys key [key ...] arg [arg ...]
无参
# SET name Jack
redis> EVAL "return redis.call('set', 'name', 'Jack')" 0
有参
# SET name jack
redis> EVAL "return redis.call('set', KEYS[1], ARGV[1])" 1 name Rose
根据释放锁的流程,写出的LUA脚本 unlock.lua
-- KEYS[1]: 分布式锁的key
-- ARGV[1]: 分布式锁的值
-- 获取redis中分布式锁的val 比较与传入的val: threadId 是否一致
if (redis.call('get', KEYS[1]) == ARGV[1])
then
-- 一致则删除锁
return redis.call('del', KEYS[1])
end
-- 不一致则返回0
return 0
RedisTemplate 接口
将unlock.lua
放置于项目的Resource目录下
给出完整的分布式锁代码
public class SimpleRedisLock implements ILock{
private static final String LOCK_KEY_PREFIX = "lock:";
private static final String THREAD_PREFIX = UUID.randomUUID().toString(true) + "-";
private static final DefaultRedisScript<Long> UNLOCK_LUA_SCRIPT;
private final StringRedisTemplate stringRedisTemplate;
private final String lockKey;
static {
UNLOCK_LUA_SCRIPT = new DefaultRedisScript<>();
// 使用Spring的ClassPathResource 指定脚本路径
UNLOCK_LUA_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
// 指定返回值类型(与泛型一致即可)
UNLOCK_LUA_SCRIPT.setResultType(Long.class);
}
public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String lockKey) {
this.stringRedisTemplate = stringRedisTemplate;
this.lockKey = lockKey;
}
@Override
public boolean tryLock(long timeoutSec) {
// 获取线程的标识 UUID-ThreadId
String threadId = THREAD_PREFIX + Thread.currentThread().getId();
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(LOCK_KEY_PREFIX + lockKey, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
@Override
public void unlock() {
// 调用LUA脚本
// 脚本一定要提前读取好, 防止每次调用都要IO
// 获取线程的标识 UUID-ThreadId
String threadId = THREAD_PREFIX + Thread.currentThread().getId();
String key = LOCK_KEY_PREFIX + lockKey;
stringRedisTemplate.execute(UNLOCK_LUA_SCRIPT, Collections.singletonList(key), threadId);
}
}
问题分析
当前的分布式锁已经足够可用,但是对于一个高可用的分布式锁来说,固定的TTL超时释放、不可重入、不可重试、主从一致性(Redis集群的主从同步延迟),是不够完美的,引出Redisson
标签:key,String,redis,stringRedisTemplate,案例,秒杀,思考,线程,lockKey From: https://www.cnblogs.com/jentreywang/p/17008117.html