多线程与多进程本质的区别在于,多线程的内存空间是共享的,多进程是每一个进程都会独立开辟一块内存空间。如果我们运行的多个任务是完全独立的,那么在资源足够的情况下并发还是并行方案都无所谓了。但如果我们的多个任务之间有内在联系,那任务间的通信就是个问题了。由于groutine兼具多线程与多进程的特性,所以groutine也具备了多进程内存独立的特点。这时候我们就需要channel工具,来帮助groutine实现任务之间的通信。
以典型的生产者和消费者模型为例。一家餐馆: 1个厨师,2个食客
package main
import (
"fmt"
"sync"
"time"
)
// 声明WaitGroup,用于确保主groutine一定晚于子groutine结束
var wg sync.WaitGroup
// 将通道类型作为参数传递给Chef函数
func Chef(ch chan<- int) {
// defer函数的执行顺序是反向的,先出现的defer后运行
defer wg.Done()
// 函数结束前关闭通道的使用,避免因通道一直开启hold住代码
// 确定写完数据就关闭通道,通道只能被关闭一次
defer close(ch)
for i := 1; i < 6; i++ {
// 向通道中推送数据
ch <- i
fmt.Printf("厨师制作第 < %d > 盘菜品\n", i)
time.Sleep(time.Second * 1)
}
}
// 单向chan只允许取出数据
func Comsumer_1(ch <-chan int) {
defer wg.Done()
// 获取通道的内数据的方式一
for {
dish, ok := <-ch
// 当通道数据内没有数据时,则ok返回false
if !ok {
break
}
fmt.Printf("食客 [ %d ] 号吃了第 < %d > 盘菜品>\n", 1, dish)
}
}
func Comsumer_2(ch <-chan int) {
defer wg.Done()
// 获取通道的内数据的方式二
// 只返回一个通道重点数据值,当通道为空时自动结束for循环
for dish := range ch {
fmt.Printf("食客 [ %d ] 号 已经吃了第 < %d > 盘菜品>\n", 2, dish)
time.Sleep(time.Second * 2)
}
}
func main() {
// 声明一个可以缓存3个数据的channel, 允许传输的数据类型是int
ch := make(chan int, 3)
wg.Add(3)
go Chef(ch)
go Comsumer_1(ch)
go Comsumer_2(ch)
wg.Wait()
fmt.Println("程序执行结束")
}
执行结果,2个食客任务交替消费厨师任务推送进通道的数据。
% go run main.go
厨师制作第 < 1 > 盘菜品
食客 [ 2 ] 号 已经吃了第 < 1 > 盘菜品>
厨师制作第 < 2 > 盘菜品
食客 [ 1 ] 号吃了第 < 2 > 盘菜品>
厨师制作第 < 3 > 盘菜品
食客 [ 1 ] 号吃了第 < 3 > 盘菜品>
厨师制作第 < 4 > 盘菜品
食客 [ 2 ] 号 已经吃了第 < 4 > 盘菜品>
厨师制作第 < 5 > 盘菜品
食客 [ 1 ] 号吃了第 < 5 > 盘菜品>
程序执行结束
groutine池
虽然创建和销毁groutine的开销很小,但是我们依然希望可以在任务很多的时候,把groutine数量控制在一定范围内,循环使用避免无意义的性能开销。根据上面餐厅的案例,我们让厨师产生大量的食物放到通道内,而食客大数量保持与cpu内核数相等。代码如下:
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
// 声明WaitGroup,用于确保主groutine一定晚于子groutine结束
var wg sync.WaitGroup
// 将通道类型作为参数传递给Chef函数
func Chef(ch chan<- int) {
defer close(ch)
// 厨师制作10盘磁盘,放入通道
for i := 1; i < 11; i++ {
// 向通道中推送数据
ch <- i
fmt.Printf("厨师制作第 < %d > 盘菜品\n", i)
time.Sleep(time.Second * 1)
}
}
// 单向chan只允许取出数据
func Comsumer(cost_num int, ch <-chan int) {
defer wg.Done()
// 循环从通道中获取数据,直到取完
for {
dish, ok := <-ch
// 当通道数据内没有数据时,则ok返回false
if !ok {
break
}
fmt.Printf("食客 [ %d ] 号吃了第 < %d > 盘菜品>\n", cost_num+1, dish)
time.Sleep(time.Second * 2)
}
}
func main() {
// 声明一个可以缓存3个数据的channel, 允许传输的数据类型是int
ch := make(chan int, 3)
// 获取当前电脑的CPU个书
cpu_num := runtime.NumCPU()
fmt.Printf("当前电脑CPU内核数为: %v\n", cpu_num)
// 主goroutine不能操作通道,必须使用子groutine才能读写通道
// 启动一个负责生产的子groutine
go Chef(ch)
// 启动groutine数目等于cpu内核数
wg.Add(cpu_num)
for num := 0; num < cpu_num; num++ {
// 启动多个食客groutine,并发从通道拿去食物。
// 通道取完后,才停止各个食客任务
go Comsumer(num, ch)
}
wg.Wait()
fmt.Println("程序执行结束")
}
执行结果,最多4个食客争相从通道中消费了10个菜品
% go run main.go
当前电脑CPU内核数为: 4
厨师制作第 < 1 > 盘菜品
食客 [ 4 ] 号吃了第 < 1 > 盘菜品>
厨师制作第 < 2 > 盘菜品
食客 [ 2 ] 号吃了第 < 2 > 盘菜品>
厨师制作第 < 3 > 盘菜品
食客 [ 1 ] 号吃了第 < 3 > 盘菜品>
厨师制作第 < 4 > 盘菜品
食客 [ 3 ] 号吃了第 < 4 > 盘菜品>
厨师制作第 < 5 > 盘菜品
食客 [ 4 ] 号吃了第 < 5 > 盘菜品>
厨师制作第 < 6 > 盘菜品
食客 [ 2 ] 号吃了第 < 6 > 盘菜品>
厨师制作第 < 7 > 盘菜品
食客 [ 1 ] 号吃了第 < 7 > 盘菜品>
厨师制作第 < 8 > 盘菜品
食客 [ 3 ] 号吃了第 < 8 > 盘菜品>
厨师制作第 < 9 > 盘菜品
食客 [ 4 ] 号吃了第 < 9 > 盘菜品>
厨师制作第 < 10 > 盘菜品
食客 [ 2 ] 号吃了第 < 10 > 盘菜品>
程序执行结束
总结:
- 通道只能被关闭一次,并且关闭的通道依然可以被其他任务获取数据。所以写操作完成就可以关闭隧道了
- Comsumer_1(ch <-chan int) 这种方式声明的是只能单向读取的通道,若声明为 Comsumer_1(ch chan int) 没有箭头指向的话,就是个双向通道,在函数中可以读写
- Chef(ch chan<- int) 这种方式就是只能写数据单向通道,数据类型是int
- ch <- i 是向通道中写数据。 i, ok <- ch 是从通道中读数据,通道为空的时候ok的值返回false
- ch := make(chan int, 3) 生命都是带3个数据缓存的通道,即在通道未达到缓存上限之前,生产者把数据放到通道既可进行下一步运行。若声明为ch := make(chan int) 则表示声明的是无缓存的同步通道。即生产者把消息放入通道后,生产者流程会hold住,直到有消费者来取走消息。
- 只有子groutine可以操作通道