前言
- 分布式应用进行逻辑处理时经常会遇到并发问题。
- 比如一个操作要修改用户的状态,修改状态需要先读出用户的状态,在内存里进行修改,改完了再存回去。如果这样的操作同时进行了,就会出现并发问题,因为读取和保存状态这两个操作不是原子的。(Wiki 解释:所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch 线程切换。)
- 这个时候就要使用到分布式锁来限制程序的并发执行。
- 单机情况下可以使用synchronized、ReentrantLock来保证数据一致性,但不适用于分布式环境中
分布式锁
方式一
下面使用redis的set命令来实现分布式加锁
SET KEY VALUE [EX seconds] [PX milliseconds] [NX|XX]
EX seconds 设置指定的到期时间(以秒为单位)
PX milliseconds 设置指定的到期时间(以毫秒为单位)
NX 仅在键不存在时设置键
XX 只有在键已存在时才设置
@RestController
public class IndexController4 {
// Redis分布式锁的key
public static final String REDIS_LOCK = "good_lock";
@Autowired
StringRedisTemplate template;
@RequestMapping("/buy4")
public String index(){
// 每个人进来先要进行加锁,key值为"good_lock",value随机生成
String value = UUID.randomUUID().toString().replace("-","");
try{
// 加锁
Boolean flag = template.opsForValue().setIfAbsent(REDIS_LOCK, value);
// 加锁失败
if(!flag){
return "抢锁失败!";
}
System.out.println( value+ " 抢锁成功");
String result = template.opsForValue().get("goods:001");
int total = result == null ? 0 : Integer.parseInt(result);
if (total > 0) {
int realTotal = total - 1;
template.opsForValue().set("goods:001", String.valueOf(realTotal));
// 如果在抢到所之后,删除锁之前,发生了异常,锁就无法被释放,
// 释放锁操作不能在此操作,要在finally处理
// template.delete(REDIS_LOCK);
System.out.println("购买商品成功,库存还剩:" + realTotal + "件, 服务端口为8001");
return "购买商品成功,库存还剩:" + realTotal + "件, 服务端口为8001";
} else {
System.out.println("购买商品失败,服务端口为8001");
}
return "购买商品失败,服务端口为8001";
}finally {
// 释放锁
template.delete(REDIS_LOCK);
}
}
}
方式二(改进方式一)
- 在上面的代码中,如果程序在运行期间,部署了微服务jar包的机器突然挂了,代码层面根本就没有走到finally代码块,也就是说在宕机前,锁并没有被删除掉,这样的话,就没办法保证解锁
- 所以,这里需要对这个key加一个过期时间,Redis中设置过期时间有两种方法:
template.expire(REDIS_LOCK,10, TimeUnit.SECONDS)
template.opsForValue().setIfAbsent(REDIS_LOCK, value,10L,TimeUnit.SECONDS)
- 第一种方法需要单独的一行代码,且并没有与加锁放在同一步操作,所以不具备原子性,也会出问题
- 第二种方法在加锁的同时就进行了设置过期时间,所有没有问题,这里采用这种方式
方式三(改进方式二)
- 方式二设置了key的过期时间,解决了key无法删除的问题,但问题又来了上面设置了key的过期时间为10秒,如果业务逻辑比较复杂,需要调用其他微服务,处理时间需要15秒(模拟场景,别较真),而当10秒钟过去之后,这个key就过期了,其他请求就又可以设置这个key,此时如果耗时15秒的请求处理完了,回来继续执行程序,就会把别人设置的key给删除了,这是个很严重的问题!
- 所以,谁上的锁,谁才能删除
@RestController
public class IndexController6 {
public static final String REDIS_LOCK = "good_lock";
@Autowired
StringRedisTemplate template;
@RequestMapping("/buy6")
public String index(){
// 每个人进来先要进行加锁,key值为"good_lock"
String value = UUID.randomUUID().toString().replace("-","");
try{
// 为key加一个过期时间
Boolean flag = template.opsForValue().setIfAbsent(REDIS_LOCK, value,10L,TimeUnit.SECONDS);
// 加锁失败
if(!flag){
return "抢锁失败!";
}
System.out.println( value+ " 抢锁成功");
String result = template.opsForValue().get("goods:001");
int total = result == null ? 0 : Integer.parseInt(result);
if (total > 0) {
// 如果在此处需要调用其他微服务,处理时间较长。。。
int realTotal = total - 1;
template.opsForValue().set("goods:001", String.valueOf(realTotal));
System.out.println("购买商品成功,库存还剩:" + realTotal + "件, 服务端口为8001");
return "购买商品成功,库存还剩:" + realTotal + "件, 服务端口为8001";
} else {
System.out.println("购买商品失败,服务端口为8001");
}
return "购买商品失败,服务端口为8001";
}finally {
// 谁加的锁,谁才能删除!!!!
if(template.opsForValue().get(REDIS_LOCK).equals(value)){
template.delete(REDIS_LOCK);
}
}
}
}
方式四(改进方式三)
- 在上面方式三下,规定了谁上的锁,谁才能删除,但finally快的判断和del删除操作不是原子操作,并发的时候也会出问题,并发嘛,就是要保证数据的一致性,保证数据的一致性,最好要保证对数据的操作具有原子性。
- 在Redis的set命令介绍中,最后推荐Lua脚本进行锁的删除,(可以使用RedisTemplate的API调用Lua脚本)
@RestController
public class IndexController7 {
public static final String REDIS_LOCK = "good_lock";
@Autowired
StringRedisTemplate template;
@RequestMapping("/buy7")
public String index(){
// 每个人进来先要进行加锁,key值为"good_lock"
String value = UUID.randomUUID().toString().replace("-","");
try{
// 为key加一个过期时间
Boolean flag = template.opsForValue().setIfAbsent(REDIS_LOCK, value,10L,TimeUnit.SECONDS);
// 加锁失败
if(!flag){
return "抢锁失败!";
}
System.out.println( value+ " 抢锁成功");
String result = template.opsForValue().get("goods:001");
int total = result == null ? 0 : Integer.parseInt(result);
if (total > 0) {
// 如果在此处需要调用其他微服务,处理时间较长。。。
int realTotal = total - 1;
template.opsForValue().set("goods:001", String.valueOf(realTotal));
System.out.println("购买商品成功,库存还剩:" + realTotal + "件, 服务端口为8001");
return "购买商品成功,库存还剩:" + realTotal + "件, 服务端口为8001";
} else {
System.out.println("购买商品失败,服务端口为8001");
}
return "购买商品失败,服务端口为8001";
}finally {
// 谁加的锁,谁才能删除,使用Lua脚本,进行锁的删除
Jedis jedis = null;
try{
jedis = RedisUtils.getJedis();
String script = "if redis.call('get',KEYS[1]) == ARGV[1] " +
"then " +
"return redis.call('del',KEYS[1]) " +
"else " +
" return 0 " +
"end";
Object eval = jedis.eval(script, Collections.singletonList(REDIS_LOCK), Collections.singletonList(value));
if("1".equals(eval.toString())){
System.out.println("-----del redis lock ok....");
}else{
System.out.println("-----del redis lock error ....");
}
}catch (Exception e){
}finally {
if(null != jedis){
jedis.close();
}
}
}
}
}
方式五(改进方式四)
- 在方式四下,规定了谁上的锁,谁才能删除,并且解决了删除操作没有原子性问题。但还没有考虑缓存续命,以及Redis集群部署下,异步复制造成的锁丢失:主节点没来得及把刚刚set进来这条数据给从节点,就挂了。所以直接上RedLock的Redisson落地实现。
@RestController
public class IndexController8 {
public static final String REDIS_LOCK = "good_lock";
@Autowired
StringRedisTemplate template;
@Autowired
Redisson redisson;
@RequestMapping("/buy8")
public String index(){
RLock lock = redisson.getLock(REDIS_LOCK);
lock.lock();
// 每个人进来先要进行加锁,key值为"good_lock"
String value = UUID.randomUUID().toString().replace("-","");
try{
String result = template.opsForValue().get("goods:001");
int total = result == null ? 0 : Integer.parseInt(result);
if (total > 0) {
// 如果在此处需要调用其他微服务,处理时间较长。。。
int realTotal = total - 1;
template.opsForValue().set("goods:001", String.valueOf(realTotal));
System.out.println("购买商品成功,库存还剩:" + realTotal + "件, 服务端口为8001");
return "购买商品成功,库存还剩:" + realTotal + "件, 服务端口为8001";
} else {
System.out.println("购买商品失败,服务端口为8001");
}
return "购买商品失败,服务端口为8001";
}finally {
if(lock.isLocked() && lock.isHeldByCurrentThread()){
lock.unlock();
}
}
}
}
标签:REDIS,realTotal,String,lock,redis,template,8001,分布式
From: https://www.cnblogs.com/hasome/p/18067808