首页 > 数据库 >redis 实现分布式锁原理

redis 实现分布式锁原理

时间:2023-08-25 20:32:43浏览次数:37  
标签:return err redis 写入 ctx key 原理 分布式

通过原子操作实现 redis 锁

redis 内部是通过 key/value 的形式存储的,核心原理是设置一个唯一的 key,如果这个 key 存在,说明有服务在使用

具体实现方式:

  1. 首先判断 redis 中是否存在某个 key,并且为某个值
  2. 如果这个 key 不存在,说明当前没有服务在使用,设置 key
  3. 如果这个 key 存在,说明当前有服务在使用,就等待一段时间,然后再次判断这个 key 是否存在

如下图所示

redis 实现分布式锁原理_lua

这种情况有没有问题呢?

如果在单体应用的场景下,这种方式是可行的;但是在分布式场景下,这种方式就不可行了

因为在分布式场景下,redis 是多个服务共享的,如果多个服务同时判断 key 不存在,那么就会同时设置 key,就会导致多个服务同时执行,这不是我们想要的结果

为什么这样做会有问题?

因为 getset 操作不是原子操作,你先要做操作 get,然后在操作 set,这个过程中

这就会导致当第一台服务在执行 get 时,发现 key 不存在,然后进行 set,这个时候 set 可能还没有完成,第二台服务执行了 get,发现 key 不存在,然后进行 set,这个时候就会导致多个服务同时执行,这就不是原子操作了

原子操作的意思是:一次性执行,不会被打断

这个怎么做呢?

redis 提供了一个 setnx 的方法,作用是如果 key 不存在,就设置 key,设置成功返回 1,设置失败返回 0

这就将 getset 的逻辑合二为一了,保证原子性了

如下图所示:

redis 实现分布式锁原理_初始化_02

当我们了解了原理之后,看下人家是不是这样实现的,以 redsync 为例,先来看它使用,从入口函数一步步往下追

rs := redsync.New(pool)
mutexname := "my-global-mutex"
mutex := rs.NewMutex(mutexname)

if err := mutex.Lock(); err != nil {
  panic(err)
}
if ok, err := mutex.Unlock(); !ok || err != nil {
  panic("unlock failed")
}

从上面代码可以看到,它先调用 NewMutex 创建了一个 mutex,然后调用 mutex.Lock() 方法

NewMutex 是初始化函数,用来初始化一系列的参数,

比较重要的有:

  • nameredis 中的 key
  • genValueFunc:生成 key 的函数,保证唯一性
  • expirykey 过期的时间
  • tries:尝试的次数,可能会拿不到锁,所以要尝试多次
  • delayFunc:延迟时间(睡眠时间),可能会拿不到锁,就需要等一会再尝试
  • quorum:大多数节点,这个是用来做分布式锁的,如果有 5 个节点,那么这里的大多数是 3 个节点
m := &Mutex{
  name:   name,
  expiry: 8 * time.Second,
  tries:  32,
  delayFunc: func(tries int) time.Duration {
    return time.Duration(rand.Intn(maxRetryDelayMilliSec-minRetryDelayMilliSec)+minRetryDelayMilliSec) * time.Millisecond
  },
  genValueFunc:  genValue,
  driftFactor:   0.01,
  timeoutFactor: 0.05,
  quorum:        len(r.pools)/2 + 1,
  pools:         r.pools,
}

初始化结束之后,调用 m.Lock() 上锁,m.Lock() 方法中调用 m.LockContext() 方法,

LockContext 是核心方法,里面会做很多事情,这一步我们关心它是怎么上锁的,通过搜索发现,上锁的方法是 m.acquire(),其源码是:

func (m *Mutex) acquire(ctx context.Context, pool redis.Pool, value string) (bool, error) {
  conn, err := pool.Get(ctx)
  if err != nil {
    return false, err
  }
  defer conn.Close()
  reply, err := conn.SetNX(m.name, value, m.expiry)
  if err != nil {
    return false, err
  }
  return reply, nil
}

在这里我们清晰的看到调用 SetNX 方法

通过过期时间防止死锁

这样做完之后,还有一个问题需要解决

如果正在操作 redis 的服务挂了,那么这个 key 就会一直存在,其他服务就会等待,这样就造成了死锁

解决这个问题就是设置过期时间,如果服务挂了,过期时间到了,key 就会自动删除,其他服务就可以继续使用了

通过源代码我们可以看到它设置了一个过期时间 expiry

reply, err := conn.SetNX(m.name, value, m.expiry)

