首页 > 数据库 >基于 RedisTemplate + 线程池 实现 Redis分布式锁

基于 RedisTemplate + 线程池 实现 Redis分布式锁

时间:2024-07-08 17:52:00浏览次数:13  
标签:return String Redis redis 线程 lock RedisTemplate 分布式

分布式锁需求

往往部署的 后台服务 不会只是 单机部署 了,而是通过集群的方式运行在两个甚至多个部署的服务器上(即便是同一个服务器的两个端口上,也会出现同样的问题)等架构来进行部署。
在用户所发送的请求中,每个请求将会通过负载均衡发送到不同的服务器中。如果我们还想对集群中的某个代码片段进行加锁,那么就需要我们的分布式锁出场了。
如果使用传统的加锁方式,我们会对该代码片段加上 synchronized 关键字。如下代码所示

synchronized (this){
    // todo 业务逻辑
}

synchronized 关键字,只能控制该 JVM 进程中的锁资源控制,这一方法有着很大的局限性。主要也是完成 单体架构 或者 进程内 需要加锁的需求。
synchronized 底层也是通过获取 某个对象的 对象头,来获取一个 监听器锁,而我们知道对象是存储在 JVM 的堆区 的。
所以,synchronized 只是 JVM 层面的 锁,不能使用在 集群中。
分布式锁的实现方式也有多种如:Redis分布式锁、zookeeper分布式锁等,本篇主要介绍 Redis 分布式锁。

redis为什么能实现分布式锁?

单线程模型介绍

我们知道 Redis 是一个以 键值对 存储的 nosql,所以使用 Redis 实现的分布式锁将以数据的形式作为 锁资源 存入redis。作为 “锁” 就要求在某一时刻,只会有一个线程在执行该片段。即串行执行加锁片段

Redis主线程(读写线程)模型 就是 单线程 的。也就是说在用户的请求到来时的同一时刻只会有一个线程在执行 Redis数据 相关的操作。

如图:

redis 中存入锁数据之后,第二个操作 redis 的线程(即便是从另外一个服务器来请求的线程)能够立刻得到 锁的状态(已存在该锁)。从而实现对集群的指定代码片段进行加锁

如何实现redis分布式锁?

前置知识:

  • redis 的命令,平时使用的最多的就是 set | get
  • 为实现分布式锁的特性,我们需要保证原子性,一般redis会使用 setnx 来实现
  • setnx 在redis中,如果本来有该缓存数据,则不会更新数据,否则反之
  • 在使用 java 的 api中如:RedisTemplate,该命令会根据更新状态返回一个 布尔值,如果插入成功则返回 true
  • setnx key value

Redis分布式锁的实现主要模型步骤:

  1. 在第一个线程访问时在 Redis 中添加一项缓存数据作为 锁资源
  2. 每个线程在执行该片段开始时,就会执行 setnx 命令进行缓存锁资源更新
  3. 如果更新失败,也会时返回值为 false,则说明有线程正在执行该片段。这时可以选择阻塞线程或给用户反馈一些提示。(如:系统繁忙之类的提示)
  4. 在线程结束时,需要主动删除该锁资源,让接下来的还未执行的线程进行争夺。

代码演示:

try{
    // 获取分布式锁
    Boolean lock = redisTemplate.opsForValue().setIfPresent("lock", "resource");
    // 如果锁资源未正常更新,则返回提示
    if(!lock){
        return "系统繁忙";
    }
    // 如果正常更新,则进行业务逻辑代码
    // todo 业务逻辑
    
}finally {
    // 执行完成后,删除锁
    redisTemplate.delete("lock");
}

setIfPrefent() 方法是 RedisTemplate 中的api,相当于 setnx命令。

在执行业务逻辑代码时该服务挂掉了怎么办?

finally 只能处理 异常 出现的错误,如果执行业务逻辑时挂掉,说明锁已经加上,但是却没有删除。
这个时候说明 锁永远的留在了 Redis 中。那么所有的用户线程就都进行了阻塞。这种情况在我们的 生产环境 肯定是不允许出现的。

