首页 > 其他分享 >Go 锁的实现与应用

Go 锁的实现与应用

时间:2024-07-20 15:19:39浏览次数:12  
标签:return func err 实现 redis ctx nil 应用 Go

Go 锁的实现与应用

原创 蔡蔡蔡菜 蔡蔡蔡云原生Go    2024年07月20日 08:30 广东 1人听过

在说锁的实现之前,我们先了解一下业务中的并发问题。

并发是由于多个线程在多个CPU上执行,此时CPU 之间的缓存并不可见。从磁盘或者内存获取数据后会保存在CPU中进行执行,此时如果内存和磁盘的数据被更新了,CPU 并不会发现。这个时候就出现了我们平时经常说的并发。

我们说到CPU 在执行的过程中,磁盘或者内存的数据已经发生改变,这个时候CPU在执行程序的时候,数据属于旧数据,此时计算出来的结果是不符合预期的。

这个时候再将不符合预期的数据向内存和磁盘进行写入时就会造成数据错乱

反映到实际业务上,由于数据的错乱,有可能用户会由于旧数据而支付了两次金额,这对现实情况来说是完全不允许的。

对不同的业务场景,解决并发的方式各有不同,需要用的解决方式也不相同。

我们短时间内创建微信金额相同的两个订单的时候,也会提醒我们是否存在问题,这种是在业务逻辑上进行规避,但是这种规避不是强制的,因为用户本身就有可能存在短时间内创建两个相同的订单。

但是这种处理方式也给了我们一种启发,不一定完全需要依靠技术的手段来规避并发,也可以从一定业务语义进行处理。

因此为了解决业务中的并发问题,我们就需要“锁住”要操作的资源对象来避免并发带来的影响。

sync.Mutex

sync.Mutex 是单个应用程序上的锁实现,我们先通过计数的例子看它如何进行使用。

package main

import (
 "fmt"
 "sync"
 "time"
)

type Counter struct {
 mu    sync.Mutex
 value int
}

func (c *Counter) Inc() {
 c.mu.Lock()
 defer c.mu.Unlock()
 c.value++
}

func (c *Counter) Value() int {
 c.mu.Lock()
 defer c.mu.Unlock()
 return c.value
}

func main() {
 counter := Counter{}

 for i := 0; i < 10; i++ {
  go func() {
   for j := 0; j < 10000; j++ {
    counter.Inc()
   }
  }()
 }

 time.Sleep(time.Second)
 fmt.Println(counter.Value())
}

Lock 的时候会通过 CAS 尝试看是否可以获得锁,如果不可以的话,则会进入自旋来等待获取

func (m *Mutex) Lock() {
 // Fast path: grab unlocked mutex.
 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
  if race.Enabled {
   race.Acquire(unsafe.Pointer(m))
  }
  return
 }
 m.lockSlow()
}

我们对 lockSlow 的代码进行拆解,首先是进入 for 自旋循环前变量的声明

func (m *Mutex) lockSlow() {
 /**
 `waitStartTime`记录当前 goroutine 开始等待的时间。
 `starving`表示当前 goroutine 是否处于饥饿状态。
 `awoke`表示当前 goroutine 是否从睡眠中被唤醒。
 `iter`自旋迭代计数器。
 `old`:保存当前锁的状态。
  */
 var waitStartTime int64
 starving := false
 awoke := false
 iter := 0
 old := m.state
}

开始自旋进行锁的获取

func (m *Mutex) lockSlow() {
 for{
  if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
        if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
            atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
            awoke = true
        }
        runtime_doSpin()
        iter++
        continue
    }
 }
}

如果锁已经被占用(mutexLocked),且不处于饥饿模式(mutexStarving),则进行自旋操作。

在自旋期间,尝试设置 mutexWoken 标志,通知 Unlock 方法不要唤醒其他被阻塞的 goroutine。

执行自旋操作(runtime_doSpin),增加自旋计数器 iter 。

这些状态的维护都是为了能够让锁尽可能高效的分配给需要的地方,不让某些协程等待出现饥饿的情况。

如果锁已经被释放则通过 CAS 获取锁并对状态进行更新。

func (m *Mutex) lockSlow() {
 for{
    if atomic.CompareAndSwapInt32(&m.state, old, new) {
    // 上锁成功则退出循环
        if old&(mutexLocked|mutexStarving) == 0 {
            break 
        }
        //...
    }
 }
}