这个过期时间是怎么来的呢?

刚刚在入口函数中,我们看到了 NewMutex 函数,它初始化了一个 expiry,这个 expiry 就是过期时间:expiry: 8 * time.Second,它默认设置的是 8

到这里就有疑问了,如果我的服务执行时间超过 8 秒怎么办?,不就达不到锁的效果了?

我们很快就会想到,在过期前刷新下过期时间不就行了?

确实 redsync 也考虑到了这个问题,它提供了一个 Extend 方法,用来刷新过期时间

m.Extent() 方法调用 m.ExtendContext() 方法,在 m.ExtendContext() 方法中调用 m.touch() 方法

func (m *Mutex) Extend() (bool, error) {
  return m.ExtendContext(nil)
}
func (m *Mutex) ExtendContext(ctx context.Context) (bool, error) {
  // ... 省略其他代码
  m.touch(ctx, pool, m.value, int(m.expiry/time.Millisecond))
  // ... 省略其他代码
}
func (m *Mutex) touch(ctx context.Context, pool redis.Pool, value string, expiry int) (bool, error) {
  // ... 省略其他代码
	conn, err := pool.Get(ctx)
  conn.Eval(touchScript, m.name, value, expiry)
}

m.touch() 方法中我们看到它调用 redis 提供的 Eval 方法,可以执行一段 lua 脚本,脚本的内容如下:

var touchScript = redis.NewScript(1, `
  if redis.call("GET", KEYS[1]) == ARGV[1] then
    return redis.call("PEXPIRE", KEYS[1], ARGV[2])
  else
    return 0
  end
`)

它为什么要这样做呢?

不就是把过期时间刷新下吗?为什么要写 lua

这里我们需要了解下 redislua 脚本,redislua 脚本是原子性的,它可以保证一段脚本的执行是原子性的

这样就可以保证刷新过期时间的操作是原子性的,不会出现刷新过期时间失败的情况

如果我们用 go 语言去续期的需要三步:

  1. 先获取到 key 的值
  2. 判断 redis 中的值是不是你传进来的值
  3. 如果是的话,续期

这样的话,这样的话就不具备原子性了,任何一步都有失败的可能,所以 redsync 选择了 lua 脚本

我们在使用 m.Extend() 续期时,需要用协程去做

redsync 为什么不自动续期呢?

如果做自动续期的话,当前正在操作的服务如果 hung 住了,那么就会不停的续期,造成其他服务无法进来,所以 redsync 将续期的功能交给了使用者

防止被其他服务删除

锁只能被持有该锁的服务删除,不能被其他服务删除

如果保证锁只能被持有该锁的服务删除,那么就需要在 setnx 的时候,给 key 设置一个唯一的值,这个值可以是 uuid,这样就可以保证锁只能被持有该锁的服务删除

我们看下 redsync 源码是如何做的,初始化时就生成了一个唯一的值,它是使用 base64 编码的

func genValue() (string, error) {
  b := make([]byte, 16)
  _, err := rand.Read(b)
  if err != nil {
    return "", err
  }
  return base64.StdEncoding.EncodeToString(b), nil
}

删除的时候,调用 m.Unlock() 方法,m.Unlock() 方法调用 m.UnlockContext() 方法,在在 m.release() 方法

func (m *Mutex) release(ctx context.Context, pool redis.Pool, value string) (bool, error) {
  // ... 省略其他代码
  conn, err := pool.Get(ctx)
  conn.Eval(deleteScript, m.name, value)
}

m.release() 方法中我们看的也是在执行 lua 脚本,脚本的内容如下:

var deleteScript = redis.NewScript(1, `
  if redis.call("GET", KEYS[1]) == ARGV[1] then
    return redis.call("DEL", KEYS[1])
  else
    return 0
  end
`)

这也是为了保证在删除锁的时候,保证原子性

redlock

通过我们上面讲解的已经能满足一般的使用场景,但是在大型项目中,不会只搭建一个 redis,而是搭建 redis 集群

这样又会出现一个新的问题:redlock

redlock 是什么呢?我们先来看下 redis 集群

一般 redis 集群有一个 master 节点,多个 slave 节点

如下图所示:

redis 实现分布式锁原理_初始化_03

当我在加锁时,如果 master 节点会自动同步到 slave 节点,那么就不会有问题

如果这时 master 节点出问题了(或者说在同步过程中出问题,还没有同步完),slave 节点会选举出一个 master 节点,这个过程中会有一段时间,这时如果有一个服务进来写,发现是能写入的,这就出现了问题