解决方案:利用 Redis 的 过期策略,为该锁资源添加 过期时间。

代码参考:

try{
    // 获取分布式锁
    Boolean lock = redisTemplate.opsForValue().setIfPresent("lock", "resource", 10, TimeUnit.SECONDS);
    // 如果锁资源未正常更新,则返回提示
    if(!lock){
        return "系统繁忙";
    }
    // 如果正常更新,则进行业务逻辑代码
    // todo 业务逻辑
    
}finally {
    // 执行完成后,删除锁
    redisTemplate.delete("lock");
}

这样即便服务挂掉了,在到了过期时间之后,该锁资源也会自动释放

如果运行时间超过了过期时间怎么办?

运行时间超过了过期时间,在第一个线程没有全部执行完时,第二个线程就开始执行了。如下图模拟的场景所示:假设线程一共需要执行 15s,但是 Redis锁 过期时间只有 10s

这样就违背了分布式锁的作用。而因为 线程1 的锁已经被过期了,线程2马上就能得到锁。

出现的新问题有:

  1. 原本应该串行的两个线程,有了并发的情况。这可能违背我们所设想的情况,而出现不可预料的错误。
  2. 由于线程1 还没结束,线程2重新加了锁。而不久之后 线程1 结束了,又执行了删除锁的操作,导致线程2 刚加的锁 就被释放了

解决方案:

问题1:创建出分线程对过期时间进行 “续命”, 即延长过期时间

问题2:对每个线程存入值时创建一个线程标识,在执行删除操作时,核对自己的标识,如果是自己当时创建的锁,才执行删除操作。

代码参考:

String clientID = UUID.randomUUID().toString();// 问题2:创建线程标识,并存入redis;
try{
  // 获取分布式锁
  Boolean lock = redisTemplate.opsForValue().setIfPresent("lock", clientID, 10, TimeUnit.SECONDS);
  // 如果锁资源未正常更新,则返回提示
  if(!lock){
      return "系统繁忙";
  }
  // 问题1 创建线程续命
  new Thread(new Runnable() {
      @Override
      public void run() {
          // 对 redis的锁过期时间进行续命
      }
  }).start();
  // 如果正常更新,则进行业务逻辑代码
  // todo 业务逻辑

}finally {
  // 执行完成后,判断为自己创建的锁,则删除锁
  if(clientID.equals(redisTemplate.opsForValue().get("lock"))){
      redisTemplate.delete("lock");
  }

}

创建出分线程的时机应该在判断是否已存在后 立刻 创建,避免因前面代码执行时间过长而导致来不及续命。
现在看来好像分布式锁已经是一个比较完善了,但仍然有待优化也需要根据自己的业务逻辑代码进行修改和设计

代码模板

在实际开发中还是不建议直接通过 Thread 类来进行创建线程,这里模板使用 JUC 提供的,ScheduledThreadPoolExecutor 类来实现线程管理

	// 该线程池能够轻松帮助我们实现有关时间控制的任务
    @Resource
	ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
    
    
// ----------- 业务方法分布式锁片段 --------------
 
 
    ScheduledFuture<?> addLockLifeThread = null;
    try{
        // 创建线程id, 用作判断
    	String clientId = UUID.randomUUID().toString();
        // 设置分布式锁
	    Boolean lock = redisTemplate.opsForValue().setIfPresent(LOCK_KEY, clientId, LOCK_TTL, TimeUnit.SECONDS);
	    if (lock == null || !lock) {
	        // todo 如果没有拿到锁优化为阻塞,不要直接返回
		    return false;
	    }
	    // 使用线程池创建定时任务线程
	    addLockLifeThread = scheduledThreadPoolExecutor.scheduleAtFixedRate(() -> {
		    // lock锁续命
		    lengthenLockLife(clientId);
	    }, ADD_LOCK_TTL, ADD_LOCK_TTL, TimeUnit.SECONDS); 
        // 后面的参数表示,ADD_LOCK_TTL秒后,开始第1次执行,每隔ADD_LOCK_TTL秒在执行一次
 
    // ===== todo 完成需要进行加锁的业务逻辑 ==========
 
    } catch (Exception e){
        log.info("执行出错:{}", e.getMessage());
    }finally{
        // 关闭续命线程,释放锁资源
        if(addLockLifeThread != null){
	        addLockLifeThread.cancel(true);
        }
	    redisTemplate.delete(LOCK_KEY);
    }
 
 
 
