首页 > 数据库 >Redis—分布式锁

Redis—分布式锁

时间:2024-04-06 22:33:48浏览次数:31  
标签:lock return Redis redis long 获取 threadId 分布式

单实例的正确实现方式

获取锁的正确操作为: SET resource_name my_random_value NX PX 30000,它限定了只有当锁空闲且持有锁的时间为30000ms,并且锁资源对应的 value 为一个随机值。设置随机值是为了在释放锁时,确保当前线程能够释放该锁,避免出现操作超时的线程释放了其它线程的锁。

释放锁时查询对比 value 和具体删除锁操作必须是一个原子操作,可以结合 LUA脚本实现,对应代码如下:

if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

设置的 value 可以是任意随机值,最简单的就是 UNIX timestamp with microsecond precision

代码实例

参考:https://juejin.cn/post/7020803797211414558

// 锁接口,提供释放锁、尝试加锁、重复尝试加锁
public interface RedisLock {
    long TIMEOUT_MILLIS = 30000;

    int RETRY_MILLIS = 30000;

    long SLEEP_MILLIS = 10;

    boolean tryLock(String key);

    boolean lock(String key);

    boolean lock(String key, long expire);

    boolean lock(String key, long expire, long retryTimes);

    boolean unlock(String key);
}

// 简单抽象类
public abstract class AbstractRedisLock implements RedisLock{
    @Override
    public boolean lock(String key) {
        return lock(key, TIMEOUT_MILLIS);
    }

    @Override
    public boolean lock(String key, long expire) {
        return lock(key, TIMEOUT_MILLIS, RETRY_MILLIS);
    }
}

// 具体实现类
@Component
public class RedisLockImpl extends AbstractRedisLock {
    private Logger logger = LoggerFactory.getLogger(RedisLockImpl.class);

    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    private ThreadLocal<String> threadLocal = new ThreadLocal<String>();
    private static final String UNLOCK_LUA;     // Lua脚本保证释放锁操作原子性

    static {
        StringBuilder builder = new StringBuilder();
        builder.append("if redis.call(\"get\",KEYS[1]) == ARGV[1]");
        builder.append("then ");
        builder.append("    return redis.call(\"del\",KEYS[1]) ");
        builder.append("else ");
        builder.append("    return 0 ");
        builder.append("end ");
        UNLOCK_LUA = builder.toString();
    }

    @Override
    public boolean tryLock(String key) {
        return tryLock(key, TIMEOUT_MILLIS);
    }

    // unblocking lock
    public boolean tryLock(String key, long expire) {
        try {
            return !StringUtils.isEmpty(redisTemplate.execute((RedisCallback<String>) connection-> {
                JedisCommands commands = (JedisCommands) connection.getNativeConnection();
                String uuid = UUID.randomUUID().toString();
                threadLocal.set(uuid);
                return commands.set(key, uuid, SetParams.setParams().nx().px(expire));
            }));
        } catch (Exception e) {
            logger.error("set redis occured an exception", e);
        }
        return false;
    }

