首页 > 数据库 >巧用Redis实现分布式锁详细介绍_Redis

巧用Redis实现分布式锁详细介绍_Redis

时间:2023-08-03 16:32:06浏览次数:53  
标签:Redisson return lock Redis key TimeUnit 巧用 public 分布式

目录
前言
手写Redis分布式锁
Redisson
lock()
lock(long leaseTime, TimeUnit unit)
tryLock(long waitTime, long leaseTime, TimeUnit unit)
RedLock红锁
总结

前言
无论是synchronized还是Lock,都运行在线程级别上,必须运行在同一个JVM中。如果竞争资源的进程不在同一个JVM中时,这样线程锁就无法起到作用,必须使用分布式锁来控制多个进程对资源的访问。
分布式锁的实现一般有三种方式,使用MySql数据库行锁,基于Redis的分布式锁,以及基于Zookeeper的分布式锁。本文中我们重点看一下Redis如何实现分布式锁。
首先,看一下用于实现分布式锁的两个Redis基础命令:
setnx key value
这里的setnx,是"set if Not eXists"的缩写,表示当指定的key值不存在时,为key设定值为value。如果key存在,则设定失败。
setex key timeout value
setex命令为指定的key设置值及其过期时间(以秒为单位)。如果key已经存在,setex命令将会替换旧的值。
基于这两个指令,我们能够实现:
使用setnx 命令,保证同一时刻只有一个线程能够获取到锁使用setex 命令,保证锁会超期释放,从而不因一个线程长期占有一个锁而导致死锁。
这里将两个命令结合在一起使用的原因是,在正常情况下,如果只使用setnx 命令,使用完成后使用delete命令删除锁进行释放,不存在什么问题。但是如果获取分布式锁的线程在运行中挂掉了,那么锁将不被释放。如果使用setex 设置了过期时间,即使线程挂掉,也可以自动进行锁的释放。

手写Redis分布式锁
接下来,我们基于Redis+Spring手写实现一个分布式锁。首先配置Jedis连接池:
@Configuration
public class Config {
    @Bean
    public JedisPool jedisPool(){
        JedisPoolConfig jedisPoolConfig=new JedisPoolConfig();
        jedisPoolConfig.setMaxIdle(100);
        jedisPoolConfig.setMinIdle(1);
        jedisPoolConfig.setMaxWaitMillis(2000);
        jedisPoolConfig.setTestOnBorrow(true);
        jedisPoolConfig.setTestOnReturn(true);
        JedisPool jedisPool=new JedisPool(jedisPoolConfig,"127.0.0.1",6379);
        return  jedisPool;
    }
}

