注:本文所有函数名为中文名,并不符合代码规范,仅供读者理解参考。
Goroutine
Go程不是OS线程,也不是绿色线程(语言运行时管理的线程),而是更高级别的抽象,一种特殊的协程。是一种非抢占式的简单并发子goroutine(函数、闭包、方法)。不能被中断,但有多个point可以暂停或重新进入。
goroutine 在它们所创建的相同地址空间内执行,特别是在循环创建go程的时候,推荐将变量显式映射到闭包(引用外部作用域变量的函数)中。
fork-join 并发模型
Fork 在程序中的任意节点,子节支可以与父节点同时运行。join 在将来某个时候这些并发分支会合并在一起,这是保持程序正确性和消除竞争条件的关键。Go语言遵循 fork-join并发模型。
使用 go func 其实就是在创建 fork point,为了创建 join point,我们需要解决竞争条件。
sync.WaitGroup
func 竞争条件_解决() {
var wg sync.WaitGroup
var data int
wg.Add(1)
go func() {
defer wg.Done()
data++
}()
wg.Wait()
if data == 0 {
fmt.Println("Value", data)
} else {
fmt.Println("Value 不是 0")
}
}
通过 sync.WaitGroup 我们阻塞 main 直到 go 程退出后再让 main 继续执行,实现了 join point。可以理解为并发-安全计数器,经常配合循环使用。
这是一个同步访问共享内存的例子。使用前提是你不关心并发操作的结果,或者你有其他方法来收集它们的结果。
wg.Add(1) 是在帮助跟踪的goroutine之外完成的,如果放在匿名函数内部,会产生竞争条件。因为你不知道go程什么时候被调度。
sync.Mutex 互斥锁
type state struct {
lock sync.Mutex
count int
}
func 结构体修改状态_互斥锁() {
s := state{}
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
// s.lock.Lock()
defer wg.Done()
// defer s.lock.Unlock()
s.count++
}()
}
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
// s.lock.Lock()
defer wg.Done()
// defer s.lock.Unlock()
s.count--
}()
}
wg.Wait()
fmt.Println(s.count)
}
没有互斥锁的时候,会导致发生竞争现象,取消互斥锁的注释,最终结果为理想的0。
进入和退出一个临界区是有消耗的,所以一般人会尽量减少在临界区的时间。
sync.RWMutex 读写锁
本质和普通的互斥锁相同,但是可以保证在未锁的情况允许多个读消费者持有一个读锁,在读消费者非常多的情况下可以提高性能。
在多个读消费者的情况下,通常使用 RWMutex ,读消费者较少时,Mutex和RWMutex两者都可用。
Cond 同步多个go程
cond : 一个goroutine的集合点,等待或发布一个event。
多个go程暂停在某个point上,等待一个事件信号再继续执行。没有cond的时候是怎么做的,当然是for循环,但是这有个大问题。
func 无cond() {
isOK := false
go func() {
for isOK == false {
// time.Sleep(time.Microsecond) // bad method
// do something
}
fmt.Println("OK I finished")
}()
go func() {
for isOK == false {
// time.Sleep(time.Microsecond) // bad method
// do something
}
fmt.Println("OK I finished")
}()
time.Sleep(time.Second * 5)
isOK = true
select {}
}
这会消耗一整个CPU核心的所有周期,有些人会引入 time.Sleep 实际上这会让算法低效,这时候我们可以使用 cond。
func 有cond() {
var wg sync.WaitGroup
cond := sync.NewCond(&sync.Mutex{})
test := func() {
defer wg.Done()
defer cond.L.Unlock()
cond.L.Lock()
cond.Wait()
fmt.Println("something work...OK finished")
}
wg.Add(2)
go test()
go test()
time.Sleep(time.Second * 5)
cond.Broadcast() // 通知所有go程
// cond.Signal() // 通知等待时间最久的一个go程
wg.Wait()
}
cond运行时内部维护一个FIFO列表。与利用channel相比,cond类型性能要高很多。
Once 只允许一次
可以配合单例模式使用,将判断对象是否为null改为sync.Once用于创建唯一对象。
sync.Once只计算调用Do方法的次数,而不是多少次唯一调用Do方法。所以在必要情况下声明多个sync.Once变量而不是用一个。下面的例子输出 1
func 只调用一次() {
var once sync.Once
count := 0
once.Do(func() {
count++
})
once.Do(func() {
count--
})
fmt.Println(count)
}
Pool 池子
对象池模式是一种创建和提供可供使用的固定数量实例或Pool实例的方法。通常用于约束创建昂贵的场景,比如数据库连接,以便只创建固定数量的实例,但不确定数量的操作仍然可以请求访问这些场景。
使用pool的另一个原因是实例化的对象会被GC自动清理,而pool不会
- 可以通过限制创建的对象数量来节省主机内存。
- 提前加载获取引用到另一个对象所需的时间,比如建立服务器连接。
你的并发进程需要请求一个对象,但是在实例化之后很快地处理它们,或者在这些对象的构造可能会对内存产生负面影响,这时最好使用Pool设计模式。但是必须确保pool中对象是同质的,否则性能大打折扣。
注意事项
- 实例化 sync.Pool ,调用 New 方法创建成员变量是线程安全的。
- 收到来自Get的实例,不要对所接受的对象的状态做出任何假设。(同质,不需要做if判断)
- 当你用完了一个从Pool取出的对象时,一定要调用put,否则无法复用这个实例。通常情况下用defer完成。
- Pool内的分布必须大致均匀
type conn struct{}
func 对象池() {
pool := &sync.Pool{New: func() any {
time.Sleep(time.Millisecond * 250)
fmt.Println("创建连接对象")
return &conn{}
}}
for i := 0; i < 10; i++ {
pool.Put(pool.New())
}
fmt.Println("初始化结束")
c1 := pool.Get()
c2 := pool.Get()
pool.Put(c1)
pool.Put(c2)
}
Channel 通道
channel也可以用来同步内存访问,但最好用于在goroutine之间传递消息(channel是将goroutine绑定在一起的粘合剂)。双向 chan 变量名后缀加 Stream
带缓存的channel和不带缓存的channel声明是一样的
var dataStream chan interface{}
双向channel可以隐式转换成单向channel,这对函数返回单向通道很有用
var receiveChan <-chan interface{}
var sendChan chan<- interface{}
dataStream := make(chan interface{})
receiveChan = dataStream
sendChan = datraStream
go语言中channel是阻塞的,意味着channel内的数据被消费后,新的数据才可以写入。通过 <- 操作符的接受形式可以选择返回两个值。
salutation,ok := <-dataStream
当channel未关闭时,ok返回true,关闭后返回false。即使channel关闭了,也能读取到默认值,为了支持一个channel有单个上游写入,有多个下游读取。
模拟之前WaitGroup的例子
func 竞争条件_通道() {
var data int
var Stream chan interface{} = make(chan interface{})
go func() {
data++
Stream <- struct{}{}
}()
<-Stream
if data == 0 {
fmt.Println("Value", data)
} else {
fmt.Println("Value 不是 0")
}
}
模拟之前cond同步多个go程的例子
func channel代替cond() {
var wg sync.WaitGroup
Stream := make(chan interface{})
test := func() {
defer wg.Done()
<-Stream
fmt.Println("something work...OK finished")
}
wg.Add(1)
go test()
go test()
time.Sleep(time.Second * 5)
close(Stream)
wg.Wait()
}
在同一时间打开或关闭多个goroutine可以考虑用channel。
channel操作结果
操作 | Channel状态 | 结果 |
---|---|---|
Read | nil | 阻塞 |
打开且非空 | 输出值 | |
打开但空 | 阻塞 | |
关闭的 | 默认值,false | |
只写 | 编译错误 | |
Write | nil | 阻塞 |
打开但填满 | 阻塞 | |
打开但不满 | 写入 | |
关闭的 | panic | |
只读 | 编译错误 | |
close | nil | panic |
打开且非空 | 关闭Channel;仍然能读取通道数据,直到读取完毕返回默认值 | |
打开但空 | 关闭Channel;返回默认值 | |
关闭的 | panic | |
只读 | 编译错误 |
Channel 使用哲学
在正确的环境中配置Channel,分配channel的所有权。这里的所有权被定义为 实例化、写入和关闭channel的goroutine。重要的是弄清楚哪个goroutine拥有channel。
单向channel声明的是一种工具,允许我们区分所有者和使用者。一旦我们将channel所有者和非channel所有者区分开来,前面的表的结果会非常清晰。可以开始讲责任分配给哪些拥有channel的goroutine和不拥有channel的goroutine。
拥有channel的goroutine
- 实例化channel
- 执行写操作,或将所有权传递个另一个goroutine
- 关闭channel
- 执行这三件事,并通过只读channel把它们暴露出来。
使用channel的goroutine
- 知道channel是何时关闭的 => 检查第二个返回值
- 正确处理阻塞 =>取决于你的算法
尽量保持channel的所有权很小,消费者函数只能执行channel的读取方法,因此只需要知道它应该如何处理阻塞和channel的关闭。
func 通道使用哲学() {
// 所有权范围足够小,职责明确
chanOwner := func() <-chan int {
resultStream := make(chan int, 5)
go func() {
defer close(resultStream)
for i := 0; i < 5; i++ {
resultStream <- i
}
}()
return resultStream // 传递单向通道给另一个 goroutine
}
resultStream := chanOwner()
for result := range resultStream {
fmt.Println(result)
}
fmt.Println("Done")
}
Select 选择语句
Go语言运行时将在一组case语句中执行伪随机选择。
var c<-chan int // 注意是 nil,永远阻塞
select{
case <-c:
case <- time.After(1 * time.Second):
fmt.Println("Timed out.")
}
time.After函数通过传入time.Duration参数返回一个数值并写入channel。select允许加default语句,通常配合for-select循环一起使用,允许go程在等待另一个go程结果的同时,自己干一些事情。
GOMAXPROCS
通过修改 runtime.GOMAXPROCS 允许你修改OS线程的数量。一般是为了调试,添加OS线程来更频繁触发竞争条件。
参考资料
-
《Go语言并发之道》Katherine CoxBuday
-
《Go语言核心编程》李文塔
-
《Go语言高级编程》柴树彬、曹春辉