如下图所示:

redis 实现分布式锁原理_redis_04

面对这种问题如何解决,引入了 redlock 的这个概念

redlock 的核心思想是:在 redis 集群中,大多数节点都能写入成功,那么就认为写入成功,而不是只向一台 redis 写入

当第一个服务写入时,同时向 5redis 写入,这时如果第二个服务写入,写同时向 5redis 写入,谁先成功写入大多数 redis,谁就认为写入成功,锁就交给谁

这里的大多数就是比一半多 1 台,也就是 n / 2 + 1,所以 redis 应该准备奇数台,同时也无需关心这 5redis 的主从关系了

如下图所示:

redis 实现分布式锁原理_lua_05

我们通过 redsync 源码来学习 redlock,是如何实现的:

  1. 通过 select 实现超时控制
  2. 核心代码是 actOnPoolsAsync 方法
  • pools:表示向多台 redis 写入
  • async:表示异步写入多台 redis,同步写入的话,效率偏低,使用 goroutine(具体可以查看下面 actOnPoolsAsync 方法的分析)
  1. 判断是否拿到锁
  • 如果拿到锁,更新 m.valuem.until
  • 如果没有拿到锁,需要释放已经写入的 rediskey
func (m *Mutex) LockContext(ctx context.Context) error {
  if ctx == nil {
    ctx = context.Background()
  }

  value, err := m.genValueFunc()
  if err != nil {
    return err
  }

  // 如果没有拿到锁,等待一段时间在去拿
  for i := 0; i < m.tries; i++ {
    if i != 0 {
      // 使用 select 实现超时控制
      select {
      case <-ctx.Done():
        return ErrFailed
      case <-time.After(m.delayFunc(i)):
      }
    }

    // 记录拿锁开始时间
    start := time.Now()

    n, err := func() (int, error) {
      ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor)))
      defer cancel()
      // 异步写入多台 redis
      return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
        return m.acquire(ctx, pool, value)
      })
    }()
    // 记录拿锁结束时间
    now := time.Now()
    // 计算还剩多少时间:过期时间 - 拿锁花费的时间 - 时间偏移
    // 这段代码是为了防止 `redis` 节点时间不同步,导致锁过期时间不准确,所以在过期时间上加上一个 `driftFactor`,这个值是 `0.01`,也就是 `1%` 的误差
    until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.driftFactor)))
    // 判断是否竞争成功
    if n >= m.quorum && now.Before(until) {
      m.value = value
      m.until = until
      return nil
    }
    // 如果竞争失败,释放已经写入的 redis 的 key
    func() (int, error) {
      ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor)))
      defer cancel()
      return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
        return m.release(ctx, pool, value)
      })
    }()
    if i == m.tries-1 && err != nil {
      return err
    }
  }

  return ErrFailed
}

为什么要使用异步写入多台 redis 呢?

如果采用同步写入的多台的话,如果写入的 redis 比较多,就会很耗时,可能写到最后一台 redis 时,前面的 redis 已经过期了,这样就会出现问题

启用 goroutine 去写入的话,可以一瞬间都拿到 lock,调用 setnx 方法去写入

然后再统计成功写入的台数,返回出去

func (m *Mutex) actOnPoolsAsync(actFn func(redis.Pool) (bool, error)) (int, error) {
  type result struct {
    Node   int
    Status bool  // 成功写入的台数
    Err    error // 未成功写入的错误
  }

  // 启用 goroutine 去调用 setnx 写入
  // 用 channel 来接收结果
  ch := make(chan result)
  for node, pool := range m.pools {
    go func(node int, pool redis.Pool) {
      r := result{Node: node}
      r.Status, r.Err = actFn(pool)
      ch <- r
    }(node, pool)
  }
  n := 0
  var taken []int
  var err error
  for range m.pools {
    r := <-ch
    // 写入成功,n++;写入失败,记录错误
    if r.Status {
      n++
    } else if r.Err != nil {
      err = multierror.Append(err, &RedisError{Node: r.Node, Err: r.Err})
    } else {
      taken = append(taken, r.Node)
      err = multierror.Append(err, &ErrNodeTaken{Node: r.Node})
    }
  }

  // 将写入的台数和错误返回出去
  if len(taken) >= m.quorum {
    return n, &ErrTaken{Nodes: taken}
  }
  return n, err
}

总结

