Mutex
- 饥饿模式的判定: 等待时间超过
1ms
- woken 标志的作用:
- 通知
unlock
有人在自旋, 这样unlock
时,我们不会去唤醒队列中阻塞的G - 保证公平性,也就是当我们还剩最后一个在获取锁,如果我们这时候有一个人来
Lock()
如果没有这个标志位的话,新的竞争者 会 通过fast-path
获取锁,但是如果有这个标志位,我们可以让他们进入lockslow()
一起竞争.
- 通知
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { // 如果直接可以获取到锁
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}
func sync_runtime_canSpin(i int) bool {
// sync.Mutex is cooperative, so we are conservative with spinning.
// Spin only few times and only if running on a multicore machine and
// GOMAXPROCS>1 and there is at least one other running P and local runq is empty.
// As opposed to runtime mutex we don't do passive spinning here,
// because there can be work on global runq or on other Ps.
if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
return false
}
if p := getg().m.p.ptr(); !runqempty(p) {
return false
}
return true
}
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
for {
// 如果当前是正常模式 并且 锁已经被获取了,判断是否可以自旋
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 小于 4 次,并且 cpu >= 1 , 至少有一个运行的 P
// 并且当前绑定的 P 中本地runq是空的,
// 设置 woken, 避免让 unlock 去唤醒其他锁住的 G
// 如果当前有等待的人,并且当前我们可以设置 woken 标志(这里与 unlockSlow 相对应)
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
new := old
// 不是饥饿模式, 可以争夺锁
if old&mutexStarving == 0 {
new |= mutexLocked
}
// 是饥饿模式,那么就 waiter + 1
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// 如果当前锁锁住的情况下,切换模式.
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
// 被唤醒
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
// 如果更新成功了
if atomic.CompareAndSwapInt32(&m.state, old, new) {
/// 获取到锁了(当前是正常模式,并且抢到锁了)
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// 加入等待队列中(如果我们之前等待过,我们就加入队头)
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo, 1) // // If lifo is true, queue waiter at the head of wait queue.
// 被唤醒了,如果 > 1ms ,那么就设置模式为 饥饿模式
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
// 原来就是饥饿模式,说明我们在队列中被唤醒了,我们当前拿到锁了
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 将等待者 - 1
delta := int32(mutexLocked - 1<<mutexWaiterShift)
// 如果当前不是饥饿模式,或者是最后一个等待者,退出饥饿模式.
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
// 正常模式,被唤醒就要去争抢.
awoke = true
iter = 0
} else {
// 更新失败, 重新争抢
old = m.state
}
}
}
func (m *Mutex) Unlock() {
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// Outlined slow path to allow inlining the fast path.
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
m.unlockSlow(new)
}
}
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {
old := new
for {
// 此时 old 不是饥饿模式,并且不是locked的状态. old&(mutexLocked|mutexWoken|mutexStarving) = old & mutexStarving
// 如果是最后一个人, 或者当前被设置了 woken 标志,那么说明有人正在自旋,那么我们就会让它去争抢锁.
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 此时, old 就只有 waiter 标志位有值
// 为什么要打上 woken 标记呢? 考虑最极端的一种情况, 当前 等待者只有一个人, 那么我们如果此时在更新完状态后,
// 有人来 Lock(), 会进入 fast-path 尝试获取锁,此时如果没有 Woken 标志,会直接成功.
// 所以 woken 的第二个作用就已经显现出来了, 保证公平性.
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// 直接唤醒队头
runtime_Semrelease(&m.sema, true, 1)
}
}
RWMutex
- 核心: 依靠 readerCount 的最高位,代表我们是否有 写着等待.
- 利用 readerWait 唤醒写者(或者没有读者了,写者直接拿锁跑路)
package main
import (
"sync/atomic"
)
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount int32 // number of pending readers
readerWait int32 // number of departing readers
}
const rwmutexMaxReaders = 1 << 30
func (rw *RWMutex) RLock() {
// 如果发现读者 + 1 后小于 0, 说明有写者(写者加锁会 将 readCount - max)
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// 读者等待
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
}
/*
其实最有趣的是 (ra,rb) (wa,wb) 这几个操作的顺序性
假设
1. ra , wa
这种情况 ra 发生的时候, 读到的值 >= 0, 不会进入unlockSlow, 也就和 w 操作没有关系了
2. wa, ra, rb, wb
首先 wa 结束, readerCount < 0
wa -> r = (readerCount -= Max) + Max
ra -> readerCount + 1 < 0
rb -> readerWait - 1 < 0 // 不会唤醒
wb -> readerWait + r // 符合 判断的情况,如果此时 readerWait = 0,等于当前之前没有读者
3. wa, ra, wb, rb
首先 wa 结束, readerCount < 0
wa -> r = (readerCount -= Max) + Max
ra -> readerCount + 1 < 0
wb -> readerWait + r // 睡眠, 唤醒操作给 reader
rb ->
readerWait - 1 < 0 // 不会唤醒
readerWait - 1 = 0 // 唤醒读者
*/
func (rw *RWMutex) RUnlock() {
// 如果是 < 0 说明可能需要唤醒写者
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 { // ra
rw.rUnlockSlow(r)
}
}
func (rw *RWMutex) rUnlockSlow(r int32) {
// 离开的读者 + 1, 是最后一个读者 就唤醒写者
if atomic.AddInt32(&rw.readerWait, -1) == 0 { // rb
// 唤醒写者
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
func (rw *RWMutex) Lock() {
// 只有一个写者更改 reader 相关字段
rw.w.Lock()
// 通知 读者, 有等待的写者
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders // wa
// Wait for active readers.
// 如果发现 r !=0 也就是有读者,并且离开的读者和
// 截止到当前, 一共有 r 个读者, 然后如果这 r 个读者离开了, 你将我唤醒
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 { // wb
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
}
// TryLock tries to lock rw for writing and reports whether it succeeded.
//
// Note that while correct uses of TryLock do exist, they are rare,
// and use of TryLock is often a sign of a deeper problem
// in a particular use of mutexes.
func (rw *RWMutex) TryLock() bool {
if !rw.w.TryLock() {
return false
}
if !atomic.CompareAndSwapInt32(&rw.readerCount, 0, -rwmutexMaxReaders) {
rw.w.Unlock()
return false
}
return true
}
func (rw *RWMutex) Unlock() {
// 通知 读者,写者结束
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
throw("sync: Unlock of unlocked RWMutex")
}
// 唤醒阻塞的读者
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
rw.w.Unlock()
}
WaitGroup
- 一共三个字段 依次是
waiter, counter, sema
wait()
只会判断当前是否有工人- 如果有,
waiter+1
-> 休眠 -> 等待唤醒 - 没有,直接返回
- add() counter+=delta (如果
counter<0
panic)- 如果
counter>0
| 没有等待的人-> 返回 (counter=0 & waiter = 0)
如果没有工人正在工作 并且有等待的人, 将其唤醒
- 如果
Once
核心就是 二次判断
- 开始时判断
- 做之前判断
type Once struct {
done uint32 // 判断是否做过
m Mutex // 加锁 保证 只有一个人做
}
func (o *Once) Do(f func()) {
if atomic.LoadUint32(&o.done) == 0 { // 1. 判断是否做过
o.doSlow(f)
}
}
func (o *Once) doSlow(f func()) {
o.m.Lock() // 保证只有一个人做
defer o.m.Unlock()
if o.done == 0 { // 2. 判断是否做过
defer atomic.StoreUint32(&o.done, 1) // 标记做过了
f()
}
}
Cond
队指的是 notifyList
核心 利用ticket 标记入队顺序
- Wait时通过分配 队的
wait
分配ticket
,然后入队 - Signal, 唤醒ticket为
notify+1
的 G,然后将notify+1
- Broadcast, 将
notify = l.wait
然后改变G的状态为_Gwaiting->_Grunnable,然后放在当前P的next
中,唤醒空闲的P,空闲的P因为本地队列没有G,所以会偷,可能会偷到当前改变状态后的G
type Cond struct {
noCopy noCopy
// L is held while observing or changing the condition
L Locker
notify notifyList
checker copyChecker // 用于禁止运行期间发生的拷贝;
}
func NewCond(l Locker) *Cond {
return &Cond{L: l}
}
func (c *Cond) Wait() {
c.checker.check() //这就是检查是否发生运行时的拷贝
t := runtime_notifyListAdd(&c.notify) // 加一个等待的人 t 就是他的编号
c.L.Unlock()
runtime_notifyListWait(&c.notify, t) //等待被唤醒 编号是 t
// 0. 如果发现通知的下标 已经大于 当前的 t(自己的票号) 那么我们就返回(有可能是有人广播了)
// 1. 获取 Sudog 绑定一些信息
// 2. 放在通知链表里面
// 3. 休眠
c.L.Lock()
}
func (c *Cond) Signal() {
runtime_notifyListNotifyOne(&c.notify) // 遍历 notify 链表 并唤醒 中 票号为 l.notify 的 G
}
func (c *Cond) Broadcast() {
runtime_notifyListNotifyAll(&c.notify) // 广播,见上面分析.
}