Go 锁的实现与应用
原创 蔡蔡蔡菜 蔡蔡蔡云原生Go 2024年07月20日 08:30 广东 1人听过在说锁的实现之前,我们先了解一下业务中的并发问题。
并发是由于多个线程在多个CPU上执行,此时CPU 之间的缓存并不可见。从磁盘或者内存获取数据后会保存在CPU中进行执行,此时如果内存和磁盘的数据被更新了,CPU 并不会发现。这个时候就出现了我们平时经常说的并发。
我们说到CPU 在执行的过程中,磁盘或者内存的数据已经发生改变,这个时候CPU在执行程序的时候,数据属于旧数据,此时计算出来的结果是不符合预期的。
这个时候再将不符合预期的数据向内存和磁盘进行写入时就会造成数据错乱。
反映到实际业务上,由于数据的错乱,有可能用户会由于旧数据而支付了两次金额,这对现实情况来说是完全不允许的。
对不同的业务场景,解决并发的方式各有不同,需要用的解决方式也不相同。
我们短时间内创建微信金额相同的两个订单的时候,也会提醒我们是否存在问题,这种是在业务逻辑上进行规避,但是这种规避不是强制的,因为用户本身就有可能存在短时间内创建两个相同的订单。
但是这种处理方式也给了我们一种启发,不一定完全需要依靠技术的手段来规避并发,也可以从一定业务语义进行处理。
因此为了解决业务中的并发问题,我们就需要“锁住”要操作的资源对象来避免并发带来的影响。
sync.Mutex
sync.Mutex
是单个应用程序上的锁实现,我们先通过计数的例子看它如何进行使用。
package main
import (
"fmt"
"sync"
"time"
)
type Counter struct {
mu sync.Mutex
value int
}
func (c *Counter) Inc() {
c.mu.Lock()
defer c.mu.Unlock()
c.value++
}
func (c *Counter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}
func main() {
counter := Counter{}
for i := 0; i < 10; i++ {
go func() {
for j := 0; j < 10000; j++ {
counter.Inc()
}
}()
}
time.Sleep(time.Second)
fmt.Println(counter.Value())
}
Lock
的时候会通过 CAS
尝试看是否可以获得锁,如果不可以的话,则会进入自旋来等待获取
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
m.lockSlow()
}
我们对 lockSlow
的代码进行拆解,首先是进入 for
自旋循环前变量的声明
func (m *Mutex) lockSlow() {
/**
`waitStartTime`记录当前 goroutine 开始等待的时间。
`starving`表示当前 goroutine 是否处于饥饿状态。
`awoke`表示当前 goroutine 是否从睡眠中被唤醒。
`iter`自旋迭代计数器。
`old`:保存当前锁的状态。
*/
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
}
开始自旋进行锁的获取
func (m *Mutex) lockSlow() {
for{
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
continue
}
}
}
如果锁已经被占用(mutexLocked
),且不处于饥饿模式(mutexStarving
),则进行自旋操作。
在自旋期间,尝试设置 mutexWoken
标志,通知 Unlock
方法不要唤醒其他被阻塞的 goroutine。
执行自旋操作(runtime_doSpin
),增加自旋计数器 iter
。
这些状态的维护都是为了能够让锁尽可能高效的分配给需要的地方,不让某些协程等待出现饥饿的情况。
如果锁已经被释放则通过 CAS
获取锁并对状态进行更新。
func (m *Mutex) lockSlow() {
for{
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 上锁成功则退出循环
if old&(mutexLocked|mutexStarving) == 0 {
break
}
//...
}
}
}
Unlock
的相对来说比较简单,释放锁失败则会进入 unlockSlow
流程中,有饥饿的等待者则直接交给下一个等待者。
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
m.unlockSlow(new)
}
}
可以看到 sync.Mutex
通过复杂的状态管理和自旋等待机制,确保 在多 goroutine 的情况下,仍能高效且安全地提供互斥访问。
处理了锁的饥饿状态、自旋等待、信号量阻塞等多个方面的内容。
但是这仅仅只能保证单个程序临界区不受并发影响,如果多个Pod之间对临界资源进行访问,则需要借助分布式锁来实现。
分布式锁
我们可以借助存储中间件,如Redis 和ETCD 等来实现分布式锁的存储和使用。
那如何做到高可用?
首先要对Redis 、ETCD 的集群做高可用策略。Redis、ETCD集群失效时,可以通过Mysql进行存储来对资源加分布式锁。
此时数据库的压力也会增大,可以将需要高可用的数据库进行单独的部署,避免出现影响线上业务库的情况。
服务的各个实例在操作之前先要获取相应的锁后再进行相应的操作。
Redis故障后如何转移?
故障转移流程存储状态的定义,用于来判定获取锁当时的存储类型
type storageStatus int64const (
// redis处于健康状态
redisWorking storageStatus = iota + 1
// redis故障,mysql 处于运行状态
mysqlWorking
// redis处于恢复状态
redisRecovering
)
type storageWatcher struct {
// 存储的状态
status storageStatus
// Redis 连接池
redisConn *redis.Pool
// 恢复的时间,用于判断是否完全恢复
recoveringTime time.Time
*sync.RWMutex
}
先判断是否已经切换成 Mysql
的存储状态,是则直接返回,如果不是,则对修改状态进行修改
func (s *storageWatcher) failover() bool {
// 已经转移完成,则直接返回
if s.status == mysqlWorking {
return true}
// 修改状态
s.Lock()
defer s.Unlock()
s.status = mysqlWorking
return true}
Redis 故障后如何恢复
故障转移恢复每次获取存储状态会进行存储状态维护
func (s *storageWatcher) getStorageStatus(ctx context.Context) storageStatus {
s.Lock()
defer s.Unlock()
// 1.redis 正常工作
if s.status == redisWorking {
return redisWorking
}
// 2.redis处于故障状态
if s.status == mysqlWorking {
_, err := s.redisConn.Get().Do("PING")
// redis仍然处于故障状态
if err != nil {
return mysqlWorking
}
// redis开始恢复
s.status = redisRecovering
s.recoveringTime = time.Now()
}
// 3.判断redis是否完全恢复
if s.status == redisRecovering {
// mysql 仍然未空,则仍然处于恢复状态
if !s.isMysqlLockEmpty(ctx) {
return redisRecovering
}
// 处于十分钟恢复则认为恢复完成
recoveringDoneSeconds := 60 * 10
recoveringSeconds := int(s.recoveringTime.Sub(s.recoveringTime).Seconds())
if recoveringSeconds > recoveringDoneSeconds {
s.status = redisWorking
return redisWorking
}
}
// 4.没有恢复,则仍然处于recovering 状态
status := s.status
return status
}
最终Lock实现
高可用锁定义
type haLock struct {
key string// 标识上锁进程
requestId string// 上锁失败是否进行等待
needWaiting bool// 该可重入锁存储的类型
storageStatus storageStatus
// 最多的续约次数
maxRenewCnt int// 锁的过期时间
expireTime time.Duration
*sync.Once
}
上锁方法的实现
func (l *haLock) Lock(ctx context.Context) (unlockFunc, error) {
// 1.已经是数据库存储运行,直接上锁,没有兜底方案
if l.storageStatus == mysqlWorking {
unlockFunc, err := l.mysqlLock(ctx)
if err != nil {
log_util.LogInfoWithParams(ctx, "", err)
return emptyUnlockFunc, errors.WithStack(err)
}
return unlockFunc, nil}
// 2.如果redis完全健康,则直接对redis上锁
if l.storageStatus == redisWorking {
unlockFunc, err := l.redisLock(ctx)
if err == nil {
return unlockFunc, nil}
// 故障转移,如果失败则直接返回
err = l.failover(ctx)
if err != nil {
log_util.LogErrorWithParams(ctx, "failover err ", err)
return emptyUnlockFunc, errors.WithStack(err)
}
// 故障转移成功,则通过数据库上锁
unlockFunc, err = l.mysqlLock(ctx)
if err != nil {
return emptyUnlockFunc, errors.WithStack(err)
}
}
// 3.redis 还未完全恢复健康,则对数据库预先加锁
if l.storageStatus == redisRecovering {
unlockFunc, err := l.redisLockWithPreLockMysql(ctx)
if err != nil {
log_util.LogErrorWithParams(ctx, "redisLockWithPreLockMysql", err)
return emptyUnlockFunc, errors.WithStack(err)
}
return unlockFunc, nil}
return emptyUnlockFunc, errors.New("unknow status")
}
为什么要对数据库预先加锁?因为通过 select for update
可以防止 mysql
和 redis
同时加上锁导致并发问题
Mysql 锁
CREATE TABLE `lock_tab` (
`id` INT ( 11 ) NOT NULL AUTO_INCREMENT COMMENT '主键',
`lock_key` VARCHAR ( 64 ) NOT NULL DEFAULT '' COMMENT '锁定的key',
`desc` VARCHAR ( 1024 ) NOT NULL DEFAULT '备注信息',
`lock_cnt` INT ( 11 ) NOT NULL COMMENT '锁定的次数,用于实现可重入',
`expire_time` INT ( 11 ) NOT NULL COMMENT '过期的时间',
`ctime` INT ( 11 ) NOT NULL COMMENT '创建时间',
`mtime` INT ( 11 ) NOT NULL COMMENT '修改时间',
PRIMARY KEY ( `id` ),
UNIQUE KEY `uniq_key` (`key`) USING BTREE
) ENGINE = INNODB DEFAULT CHARSET = utf8 COMMENT = '锁定的表';
通过将key insert
数据库中即可完成上锁,结束后则 delete
进行解锁,通过 update
过期时间来实现锁的续约
Redis 锁的实现
通过 SETEXNX
来实现对redis的锁实现,如果处于恢复状态,则提供了对 mysql
的 key 进行 select for update
func (l *haLock) redisLockWithPreLockMysql(ctx context.Context) (unlockFunc, error) {
*// 需要先锁住mysql 对应的key,避免有机器还在使用mysql上锁导致并发*
unlockPreLockMysql, err := l.preLockMysql(ctx)
if err != nil {
log_util.LogErrorWithParams(ctx, "", err)
return emptyUnlockFunc, errors.WithStack(err)
}
unlockFunc, err := l.redisLock(ctx)
if err != nil {
log_util.LogErrorWithParams(ctx, "redis lock fail", err)
return emptyUnlockFunc, errors.WithStack(err)
}
return func(ctx context.Context) (e error) {
e = unlockPreLockMysql(ctx)
if e != nil {
log_util.LogErrorWithParams(ctx, "unlockPreLockMysql ", e)
}
e = unlockFunc(ctx)
if e != nil {
log_util.LogErrorWithParams(ctx, "redis unlock ", e)
return errors.WithStack(e)
}
return nil
}, nil
}
装饰器使用
为了方便调用方使用,通过为调用方法包装加锁和解锁的方法,在调用结束后调用 unlock
方法,
func ReentrantLockDecorator(ctx context.Context, decoPtr, fn interface{}, key string, opts ...LockDecoratorParamsOpt) {
var decoratedFunc, targetFunc reflect.Value
decoratedFunc = reflect.ValueOf(decoPtr).Elem()
targetFunc = reflect.ValueOf(fn)
rLock := newHALock(ctx)
v := reflect.MakeFunc(
targetFunc.Type(),
func(in []reflect.Value) (out []reflect.Value) {
log_util.LogInfoWithParams(ctx, "LockDecorator locker", rLock)
unlockFunc, err := rLock.Lock(ctx)
if err != nil {
panic(err)
}
defer func() {
err := unlockFunc(ctx)
if err != nil {
log_util.LogErrorWithParams(ctx, "reentrant unlock err", err, rLock)
return}
log_util.LogInfoWithParams(ctx, "LockDecorator locker unlock", rLock)
}()
out = targetFunc.Call(in)
return},
)
decoratedFunc.Set(v)
return}
为什么Go不支持可重入锁
https://stackoverflow.com/questions/14670979/recursive-locking-in-go#14671462
func F() {
mu.Lock()
//... do some stuff ...
G()
//... do some more stuff ...
mu.Unlock()
}
func G() {
mu.Lock()
//... do some stuff ...
mu.Unlock()
}
函数F()
和G()
使用了相同的互斥锁,并且都在各自函数内部进行了加锁,此时就会出现死锁,使用可重入锁可以解决问题,但是首先应该考虑的还是先看能否改变代码解耦
func call(){
F()
G()
}
func F() {
mu.Lock()
... do some stuff
mu.Unlock()
}
func g() {
... do some stuff ...
}
func G() {
mu.Lock()
g()
mu.Unlock()
}
这样不仅避免了死锁,也对代码进行解耦,每一层有自己的锁,不会相互争抢导致死锁。
如果需要用到可重入锁,可能需要先考虑代码结构是否存在问题,但有时候业务场景复杂时会有许多递归调用,这时如果难以解决就可以使用可重入的分布式锁。
到这里我们学习到了高可用分布式锁的故障转移和上锁功能的实现,可以在 Redis
故障后是比较大的,而且充斥大量分布式锁也会让代码增加额外的性能开销,由于分布式锁是锁一段代码,有可能在内部做了耗时的操作而不容易察觉,最终导致了整体程序的性能问题。
所以减少分布式锁的依赖是我们需要在业务开发中需要时时刻刻去注意的问题,这也要求我们在上锁的时候应该尽可能最小粒度的去锁。
数据库唯一索引
在创建数据库时对字段增加唯一索引,此时当并发操作数据时出现唯一主键冲突,在数据库层则只会成功插入一条,避免其他的数据进行插入。
例如在用户表的可以把手机号设置为唯一索引,这样用户重复注册的时候,只要直接捕获数据库的错误,就可以避免同个手机号被多个用户同时注册。同样的将用户相互关注时 user_id 和 follow_user_id 作为唯一索引,可以避免重复关注操作。
这种做法是通过数据库对唯一索引的碰撞来解决并发问题。
但是这种做法也会存在一个问题,就是每一次重复的操作我们都会透穿到数据库,如果用户频繁会有这种操作,实际上对我们的性能也是一种损耗,比较适用于在用户注册这种QPS较小的场景下。
当唯一资源已经被插入之后,实际上也就是“锁住”了其他资源插入的可能性。
乐观锁
修改数据库的时候通过将原来的值也传入作为筛选的条件,如果此时数据已经被修改,那么WHERE 条件则不满足,会更新失败。
存在ABA问题,这个时候可以通过版本号单调递增的方式来解决该问题。
UPDATE tab SET a = 'new', version = version + 1 WHERE a = 'old' AND version = ?
适用场景是对同一资源进行操作并且有明确的前后修改状态。
比如支付订单的场景下,创建待支付订单是必须在支付订单的前面,并且已支付订单不允许再次被支付。
不适用于需要对不同资源进行更新时,乐观锁无法锁住同一个对象,因为乐观锁的字段需要跟在对象上。并且也不适用于并发数较高频率出现时不适合,会出现较多的更新失败,导致大量请求无效操作,增加了整个系统的负载。
悲观锁
通过mysql 的悲观锁 select for update
,在需要更新前先将数据进行上锁,此时能做到防止并发。
缺点是悲观锁对性能的损耗较大,容易造成死锁,所以在用的时候需要谨慎。
但是它在实际场景中也是最安全最不容易出问题的一种锁。
上面三种方式都是通过对象来进行处理,但是有些操作是代码临界区需要上锁,而不能依托于具体的对象,这个时候就需要引入分布式锁。
感谢你读到这里,如果喜欢云原生、Go、个人成长的内容可以关注我,让我们一起进步。
蔡蔡蔡菜 Go14 Go · 目录 上一篇项目重构为Go实现后,对 DDD 实践的思考 阅读 129 作者已设置关注后才可以留言 蔡蔡蔡云原生Go
人划线
标签:return,func,err,实现,redis,ctx,nil,应用,Go From: https://www.cnblogs.com/cheyunhua/p/18313144