实现RedisLock分布式锁:
public class RedisLock implements Lock {    @Autowired    JedisPool jedisPool;    private static final String key = "lock";    private ThreadLocal threadLocal = new ThreadLocal<>();    @Override    public void lock() {        boolean b = tryLock();        if (b) {            return;        }        try {            TimeUnit.MILLISECONDS.sleep(50);        } catch (Exception e) {            e.printStackTrace();        }        lock();//递归调用    }    @Override    public boolean tryLock() {        SetParams setParams = new SetParams();        setParams.ex(10);        setParams.nx();        String s = UUID.randomUUID().toString();        Jedis resource = jedisPool.getResource();        String lock = resource.set(key, s, setParams);        resource.close();        if ("OK".equals(lock)) {            threadLocal.set(s);            return true;        }        return false;    }    //解锁判断锁是不是自己加的    @Override    public void unlock(){        //调用lua脚本解锁        String script="if redis.call(\"get\",KEYS[1]==ARGV[1] then\n"+                "   return redis.call(\"del\",KEYS[1])\n"+                "else\n"+                "   return 0\n"+                "end";        Jedis resource = jedisPool.getResource();        Object eval=resource.eval(script, Arrays.asList(key),Arrays.asList(threadLocal.get()));        if (Integer.valueOf(eval.toString())==0){            resource.close();            throw new RuntimeException("解锁失败");        }        /*        *不写成下面这种也是因为不是原子操作,和ex、nx相同        String s = resource.get(key);        if (threadLocal.get().equals(s)){            resource.del(key);        }        */        resource.close();    }    @Override    public void lockInterruptibly() throws InterruptedException {    }    @Override    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {        return false;    }    @Override    public Condition newCondition() {        return null;    }}
简单对上面代码中需要注意的地方做一解释:
加锁过程中,使用SetParams 同时设置nx和ex的值,保证原子操作通过ThreadLocal保存key对应的value,通过value来判断锁是否当前线程自己加的,避免线程错乱解锁释放锁的过程中,使用lua脚本进行删除,保证Redis在执行此脚本时不执行其他操作,从而保证操作的原子性
但是,这段手写的代码可能会存在一个问题,就是不能保证业务逻辑一定能被执行完成,因为设置了锁的过期时间可能导致过期。

Redisson
基于上面存在的问题,我们可以使用Redisson分布式可重入锁。Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。
引入依赖:
    org.redisson    redisson    3.10.7
配置RedissonClient,然后我们对常用方法进行测试。
@Configuration
public class RedissonConfig {
    @Bean
    public RedissonClient redissonClient(){
        Config config=new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
        RedissonClient redissonClient= Redisson.create(config);
        return redissonClient;
    }
}


lock()
先写一个测试接口:
@GetMapping("/lock")
public String test() {
    RLock lock = redissonClient.getLock("lock");
    lock.lock();
    System.out.println(Thread.currentThread().getName()+" get redisson lock");

    try {
        System.out.println("do something");
        TimeUnit.SECONDS.sleep(20);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    lock.unlock();
    System.out.println(Thread.currentThread().getName()+ " release lock");

   return "locked";
}

进行测试,同时发送两个请求,redisson锁生效:



lock(long leaseTime, TimeUnit unit)
Redisson可以给lock()方法提供leaseTime参数来指定加锁的时间,超过这个时间后锁可以自动释放。测试接口:
@GetMapping("/lock2")
public String test2() {
    RLock lock = redissonClient.getLock("lock");
    lock.lock(10,TimeUnit.SECONDS);
    System.out.println(Thread.currentThread().getName()+" get redisson lock");

    try {
        System.out.println("do something");
        TimeUnit.SECONDS.sleep(20);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println(Thread.currentThread().getName()+ " release lock");
    return "locked";
}

运行结果:


可以看出,在第一个线程还没有执行完成时,就释放了redisson锁,第二个线程进入后,两个线程可以同时执行被锁住的代码逻辑。这样可以实现无需调用unlock方法手动解锁。

tryLock(long waitTime, long leaseTime, TimeUnit unit)
tryLock方法会尝试加锁,最多等待waitTime秒,上锁以后过leaseTime秒自动解锁;如果没有等待时间,锁不住直接返回false。
@GetMapping("/lock3")
public String test3() {
    RLock lock = redissonClient.getLock("lock");
    try {
        boolean res = lock.tryLock(5, 30, TimeUnit.SECONDS);
        if (res){
            try{
                System.out.println(Thread.currentThread().getName()+" 获取到锁,返回true");
                System.out.println("do something");
                TimeUnit.SECONDS.sleep(20);
            }finally {
                lock.unlock();
                System.out.println(Thread.currentThread().getName()+" 释放锁");
            }
        }else {
            System.out.println(Thread.currentThread().getName()+" 未获取到锁,返回false");
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "lock";
}

运行结果:


可见在第一个线程获得锁后,第二个线程超过等待时间仍未获得锁,返回false放弃获得锁的过程。
除了以上单机Redisson锁以外,还支持我们之前提到过的哨兵模式和集群模式,只需要改变Config的配置即可。以集群模式为例:
@Bean
public RedissonClient redissonClient(){
    Config config=new Config();
    config.useClusterServers().addNodeAddress("redis://172.20.5.170:7000")
        .addNodeAddress("redis://172.20.5.170:7001")
        .addNodeAddress("redis://172.20.5.170:7002")
        .addNodeAddress("redis://172.20.5.170:7003")
        .addNodeAddress("redis://172.20.5.170:7004")
        .addNodeAddress("redis://172.20.5.170:7005");
    RedissonClient redissonClient = Redisson.create(config);
    return redissonClient;
}


RedLock红锁
下面介绍一下Redisson红锁RedissonRedLock,该对象也可以用来将多个RLock对象关联为一个红锁,每个RLock对象实例可以来自于不同的Redisson实例。
RedissonRedLock针对的多个Redis节点,这多个节点可以是集群,也可以不是集群。当我们使用RedissonRedLock时,只要在大部分节点上加锁成功就算成功。看一下使用:
@GetMapping("/testRedLock")
public void testRedLock() {
    Config config1 = new Config();
    config1.useSingleServer().setAddress("redis://172.20.5.170:6379");
    RedissonClient redissonClient1 = Redisson.create(config1);

    Config config2 = new Config();
    config2.useSingleServer().setAddress("redis://172.20.5.170:6380");
    RedissonClient redissonClient2 = Redisson.create(config2);

    Config config3 = new Config();
    config3.useSingleServer().setAddress("redis://172.20.5.170:6381");
    RedissonClient redissonClient3 = Redisson.create(config3);

    String resourceName = "REDLOCK";
    RLock lock1 = redissonClient1.getLock(resourceName);
    RLock lock2 = redissonClient2.getLock(resourceName);
    RLock lock3 = redissonClient3.getLock(resourceName);

    RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
    boolean isLock;
    try {
        isLock = redLock.tryLock(5, 30, TimeUnit.SECONDS);
        if (isLock) {
            System.out.println("do something");
            TimeUnit.SECONDS.sleep(20);
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        redLock.unlock();
    }
}

相对于单Redis节点来说,RedissonRedLock的优点在于防止了单节点故障造成整个服务停止运行的情况;并且在多节点中锁的设计,及多节点同时崩溃等各种意外情况有自己独特的设计方法。使用RedissonRedLock,性能方面会比单节点Redis分布式锁差一些,但可用性比普通锁高很多。

总结
目录
前言
手写Redis分布式锁
Redisson
lock()
lock(long leaseTime, TimeUnit unit)
tryLock(long waitTime, long leaseTime, TimeUnit unit)
RedLock红锁
总结

前言
无论是synchronized还是Lock,都运行在线程级别上,必须运行在同一个JVM中。如果竞争资源的进程不在同一个JVM中时,这样线程锁就无法起到作用,必须使用分布式锁来控制多个进程对资源的访问。
分布式锁的实现一般有三种方式,使用MySql数据库行锁,基于Redis的分布式锁,以及基于Zookeeper的分布式锁。本文中我们重点看一下Redis如何实现分布式锁。
首先,看一下用于实现分布式锁的两个Redis基础命令:
setnx key value
这里的setnx,是"set if Not eXists"的缩写,表示当指定的key值不存在时,为key设定值为value。如果key存在,则设定失败。
setex key timeout value
setex命令为指定的key设置值及其过期时间(以秒为单位)。如果key已经存在,setex命令将会替换旧的值。
基于这两个指令,我们能够实现:
使用setnx 命令,保证同一时刻只有一个线程能够获取到锁使用setex 命令,保证锁会超期释放,从而不因一个线程长期占有一个锁而导致死锁。
这里将两个命令结合在一起使用的原因是,在正常情况下,如果只使用setnx 命令,使用完成后使用delete命令删除锁进行释放,不存在什么问题。但是如果获取分布式锁的线程在运行中挂掉了,那么锁将不被释放。如果使用setex 设置了过期时间,即使线程挂掉,也可以自动进行锁的释放。

手写Redis分布式锁
接下来,我们基于Redis+Spring手写实现一个分布式锁。首先配置Jedis连接池:
@Configuration
public class Config {
    @Bean
    public JedisPool jedisPool(){
        JedisPoolConfig jedisPoolConfig=new JedisPoolConfig();
        jedisPoolConfig.setMaxIdle(100);
        jedisPoolConfig.setMinIdle(1);
        jedisPoolConfig.setMaxWaitMillis(2000);
        jedisPoolConfig.setTestOnBorrow(true);
        jedisPoolConfig.setTestOnReturn(true);
        JedisPool jedisPool=new JedisPool(jedisPoolConfig,"127.0.0.1",6379);
        return  jedisPool;
    }
}

实现RedisLock分布式锁:
public class RedisLock implements Lock {
    @Autowired
    JedisPool jedisPool;

    private static final String key = "lock";
    private ThreadLocal<String> threadLocal = new ThreadLocal<>();

    @Override
    public void lock() {
        boolean b = tryLock();
        if (b) {
            return;
        }
        try {
            TimeUnit.MILLISECONDS.sleep(50);
        } catch (Exception e) {
            e.printStackTrace();
        }
        lock();//递归调用
    }

    @Override
    public boolean tryLock() {
        SetParams setParams = new SetParams();
        setParams.ex(10);
        setParams.nx();
        String s = UUID.randomUUID().toString();
        Jedis resource = jedisPool.getResource();
        String lock = resource.set(key, s, setParams);
        resource.close();
        if ("OK".equals(lock)) {
            threadLocal.set(s);
            return true;
        }
        return false;
    }

    //解锁判断锁是不是自己加的
    @Override
    public void unlock(){
        //调用lua脚本解锁
        String script="if redis.call(\"get\",KEYS[1]==ARGV[1] then\n"+
                "   return redis.call(\"del\",KEYS[1])\n"+
                "else\n"+
                "   return 0\n"+
                "end";
        Jedis resource = jedisPool.getResource();
        Object eval=resource.eval(script, Arrays.asList(key),Arrays.asList(threadLocal.get()));
        if (Integer.valueOf(eval.toString())==0){
            resource.close();
            throw new RuntimeException("解锁失败");
        }
        /*
        *不写成下面这种也是因为不是原子操作,和ex、nx相同
        String s = resource.get(key);
        if (threadLocal.get().equals(s)){
            resource.del(key);
        }
        */
        resource.close();
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public Condition newCondition() {
        return null;
    }
}

简单对上面代码中需要注意的地方做一解释:
加锁过程中,使用SetParams 同时设置nx和ex的值,保证原子操作通过ThreadLocal保存key对应的value,通过value来判断锁是否当前线程自己加的,避免线程错乱解锁释放锁的过程中,使用lua脚本进行删除,保证Redis在执行此脚本时不执行其他操作,从而保证操作的原子性
但是,这段手写的代码可能会存在一个问题,就是不能保证业务逻辑一定能被执行完成,因为设置了锁的过期时间可能导致过期。

Redisson
基于上面存在的问题,我们可以使用Redisson分布式可重入锁。Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。
引入依赖:
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.10.7</version>
</dependency>

配置RedissonClient,然后我们对常用方法进行测试。
@Configuration
public class RedissonConfig {
    @Bean
    public RedissonClient redissonClient(){
        Config config=new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
        RedissonClient redissonClient= Redisson.create(config);
        return redissonClient;
    }
}


lock()
先写一个测试接口:
@GetMapping("/lock")
public String test() {
    RLock lock = redissonClient.getLock("lock");
    lock.lock();
    System.out.println(Thread.currentThread().getName()+" get redisson lock");

    try {
        System.out.println("do something");
        TimeUnit.SECONDS.sleep(20);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    lock.unlock();
    System.out.println(Thread.currentThread().getName()+ " release lock");

   return "locked";
}

进行测试,同时发送两个请求,redisson锁生效:



lock(long leaseTime, TimeUnit unit)
Redisson可以给lock()方法提供leaseTime参数来指定加锁的时间,超过这个时间后锁可以自动释放。测试接口:
@GetMapping("/lock2")
public String test2() {
    RLock lock = redissonClient.getLock("lock");
    lock.lock(10,TimeUnit.SECONDS);
    System.out.println(Thread.currentThread().getName()+" get redisson lock");

    try {
        System.out.println("do something");
        TimeUnit.SECONDS.sleep(20);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println(Thread.currentThread().getName()+ " release lock");
    return "locked";
}

运行结果:


可以看出,在第一个线程还没有执行完成时,就释放了redisson锁,第二个线程进入后,两个线程可以同时执行被锁住的代码逻辑。这样可以实现无需调用unlock方法手动解锁。

tryLock(long waitTime, long leaseTime, TimeUnit unit)
tryLock方法会尝试加锁,最多等待waitTime秒,上锁以后过leaseTime秒自动解锁;如果没有等待时间,锁不住直接返回false。
@GetMapping("/lock3")
public String test3() {
    RLock lock = redissonClient.getLock("lock");
    try {
        boolean res = lock.tryLock(5, 30, TimeUnit.SECONDS);
        if (res){
            try{
                System.out.println(Thread.currentThread().getName()+" 获取到锁,返回true");
                System.out.println("do something");
                TimeUnit.SECONDS.sleep(20);
            }finally {
                lock.unlock();
                System.out.println(Thread.currentThread().getName()+" 释放锁");
            }
        }else {
            System.out.println(Thread.currentThread().getName()+" 未获取到锁,返回false");
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "lock";
}

运行结果:


可见在第一个线程获得锁后,第二个线程超过等待时间仍未获得锁,返回false放弃获得锁的过程。
除了以上单机Redisson锁以外,还支持我们之前提到过的哨兵模式和集群模式,只需要改变Config的配置即可。以集群模式为例:
@Bean
public RedissonClient redissonClient(){
    Config config=new Config();
    config.useClusterServers().addNodeAddress("redis://172.20.5.170:7000")
        .addNodeAddress("redis://172.20.5.170:7001")
        .addNodeAddress("redis://172.20.5.170:7002")
        .addNodeAddress("redis://172.20.5.170:7003")
        .addNodeAddress("redis://172.20.5.170:7004")
        .addNodeAddress("redis://172.20.5.170:7005");
    RedissonClient redissonClient = Redisson.create(config);
    return redissonClient;
}


RedLock红锁
下面介绍一下Redisson红锁RedissonRedLock,该对象也可以用来将多个RLock对象关联为一个红锁,每个RLock对象实例可以来自于不同的Redisson实例。
RedissonRedLock针对的多个Redis节点,这多个节点可以是集群,也可以不是集群。当我们使用RedissonRedLock时,只要在大部分节点上加锁成功就算成功。看一下使用:
@GetMapping("/testRedLock")
public void testRedLock() {
    Config config1 = new Config();
    config1.useSingleServer().setAddress("redis://172.20.5.170:6379");
    RedissonClient redissonClient1 = Redisson.create(config1);

    Config config2 = new Config();
    config2.useSingleServer().setAddress("redis://172.20.5.170:6380");
    RedissonClient redissonClient2 = Redisson.create(config2);

    Config config3 = new Config();
    config3.useSingleServer().setAddress("redis://172.20.5.170:6381");
    RedissonClient redissonClient3 = Redisson.create(config3);

    String resourceName = "REDLOCK";
    RLock lock1 = redissonClient1.getLock(resourceName);
    RLock lock2 = redissonClient2.getLock(resourceName);
    RLock lock3 = redissonClient3.getLock(resourceName);

    RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
    boolean isLock;
    try {
        isLock = redLock.tryLock(5, 30, TimeUnit.SECONDS);
        if (isLock) {
            System.out.println("do something");
            TimeUnit.SECONDS.sleep(20);
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        redLock.unlock();
    }
}

相对于单Redis节点来说,RedissonRedLock的优点在于防止了单节点故障造成整个服务停止运行的情况;并且在多节点中锁的设计,及多节点同时崩溃等各种意外情况有自己独特的设计方法。使用RedissonRedLock,性能方面会比单节点Redis分布式锁差一些,但可用性比普通锁高很多。

标签:Redisson,return,lock,Redis,key,TimeUnit,巧用,public,分布式
From: https://blog.51cto.com/u_16110906/6950568

相关文章

  • 集群与分布式
    集群:多个人干同样一件事情分布式:多个人干不同的事情,合起来时一件事情两个人都是做饭,洗菜,切菜 所以他两个是一个集群雇人洗菜,切菜,自己做饭,三个人共同干做饭一件事,这就是分布式集群分布式系统的有点:......
  • SpringBoot 整合 redis-阿里云发送短信
    pom<dependencies><!--阿里云短信--><dependency><groupId>com.aliyun</groupId><artifactId>aliyun-java-sdk-core</artifactId><version>4.5.16</version>......
  • Redis从入门到放弃(7):主从复制
    1、概念主从复制是Redis的一项重要特性,用于将一个Redis服务器(Master主节点)的数据复制到其他Redis服务器(Slave从节点),以实现数据的高可用性和读写分离。数据的复制是单向的,只能由主节点到从节点。2、作用主从复制在Redis中扮演着重要的角色,带来了以下几个主要的作用:高可用性:......
  • redis stream做轻量级消息队列的可行性
    背景对于消息数量很少的场景,尝试使用redisstream来做消息队列.为什么要用redis的stream,redis的其他数据结构可以吗?参考文章1:https://www.zhihu.com/question/43688764?sort=created参考文章2:https://www.cnblogs.com/williamjie/p/11201654.htmlredis有一些机制有队......
  • redis 集群重启问题
    redis出现链接不稳定或者链接不上的时候,需要重新启动1.找到集群的位置上 下面的两个启动和停止命令进行停止和启动但停止时候启动的时候可能会出现下面的问题 这是要删除appendonly.aof和dump.rdb文件,也可以选先将这两个先备份一下,然后启动后重新恢复,才能启动成功end........
  • redis版本升级方法
    wget -P /tmp https://download.redis.io/releases/redis-6.2.6.tar.gz         #下载最新版本tar -C /opt/redis/ -xf /tmp/redis-6.2.6.tar.gz #解压cd redis-6.2.6 #进入新版本目录下yum -y install centos-release-scldevtoolset-9-gcc devtoo......
  • redis远程代码执行CVE-2016-8339
       Redis3.2.x<3.2.4版本存在缓冲区溢出漏洞,可导致任意代码执行。Redis数据结构存储的CONFIGSET命令中client-output-buffer-limit选项处理存在越界写漏洞。构造的CONFIGSET命令可导致越界写,代码执行。漏洞利用:修改配置文件redis.confcpredis.conf./src/......
  • Redis基础
    Redis是一个基于内存的key-value结构数据库。Redis是互联网技术领域使用最为广泛的存储中间件,它是【RemoteDictionaryService】的首字母缩写,也就是【远程字典服务】。基于内存存储,读写性能高适合存储热点数据(热点商品、资讯、新闻)企业应用广泛Redis入门Redis简介......
  • Redis宕机 快速恢复数据
    学习记录下 Redis中如何实现无畏宕机快速恢复和持久化-Redis-PHP中文网......
  • Redis-Predis 扩展介绍
    PredisPredis适用于PHP5.3以上版本在Redis使用,其中包括了集群的使用。主要功能支持各个版本的Redis(从2.0到3.0以及unstable)使用哈希方式或用户自定义方式进行集群中节点的客户端分片支持Redis-cluster(集群)(Redis>=3.0).支持主/从结构的读写分离支持已知的所有R......