1 原理
默认情况下,读写未就绪的channel(读没有数据的channel,或者写缓冲区已满的channel)时,协程会被阻塞。
但是当读写channel操作和select搭配使用时,即使channel未就绪,也可以执行其它分支,当前协程不会被阻塞。
ch := make(chan int)
select{
case <- ch:
default:
}
本文主要介绍channel的阻塞模式,和select搭配使用的非阻塞模式,后续会另起一篇介绍。
1.1 数据结构
channel涉及到的核心数据结构包含3个。
hchan
// channel
type hchan struct {
// 循环队列
qcount uint // 通道中数据个数
dataqsiz uint // buf长度
buf unsafe.Pointer // 数组指针
sendx uint // send index
recvx uint // receive index
elemsize uint16 // 元素大小
elemtype *_type // 元素类型
closed uint32 // 通道关闭标志
recvq waitq // 由双向链表实现的recv waiters队列
sendq waitq // 由双向链表实现的send waiters队列
lock mutex
}
hchan是channel底层的数据结构,其核心是由数组实现的一个环形缓冲区:
-
qcount 通道中数据个数
-
dataqsiz 数组长度
-
buf 指向数组的指针,数组中存储往channel发送的数据
-
sendx 发送元素到数组的index
-
recvx 从数组中接收元素的index
-
elemsize channel中元素类型的大小
-
elemtype channel中的元素类型
-
closed 通道关闭标志
-
recvq 因读取channel而陷入阻塞的协程等待队列
-
sendq 因发送channel而陷入阻塞的协程等待队列
-
lock 锁
waitq
// 等待队列(双向链表)
type waitq struct {
first *sudog
last *sudog
}
waitq是因读写channel而陷入阻塞的协程等待队列。
-
first 队列头部
-
last 队列尾部
sudog
// sudog represents a g in a wait list, such as for sending/receiving
// on a channel.
type sudog struct {
g *g // 等待send或recv的协程g
next *sudog // 等待队列下一个结点next
prev *sudog // 等待队列前一个结点prev
elem unsafe.Pointer // data element (may point to stack)
success bool // 标记协程g被唤醒是因为数据传递(true)还是channel被关闭(false)
c *hchan // channel
}
sudog是协程等待队列的节点:
-
g 因读写而陷入阻塞的协程
-
next 等待队列下一个节点
-
prev 等待队列前一个节点
-
elem 对于写channel,表示需要发送到channel的数据指针;对于读channel,表示需要被赋值的数据指针。
-
success 标记协程被唤醒是因为数据传递(true)还是channel被关闭(false)
-
c 指向channel的指针
1.2 通道创建
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// buf数组所需分配内存大小
mem := elem.size*uintptr(size)
var c *hchan
switch {
case mem == 0:// Unbuffered channels,buf无需内存分配
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0: // Buffered channels,通道元素类型非指针
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Buffered channels,通道元素类型是指针
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
return c
}
通道创建主要是分配内存并构建hchan对象。
1.3 通道写入
3种异常情况处理
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 1.channel为nil
if c == nil {
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
lock(&c.lock) //加锁
// 2.如果channel已关闭,直接panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// Block on the channel.
mysg := acquireSudog()
c.sendq.enqueue(mysg) // 入sendq等待队列
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
closed := !mysg.success // 协程被唤醒的原因是因为数据传递还是通道被关闭
// 3.因channel被关闭导致阻塞写协程被唤醒并panic
if closed {
panic(plainError("send on closed channel"))
}
}
-
对 nil channel写入,会死锁
-
对被关闭的channel写入,会panic
-
对因写入而陷入阻塞的协程,如果channel被关闭,阻塞协程会被唤醒并panic
写时有阻塞读协程
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
lock(&c.lock) //加锁
// 1、当存在等待接收的Goroutine
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3) // 直接把正在发送的值发送给等待接收的Goroutine,并将此接收协程放入可调度队列等待调度
return true
}
}
// send processes a send operation on an empty channel c.
// The value ep sent by the sender is copied to the receiver sg.
// The receiver is then woken up to go on its merry way.
// Channel c must be empty and locked. send unlocks c with unlockf.
// sg must already be dequeued from c.
// ep must be non-nil and point to the heap or the caller's stack.
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 将ep写入sg中的elem
if sg.elem != nil {
t:=c.elemtype
dst := sg.elem
// memmove copies n bytes from "from" to "to".
memmove(dst, ep, t.size)
sg.elem = nil // 数据已经被写入到<- c变量,因此sg.elem指针可以置空了
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
// 唤醒receiver协程gp
goready(gp, skip+1)
}
// 唤醒receiver协程gp,将其放入可运行队列中等待调度执行
func goready(gp *g, traceskip int) {
systemstack(func() {
ready(gp, traceskip, true)
})
}
// Mark gp ready to run.
func ready(gp *g, traceskip int, next bool) {
status := readgstatus(gp)
// Mark runnable.
_g_ := getg()
mp := acquirem() // disable preemption because it can be holding p in a local var
// status is Gwaiting or Gscanwaiting, make Grunnable and put on runq
casgstatus(gp, _Gwaiting, _Grunnable)
runqput(_g_.m.p.ptr(), gp, next)
wakep()
releasem(mp)
}
-
加锁
-
从阻塞读协程队列取出sudog节点
-
在send方法中,调用memmove方法将数据拷贝给sudog.elem指向的变量。
-
goready方法唤醒接收到数据的阻塞读协程g,将其放入协程可运行队列中等待调度
-
解锁
写时无阻塞读协程但环形缓冲区仍有空间
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
lock(&c.lock) //加锁
// 当缓冲区未满时
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx) // 获取指向缓冲区数组中位于sendx位置的元素的指针
typedmemmove(c.elemtype, qp, ep) // 将当前发送的值拷贝到缓冲区
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0 // 因为是循环队列,sendx等于队列长度时置为0
}
c.qcount++
unlock(&c.lock)
return true
}
}
-
加锁
-
将数据放入环形缓冲区
-
解锁
写时无阻塞读协程且环形缓冲区无空间
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
lock(&c.lock) //加锁
// Block on the channel.
// 将当前的Goroutine打包成一个sudog节点,并加入到阻塞写队列sendq里
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
mysg.g = gp
mysg.c = c
gp.waiting = mysg
c.sendq.enqueue(mysg) // 入sendq等待队列
// 调用gopark将当前Goroutine设置为等待状态并解锁,进入休眠等待被唤醒,触发协程调度
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// 被唤醒之后执行清理工作并释放sudog结构体
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success // gp被唤醒的原因是因为数据传递还是通道被关闭
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
// 因关闭被唤醒则panic
if closed {
panic(plainError("send on closed channel"))
}
// 数据成功传递
return true
}
-
加锁。
-
将当前协程gp封装成sudog节点,并加入channel的阻塞写队列sendq。
-
调用gopark将当前协程设置为等待状态并解锁,触发调度其它协程运行。
-
因数据被读或者channel被关闭,协程从park中被唤醒,清理sudog结构。
-
因channel被关闭导致协程唤醒,panic
-
返回
整体写流程
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 1.channel为nil
if c == nil {
// 当前Goroutine阻塞挂起
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// 2.加锁
lock(&c.lock)
// 3.如果channel已关闭,直接panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// 4、存在阻塞读协程
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3) // 直接把正在发送的值发送给等待接收的Goroutine,并将此接收协程放入可调度队列等待调度
return true
}
// 5、缓冲区未满时
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx) // 获取指向缓冲区数组中位于sendx位置的元素的指针
typedmemmove(c.elemtype, qp, ep) // 将当前发送的值拷贝到缓冲区
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0 // 因为是循环队列,sendx等于队列长度时置为0
}
c.qcount++
unlock(&c.lock)
return true
}
// Block on the channel.
// 6、将当前协程打包成一个sudog结构体,并加入到channel的阻塞写队列sendq
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg) // 入sendq等待队列
atomic.Store8(&gp.parkingOnChan, 1)
// 7.调用gopark将当前协程设置为等待状态并解锁,进入休眠,等待被唤醒,并触发协程调度
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// 8. 被唤醒之后执行清理工作并释放sudog结构体
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success // g被唤醒的原因是因为数据传递还是通道被关闭
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
// 9.因关闭被唤醒则panic
if closed {
panic(plainError("send on closed channel"))
}
// 10.数据成功传递
return true
}
-
channel为nil检查。为空则死锁。
-
加锁
-
如果channel已关闭,直接panic。
-
当存在阻塞读协程,直接把数据发送给读协程,唤醒并将其放入协程可运行队列中等待调度运行。
-
当缓冲区未满时,将当前发送的数据拷贝到缓冲区。
-
当既没有阻塞读协程,缓冲区也没有剩余空间时,将协程加入阻塞写队列sendq。
-
调用gopark将当前协程设置为等待状态,进入休眠等待被唤醒,触发协程调度。
-
被唤醒之后执行清理工作并释放sudog结构体
-
唤醒之后检查,因channel被关闭导致协程唤醒则panic。
-
返回。
1.4 通道读
2种异常情况处理
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 1.channel为nil
if c == nil {
// 否则,当前Goroutine阻塞挂起
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
lock(&c.lock)
// 2.如果channel已关闭,并且缓冲区无元素,返回(true,false)
if c.closed != 0 {
if c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
//根据channel元素的类型清理ep对应地址的内存,即ep接收了channel元素类型的零值
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
}
-
channel未初始化,读操作会死锁
-
channel已关闭且缓冲区无数据,给读变量赋零值。
读时有阻塞写协程
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
lock(&c.lock)
// Just found waiting sender with not closed.
// 等待发送的队列sendq里存在Goroutine
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
// 如果无缓冲区,那么直接从sender接收数据;否则,从buf队列的头部接收数据,并把sender的数据加到buf队列的尾部
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true // 接收成功
}
}
// recv processes a receive operation on a full channel c.
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// channel无缓冲区,直接从sender读
if c.dataqsiz == 0 {
if ep != nil {
// copy data from sender
t := c.elemtype
src := sg.elem
typeBitsBulkBarrier(t, uintptr(ep), uintptr(src), t.size)
memmove(dst, src, t.size)
}
} else {
// 从队列读,sender再写入队列
qp := chanbuf(c, c.recvx)
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
// 唤醒sender队列协程sg
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
// 唤醒协程
goready(gp, skip+1)
}
-
加锁
-
从阻塞写队列取出sudog节点
-
假如channel为无缓冲区通道,则直接读取sudog对应写协程数据,唤醒写协程。
-
假如channel为缓冲区通道,从channel缓冲区头部(recvx)读数据,将sudog对应写协程数据,写入缓冲区尾部(sendx),唤醒写协程。
-
解锁
读时无阻塞写协程且缓冲区有数据
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
lock(&c.lock)
// 缓冲区buf中有元素,直接从buf拷贝元素到当前协程(在已关闭的情况下,队列有数据依然会读)
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)// 将从buf中取出的元素拷贝到当前协程
}
typedmemclr(c.elemtype, qp) // 同时将取出的数据所在的内存清空
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true // 接收成功
}
}
-
加锁
-
从环形缓冲区读数据。在channel已关闭的情况下,缓冲区有数据依然可以被读。
-
解锁
读时无阻塞写协程且缓冲区无数据
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
lock(&c.lock)
// no sender available: block on this channel.
// 阻塞模式,获取当前Goroutine,打包一个sudog,并加入到channel的接收队列recvq里
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
gp.waiting = mysg
mysg.g = gp
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg) // 入接收队列recvq
// 挂起当前Goroutine,设置为_Gwaiting状态,进入休眠等待被唤醒
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// 因通道关闭或者读到数据被唤醒
gp.waiting = nil
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success // 10.返回成功
}
-
加锁。
-
将当前协程gp封装成sudog节点,加入channel的阻塞读队列recvq。
-
调用gopark将当前协程设置为等待状态并解锁,触发调度其它协程运行。
-
因读到数据或者channel被关闭,协程从park中被唤醒,清理sudog结构。
-
返回
整体读流程
// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 1.channel为nil
if c == nil {
// 否则,当前Goroutine阻塞挂起
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// 2.加锁
lock(&c.lock)
// 3.如果channel已关闭,并且缓冲区无元素,返回(true,false)
if c.closed != 0 {
if c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
//根据channel元素的类型清理ep对应地址的内存,即ep接收了channel元素类型的零值
typedmemclr(c.elemtype, ep)
}
return true, false
}
// The channel has been closed, but the channel's buffer have data.
} else {
// Just found waiting sender with not closed.
// 4.存在阻塞写协程
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
// 如果无缓冲区,那么直接从sender接收数据;否则,从buf队列的头部接收数据,并把sender的数据加到buf队列的尾部
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true // 接收成功
}
}
// 5.缓冲区buf中有元素,直接从buf拷贝元素到当前协程(在已关闭的情况下,队列有数据依然会读)
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)// 将从buf中取出的元素拷贝到当前协程
}
typedmemclr(c.elemtype, qp) // 同时将取出的数据所在的内存清空
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true // 接收成功
}
// no sender available: block on this channel.
// 6.获取当前Goroutine,封装成sudog节点,加入channel阻塞读队列recvq
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg) // 入接收队列recvq
atomic.Store8(&gp.parkingOnChan, 1)
// 7.挂起当前Goroutine,设置为_Gwaiting状态,进入休眠等待被唤醒
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// 8.因通道关闭或者可读被唤醒
gp.waiting = nil
gp.activeStackChans = false
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
// 9.返回
return true, success
}
通道读流程如下:
-
channel为nil检查。空则死锁。
-
加锁。
-
如果channel已关闭,并且缓冲区无数据,读变量赋零值,返回。
-
当存在阻塞写协程,如果缓冲区已满,则直接从sender接收数据;否则,从环形缓冲区头部接收数据,并把sender的数据加到环形缓冲区尾部。唤醒sender,将其放入协程可运行队列中等待调度运行,返回。
-
如果缓冲区中有数据,直接从缓冲区拷贝数据到当前协程,返回。
-
当既没有阻塞写协程,缓冲区也没有数据时,将协程加入阻塞读队列recvq。
-
调用gopark将当前协程设置为等待状态,进入休眠等待被唤醒,触发协程调度。
-
因通道关闭或者可读被唤醒。
-
返回。
1.5 通道关闭
func closechan(c *hchan) {
// // 1.channel为nil则panic
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
// 2.已关闭的channel再次关闭则panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
// 设置关闭标记
c.closed = 1
var glist gList
// 遍历recvq和sendq中的协程放入glist
// release all readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
glist.push(gp)
}
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
glist.push(gp)
}
unlock(&c.lock)
// 3.将glist中所有Goroutine的状态置为_Grunnable,等待调度器进行调度
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
-
channel为nil检查。为空则panic
-
已关闭channel再次被关闭,panic
-
将sendq和recvq所有Goroutine的状态置为_Grunnable,放入协程调度队列等待调度器调度