首页 > 数据库 >Go每日一库之168:redsync(redis分布式锁)

Go每日一库之168:redsync(redis分布式锁)

时间:2023-09-29 21:26:36浏览次数:46  
标签:加锁 err redis value 一库 168 key redsync

今天给大家推荐的是基于redis的Go版本的分布式锁工具:redsync。该工具也是redis官网上推荐的。redsync 基于redis的高可用、高性能、防死锁、防误删的分布式锁实现,具有高性能、高可用、防死锁、防误删的特点。

一、分布式锁基础知识

什么是分布式锁

锁,在编程语言中就是一个变量,该变量在同一时刻只能有一个线程拥有,以便保护共享数据在同一时刻只有一个线程去操作。而分布式锁也是锁,即分布式系统中的锁。该锁是用于解决在分布式系统中控制共享资源访问的问题的。

分布式锁常见使用场景

1.最常见扣减库存 2.缓存击穿/缓存雪崩(也可以采用分布式锁) 3.在高并发的场景下,阻止流量打到后边等等

二、redsync包从使用到原理

安装

go get github.com/go-redsync/redsync/v4

** 基本使用**

该包的使用也很简单。首先创建一个redis的客户端连接。然后将该客户端连接加入到redis的Pool中。最后,redsync基于该redisPool进行实例化。然后通过redsync实例的NewMutex就可以基于一个具体的key新建一个分布式锁。然后进行加锁和解锁操作。

该包进行实例化时有基于redis的单机模式和集群模式两种使用方式。在使用上主要有以下两点区别:

  • • 连接redis的客户端是以集群模式创建还是以单机模式创建
  • • 在导入redsync包时,集群模式需要导入goredis/v8的版本

我们看下具体的两种模式下的基本使用。以下示例代码是基于redis单机模式的使用。初始化客户端连接时使用NewClient创建一个连接。如下:

package main

import (
    goredislib "github.com/go-redis/redis/v8"
    "github.com/go-redsync/redsync/v4"
    "github.com/go-redsync/redsync/v4/redis/goredis/v8"
)

func main() {
    // 创建一个redis的客户端连接
    client := goredislib.NewClient(&goredislib.Options{
        Addr: "localhost:6379",
    })
    // 创建redsync的客户端连接池
    pool := goredis.NewPool(client) // or, pool := redigo.NewPool(...)

    // 创建redsync实例
    rs := redsync.New(pool)

    // 通过相同的key值名获取同一个互斥锁.
    mutexname := "my-global-mutex"
    //创建基于key的互斥锁
    mutex := rs.NewMutex(mutexname)

    // 对key进行
    if err := mutex.Lock(); err != nil {
        panic(err)
    }

    // 获取锁后的业务逻辑处理.

    // 释放互斥锁
    if ok, err := mutex.Unlock(); !ok || err != nil {
        panic("unlock failed")
    }
}

如果要想基于redis的集群模式,则在创建redis的客户端连接时使用NewClusterClient函数,如下:

    // 创建一个redis集群模式的客户端连接
    client := goredislib.NewClusterClient(&goredislib.ClusterOptions{
        Addr: []string{"localhost:6379"},
    })

实现分析

从上面的示例代码中可以看到,该包的使用流程就是创建redis客户端连接、实例化redsync对象、创建一个互斥锁、加锁、解锁。接下来我们一步步分析其实现过程。

1、创建redsync对象

在该包中创建redsync对象是通过以下函数实现的:

redsync.NewPool(pool ...redis.Pool) *Redsync

首先,我们看到该NewPool函数接收多个redis.Pool参数,我们再看Redsync的结构体,结构体中只有一个pool属性,并且是一个redis连接池的切片,说明可以有多个redis客户端连接池。同时通过注释可以得知,Redsync可以使用多个Redis连接池创建分布式锁。

// Redsync provides a simple method for creating distributed mutexes using multiple Redis connection pools.
type Redsync struct {
    pools []redis.Pool
}

问题:为什么要这里要用一个redis连接池的切片呢?暂且我们先认为这里只传入了一个redis的客户端连接池。带着这个问题往下看。

2、创建互斥锁

创建完Redsync实例后,就可以通过该实例中的NewMutex方法创建一个互斥锁了。这里就是实例化了一个Mutex对象。如下:

