Golang如何优雅地关闭 channel
萧瑟 golang面试经典讲解 2023-05-31 21:00 发表于上海一、介绍
想必听说过 go 的,应该都知道 go 的最大的特性 goroutine 并发编程,而说到并发编程,使用 channel 进行数据传输是 go 中的必修课。
go 的并发哲学:不要通过共享内存来通信,而要通过通信来实现内存共享。
channel 的坑不少,本篇简单聊聊关闭 channel 的方法。
二、关闭channel原则
坊间流传的关闭 channel
的原则:
不要从接收端关闭
channel
,也不要在有多个发送端时,主动关闭channel
这个原则的来源就因为:
不能向已关闭的
channel
发送数据会导致panic不能重复关闭已关闭的
channel 会导致panic
一个比较粗糙的检查 channel 是否关闭的函数:
package main
import "fmt"
func IsClosed(ch <-chan int) bool {
select {
case <-ch:
return true
default:
}
return false
}
func main() {
c := make(chan int)
fmt.Println(IsClosed(c)) // false
close(c)
fmt.Println(IsClosed(c)) // true
}
看一下代码,其实存在很多问题。首先,IsClosed 函数是一个有副作用的函数。每调用一次,都会读出 channel 里的一个元素,改变了 channel 的状态。这不是一个好的函数,干活就干活,还顺手牵羊!
其次,IsClosed 函数返回的结果仅代表调用那个瞬间,并不能保证调用之后会不会有其他 goroutine 对它进行了一些操作,改变了它的这种状态。例如,IsClosed 函数返回 true,但这时有另一个 goroutine 关闭了 channel,而你还拿着这个过时的 “channel 未关闭”的信息,向其发送数据,就会导致 panic 的发生。当然,一个 channel 不会被重复关闭两次,如果 IsClosed 函数返回的结果是 true,说明 channel 是真的关闭了。
有两个不那么优雅地关闭 channel 的方法:
-
使用 defer-recover 机制,放心大胆地关闭 channel 或者向 channel 发送数据。即使发生了 panic,有 defer-recover 在兜底。
-
使用 sync.Once 来保证只关闭一次。
三、如何优雅关闭channel
根据 sender 和 receiver 的个数,分下面几种情况:
-
一个 sender,一个 receiver
-
一个 sender, M 个 receiver
-
N 个 sender,一个 reciver
-
N 个 sender, M 个 receiver
3.1 1和2的情况
只有一个 sender 的情况就不用说了,直接从 sender 端关闭就好了,没有问题。func main() {
dataCh := make(chan int, 100)
// sender
go func() {
for i := 0; i < 1000; i++ {
dataCh <- i + 1
}
log.Println("send complete")
close(dataCh)
}()
// receiver
for i := 0; i < 5; i++ {
go func() {
for {
data, ok := <-dataCh
if !ok { // 已关闭
return
}
_ = data
}
}()
}
select {
case <-time.After(time.Second * 5):
fmt.Println(runtime.NumGoroutine())
}
}
3.1 3的情况
优雅关闭 channel 的方法是:the only receiver says "please stop sending more" by closing an additional signal channel。
解决方案就是增加一个传递关闭信号的 channel,receiver 通过信号 channel 下达关闭数据 channel 指令。senders 监听到关闭信号后,停止接收数据。代码如下:
package main
import (
"log"
"math/rand"
"sync"
"time"
)
func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
const Max = 100000
const NumSenders = 1000
wgReceivers := sync.WaitGroup{}
wgReceivers.Add(1)
dataCh := make(chan int)
stopCh := make(chan struct{})
// senders
for i := 0; i < NumSenders; i++ {
go func() {
for {
select {
case <-stopCh:
return
default:
}
select {
case <-stopCh:
return
case dataCh <- rand.Intn(Max):
}
}
}()
}
// receiver
go func() {
defer wgReceivers.Done()
for value := range dataCh {
if value == Max-1 {
close(stopCh)
return
}
log.Println(value)
}
}()
wgReceivers.Wait()
}
这里的 stopCh 就是信号 channel,它本身只有一个 sender,因此可以直接关闭它。senders 收到了关闭信号后,select 分支 “case <- stopCh” 被选中,退出函数,不再发送数据。
需要说明的是,上面的代码并没有明确关闭 dataCh。在 Go 语言中,对于一个 channel,如果最终没有任何 goroutine 引用它,不管 channel 有没有被关闭,最终都会被 gc 回收。所以,在这种情形下,所谓的优雅地关闭 channel 就是不关闭 channel,让 gc 代劳。
3.1 4的情况
优雅关闭 channel 的方法是:any one of them says "let's end the game" by notifying a moderator to close an additional signal channel。
和第 3 种情况不同,这里有 M 个 receiver,如果直接还是采取第 3 种解决方案,由 receiver 直接关闭 stopCh 的话,就会重复关闭一个 channel,导致 panic。因此需要增加一个中间人,M 个 receiver 都向它发送关闭 dataCh 的“请求”,中间人收到第一个请求后,就会直接下达关闭 dataCh 的指令(通过关闭 stopCh,这时就不会发生重复关闭的情况,因为 stopCh 的发送方只有中间人一个)。另外,这里的 N 个 sender 也可以向中间人发送关闭 dataCh 的请求。
package main
import (
"fmt"
"math/rand"
"strconv"
"time"
)
func main() {
rand.Seed(time.Now().UnixNano())
const Max = 100000
const NumReceivers = 10
const NumSenders = 1000
dataCh := make(chan int, 100)
stopCh := make(chan struct{})
// It must be a buffered channel.
toStop := make(chan string, 1)
var stoppedStr string
// moderator
go func() {
stoppedStr = <-toStop
fmt.Println(stoppedStr)
close(stopCh)
}()
// senders
for i := 0; i < NumSenders; i++ {
go func(id string) {
for {
value := rand.Intn(Max)
if value == 0 {
select {
case toStop <- "sender#" + id:
default:
}
return
}
select {
case <-stopCh:
return
case dataCh <- value:
}
}
}(strconv.Itoa(i))
}
// receivers
for i := 0; i < NumReceivers; i++ {
go func(id string) {
for {
select {
case <-stopCh:
return
case value := <-dataCh:
if value == Max-1 {
select {
case toStop <- "receiver#" + id:
default:
}
return
}
fmt.Println(value)
}
}
}(strconv.Itoa(i))
}
select {
case <-time.After(time.Second):
}
}
代码里 toStop 就是中间人的角色,使用它来接收 senders 和 receivers 发送过来的关闭 dataCh 请求。
这个例子可以在 sender 和 receiver 端都发送关闭信号,通过 toStop 这个中间人来传递关闭信号,接收到之后关闭 stopCh。这里需要注意将 toStop 定义为带缓冲的 channel,若是不带缓冲,可能会出现 <-toStop 这个接收协程还未跑起来时,就已经有其他协程向其发送了 toStop<-xx 关闭信号。
这时在 sender 或 receiver 的 select 分支就可能走 default 语句,导致逻辑错误。
这个例子中,简单点的做法可以给 toStop
设置缓存为 sender
与 receiver
的和,就可以简写为如下:
...
toStop := make(chan string, NumReceivers + NumSenders)
...
value := rand.Intn(Max)
if value == 0 {
toStop <- "sender#" + id
return
}
...
if value == Max-1 {
toStop <- "receiver#" + id
return
}
...
直接向 toStop 发送请求,因为 toStop 容量足够大,所以不用担心阻塞,自然也就不用 select 语句再加一个 default case 来避免阻塞。
可以看到,这里同样没有真正关闭 dataCh,原样同第 3 种情况。
channel 的注意点
channel 的声明必须使用 make 关键字,不能直接 var c chan int,这样得到的是 nil channel
不能向 nil channel 发送数据
var c chan int
c <- 1 // panic
四、总结
关闭 channel 的基本法则:
-
单 sender 的情况下,都可以直接在 sender 端关闭 channel。
-
多 sender 的情况下,可以增加一个传递关闭信号的 channel 专门用于关闭数据传输的 channel。
原则:不要从接收端关闭 channel,也不要在有多个发送端时,主动关闭 channel。
本质:已关闭的 channel 不能再关闭(或者再向其发送数据)。
标签:case,sender,优雅,Golang,关闭,receiver,dataCh,channel From: https://www.cnblogs.com/cheyunhua/p/17497224.html