// -----------------------------------------------
 
/**
 * 分布式锁进行续命
 *
 * @param clientId 创建的线程id
 */
public void lengthenLockLife(String clientId) {
	String redisLock = redisTemplate.opsForValue().get(LOCK_KEY);
	if (clientId.equals(redisLock)) {
		// 如果是此线程加的锁,进行续命操作
		redisTemplate.expire(LOCK_KEY, LOCK_TTL, TimeUnit.SECONDS);
		log.info("线程id {},进行续命", clientId);
	}
}

创建线程池时,需要 合理配置线程池参数。如:最多允许并发线程为 5 时,可将线程池 核心线程数 配置为 5等。
尽量避免线程添加到 阻塞队列 中,甚至是使用 非核心线程。当然具体情况需要根据业务情况而定。毕竟线程池相关的资源在使用过程中不容易被垃圾回收

redis分布式锁工具类

package com.common.util;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Component;

import java.util.Collections;
import java.util.Objects;

/**
 * @title  用于实现分布式锁
 * @author Created by wql on 2020/5/29.
 */
@Component
public class RedisUtil {


    public static final String LOCK_PREFIX = "redis_lock_";

    private static final Long SUCCESS = 1L;
    // ms  默认10分钟
    public static final int LOCK_EXPIRE = 60 * 10;

    @Autowired
    RedisTemplate redisTemplate;



    /**
     * 最终加强分布式锁
     *
     * @param key key值
     * @return 是否获取到
     */
    public boolean lock(String key) {
        String lock = LOCK_PREFIX + key;
        // 利用lambda表达式
        return (Boolean) redisTemplate.execute((RedisCallback) connection -> {

            long expireAt = System.currentTimeMillis() + LOCK_EXPIRE + 1;
            Boolean acquire = connection.setNX(lock.getBytes(), String.valueOf(expireAt).getBytes());
            
            if (acquire) {
                return true;
            } else {

                byte[] value = connection.get(lock.getBytes());

                if (Objects.nonNull(value) && value.length > 0) {

                    long expireTime = Long.parseLong(new String(value));

                    if (expireTime < System.currentTimeMillis()) {
                        // 如果锁已经过期
                        byte[] oldValue = connection.getSet(lock.getBytes(), String.valueOf(System.currentTimeMillis() + LOCK_EXPIRE + 1).getBytes());
                        // 防止死锁
                        return Long.parseLong(new String(oldValue)) < System.currentTimeMillis();
                    }
                }
            }
            return false;
        });
    }


    /**
     * 获取锁
     * @param lockKey
     * @param value
     * @param expireTime:单位-秒
     * @return
     */
    public  boolean getLock(String lockKey, String value, Integer expireTime){
        if(StringUtils.isTrimBlank(expireTime)){
            expireTime = LOCK_EXPIRE;
        }
        boolean ret = false;
        try{
            String script = "if redis.call('setNx',KEYS[1],ARGV[1]) then if redis.call('get',KEYS[1])==ARGV[1] then return redis.call('expire',KEYS[1],ARGV[2]) else return 0 end end";
            RedisScript<String> redisScript = new DefaultRedisScript<>(script, String.class);
            Object result = redisTemplate.execute(redisScript, Collections.singletonList(LOCK_PREFIX+lockKey),value,expireTime);
            if(SUCCESS.equals(result)){
                return true;
            }
        }catch(Exception e){
            return false;
        }
        return ret;
    }

    /**
     * 释放锁
     * @param lockKey
     * @param value
     * @return
     */
    public  boolean releaseLock(String lockKey, String value) {

        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";

        RedisScript<String> redisScript = new DefaultRedisScript<>(script, String.class);

        Object result = redisTemplate.execute(redisScript, Collections.singletonList(LOCK_PREFIX+lockKey), value);
        if (SUCCESS.equals(result)) {
            return true;
        }
        return false;

    }





