首页 > 其他分享 >GO 同步原语

GO 同步原语

时间:2023-01-01 15:55:21浏览次数:56  
标签:同步 old 原语 rw atomic func GO 唤醒 readerCount

Mutex

  1. 饥饿模式的判定: 等待时间超过 1ms
  2. woken 标志的作用:
    1. 通知unlock有人在自旋, 这样unlock时,我们不会去唤醒队列中阻塞的G
    2. 保证公平性,也就是当我们还剩最后一个在获取锁,如果我们这时候有一个人来 Lock()如果没有这个标志位的话,新的竞争者 会 通过fast-path获取锁,但是如果有这个标志位,我们可以让他们进入 lockslow()一起竞争.
func (m *Mutex) Lock() {
    // Fast path: grab unlocked mutex.
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { // 如果直接可以获取到锁
       if race.Enabled {
          race.Acquire(unsafe.Pointer(m))
      }
       return
   }
    // Slow path (outlined so that the fast path can be inlined)
    m.lockSlow()
}

func sync_runtime_canSpin(i int) bool {
    // sync.Mutex is cooperative, so we are conservative with spinning.
    // Spin only few times and only if running on a multicore machine and
    // GOMAXPROCS>1 and there is at least one other running P and local runq is empty.
    // As opposed to runtime mutex we don't do passive spinning here,
    // because there can be work on global runq or on other Ps.
    if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
       return false
   }
    if p := getg().m.p.ptr(); !runqempty(p) {
       return false
   }
    return true
}

func (m *Mutex) lockSlow() {
    var waitStartTime int64
    starving := false
    awoke := false
    iter := 0
    old := m.state
    for {
       // 如果当前是正常模式 并且 锁已经被获取了,判断是否可以自旋
       if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
          // 小于 4 次,并且 cpu >= 1 , 至少有一个运行的 P
          // 并且当前绑定的 P 中本地runq是空的,
          // 设置 woken, 避免让 unlock 去唤醒其他锁住的 G
          // 如果当前有等待的人,并且当前我们可以设置 woken 标志(这里与 unlockSlow 相对应)
          if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
          atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }
          runtime_doSpin()
          iter++
          old = m.state
          continue
      }
       new := old

       // 不是饥饿模式, 可以争夺锁
       if old&mutexStarving == 0 {
          new |= mutexLocked
      }
       // 是饥饿模式,那么就 waiter + 1
       if old&(mutexLocked|mutexStarving) != 0 {
          new += 1 << mutexWaiterShift
      }
       // 如果当前锁锁住的情况下,切换模式.
       if starving && old&mutexLocked != 0 {
          new |= mutexStarving
      }
       // 被唤醒
       if awoke {
          if new&mutexWoken == 0 {
             throw("sync: inconsistent mutex state")
         }
          new &^= mutexWoken
      }
       // 如果更新成功了
       if atomic.CompareAndSwapInt32(&m.state, old, new) {
          /// 获取到锁了(当前是正常模式,并且抢到锁了)
          if old&(mutexLocked|mutexStarving) == 0 {
             break // locked the mutex with CAS
         }
          // 加入等待队列中(如果我们之前等待过,我们就加入队头)
          queueLifo := waitStartTime != 0
          if waitStartTime == 0 {
             waitStartTime = runtime_nanotime()
         }
          runtime_SemacquireMutex(&m.sema, queueLifo, 1) // // If lifo is true, queue waiter at the head of wait queue.
          // 被唤醒了,如果 > 1ms ,那么就设置模式为 饥饿模式
          starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
          old = m.state
          if old&mutexStarving != 0 {
             // 原来就是饥饿模式,说明我们在队列中被唤醒了,我们当前拿到锁了
             if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                throw("sync: inconsistent mutex state")
            }
             // 将等待者 - 1
             delta := int32(mutexLocked - 1<<mutexWaiterShift)
             // 如果当前不是饥饿模式,或者是最后一个等待者,退出饥饿模式.
             if !starving || old>>mutexWaiterShift == 1 {
                delta -= mutexStarving
            }
             atomic.AddInt32(&m.state, delta)
             break
         }
          // 正常模式,被唤醒就要去争抢.
          awoke = true
          iter = 0
      } else {
          // 更新失败, 重新争抢
          old = m.state
      }
   }
}

  func (m *Mutex) Unlock() {

    // Fast path: drop lock bit.
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if new != 0 {
       // Outlined slow path to allow inlining the fast path.
       // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
       m.unlockSlow(new)
   }
}

  func (m *Mutex) unlockSlow(new int32) {
    if (new+mutexLocked)&mutexLocked == 0 {
       throw("sync: unlock of unlocked mutex")
   }
    if new&mutexStarving == 0 {
       old := new
       for {
          // 此时 old 不是饥饿模式,并且不是locked的状态. old&(mutexLocked|mutexWoken|mutexStarving) = old & mutexStarving
          // 如果是最后一个人, 或者当前被设置了 woken 标志,那么说明有人正在自旋,那么我们就会让它去争抢锁.
          if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
             return
         }

          // 此时, old 就只有 waiter 标志位有值
          // 为什么要打上 woken 标记呢? 考虑最极端的一种情况, 当前 等待者只有一个人, 那么我们如果此时在更新完状态后,
          // 有人来 Lock(), 会进入 fast-path 尝试获取锁,此时如果没有 Woken 标志,会直接成功.
          // 所以 woken 的第二个作用就已经显现出来了, 保证公平性.
          new = (old - 1<<mutexWaiterShift) | mutexWoken
          if atomic.CompareAndSwapInt32(&m.state, old, new) {
             runtime_Semrelease(&m.sema, false, 1)
             return
         }
          old = m.state
      }
   } else {
       // 直接唤醒队头
       runtime_Semrelease(&m.sema, true, 1)
   }
}