// NewMutex returns a new distributed mutex with given name.
func (r *Redsync) NewMutex(name string, options ...Option) *Mutex {
    m := &Mutex{
        name:   name,
        expiry: 8 * time.Second,
        tries:  32,
        delayFunc: func(tries int) time.Duration {
            return time.Duration(rand.Intn(maxRetryDelayMilliSec-minRetryDelayMilliSec)+minRetryDelayMilliSec) * time.Millisecond
        },
        genValueFunc:  genValue,
        driftFactor:   0.01,
        timeoutFactor: 0.05,
        quorum:        len(r.pools)/2 + 1,
        pools:         r.pools,
    }
    for _, o := range options {
        o.Apply(m)
    }
    return m
}

这里,我们先关注name、genValueFunc、quorum以及pools即可。其他的我们稍后分析。

  • • name属性:用于redis的key。一个name代表一个锁。
  • • genValueFunc:用于生成key的value。该value值会在删除锁时用到。其作用是防止被误删锁。稍后在释放锁会做分析。
  • • quorum:我们看实例化代码中赋值是len(r.pools)/2+1,也就是redis连接池数的一半+1。作用是用于高可用性。
  • • pools:即Redsync中的pools切片,本质上是redis的客户端连接,通过该连接进行redis的具体操作。

3、加锁

创建了互斥锁对象后,就可以通过互斥锁对象的Lock方法进行加锁操作了。加锁的本质就是使用setnx操作。因为setnx它会先判断key是否已经存在,如果key不存在,那么就设置key的值为value,并返回1;如果key已经存在,则不更新key的值,直接返回0。利用该特性我们就可以实现一个最简单的分布式锁了。

  • null
    image.png

该包也是通过setnx,将mutex对象中的name作为key,通过genValueFunc函数生成的随机值作为value,并且将mutex对象中的expiry属性作为过期时间。如下:

func (m *Mutex) acquire(ctx context.Context, pool redis.Pool, value string) (bool, error) {
    conn, err := pool.Get(ctx)
    if err != nil {
        return false, err
    }
    defer conn.Close()
    reply, err := conn.SetNX(m.name, value, m.expiry)
    if err != nil {
        return false, err
    }
    return reply, nil
}

这里给setnx设置过期时间是为了防止该所永远得不到释放的产生。假设没有给key设置过期时间,万一程序在发送delete命令释放锁之前宕机了,那么这个key就会永久的存储在Redis中了,其他客户端也永远获取不到这把锁了。

null

image.png

该包中的value值是通过genValueFunc函数随机生成的,该函数默认是生成一个随机值,在一定程度上保证value值的唯一性。保证value值的唯一性是为了锁在释放时被误删。这里在释放锁进行delete操作时,会对要删除的值进行判断是否是当前锁中锁持有的value。当然在NewMutex的时候可以指定生成value值的函数,但必须保证该value值的唯一性。

在初始化Redsync时,我们提到有一个pools的切片,存储的是redis的连接池。有一个问题是为什么要用一个切片呢?答案就是为了高可用性。在进行加锁操作时,该包会循环该pools,让每一个客户端连接都尝试进行setnx操作,如果操作成功的数量多余所有连接的一半,那么才认为是加锁成功。否则,加锁失败。

我们提到,为了防止锁永远得不到释放,我们给key设置了有效期。那么,在进行加锁过程的处理时间已经接近过期时间了,即使setnx成功了,也会很快到过期时间了,那这剩余的一点时间根本来不及处理加锁后的业务逻辑,导致所自动释放。这时就可能被别的线程获取该锁,那么就会造成并发问题。所以,这里判断是否加锁成功不仅要判断有几个redis的setnx操作成功了,而且还要判断加锁成功后剩余的时间是否能够处理后面的业务逻辑,以防止加锁成功后,锁又立即过期的情况。

所以在该包中判断加锁是否成功有以下条件:

now := time.Now()
until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.driftFactor)))
if n >= m.quorum && now.Before(until) {
    m.value = value
    m.until = until
    return nil
}

这里until的计算就是用当前时间加上剩余时间。剩余时间是用有效期时间m.expiry,减去加锁处理时间now.Sub(start),再减去一个预估的剩余值,(用有效期时间乘以一个driftFactro因子,该因子默认值是0.01,当然可以根据业务设置)。

