atomic
和sema
是实现go
中锁的基础,简单看下他们的实现原理。
atomic
`atomic 常用来作为保证原子性的操作。
当多个协程,同时一个数据进行操作时候,如果不加锁,最终的很难得到想要的结果。
var p int64 = 0
func add() {
p = p + 1
}
func main() {
for i := 0; i < 1000; i++ {
go add()
}
time.Sleep(time.Second * 5)
fmt.Println(p) //982
}
这种情况下,最终打印的 都不会是1000,每次不固定。
改成atomic 能解决
var p int64 = 0
func add() {
atomic.AddInt64(&p, 1)
}
func main() {
for i := 0; i < 1000; i++ {
go add()
}
time.Sleep(time.Second * 5)
fmt.Println(p)
}
atomic 为什么能做到?
TEXT sync∕atomic·AddInt64(SB), NOSPLIT, $0-24
GO_ARGS
MOVD $__tsan_go_atomic64_fetch_add(SB), R9
BL racecallatomic<>(SB)
MOVD add+8(FP), R0 // convert fetch_add to add_fetch
MOVD ret+16(FP), R1
ADD R0, R1, R0
MOVD R0, ret+16(FP)
RET
老的版本中是能见到,lock 这种操作系统级别的锁,新版的go已经改写了这块逻辑,但是能猜想到效果肯定一样。 如果有理清楚的,评论区可以交流下。
小结:
原子操作是一种硬件层面加锁的机制
保证操作一个变量的时候,其他协程/线程无法访问
sema
几乎在go的每个锁的定义都能看到sema
的身影,理解了sema
再看 互斥锁、读写锁就会很好理解。
信号量锁/信号锁
核心是一个uint32值,含义是同时可并发的数量
每一个sema 锁都对应一个SemaRoot结构体
SemaRoot中有一个平衡二叉树用于协程排队
例如:
type Mutex struct {
state int32
sema uint32
}
sema的uint32中的 每一个数,背后都对应一个 semaRoot的结构体
type semaRoot struct {
lock mutex
treap *sudog // root of balanced tree of unique waiters.
nwait atomic.Uint32 // Number of waiters. Read w/o the lock.
}
type sudog struct {
g *g // 包含了 协程 g
next *sudog // 下一个
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
}
结构如下:
这里可以先讲下,当这个 sema uint32
值,初始化时候,大于0 比如 赋值5,就代表着,并发时候,有5个协程可以获取锁。其他协程需要等待前面5个释放了,才能进入。
sema 大于0
// 获取sema锁。大于0的情况
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int, reason waitReason) {
gp := getg()
if gp != gp.m.curg {
throw("semacquire not on the G stack")
}
// Easy case. // 容易的情况
if cansemacquire(addr) {
return
}
// 方法很长,先看简单的部分。
}
func cansemacquire(addr *uint32) bool {
for {
v := atomic.Load(addr) // 根据sema的地址,获取int32的值
if v == 0 { // 如果未0了,就获取失败了
return false
}
// 大于0 ,则把 sema的值减去1
if atomic.Cas(addr, v, v-1) { // cas 就是 CompareAndSwapInt 的底层实现
return true
}
}
}
到此,对sema为什么只是定义为一个 uint32的值有了大致理解,就是一个控制能有多少个协程同时获取锁的值。
看下释放:
func semrelease1(addr *uint32, handoff bool, skipframes int) {
root := semtable.rootFor(addr)
atomic.Xadd(addr, 1) // 给sema的值 加上1
// Easy case: no waiters?
// This check must happen after the xadd, to avoid a missed wakeup
// (see loop in semacquire).
if root.nwait.Load() == 0 { // 如果没有 nwait在等待,就直接结束。
return
}
}
nwait 就是等待协程的个数。
小结, 当sema的值大于0 :
获取锁:uint32减1 ,获取成功
释放锁:uint32加1,释放成功
sema值等于0
再看 semacquire1
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int, reason waitReason) {
/ / Harder case:
// increment waiter count
s := acquireSudog()
root := semtable.rootFor(addr) // 根据sema的地址,获取了包含 sudog的队列
for {
root.queue(addr, s, lifo) // 将新的协程放入这个等待队列中
goparkunlock(&root.lock, reason, traceBlockSync, 4+skipframes)
// 主动调用协程的gopark,让它休眠,gopark的说明看 go GMP中有讲
}
releaseSudog(s)
}
//再看释放
func semrelease1(addr *uint32, handoff bool, skipframes int) {
root := semtable.rootFor(addr)
atomic.Xadd(addr, 1)
if root.nwait.Load() == 0 {
return
}
// 如果等待队列不是0 ,就需要释放一个
// Harder case: search for a waiter and wake it.
lockWithRank(&root.lock, lockRankRoot)
s, t0 := root.dequeue(addr) // 从全局的队列中,取出一个
if s != nil {
root.nwait.Add(-1) // 把等待的数量减一
}
unlock(&root.lock) //操作全局队列都需要加锁
}
小结,当sema的值等于0时候:
获取锁:协程休眠,进入堆树等待
释放锁:从堆树中取出一个协程,唤醒
sema 锁退化成一个专用休眠队列
有没有可能sema的值,小于0 ?
看看sema的定义 `uint32` 所以 不可能。
总结下:
atomic原子操作是一种硬件层面的加锁机制。
sema 背后是一整套锁的管理和等待的机制,开发者在使用时候,感知不到。
sema的值就是能同时获取锁协程的个数。sema的地址作为了休眠等待队列(平衡树)的key。
标签:协程,addr,sema,atomic,go,root,uint32
From: https://www.cnblogs.com/studyios/p/17869424.html