Unlock 的相对来说比较简单,释放锁失败则会进入 unlockSlow 流程中,有饥饿的等待者则直接交给下一个等待者。

func (m *Mutex) Unlock() {
 if race.Enabled {
  _ = m.state
  race.Release(unsafe.Pointer(m))
 }

 new := atomic.AddInt32(&m.state, -mutexLocked)
 if new != 0 {
  m.unlockSlow(new)
 }
}

可以看到 sync.Mutex 通过复杂的状态管理和自旋等待机制,确保 在多 goroutine 的情况下,仍能高效且安全地提供互斥访问。

处理了锁的饥饿状态、自旋等待、信号量阻塞等多个方面的内容。

但是这仅仅只能保证单个程序临界区不受并发影响,如果多个Pod之间对临界资源进行访问,则需要借助分布式锁来实现。

分布式锁

我们可以借助存储中间件,如Redis 和ETCD 等来实现分布式锁的存储和使用。

那如何做到高可用?

首先要对Redis 、ETCD 的集群做高可用策略。Redis、ETCD集群失效时,可以通过Mysql进行存储来对资源加分布式锁。

此时数据库的压力也会增大,可以将需要高可用的数据库进行单独的部署,避免出现影响线上业务库的情况。

服务的各个实例在操作之前先要获取相应的锁后再进行相应的操作

Redis故障后如何转移?

图片

故障转移流程

存储状态的定义,用于来判定获取锁当时的存储类型

type storageStatus int64const (
 // redis处于健康状态
 redisWorking storageStatus = iota + 1
 // redis故障,mysql 处于运行状态
 mysqlWorking
 // redis处于恢复状态
 redisRecovering
)

type storageWatcher struct {
 // 存储的状态
 status storageStatus
 // Redis 连接池
 redisConn *redis.Pool
 // 恢复的时间,用于判断是否完全恢复
 recoveringTime time.Time

 *sync.RWMutex
}

先判断是否已经切换成 Mysql的存储状态,是则直接返回,如果不是,则对修改状态进行修改

func (s *storageWatcher) failover() bool {
 // 已经转移完成,则直接返回
 if s.status == mysqlWorking {
  return true}
 // 修改状态
 s.Lock()
 defer s.Unlock()
 s.status = mysqlWorking
 return true}

Redis 故障后如何恢复

图片

故障转移恢复

每次获取存储状态会进行存储状态维护

func (s *storageWatcher) getStorageStatus(ctx context.Context) storageStatus {
 s.Lock()
 defer s.Unlock()
 // 1.redis 正常工作
 if s.status == redisWorking {
  return redisWorking
 }

 // 2.redis处于故障状态
 if s.status == mysqlWorking {
  _, err := s.redisConn.Get().Do("PING")
  // redis仍然处于故障状态
  if err != nil {
   return mysqlWorking
  }
  // redis开始恢复
  s.status = redisRecovering
  s.recoveringTime = time.Now()
 }

 // 3.判断redis是否完全恢复
 if s.status == redisRecovering {
  // mysql 仍然未空,则仍然处于恢复状态
  if !s.isMysqlLockEmpty(ctx) {
   return redisRecovering
  }
  // 处于十分钟恢复则认为恢复完成
  recoveringDoneSeconds := 60 * 10
  recoveringSeconds := int(s.recoveringTime.Sub(s.recoveringTime).Seconds())
  if recoveringSeconds > recoveringDoneSeconds {
   s.status = redisWorking
   return redisWorking
  }
 }
 // 4.没有恢复,则仍然处于recovering 状态
 status := s.status
 return status
}

最终Lock实现

高可用锁定义

type haLock struct {
 key string// 标识上锁进程
 requestId string// 上锁失败是否进行等待
 needWaiting bool// 该可重入锁存储的类型
 storageStatus storageStatus
 // 最多的续约次数
 maxRenewCnt int// 锁的过期时间
 expireTime time.Duration

 *sync.Once
}

上锁方法的实现