在加锁过程中,考虑到性能问题,如果一次加锁不成功,可以进行重试。但在重试过程中需要考虑时间间隔的问题,为了体现公平性,会在最小等待时间基础上再增加一个随机值。如下是该包的实现:

for i := 0; i < m.tries; i++ {
        if i != 0 {
            select {
                case <-ctx.Done():
                // Exit early if the context is done.
                return ErrFailed
                case <-time.After(m.delayFunc(i)):
                // Fall-through when the delay timer completes.
            }
        }
        // 其他加锁逻辑
    }

这里,m.delayFunc函数的实现如下:

   delayFunc: func(tries int) time.Duration {
       return time.Duration(rand.Intn(maxRetryDelayMilliSec-minRetryDelayMilliSec)+minRetryDelayMilliSec) * time.Millisecond
   },

4、释放锁

释放锁的本质就是将对应的key从redis中删除。使用delete操作即可。但在删除时要防止误删的情况。例如,client1获得锁之后开始执行业务处理,但业务处理耗时较长,超过了锁的过期时间,导致业务处理还没结束时,锁却过期自动删除了(相当于属于client1的锁被释放了),此时,client2就会获取到这把锁,然后执行自己的业务处理,也就在此时,client1的业务处理结束了,然后向Redis发送了delete key的命令来释放锁,Redis接收到命令后,就直接将key删掉了,但此时这个key是属于client2的,所以,相当于client1把client2的锁给释放掉了:

null

image.png

所以,在加锁时我们给key设置了一个唯一的value值,在删除所时进行判断,该value值是否是当前线程的。当业务处理还没结束的时候,key自动过期了,也可以正常释放自己的锁,不影响其他线程

null

image.png

这里还有一个问题就是判断锁是否属于当前线程和释放锁两个步骤并不是原子操作。正常来说,如果线程1通过get操作从Redis中得到的value是123,那么就会执行删除锁的操作,但假如在执行删除锁的动作之前,系统卡顿了几秒钟,恰好在这几秒钟内,key自动过期了,线程2就顺利获取到锁开始执行自己的逻辑了,此时,线程1卡顿恢复了,开始继续执行删除锁的动作,那么此时删除的还是线程2的锁

null

image.png

这里的解决方案就是使用lua脚本,保证查询和删除是原子操作。我们看下Redsync包的实现:

var deleteScript = redis.NewScript(1, `
    if redis.call("GET", KEYS[1]) == ARGV[1] then
        return redis.call("DEL", KEYS[1])
    else
        return 0
    end
`)

func (m *Mutex) release(ctx context.Context, pool redis.Pool, value string) (bool, error) {
    conn, err := pool.Get(ctx)
    if err != nil {
        return false, err
    }
    defer conn.Close()
    status, err := conn.Eval(deleteScript, m.name, value)
    if err != nil {
        return false, err
    }
    return status != int64(0), nil
}

null

image.png

5、程序中的函数式模式

我们再回过头来看下创建互斥锁时的函数:

func (r *Redsync) NewMutex(name string, options ...Option) *Mutex {
    m := &Mutex{
        name:   name,
        expiry: 8 * time.Second,
        tries:  32,
        delayFunc: func(tries int) time.Duration {
            return time.Duration(rand.Intn(maxRetryDelayMilliSec-minRetryDelayMilliSec)+minRetryDelayMilliSec) * time.Millisecond
        },
        genValueFunc:  genValue,
        driftFactor:   0.01,
        timeoutFactor: 0.05,
        quorum:        len(r.pools)/2 + 1,
        pools:         r.pools,
    }
    for _, o := range options {
        o.Apply(m)
    }
    return m
}

函数的第二个签名中的Option是一个切片,可以给Mutex变量的选项设置自定义的值,比如重试次数、生成值的函数等。我们看到在实现中会有一个循环:

for _, o := range options {
    o.Apply(m)
}

type Option interface {
    Apply(*Mutex)
}

// OptionFunc is a function that configures a mutex.
type OptionFunc func(*Mutex)

// Apply calls f(mutex)
func (f OptionFunc) Apply(mutex *Mutex) {
    f(mutex)
}

每个Option都实现实现了Apply接口,其实这里利用的的是函数式选项模式。比如我们要自定义Mutex的重试次数,就可以通过如下函数:

func WithTries(tries int) Option {
    return OptionFunc(func(m *Mutex) {
        m.tries = tries
    })
}