    // blocking lock
    @Override
    public boolean lock(String key, long expire, long retryTimes) {
        boolean result = tryLock(key, expire);
        while (!result && retryTimes-- > 0) {
            try {
                logger.debug("lock failed, retrying {}......", retryTimes);
                Thread.sleep(SLEEP_MILLIS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            result = tryLock(key, expire);
        }
        return result;
    }

    @Override
    public boolean unlock(String key) {
        try {
            List<String> keys = Collections.singletonList(key);
            List<String> args = Collections.singletonList(threadLocal.get());
            Long result = redisTemplate.execute((RedisCallback<Long>) connection -> {
                Object nativeConnection = connection.getNativeConnection();
                if (nativeConnection instanceof JedisCluster) {
                    return (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA, keys, args);
                }
                if (nativeConnection instanceof Jedis) {
                    return (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, keys, args);
                }
                return 0L;
            });
            return result != null && result > 0;
        } catch (Exception e) {
            logger.error("unlock occurred an exception", e);
        }
        return false;
    }
}

项目依赖的资源文件 pom.xml,注意 redis.version 需要和 spring-data-redis 中的版本保持一致:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.15</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.demo</groupId>
    <artifactId>distributed_lock</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>distributed_lock</name>
    <description>distributed_lock</description>
    <properties>
        <java.version>1.8</java.version>
        <redis.version>3.8.0</redis.version>
        <spring-test.version>5.0.7</spring-test.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>${redis.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-logging</artifactId>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>log4j-over-slf4j</artifactId>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

测试:

@SpringBootTest
@RunWith(SpringRunner.class)
class DistributedLockApplicationTests {
    private Logger logger = LoggerFactory.getLogger(DistributedLockApplicationTests.class);

    @Autowired
    private RedisLock redisLock;
    @Autowired
    private StringRedisTemplate redisTemplate;

    private ExecutorService executors = Executors.newScheduledThreadPool(8);

    @Test
    void contextLoads() {
    }

    @Test
    public void lock() {
        redisTemplate.opsForValue().set("goods-seckill", "10");
        List<Future> futureList = new ArrayList<>();

        // 100 threads
        for (int i = 0; i < 100; i++) {
            futureList.add(executors.submit(this::seckill));
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        // waiting for result blocked
        futureList.forEach(action -> {
            try {
                action.get();
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }

    public int seckill() {
        String key = "goods";
        try {
            redisLock.lock(key);
            int num = Integer.valueOf(Objects.requireNonNull(redisTemplate.opsForValue().get("goods-seckill")));
            if (num > 0) {
                redisTemplate.opsForValue().set("goods-seckill", String.valueOf(String.valueOf(--num)));
                logger.info("秒杀成功,剩余库存{}", num);
            } else {
                logger.error("秒杀失败,剩余库存{}", num);
            }
            return num;
        } catch (NumberFormatException e) {
            logger.error("seckill exception", e);
        } finally {
            redisLock.unlock(key);
        }
        return 0;
    }
}

单实例实现起来方便,但是不具备实际可用性,实际使用往往是 redis 集群,所以考虑 Redlock 算法。


官方文档解释

https://redis.io/docs/manual/patterns/distributed-locks/

为什么基于故障转移的实现是不够的?

实现主从的 Redis架构,但是仍不能保证在主从复制过程中锁资源的互斥性,存在以下情况:

  • Client A acquires the lock in the master.
  • The master crashes before the write to the key is transmitted to the replica.
  • The replica gets promoted to master.
  • Client B acquires the lock to the same resource A already holds a lock for. SAFETY VIOLATION!

Redlock 算法

在分布式版本中,我们假设我们有N个redis 的matser,各自独立,没有任何关系(不在一个cluster下),可以部署在不同的服务器或是虚拟机上,假设N设置成5。

某个客户端获取锁的操作:

  1. 客户端 A 获取当前时间戳;
  2. 客户端 A 尝试在所有的 N 个 master 中有序地获取锁,使用相同的 key、value;获取锁的过程由于需要遍历多个 redis 服务,可能导致阻塞,需要设置超时时间,假设锁自动释放的时间是 10s,那么这个超时时间可以设置在 5~50ms 的范围内,防止客户端在获取锁期间由于 redis 节点的崩溃导致获取锁的操作超时,如果服务器端没有在规定时间内响应,客户端应该尽快尝试去另外一个Redis实例请求获取锁;
  3. 客户端 A 获取到锁,再获取当前的时间戳,和步骤1 的时间相减,得到获取锁消耗的时间。在锁有效期内,当且仅当客户端A拿到大部分( N/2 + 1,这里至少3)的锁时,分布式锁才可以被正式获取;
  4. 每次当锁被获取到时(从每个master获取),有效时间可以被设置成初始有效时间减去获取锁消耗的时间;
  5. 假设步骤2中获取锁失败,没有拿到大部分锁,那么它需要把自己在少部分 redis 上拿到的锁释放掉。

失败重试

当客户端获取不到锁,它应该在之后一个随机时间点重试,这为了避免多个客户端尝试同时获取同一个资源(类似脑裂的情况,大概意思就是竞争了一堆,却发现,没人拿到锁),客户端拿到锁越快(早),脑裂的情况越小(或者重试的需要越小),所以理想情况下,客户端可以同时(多路复用)发送set命令给各个master。

释放锁

锁释放步骤很简单,就是把所有master实例上的锁释放,并不需要关心客户端在该实例上有没有成功得到锁。


Redisson 分析和使用

使用(一)、搭建多个 Redis Master

利用 docker 创建三个容器,使用 redis:5.0.7 镜像搭建多个 Redis-server,再利用端口映射实现各个 Redis 服务端 6379端口分别映射到主服务器 6379、6380、6381端口。

详细搭建步骤如下,这里以 6379的 redis服务搭建为例:

# 创建文件挂载目录
mkdir -p /opt/docker/redis/data
mkdir -p /opt/docker/redis/conf

# 配置文件也需要映射(注意修改配置文件配置才能进行远程连接)
cp redis.conf /opt/docker/redis/conf/redis.conf

# 创建容器启动 redis01-server
docker run -p 6379:6379 --name redis -v /opt/docker/redis/conf/redis.conf:/etc/redis/redis.conf -v /opt/docker/redis/data:/data -d redis:5.0.7 redis-server /etc/redis/redis.conf --appendonly yes
# 同理创建 redis02-server
docker run -p 6380:6379 --name redis02 -v /opt/docker/redis02/conf/redis.conf:/etc/redis/redis.conf -v /opt/docker/redis02/data:/data -d redis:5.0.7 redis-server /etc/redis/redis.conf --appendonly yes
# 创建 redis03-server
docker run -p 6381:6379 --name redis03 -v /opt/docker/redis03/conf/redis.conf:/etc/redis/redis.conf -v /opt/docker/redis03/data:/data -d redis:5.0.7 redis-server /etc/redis/redis.conf --appendonly yes

使用(二)、redisson使用

  1. 引入pom.xml
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.3.2</version>
        </dependency>
  1. 获取多个服务端连接
@Test
public void testSentinel() {
    // 创建到多个 master redis 的客户端连接
    Config config1 = new Config();
    config1.useSingleServer().setAddress("xxxxxxxx:6379").setPassword("xjx123456").setDatabase(0);
    RedissonClient redissonClient1 = Redisson.create(config1);

    Config config2 = new Config();
    config2.useSingleServer().setAddress("xxxxxxxx:6380").setPassword("xjx123456").setDatabase(0);
    RedissonClient redissonClient2 = Redisson.create(config2);

    Config config3 = new Config();
    config3.useSingleServer().setAddress("xxxxxxxx:6381").setPassword("xjx123456").setDatabase(0);
    RedissonClient redissonClient3 = Redisson.create(config3);

    String resourceName = "REDLOCK_KEY";
    RLock lock1 = redissonClient1.getLock(resourceName);
    RLock lock2 = redissonClient2.getLock(resourceName);
    RLock lock3 = redissonClient3.getLock(resourceName);
    // 向三个实例拿锁
    RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
    boolean isLock;
    // 500ms 拿锁,拿到锁有效期10s
    try {
        isLock = redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS);
        System.out.println("isLock = " + isLock);
        if (isLock) {
            System.out.println("成功获取到 RedLock 锁");
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        redLock.unlock();
    }
}

Redisson源码分析(一)、非公平锁获取锁

上述代码中 tryLock() 方法实际调用了 RedissonMultiLocktryLock() 方法,其中 waitTime(获取锁的最长时间)、leaseTime(锁生效的最长时间)、unit(时间单位)

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    long newLeaseTime = -1;
    if (leaseTime != -1) {
        newLeaseTime = waitTime*2;
    }
    
    long time = System.currentTimeMillis();
    long remainTime = -1;
    if (waitTime != -1) {
        remainTime = unit.toMillis(waitTime);
    }
    int failedLocksLimit = failedLocksLimit();
    List<RLock> lockedLocks = new ArrayList<RLock>(locks.size());
    
    // 循环遍历每把锁的实例集合,获取 RLock 对象
    for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
        RLock lock = iterator.next();
        boolean lockAcquired;
        try {
            // 对每一把锁都尝试获取锁,如果设置了获取锁过程的超时时间,则超时等待获取锁
            if (waitTime == -1 && leaseTime == -1) {
                lockAcquired = lock.tryLock();
            } else {
                long awaitTime = unit.convert(remainTime, TimeUnit.MILLISECONDS);
                lockAcquired = lock.tryLock(awaitTime, newLeaseTime, unit);
            }
        } catch (Exception e) {
            lockAcquired = false;
        }
         
        // 获取成功,添加到 lockedLocks 集合中
        if (lockAcquired) {
            lockedLocks.add(lock);
        } else {
            // 如果不能保证获取至少 N/2 + 1 把锁,就结束
            if (locks.size() - lockedLocks.size() == failedLocksLimit()) {
                break;
            }
            // 获取锁失败超过了一定次数
            if (failedLocksLimit == 0) {
                unlockInner(lockedLocks);
                if (waitTime == -1 && leaseTime == -1) {
                    return false;
                }
                failedLocksLimit = failedLocksLimit();
                lockedLocks.clear();
                // reset iterator
                while (iterator.hasPrevious()) {
                    iterator.previous();
                }
            } else {
                // 否则 failedLocksLimit 减一,切换获取下一把锁
                failedLocksLimit--;
            }
        }
        
        if (remainTime != -1) {
            // 获取完每一把锁,用总超时时间减去该把锁获取的时间间隔,判断是否有效
            remainTime -= (System.currentTimeMillis() - time);
            time = System.currentTimeMillis();

            // 获取超时,释放所有锁资源
            if (remainTime <= 0) {
                unlockInner(lockedLocks);
                return false;
            }
        }
    }
    
    // 分布式锁有设置超时时间,针对每一把锁设置超时时间
    if (leaseTime != -1) {
        List<RFuture<Boolean>> futures = new ArrayList<RFuture<Boolean>>(lockedLocks.size());
        for (RLock rLock : lockedLocks) {
            RFuture<Boolean> future = rLock.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
            futures.add(future);
        }
        
        for (RFuture<Boolean> rFuture : futures) {
            rFuture.syncUninterruptibly();
        }
    }
    
    return true;
}

REDLOCK 加锁逻辑如下:

  • REDLOCK 加锁实际上对应了 RedissonMultiLock 的加锁方法;
  • 首先获取到当前系统时间,以毫秒为单位;
  • 依次尝试从 NMaster 实例中获取相同 hash-Key(比如 LOCK_KEY)和 inner-Key(UUID+ThreadId)的锁资源。在针对每个 Redis-Master 实例加锁时,设置一个超时时间并且这个超时时间需要小于锁的失效时间(比如总的获取锁时间为10s,每个Master获取锁的失效时间为5~50ms内)。这样可以避免服务器端获取锁资源时超时而导致整个过程阻塞;
  • 客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)就得到获取锁使用的时间。当且仅当从大多数的Redis节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功。
  • 如果取到了锁,key的真正有效时间等于有效时间减去获取锁所使用的时间(上一步)。
  • 如果因为某些原因,获取锁失败(没有在至少N/2+1个Redis实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的Redis实例上进行解锁(即便某些Redis实例根本就没有加锁成功)。

上面分析了 REDLOCK 加锁算法的步骤,其中 RedissonLocktryLock() 方法是实际单把锁的加锁逻辑:

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    long time = unit.toMillis(waitTime);
    long current = System.currentTimeMillis();
    final long threadId = Thread.currentThread().getId();
    // 通过 LUA 脚本的加锁过程,若成功获取锁则返回空;否则返回锁剩余的有效期时间
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    // 成功获取到锁,返回
    if (ttl == null) {
        return true;
    }
    
    // 计算当前线程等待获取锁剩余有效时间
    time -= (System.currentTimeMillis() - current);
    // 小于有效时间,快速返回失败
    if (time <= 0) {
        acquireFailed(threadId);
        return false;
    }
    
    current = System.currentTimeMillis();
    // 订阅解锁 channel,返回一个 Future 对象 
    final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
    // 超时阻塞地监听 Future 对象,如果有效期时间内没有收到,就取消订阅消息,获取锁失败
    if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
        if (!subscribeFuture.cancel(false)) {
            subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
                @Override
                public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
                    if (subscribeFuture.isSuccess()) {
                        unsubscribe(subscribeFuture, threadId);
                    }
                }
            });
        }
        acquireFailed(threadId);
        return false;
    }