分布式锁的实现需要考虑的问题:

  1. 原子性(互斥性):锁只能被一个服务持有
  • 使用 setnx 命令,将 setget 变成原子性
  • 使用 lua 搅拌
  1. 死锁:设置过期时间,防止服务挂了变成死锁
  • 续期操作需要保证原子性,使用 lua 脚本
  1. 安全性:锁只能被持有该锁的服务删除,不能被其他服务删除
  • setnx 的时候,给 key 设置一个唯一的值

标签:return,err,redis,写入,ctx,key,原理,分布式
From: https://blog.51cto.com/u_16205813/7234938

相关文章

  • Redis使用细节(持续更新中)
    Redis使用细节分布式锁因为Redis是单线程的,所以可以用setnx来模拟锁的获取释放从而实现分布式锁在用setnx实现分布式锁时,会出现一些问题业务超时解锁,导致并发问题。业务执行时间超过了锁超时的时间redis主从切换临界点问题,主从切换后,A持有的锁还没有同步到新的主节点,B在新的......
  • php基于Redis实现播放量增加功能
    开发思路就是用redis得有序组合功能1.先自增到reids中$redisGetKey='ciku_promoter_click_number';$jyId=2;//文章id$redis=RedisService::build()->getInstance();$videoPlayNumber=$redis->zScore($redisGetKey,$jyId);//如果该文章id存在,则新增,否则自增1if......
  • HTTP代理什么是反向HTTP代理?反向HTTP代理的工作原理及使用场景
    反向HTTP代理是一种网络服务,它可以帮助服务器绕过一些访问限制,如IP封锁、访问频率限制等,以便服务器可以正常提供服务。本文将介绍反向HTTP代理的定义、工作原理、应用场景以及注意事项。反向HTTP代理是一种网络服务,它充当了服务器和客户端之间的中介,通过代理服务器来转发客户端的请......
  • Redis-持久化的学习
    持久化-rdbredis.conf中已经自动配置好了持久化设置,但我们可以改为自己需要的设置。当条件触发时会在同级文件夹内生成dump.rdb文件(快照)。 触发条件:1:满足config中设置的触发条件2:使用flushall命令3:退出redis,也会自动生成dump.rdb  如何打开rdb文件?在redis中输入conf......
  • Redis学习笔记
    1.Redis安装配置及开启自启1.1安装Redis依赖Redis是基于C语言编写的,因此首先需要安装Redis所需要的gcc依赖:yuminstall-ygcctcl1.2安装Redis获取Rediscd/opt/wgethttp://download.redis.io/releases/redis-6.2.6.tar.gz解压Reids#解压到/usr/local/src/......
  • Redis开启过期监听
    1.开启过期通知配置默认notify-keyspace-events""修改为:notify-keyspace-eventsEx2.增加监听类publicclassRedisKeyExpirationListenerextendsKeyExpirationEventMessageListener{publicRedisKeyExpirationListener(RedisMessageListenerContainerlisten......
  • [javase高级] HashMap实现原理
    HashMap是数组+链表实现的,既然用到hash散列,那么肯定不可避免的会出现冲突问题,HashMap解决冲突的方法是拉链法,因为这里有用到数组,那么当容量不足的时候就需要进行扩容操作了,在HashMap中有个术语叫冲突,当冲突几率越来越高的时候就需要进行扩容操作了那什么情况就叫冲突几率高呢?就是......
  • Redis如何批量删除指定前缀的key
    批量删除指定前缀的Key有两中方法,一种是借助redis-cli,另一种是通过SCAN命令来遍历所有匹配前缀的key,并使用DEL命令逐个删除它们。redis-cli使用Redis自带的redis-cli命令行工具,你可以通过以下方式批量删除指定前缀的key:redis-cliKEYS"your_prefix*"|xargsredis......
  • DNSLOG平台搭建及其原理分析
    一、DNS基本概念0x1:DNS中不同域名类型概念DNS的全称是DomainNameSystem(网络名称系统),它作为将域名和IP地址相互映射,使人更方便地访问互联网。当用户输入某一网址如littlehann.com,网络上的DNSServer会将该域名解析,并找到对应的真实IP如101.37.97.51,使用户可以访问这台服务器上......
  • springboot整合redis回滚
    1:添加依赖2:yml中配置连接,如:host,password,port3:@autowired注解注入Redistemplate调用方法生成对象 为了方便公司开发,一般会对数据进行序列化存储,这时需要创建配置类进行全局设置packagecom.example.config;importcom.fasterxml.jackson.annotation.JsonAutoDetect;importco......