RWMutex

  1. 核心: 依靠 readerCount 的最高位,代表我们是否有 写着等待.
  2. 利用 readerWait 唤醒写者(或者没有读者了,写者直接拿锁跑路)
package main

import (
	"sync/atomic"
)

type RWMutex struct {
	w           Mutex  // held if there are pending writers
	writerSem   uint32 // semaphore for writers to wait for completing readers
	readerSem   uint32 // semaphore for readers to wait for completing writers
	readerCount int32  // number of pending readers
	readerWait  int32  // number of departing readers
}


const rwmutexMaxReaders = 1 << 30

func (rw *RWMutex) RLock() {
	// 如果发现读者 + 1 后小于 0, 说明有写者(写者加锁会 将 readCount - max)
	if atomic.AddInt32(&rw.readerCount, 1) < 0 {
		// 读者等待
		runtime_SemacquireMutex(&rw.readerSem, false, 0)
	}

}

/*
	其实最有趣的是 (ra,rb) (wa,wb) 这几个操作的顺序性
	假设
		1. ra , wa
			这种情况 ra 发生的时候, 读到的值 >= 0, 不会进入unlockSlow, 也就和 w 操作没有关系了
		2. wa, ra, rb, wb
			首先 wa 结束, readerCount < 0
			wa -> r = (readerCount -= Max) + Max
			ra -> readerCount + 1 < 0
			rb -> readerWait - 1 < 0 // 不会唤醒
			wb -> readerWait + r // 符合 判断的情况,如果此时 readerWait = 0,等于当前之前没有读者
		3. wa, ra, wb, rb
			首先 wa 结束, readerCount < 0
			wa -> r = (readerCount -= Max) + Max
			ra -> readerCount + 1 < 0
			wb -> readerWait + r  // 睡眠, 唤醒操作给 reader
			rb -> 
					readerWait - 1 < 0 // 不会唤醒
					readerWait - 1 = 0 // 唤醒读者
*/

func (rw *RWMutex) RUnlock() {
	// 如果是 < 0 说明可能需要唤醒写者
	if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 { // ra
		rw.rUnlockSlow(r)
	}
}

func (rw *RWMutex) rUnlockSlow(r int32) {
	// 离开的读者 + 1, 是最后一个读者 就唤醒写者
	if atomic.AddInt32(&rw.readerWait, -1) == 0 { // rb
		// 唤醒写者
		runtime_Semrelease(&rw.writerSem, false, 1)
	}
}

func (rw *RWMutex) Lock() {
	// 只有一个写者更改 reader 相关字段
	rw.w.Lock()

	// 通知 读者, 有等待的写者
	r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders // wa
	// Wait for active readers.
	// 如果发现 r !=0 也就是有读者,并且离开的读者和
	// 截止到当前, 一共有 r 个读者, 然后如果这 r 个读者离开了, 你将我唤醒
	if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 { // wb
		runtime_SemacquireMutex(&rw.writerSem, false, 0)
	}
}

// TryLock tries to lock rw for writing and reports whether it succeeded.
//
// Note that while correct uses of TryLock do exist, they are rare,
// and use of TryLock is often a sign of a deeper problem
// in a particular use of mutexes.
func (rw *RWMutex) TryLock() bool {
	if !rw.w.TryLock() {
		return false
	}
	if !atomic.CompareAndSwapInt32(&rw.readerCount, 0, -rwmutexMaxReaders) {
		rw.w.Unlock()
		return false
	}

	return true
}

func (rw *RWMutex) Unlock() {

	// 通知 读者,写者结束
	r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
	if r >= rwmutexMaxReaders {
		throw("sync: Unlock of unlocked RWMutex")
	}
	// 唤醒阻塞的读者
	for i := 0; i < int(r); i++ {
		runtime_Semrelease(&rw.readerSem, false, 0)
	}
	rw.w.Unlock()
}

WaitGroup

  1. 一共三个字段 依次是 waiter, counter, sema
  2. wait() 只会判断当前是否有工人
  3. 如果有, waiter+1 -> 休眠 -> 等待唤醒
  4. 没有,直接返回
  5. add() counter+=delta (如果counter<0 panic)
    1. 如果counter>0 | 没有等待的人-> 返回
    2. (counter=0 & waiter = 0)如果没有工人正在工作 并且有等待的人, 将其唤醒

Once

核心就是 二次判断

  1. 开始时判断
  2. 做之前判断