    // 直到获取锁成功或者获取锁超时失败,执行以下逻辑
    try {
        time -= (System.currentTimeMillis() - current);
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }

        // 自旋尝试获取锁资源
        while (true) {
            long currentTime = System.currentTimeMillis();
            ttl = tryAcquire(leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                return true;
            }

            time -= (System.currentTimeMillis() - currentTime);
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }

            // 使用信号量机制,保证一个客户端中多个线程竞争分布式锁场景下,只会有一个线程被唤醒去和集群争夺锁,这样避免了大量的无效请求。
            currentTime = System.currentTimeMillis();
            if (ttl >= 0 && ttl < time) {
                getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
            }

            time -= (System.currentTimeMillis() - currentTime);
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }
        }
    } finally {
        unsubscribe(subscribeFuture, threadId);
    }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
}

// 接上部分
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
    return get(tryAcquireAsync(leaseTime, unit, threadId));
}


// 接上部分
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
    if (leaseTime != -1) {
        // 锁生效时间不为 -1 时,详细的加锁逻辑
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.addListener(new FutureListener<Long>() {
        @Override
        public void operationComplete(Future<Long> future) throws Exception {
            if (!future.isSuccess()) {
                return;
            }

            Long ttlRemaining = future.getNow();
            // lock acquired
            if (ttlRemaining == null) {
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}

获取锁 redLock.tryLock() 的核心源码如下,其中默认的租约时间(leaseTime)是 LOCK_EXPIRATION_INTERVAL_SECONDS,即 30s。获取锁时会执行如下 LUA 指令:

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);

    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
    // 分布式key不存在,不存在就 hset创建(hset REDLOCK_KEY uuid+threadId 1),并且 pexpire 设置锁资源失效时间
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                "end; " +
    // 分布式Key已经存在,value也能对应上,可重入,hincrby 添加计数,重置锁失效时间
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                "end; " +
    // 已经有线程持有分布式锁,返回锁对应Key失效的毫秒数
                "return redis.call('pttl', KEYS[1]);",
    // 三个参数分别对应:
    // REDLOCK_KEY ——> KEYS[1]
    // leaseTime ——> ARGV[1]
    // uuid+threaId ——> ARGV[2]
                Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

总的来说,获取锁的逻辑为:

  1. 总共奇数个 Redis_Master 实例,每个实例对应一个 RedissonClient ,通过 getLock() 方法获取到对应的 RedissonLock 对象,所有对象存储在一个 ArrayList中;
  2. 遍历每个 RedissonLock 对象,通过 tryLock() 方法尝试获取锁,该方法底层通过 LUA 指令实现。锁在 redis 中以 hash 结构存储,哈希表名为 REDLOCK_KEY字符串、keyuuid+threadIdvalue 为整数值,记录锁重入的次数。
  3. 根据 tryLock() 返回的结果判断是否成功获取锁:
    1. 若成功返回结果为空;
    2. 若失败返回锁剩余有效时间,并订阅释放锁的 channel,然后 await() 阻塞避免无效的空自旋。
      1. 如果此时有其它线程释放锁,会向释放锁的 channel 发送消息,所有阻塞等待的客户端线程会被唤醒。争夺过程是基于 Semaphore 信号量机制,注意这里只会唤醒一个线程去和集群中其它节点争夺锁资源,因为在一个客户端进程中多个线程竞争分布式锁的场景下,没有必要唤醒所有线程去争夺锁,锁在同一时刻只会由一个线程持有。
      2. 如果解锁的消息没有成功发送到 channel 中(PubSub 是不可靠的,可能发布订阅失效了),阻塞等待超时后进入死循环不断获取锁。

Redisson源码分析(二)、非公平锁释放锁

释放锁参考 RedissonMultiLock 类的 unlock() 方法,该方法返回 RFuture对象,然后异步的方式获取结果,如下:

public void unlock() {
    List<RFuture<Void>> futures = new ArrayList<RFuture<Void>>(locks.size());

    for (RLock lock : locks) {
        futures.add(lock.unlockAsync());
    }

    for (RFuture<Void> future : futures) {
        future.syncUninterruptibly();
    }
}

方法底层调用了 unlockAsnyc() 方法,如下:

public RFuture<Void> unlockAsync() {
    long threadId = Thread.currentThread().getId();
    return unlockAsync(threadId);
}
public RFuture<Void> unlockAsync(final long threadId) {
    final RPromise<Void> result = newPromise();
    RFuture<Boolean> future = unlockInnerAsync(threadId);

    future.addListener(new FutureListener<Boolean>() {
        @Override
        public void operationComplete(Future<Boolean> future) throws Exception {
            if (!future.isSuccess()) {
                result.tryFailure(future.cause());
                return;
            }

            Boolean opStatus = future.getNow();
            if (opStatus == null) {
                IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                        + id + " thread-id: " + threadId);
                result.tryFailure(cause);
                return;
            }
            if (opStatus) {
                cancelExpirationRenewal();
            }
            result.trySuccess(null);
        }
    });

    return result;
}

底层释放锁资源的 LUA 指令:

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
    // 如果 Key 不存在,向 channel 发布一条消息
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; " +
            "end;" +
    // 判断锁是否当前线程持有,不为自己持有则不允许解锁
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
    // 当前线程就是持有分布式锁的线程,重入计数减1
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
    // 减去后租约值仍然大于0,重新设置锁有效时间
            "if (counter > 0) then " +
                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                "return 0; " +
            "else " +
    // 否则删除Key,发布解锁消息
                "redis.call('del', KEYS[1]); " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; "+
            "end; " +
            "return nil;",
    // 四个参数对应:
    // KEYS[1]——>REDLOCK_KEY、KEYS[2]——>通道名称
    // ARGV[1]——>解锁消息
    // ARGV[2]——>锁生效时间
    // ARGV[3]——>uuid+threadId
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}

总的来说释放锁步骤如下:

  1. 遍历每一个 RedissonLock 对象,异步方式调用 unlockAsync() 方法,该方法底层依赖于 unlockInnerAsync() 的一系列 LUA 指令;
  2. LUA 指令先判断 key 是否存在,不存在就向 channel 发布消息;
  3. 若分布式锁存在,但是 value 值不匹配,说明锁已经被占用,返回;
  4. 若当前线程持有的就是分布式锁,重入计数减1;若减去后值仍然大于0,重新设置锁生效时间,否则删除对应的 key,发布解锁消息。

Redisson源码分析(三)、看门狗

如果业务执行时间比较长,可能会出现业务未执行完而分布式锁提前释放。针对这个问题,可以通过设置一个守护线程,定时检查锁的状态,如果锁快要过期了,就自动给锁续期。

private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, final long threadId) {
    if (leaseTime != -1) {
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
    }
    // 执行 LUA 脚本抢锁
    RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);

    // 执行完后注册监听器
    ttlRemainingFuture.addListener(new FutureListener<Boolean>() {
        @Override
        public void operationComplete(Future<Boolean> future) throws Exception {
            if (!future.isSuccess()) {
                return;
            }

            Boolean ttlRemaining = future.getNow();
            // 如果加锁成功,开启守护线程定时给锁续期
            if (ttlRemaining) {
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}



// 给锁续期方法
private void scheduleExpirationRenewal(final long threadId) {
    if (expirationRenewalMap.containsKey(getEntryName())) {
        return;
    }

    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            
            RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return 1; " +
                    "end; " +
                    "return 0;",
                        Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
            
            future.addListener(new FutureListener<Boolean>() {
                @Override
                public void operationComplete(Future<Boolean> future) throws Exception {
                    expirationRenewalMap.remove(getEntryName());
                    if (!future.isSuccess()) {
                        log.error("Can't update lock " + getName() + " expiration", future.cause());
                        return;
                    }
                    
                    if (future.getNow()) {
                        // reschedule itself
                        scheduleExpirationRenewal(threadId);
                    }
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    // 默认每 10 秒进行一次续期

    if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
        task.cancel();
    }
}

在执行完抢锁 LUA 脚本后,会返回一个 Future 对象,并给该对象注册监听器。监听器会检查是否成功获取锁,若成功获取锁,开启看门狗线程定时给锁续期。

if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then 
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return 1;
end;
return 0;

Redisson源码分析(三)、公平锁获取锁

Redisson源码分析(四)、公平锁释放锁

参考

https://juejin.cn/post/7168802584684134413

https://juejin.cn/post/6961380552519712798

https://www.zhihu.com/question/456788102

标签:lock,return,Redis,redis,long,获取,threadId,分布式
From: https://www.cnblogs.com/istitches/p/18118042

相关文章

  • Redis未授权漏洞复现
    目录Redis漏洞的产生条件及利用Redis环境搭建漏洞复现利用Redis写入Webshell利用Redis写入SSH公钥利用Redis写入计划任务Redis安全防护Redisredis(remotedictionaryserver)是一个key-value存储系统,是跨平台的非关系型数据库。redis默认情况下,会绑定在0.0.0.0:6379,如果没有采用......
  • Redis过期删除策略和内存淘汰机制
    过期删除策略1、惰性删除就是过期之后下一次取数据时,发现过期了,就删除它。2、定期删除定期删除一些过期的key。redis采用的时惰性删除+过期删除。问题:可能会漏掉一些key,从而导致OOM。内存淘汰机制3*2+2volatile-lru:从过期数据集中选择最近最少使用的数据淘汰。allKe......
  • Redis数据库的入门学习
     关系型数据库和非关系型数据库的区别:简介Redis数据库和MySql数据库的区别:Redis数据库是基于内存的key-value结构的数据库。本质上是内存存储。而MySql数据库是通过数据文件的方式存在磁盘当中,本质上是磁盘存储。且MySql当中是通过二维表存储数据。注:Redis数据库并不......
  • go~在阿里mse上使用redis.call
    相关依赖github.com/higress-group/proxy-wasm-go-sdkgithub.com/alibaba/higress/plugins/wasm-go标准的redis下面是一个读取redis指定key的方法,使用了higress的wasm-go组件实现的 err:=config.Client.SMembers("online",func(responseresp.Value){ for_,item:=......
  • Redis缓存三兄弟
    Redis缓存的问题都是因为缓存过期,导致大量请求打到数据库,给数据库添加了压力。以下是典型的三个缓存问题。缓存穿透概念缓存穿透:频繁请求缓存和数据库中没有的数据,导致数据库的压力过大解决方案规则校验:增加对key的规则校验,防止恶意请求设默认值:数据库中没有数据时,给该......
  • Redis从入门到精通(七)Redis实战(四)库存超卖、一人一单与Redis分布式锁
    ↑↑↑请在文章开头处下载测试项目源代码↑↑↑文章目录前言4.3优惠券秒杀4.3.4库存超卖问题及其解决4.3.4.1问题分析4.3.4.2问题解决4.3.5一人一单需求4.3.5.1需求分析4.3.5.2代码实现4.3.5.3并发问题4.3.5.4悲观锁解决并发问题4.3.5.5集群环境下的并发问题......
  • 中间件 ZK分布式专题与Dubbo微服务入门 8-2 dubbo 入门简介
    0课程地址https://coding.imooc.com/lesson/201.html#mid=12740 1重点关注1.1本节内容dubbo环境搭建版本及入门简介 1.2环境版本要求dubbo2.5.3及其以上jdk6及其以上maven3及其以上 1.3为什么要用dubbo......
  • 详解 Redis 在 Ubuntu 系统上的安装
    在Ubuntu20.04安装Redis1.先切换到root用户在Ubuntu20.04中,可以通过以下步骤切换到root用户:输入以下命令,以root用户身份登录:sudosu-按回车键,并输入当前用户的密码(即具有sudo权限的用户的密码)如果密码正确,将会切换到root用户,并且提示符会变为以r......
  • Redis中惰性策略的启发和流量包应用设计
    引言    在技术领域,许多中间件之所以获得巨大成功,部分原因在于它们所采用的思想之先进。这些思想解决了一个个世纪难题,接下来我将讲述一个我学习到的思想,并将其应用至工作中的案例。        惰性策略在日常编码中随处可见,但究竟什么是惰性策略呢?简而言之,惰性......
  • Redis各个方面入门详解
    目录一、Redis介绍二、分布式缓存常见的技术选型方案三、Redis和Memcached的区别和共同点四、缓存数据的处理流程五、Redis作为缓存的好处六、Redis常见数据结构以及使用场景七、Redis单线程模型八、Redis给缓存数据设置过期时间九、Redis判断数据过期的原理十......