    /**
     * 删除锁
     *
     * @param key
     */
    public void delete(String key) {
        redisTemplate.delete(LOCK_PREFIX+key);
    }
    

}

原文章地址:
https://blog.csdn.net/programming132/article/details/129196336

标签:return,String,Redis,redis,线程,lock,RedisTemplate,分布式
From: https://www.cnblogs.com/1399z3blog/p/18290473

相关文章

  • 【RT-Thread基础教程】线程的基本使用
    前言在嵌入式系统开发中,RTOS(Real-TimeOperatingSystem)扮演着至关重要的角色,而RT-Thread作为一款开源的实时操作系统,在嵌入式领域中备受欢迎。线程是RTOS中的基本执行单元,其良好的多任务处理能力使得嵌入式系统能够更有效地响应各种事件和任务。本文将着重介绍在RT-Thread......
  • Redis基础教程(十六):Redis Stream
    ......
  • 为啥 两个线程同时执行 ++i,可能会导致其中一个线程的自增操作被覆盖
    这是一个典型的并发编程问题,涉及到线程的同步和共享资源的访问问题。让我们详细来看一下为什么会这样:自增操作的细节++i 看起来是一个简单的操作,但实际上它包含了多个步骤:读取变量 i 的当前值。将读取到的值加1。将结果写回变量 i。在单线程环境中,这些步骤会顺序执......
  • Docker部署Django+MySQL+Redis+Nginx+uWSGI+Celery(超详细)
    一、知识储备经过我们之前学习的Docker相关知识,现在我们来进行实战,以下介绍如何通过DockerCompose部署Django项目:先前知识:Docker学习笔记(一)概念理解-CSDN博客Docker学习笔记(二)镜像、容器、仓库相关命令操作-CSDN博客Docker学习笔记(三)Dockerfile-CSDN博客DockerCompose......
  • nginx作为反向代理服务器:代理MySQL、Postgresql、Redis及多个TCP服务
    使用Nginx作为反向代理服务器,可以代理MySQL、PostgreSQL、Redis及多个TCP服务。这需要配置Nginx的stream模块。以下是详细的配置步骤:1.确保Nginx支持stream模块首先,确保Nginx已经编译并支持stream模块。运行以下命令检查:nginx-V在输出中查找--with-str......
  • 《框架封装 · 线程装饰器》
    ......
  • Redis 超全面试题及答案整理,最新面试题
    Redis面试题及答案整理,最新面试题Redis持久化机制有哪些?Redis支持两种主要的持久化机制:RDB(快照)和AOF(追加文件)。1、RDB(RedisDatabase):在指定的时间间隔内,执行快照存储,将内存中的所有数据保存到磁盘上的一个快照文件中。这个机制可以通过在redis.conf配置文件中设置不同......
  • Spring Boot Redis 集群性能优化(基于 Redisson)
    1.SpringBootRedis集群性能优化(基于Redisson)1.1.版本说明1.2.为什么是Redisson1.3.参数优化1.3.1.Redisson配置参数1.3.1.1.通用参数1.3.1.2.集群参数1.3.1.3.最终参数配置1.4.从Nacos获取Redisson配置1.SpringBootRedis集群性能优化(......
  • SpringBoot使用线程池实现异步批量处理任务
    模拟批处理大量数据@Slf4j@ComponentpublicclassTestFutureService{@AutowiredprivateTestFutureServiceImpltestFutureServiceImpl;/***多线程的优势:多核CPU使用多线程可以提高CPU的利用率(单核CPU不行,反而降低),可以实现异步调用。**......
  • Redisson锁误删除
    1、目标本文的主要目标是探究Redisson分布式锁在设置过期时间的情况下多线程是否会误删除的问题,首先分析单线程执行的完整过程,然后分析多线程锁误删除的现象,接着进行源码分析,理解Redisson如何保证多线程场景下当前线程不会误删除其他线程id的锁,最后是总结2、单线程执行的......