type Once struct {
    done uint32 // 判断是否做过
    m    Mutex // 加锁 保证 只有一个人做
}


func (o *Once) Do(f func()) {
    if atomic.LoadUint32(&o.done) == 0 { // 1. 判断是否做过
        o.doSlow(f)
    }
}

func (o *Once) doSlow(f func()) {
    o.m.Lock() // 保证只有一个人做
    defer o.m.Unlock() 
    if o.done == 0 { // 2. 判断是否做过
        defer atomic.StoreUint32(&o.done, 1) // 标记做过了
        f()
    }
}

Cond

队指的是 notifyList
核心 利用ticket 标记入队顺序

  1. Wait时通过分配 队的wait 分配 ticket,然后入队
  2. Signal, 唤醒ticket为 notify+1 的 G,然后将notify+1
  3. Broadcast, 将 notify = l.wait 然后改变G的状态为_Gwaiting->_Grunnable,然后放在当前P的next中,唤醒空闲的P,空闲的P因为本地队列没有G,所以会偷,可能会偷到当前改变状态后的G

type Cond struct {
	noCopy noCopy

	// L is held while observing or changing the condition
	L Locker

	notify  notifyList
	checker copyChecker // 用于禁止运行期间发生的拷贝;
}

func NewCond(l Locker) *Cond {
	return &Cond{L: l}
}

func (c *Cond) Wait() {
	c.checker.check()                     //这就是检查是否发生运行时的拷贝
	t := runtime_notifyListAdd(&c.notify) // 加一个等待的人 t 就是他的编号
	c.L.Unlock()

	runtime_notifyListWait(&c.notify, t) //等待被唤醒 编号是 t
	// 0. 如果发现通知的下标 已经大于 当前的 t(自己的票号) 那么我们就返回(有可能是有人广播了)
	// 1. 获取 Sudog 绑定一些信息
	// 2. 放在通知链表里面
	// 3. 休眠
	c.L.Lock()
}
func (c *Cond) Signal() {
	runtime_notifyListNotifyOne(&c.notify) // 遍历 notify 链表 并唤醒 中 票号为 l.notify 的 G
}
func (c *Cond) Broadcast() {
	runtime_notifyListNotifyAll(&c.notify) // 广播,见上面分析.
}


引用

  1. https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-sync-primitives/

标签:同步,old,原语,rw,atomic,func,GO,唤醒,readerCount
From: https://www.cnblogs.com/jgjg/p/17018147.html

相关文章

  • Algorithm 3 - 数据结构
    数据结构是该好好补补拉。1.线段树2.平衡树3.莫队3.1普通莫队莫队解决的问题一般是区间查询,并且可以离线。利用一种排序区间的方式,保证暴力移动最有指针的复杂度......
  • Django——全局配置settings详解
    Django设置文件包含你所有的Django安装配置。这个文件一般在你的项目文件夹里。比如我们创建了一个名为mysite的项目,那么这个配置文件setting.py就在项目里的mysite文件夹......
  • BundleFusion_Ubuntu_Pangolin 安装的一些error
    /usr/bin/ld:找不到-lEigen3::Eigen解决方法:find_package(Eigen3REQUIRED)为list(APPENDCMAKE_INCLUDE_PATH"/usr/local/include")find_package(Eigen33.3RE......
  • Django模板层
    目录Django模板层一、关于模板语法二、模板层之标签二、自定义过滤器、标签三、模板的继承与导入四、模板层前期准备Django模板层一、关于模板语法针对需要加括号调用的......
  • MongoDB从入门到实战之MongoDB工作常用操作命令
    前言:上一章节我们快速的在Docker容器中安装了MongoDB,并且通过NavicatMongoDB可视化管理工具快速的连接、创建数据库、集合以及添加了文档数据源。这一章节我们主要是......
  • Vulnhub之Jangow:1.0.1靶机完整详细测试过程
    Jangow作者:jason_huawen靶机信息名称:Jangow:1.0.1地址:识别目标主机IP地址(kali㉿kali)-[~/Vulnhub/jangow_2]└─$sudonetdiscover-ieth1-r192.168.56.0/......
  • Django视图层
    目录Django视图层一、视图层之必会三板斧二、JsonResponse对象三、request对象四、视图层之FBV与CBV五、CBV源码剖析六、虚拟环境Django视图层一、视图层之必会三板斧用......
  • 使用自己的数据集训练GoogLenet InceptionNet V1 V2 V3模型(TensorFlow)
    使用自己的数据集训练GoogLenetInceptionNetV1V2V3模型(TensorFlow)新增博客《​​使用自己的数据集训练MobileNet、ResNet图像识别(TensorFlow)​​》一、前言1、网上已有......
  • Django中关于Manager的使用
    首先介绍一下manager的使用场景,比如我们有一些表级别的,需要重复使用的功能,都可以使用manager来实现。比如我们在前面的笔记中介绍的model的create()、update()等......
  • Never Gonna Give You Up
    \[来到这个网页,等同于你被骗了。\]\[\color{red}\text{NeverGonnaGiveYouUp}\]......