func (l *haLock) Lock(ctx context.Context) (unlockFunc, error) {
 // 1.已经是数据库存储运行,直接上锁,没有兜底方案
 if l.storageStatus == mysqlWorking {
  unlockFunc, err := l.mysqlLock(ctx)
  if err != nil {
   log_util.LogInfoWithParams(ctx, "", err)
   return emptyUnlockFunc, errors.WithStack(err)
  }
  return unlockFunc, nil}

 // 2.如果redis完全健康,则直接对redis上锁
 if l.storageStatus == redisWorking {
  unlockFunc, err := l.redisLock(ctx)
  if err == nil {
   return unlockFunc, nil}
  // 故障转移,如果失败则直接返回
  err = l.failover(ctx)
  if err != nil {
   log_util.LogErrorWithParams(ctx, "failover err ", err)
   return emptyUnlockFunc, errors.WithStack(err)
  }

  // 故障转移成功,则通过数据库上锁
  unlockFunc, err = l.mysqlLock(ctx)
  if err != nil {
   return emptyUnlockFunc, errors.WithStack(err)
  }
 }

 // 3.redis 还未完全恢复健康,则对数据库预先加锁
 if l.storageStatus == redisRecovering {
  unlockFunc, err := l.redisLockWithPreLockMysql(ctx)
  if err != nil {
   log_util.LogErrorWithParams(ctx, "redisLockWithPreLockMysql", err)
   return emptyUnlockFunc, errors.WithStack(err)
  }
  return unlockFunc, nil}

 return emptyUnlockFunc, errors.New("unknow status")
}

为什么要对数据库预先加锁?因为通过 select for update 可以防止 mysql 和 redis同时加上锁导致并发问题

Mysql 锁

CREATE TABLE `lock_tab` (
    `id` INT ( 11 ) NOT NULL AUTO_INCREMENT COMMENT '主键',
    `lock_key` VARCHAR ( 64 ) NOT NULL DEFAULT '' COMMENT '锁定的key',
    `desc` VARCHAR ( 1024 ) NOT NULL DEFAULT '备注信息',
    `lock_cnt` INT ( 11 ) NOT NULL COMMENT '锁定的次数,用于实现可重入',
    `expire_time` INT ( 11 ) NOT NULL COMMENT '过期的时间',
    `ctime` INT ( 11 ) NOT NULL COMMENT '创建时间',
    `mtime` INT ( 11 ) NOT NULL COMMENT '修改时间',
    PRIMARY KEY ( `id` ),
    UNIQUE KEY `uniq_key` (`key`) USING BTREE 
) ENGINE = INNODB DEFAULT CHARSET = utf8 COMMENT = '锁定的表';

通过将key insert 数据库中即可完成上锁,结束后则 delete 进行解锁,通过 update 过期时间来实现锁的续约

Redis 锁的实现

通过 SETEXNX 来实现对redis的锁实现,如果处于恢复状态,则提供了对 mysql 的 key 进行 select for update

func (l *haLock) redisLockWithPreLockMysql(ctx context.Context) (unlockFunc, error) {
 *// 需要先锁住mysql 对应的key,避免有机器还在使用mysql上锁导致并发*
 unlockPreLockMysql, err := l.preLockMysql(ctx)
 if err != nil {
  log_util.LogErrorWithParams(ctx, "", err)
  return emptyUnlockFunc, errors.WithStack(err)
 }
 unlockFunc, err := l.redisLock(ctx)
 if err != nil {
  log_util.LogErrorWithParams(ctx, "redis lock fail", err)
  return emptyUnlockFunc, errors.WithStack(err)
 }

 return func(ctx context.Context) (e error) {
  e = unlockPreLockMysql(ctx)
  if e != nil {
   log_util.LogErrorWithParams(ctx, "unlockPreLockMysql ", e)
  }
  e = unlockFunc(ctx)
  if e != nil {
   log_util.LogErrorWithParams(ctx, "redis unlock ", e)
   return errors.WithStack(e)
  }
  return nil
 }, nil
}

装饰器使用

为了方便调用方使用,通过为调用方法包装加锁和解锁的方法,在调用结束后调用 unlock 方法,

func ReentrantLockDecorator(ctx context.Context, decoPtr, fn interface{}, key string, opts ...LockDecoratorParamsOpt) {
 var decoratedFunc, targetFunc reflect.Value
 decoratedFunc = reflect.ValueOf(decoPtr).Elem()
 targetFunc = reflect.ValueOf(fn)
 rLock := newHALock(ctx)
 v := reflect.MakeFunc(
  targetFunc.Type(),
  func(in []reflect.Value) (out []reflect.Value) {
   log_util.LogInfoWithParams(ctx, "LockDecorator locker", rLock)
   unlockFunc, err := rLock.Lock(ctx)
   if err != nil {
    panic(err)
   }
   defer func() {
    err := unlockFunc(ctx)
    if err != nil {
     log_util.LogErrorWithParams(ctx, "reentrant unlock err", err, rLock)
     return}
    log_util.LogInfoWithParams(ctx, "LockDecorator locker unlock", rLock)
   }()
   out = targetFunc.Call(in)
   return},
 )
 decoratedFunc.Set(v)
 return}

