最近面试遇到问锁的问题,答得不是很好,重新做一下总结梳理
go
中的sync
包提供了两种锁的类型,分别是互斥锁sync.Mutex
和读写锁sync.RWMutex
,这两种锁都属于悲观锁
饥饿模式与正常模式
在下面的内容会经常涉及到一个概念,饥饿模式,这里先简单说一下
1. 正常模式(非公平锁)
正常模式下,所有等待的 goroutine 按照先进先出的顺序等待。唤醒的 goroutine 不会直接拥有锁,而是和新请求锁的 goroutine 竞争锁。新请求的 goroutine 更容易获取锁,因为他在 CPU 上执行,而被唤醒的goroutine 需要重新获取CPU权限,再进行竞争。如此进行下去,就会导致 goroutine 长时间不能获取到锁。
2.饥饿模式(公平锁)
饥饿模式下,维持一个队列,所有的 goroutine 不再竞争,直接把锁交给队列中排在第一位的 goroutine;同时,饥饿模式下,新进来的 goroutine 不会进入到自旋状态,直接放在等待队列的尾部。饥饿模式的触发条件:
-
一个 goroutine 等待锁唤醒的超过1ms;
-
当前队列只剩下一个 goroutine 的时候,mutex 切换到饥饿模式。
sync.Mutex
提供方法
提供了三个方法:
-
Lock()
:进行加锁操作,在同一个goroutine中必须在锁释放之后才能进行再次上锁,不然会panic -
Unlock()
: 进行解锁操作,如果这个时候未加锁会panic,mutex和goroutine不关联,也就是说对于mutex的加锁解锁操作可以发生在多个goroutine间 -
tryLock()
:用得少,尝试获取锁,获取成功返回true,否则返回false,返回false可能有几种情况-
当锁被其他goroutine占有,无法获取,将立刻返回false,
-
锁处于饥饿模式,此时要让饥饿队列先获取锁,将立刻返回false,
-
当锁可用时尝试获取锁,获取失败也返回false
-
底层数据结构
底层就两个变量,分别说明锁的状态,以及一个信号量,用于唤醒等待goroutine
type Mutex struct { state int32 // 表示当前互斥锁的状态,复合型字段 sema uint32 // 信号量变量,用来控制等待goroutine的阻塞休眠和唤醒 }
state
是复合字段,不止说明了是否上锁,还说明了锁的模式,等待goroutine的数量等
const ( mutexLocked = 1 << iota // mutex is locked mutexWoken mutexStarving mutexWaiterShift = iota }
-
mutexLocked
**:是否被锁定,由于iota
的初始值是 0,因此mutexLocked
的值为1 << 0
,即1
(二进制为0001
)。 -
mutexWoken
**:这一位被设置时,表示有一个等待的 Goroutine 已经被唤醒或正在被唤醒。这个状态用于避免不必要地唤醒多个 Goroutine。值为1 << 1
**,即2
**(二进制为0010
**) -
mutexStarving
**:这一位被设置时,表示Mutex
处于“饥饿模式”,这在某些特定条件下会被触发,下文将说到,值为1 << 2
,即4
(二进制为0100
) -
mutexWaiterShift
**:表示等待获取锁的goroutine数量,从偏移量4开始计算
加锁过程
通过底层的atomic
包中提供的CAS方法修改锁的状态,
-
如果锁的状态是0,则改变其状态,然后直接返回
-
否则进入 Slow path,看是否自旋或者在饥饿场景下获取锁
func (m *Mutex) Lock() { // Fast path: grab unlocked mutex. if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { if race.Enabled { race.Acquire(unsafe.Pointer(m)) } return } // Slow path (outlined so that the fast path can be inlined) m.lockSlow() }
进入lockSlow
函数后,会发现有一个for循环,这个时候实际上就是在自旋获取锁, 即:乐观的认为当前正在持有锁的g能在短时间内归还锁,所以需要一些条件来判断:到底能不能短时间归还
主要工作就是:
-
判断锁是否是饥饿状态,不是则自旋(即进行此次循环),否则直接进入队列。
-
**如果自旋过程中等待时间过长,让锁自动变为饥饿模式**(网上说的1ms,没找到源码具体体现在哪)
下面贴部分代码,注释写的很详细,没截下来的地方,才是自旋,下面这部分图只是在判断某些条件
总结
mutex底层实际上就是一个状态和信号量,信号量用于通知goroutine,状态用于判断是否加锁。同时状态也是个复合变量,还记录了锁的模式(正常,饥饿模式),等待锁的goroutine数量。
获取锁就是通过atomic的CAS方法修改状态,修改失败则进入自旋,长时间获取不到锁自动转为饥饿模式
sync.RWMutex
读写锁是为了在读写并发的场景下提高锁的效率而使用,分为读锁和写锁
-
读锁:读数据时添加,与其他读锁可以共享
-
写锁:写数据时添加,与读锁,写锁互斥。也就是加写锁时需要保证既没有读锁,也没有写锁
底层数据结构
RWMutex
底层也是基于mutex
实现的,并包含了多个字段,这些字段在控制锁的行为时起着关键作用。下面是每个字段的含义:
type RWMutex struct { w Mutex // 当 Goroutine 进行写操作时,上锁 writerSem uint32 // 写信号量,有读锁存在时,阻塞写操作,并在所有读锁释放后唤醒写操作。 readerSem uint32 // 读信号量,有写锁存在时,阻塞读操作,并在写锁释放后唤醒读操作。 readerCount atomic.Int32 // 当前持有读锁的 goroutine 数量,0时才能获取写锁 readerWait atomic.Int32 // 正在等待的读 goroutine 数量,锁正在写操作时有值 }
提供方法
-
RLock()/RUnlock()
:加读锁,释放读锁。在没有写锁时才能成功加锁 -
Lock()/Unlock()
:加写锁,需要保证readerCount=0
且w.state=0
才能成功上锁
加锁过程
RLock:
直接增加readerCount
数量
func (rw *RWMutex) RLock() { // 保证w未被上锁 if race.Enabled { _ = rw.w.state race.Disable() } // 使用atomic.Add,为读锁持有数量+1 if rw.readerCount.Add(1) < 0 { // A writer is pending, wait for it. runtime_SemacquireRWMutexR(&rw.readerSem, false, 0) } if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(&rw.readerSem)) } }
Lock():
底层还是调用的mutex.Lock()
方法,不过需要判断此时是否有读锁
func (rw *RWMutex) Lock() { // 保证w未被上锁 if race.Enabled { _ = rw.w.state race.Disable() } // First, resolve competition with other writers. rw.w.Lock() // Announce to readers there is a pending writer. // 判断读锁数量 r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders // Wait for active readers. if r != 0 && rw.readerWait.Add(r) != 0 { runtime_SemacquireRWMutex(&rw.writerSem, false, 0) } if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(&rw.readerSem)) race.Acquire(unsafe.Pointer(&rw.writerSem)) } }
标签:rw,goroutine,饥饿,golang,读锁,race,mutex,原理 From: https://www.cnblogs.com/hackcaixukun/p/18378855