在初始化Mutex时,通过该函数就能设置Mutex的尝试次数。更多的函数式选项模式内容可以参考我之前写的常见错误系列的一篇文章:Go常见错误集锦之函数式选项模式

标签:加锁,err,redis,value,一库,168,key,redsync
From: https://www.cnblogs.com/arena/p/17737359.html

相关文章

  • Go每日一库之167:emoji(emoji表情)
    大家在使用微信或钉钉聊天时,一定使用过表情符号。今天就给大家介绍一个能够在终端上显示emoji表情符号的包:emoji。实现原理:emoji表情符号实际上就是在unicode编码表中有定义的一个编码。通过将符号的文字表示和对应的unicode编码进行一一对应,在使用时对文字符号进行替换成rune字......
  • Go每日一库之187:singleflight(合并重复调用)
    本文主要介绍Go语言中的singleflight包,包括什么是singleflight以及如何使用singleflight合并请求解决缓存击穿问题。singleflight目前(Go1.20)还属于Go的准标准库,它提供了重复函数调用抑制机制,使用它可以避免同时进行相同的函数调用。第一个调用未完成时后续的重复调用会等待,当第......
  • Go每日一库之186:sonic(高性能JSON库)
    介绍我们在日常开发中,常常会对JSON进行序列化和反序列化。Golang提供了encoding/json包对JSON进行Marshal/Unmarshal操作。但是在大规模数据场景下,该包的性能和开销确实会有点不够看。在生产环境下,JSON序列化和反序列化会被频繁的使用到。在测试中,CPU使用率接近10%,其中极端情况......
  • Go每日一库之184:katana(新一代爬虫框架)
    项目链接https://github.com/projectdiscovery/katana项目简介katana是一个使用golang编写的新一代爬虫框架,支持HTTP和headless抓取网页信息不仅可以作为库集成到Golang项目,还可以通过命令行直接抓取,对于有一些轻量级的抓取任务的开发者配合jq一起使用简直就是福......
  • Go每日一库之183:vegeta(http压力测试工具库)
    项目地址:https://github.com/tsenart/vegetahttps://mp.weixin.qq.com/s/J0PiqTifr_rs_S2CzMRoWg......
  • Go每日一库之182:RuleGo(轻量级高性能嵌入式规则引擎)
    ◆ 一、开源项目简介RuleGo是一个基于Go语言的轻量级、高性能、嵌入式的规则引擎。也一个灵活配置和高度定制化的事件处理框架。可以对输入消息进行过滤、转换、丰富和执行各种动作。◆ 二、开源协议使用Apache-2.0开源协议◆ 三、界面展示规则链规则链是规则节点及其关......
  • Go每日一库之181:conc(并发库)
    来自公司sourcegraph的conc**(https://github.com/sourcegraph/conc)并发库,目标是betterstructuredconcurrencyforgo,简单的评价一下每个公司都有类似的轮子,与以往的库比起来,多了泛型,代码写起来更优雅,不需要interface,不需要运行时assert,性能肯定更好我们在写通......
  • Go每日一库之180:fastcache(协程安全且支持大量数据存储的高性能缓存库)
    fastcache是一个线程安全并且支持大量数据存储的高性能缓存组件库。这是官方Github主页上的项目介绍,和fasthttp名字一样以fast打头,作者对项目代码的自信程度可见一斑。此外该库的核心代码非常轻量,笔者本着学习的目的分析下内部的代码实现。基准测试官方给出了fastca......
  • Go每日一库之179:env(将系统环境变量解析到结构体的库)
    该包的实现是基于标准库os/env包中的相关函数(比如Getenv)来获取系统的环境变量的。获取到环境变量值后,再通过结构体中的tag,将值映射到对应的结构体字段上。使用示例下面是将系统的一些环境变量映射到config结构体的示例。如下:我们可以像以下这样运行该代码:$PRODUCTION=trueHO......
  • Go每日一库之178:chromedp(一个基于Chrome DevTools协议的库,支持数据采集、截取网页长
    该库提供了一种简单、高效、可靠的方式来控制Chrome浏览器进行自动化测试和爬取数据。项目地址:https://github.com/chromedp/chromedp它可以模拟用户在浏览器中执行各种操作,如点击、输入文本、截取网页长图、将网页内容转换成pdf文档、下载图片等,从而获取到需要采集的数据。基......