为什么Go不支持可重入锁

https://stackoverflow.com/questions/14670979/recursive-locking-in-go#14671462

func F() {
 mu.Lock()
 //... do some stuff ...
 G()
 //... do some more stuff ...
 mu.Unlock()
}

func G() {
 mu.Lock()
 //... do some stuff ...
 mu.Unlock()
}

函数F()G()使用了相同的互斥锁,并且都在各自函数内部进行了加锁,此时就会出现死锁,使用可重入锁可以解决问题,但是首先应该考虑的还是先看能否改变代码解耦

func call(){
  F()
  G()
}

func F() {
      mu.Lock()
      ... do some stuff
      mu.Unlock()
}

func g() {
     ... do some stuff ...
}

func G() {
     mu.Lock()
     g()
     mu.Unlock()
}

这样不仅避免了死锁,也对代码进行解耦,每一层有自己的锁,不会相互争抢导致死锁。

如果需要用到可重入锁,可能需要先考虑代码结构是否存在问题,但有时候业务场景复杂时会有许多递归调用,这时如果难以解决就可以使用可重入的分布式锁。

到这里我们学习到了高可用分布式锁的故障转移和上锁功能的实现,可以在 Redis 故障后是比较大的,而且充斥大量分布式锁也会让代码增加额外的性能开销,由于分布式锁是锁一段代码,有可能在内部做了耗时的操作而不容易察觉,最终导致了整体程序的性能问题。

所以减少分布式锁的依赖是我们需要在业务开发中需要时时刻刻去注意的问题,这也要求我们在上锁的时候应该尽可能最小粒度的去锁。

数据库唯一索引

在创建数据库时对字段增加唯一索引,此时当并发操作数据时出现唯一主键冲突,在数据库层则只会成功插入一条,避免其他的数据进行插入。

例如在用户表的可以把手机号设置为唯一索引,这样用户重复注册的时候,只要直接捕获数据库的错误,就可以避免同个手机号被多个用户同时注册。同样的将用户相互关注时 user_id 和 follow_user_id 作为唯一索引,可以避免重复关注操作。

这种做法是通过数据库对唯一索引的碰撞来解决并发问题。

但是这种做法也会存在一个问题,就是每一次重复的操作我们都会透穿到数据库,如果用户频繁会有这种操作,实际上对我们的性能也是一种损耗,比较适用于在用户注册这种QPS较小的场景下。

当唯一资源已经被插入之后,实际上也就是“锁住”了其他资源插入的可能性。

乐观锁

修改数据库的时候通过将原来的值也传入作为筛选的条件,如果此时数据已经被修改,那么WHERE 条件则不满足,会更新失败。

存在ABA问题,这个时候可以通过版本号单调递增的方式来解决该问题。

UPDATE tab SET a = 'new', version = version + 1 WHERE a = 'old' AND version = ?

适用场景是对同一资源进行操作并且有明确的前后修改状态。

比如支付订单的场景下,创建待支付订单是必须在支付订单的前面,并且已支付订单不允许再次被支付。

不适用于需要对不同资源进行更新时,乐观锁无法锁住同一个对象,因为乐观锁的字段需要跟在对象上。并且也不适用于并发数较高频率出现时不适合,会出现较多的更新失败,导致大量请求无效操作,增加了整个系统的负载。

悲观锁

通过mysql 的悲观锁 select for update,在需要更新前先将数据进行上锁,此时能做到防止并发。

缺点是悲观锁对性能的损耗较大,容易造成死锁,所以在用的时候需要谨慎。

但是它在实际场景中也是最安全最不容易出问题的一种锁。

上面三种方式都是通过对象来进行处理,但是有些操作是代码临界区需要上锁,而不能依托于具体的对象,这个时候就需要引入分布式锁。

感谢你读到这里,如果喜欢云原生、Go、个人成长的内容可以关注我,让我们一起进步。

 

蔡蔡蔡菜

赞赏二维码喜欢作者

Go14 Go · 目录 上一篇项目重构为Go实现后,对 DDD 实践的思考 阅读 129 ​   作者已设置关注后才可以留言     蔡蔡蔡云原生Go                

人划线

 

标签:return,func,err,实现,redis,ctx,nil,应用,Go
From: https://www.cnblogs.com/cheyunhua/p/18313144

相关文章

  • html5实现摄像头扫码的实践
    使用的技术:原生html+vue.js+zxing.js;测试时附加需要的技术:iis10+ca证书制作。实现在安卓手机、安卓平板、pc上,实现浏览器上摄像头扫码功能。苹果的设备没测试过。第一步:写好测试的网页并部署。部署时使用iis发布。因为这个打开摄像头被限制为localhost和127.0.0.1和https协议下......
  • 【攻防技术系列+ARP协议】Kali实现断网攻击
    什么是ARP欺骗攻击文❓ARP(AddressResolutionProtocol)是地址解析协议,是一种将IP地址转化成物理地址的协议。ARP具体说来就是将网络层(也就是相当于OSI的第三层)地址解析为数据链路层(也就是相当于OSI的第二层)的物理地址(注:此处物理地址并不一定指MAC地址)。ARP缓存是个用来储存IP地......
  • Python中4种方法实现 xls 文件转 xlsx
    在Python中,可以采用pandas、pyexcel、win32com和xls2xlsx这四个模块,实现xls转xlsx格式。以Excel示例文件test_Excel.xls为例,具体内容如下图所示:1.pandas安装命令pipinstallpandas-ihttps://mirrors.aliyun.com/pypi/simple具体使用方法importpandasas......
  • vue3 - 最新详细实现 “日历课程表“ 上课时间表功能组件,教务系统专用老师排课表插件
    效果图在vue3、nuxt3项目开发中,详解实现学生每周“动态课程表(日历表展现)”功能实现,对学期的每周课程进行排课和准备工作,可自由切换本月的每周上课表情况、也可通过日期范围选择器进行筛选指定周的教学排班表、相同的课成可以合并(可不开启),课表数据结构支持调用后端服......
  • 离散数学——6.命题逻辑的应用
    命题逻辑的应用自然语言命题的符号化为什么要将自然语言命题符号化?自然语言命题转换为逻辑公式的过程也称为自然语言命题的符号化是将命题逻辑知识(等值演算和推理理论)用于求解应用问题的第一步$p→q的逆命题是q→p$$p→q的否命题是¬p→¬q$$p→q的逆否命题是¬q→¬p$......
  • Nacos原理和应用
    文章目录Nacos安装与启动快速入门服务分级存储模型环境隔离Nacos原理Nacos安装与启动首先安装Nacos服务,Windows安装地址:https://github.com/alibaba/nacos/releases之后进入bin目录,使用命令行输入:startup.cmd-mstandalone此时出现下图就代表启动成功默......
  • 将 .NET Framework 应用程序更新到 .NET Core。这是一个 VB.NET Windows 服务
    我有一个用VB.NET编写的有点旧的.NET4.7Windows服务。我想将其升级到.NETCore8.0,但升级向导似乎不想处理大量工作如果.NETCore支持VB.NETWindows服务,我找不到任何真正的内容....我可以找到大量创建控制台应用程序等的C#示例,但没有找到VB.NET的任何内......
  • Java计算机毕业设计秒杀系统实现(开题报告+源码+论文)
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景随着互联网技术的飞速发展和电子商务的蓬勃兴起,秒杀活动作为一种高效促销手段,在各大电商平台中屡见不鲜。秒杀活动以其时间紧迫、价格诱人的特点,迅速......
  • CSS技巧专栏:一日一例 7 - 纯CSS实现炫光边框按钮特效
    CSS技巧专栏:一日一例7-纯CSS实现炫光边框按钮特效本例效果图案例分析相信你可能已经在网络见过类似这样的流光的按钮,在羡慕别人做的按钮这么酷的时候,你有没有扒一下它的源代码的冲动?或者你当时有点冲动,却转眼忘却了。今天,刚巧,你又看到它了,今天跟我一起扒一扒它的源代码......
  • 观察者模式实战:Spring Boot中联动更新机制的优雅实现
    引言在许多应用系统中,我们经常需要处理多个表之间的关联更新问题。例如,在教育管理系统中,当学生的基本信息表中的年龄字段发生更改时,我们可能还需要同步更新学生档案表和学生成绩表中的相关信息。本文将通过一个具体的案例,介绍如何在SpringBoot项目中利用观察者模式来优雅地解......