在并行编程中,处理循环迭代时常用的并发模型有几种:
-
Worker Pool(工作池):
- 描述:创建固定数量的工作 goroutine,这些 goroutine 从共享的任务队列中获取任务并执行。
- 优点:控制并发量,避免过多 goroutine 导致资源耗尽。
- 示例:
package main import ( "fmt" "sync" ) func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) { defer wg.Done() for job := range jobs { fmt.Printf("Worker %d processing job %d\n", id, job) results <- job * 2 } } func main() { const numWorkers = 3 jobs := make(chan int, 10) results := make(chan int, 10) var wg sync.WaitGroup // Start worker goroutines for i := 1; i <= numWorkers; i++ { wg.Add(1) go worker(i, jobs, results, &wg) } // Send jobs for j := 1; j <= 10; j++ { jobs <- j } close(jobs) // Wait for all workers to finish go func() { wg.Wait() close(results) }() // Collect results for result := range results { fmt.Println("Result:", result) } }
-
Map/Reduce:
- 描述:将数据集合分成多个部分并行处理,然后将结果合并。
- 优点:适合大规模数据处理任务。
- 示例:
package main import ( "fmt" "sync" ) func mapWorker(data []int, results chan<- int, wg *sync.WaitGroup) { defer wg.Done() sum := 0 for _, v := range data { sum += v } results <- sum } func main() { data := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} chunkSize := 2 var wg sync.WaitGroup results := make(chan int, len(data)/chunkSize) // Divide data into chunks and process for i := 0; i < len(data); i += chunkSize { end := i + chunkSize if end > len(data) { end = len(data) } wg.Add(1) go mapWorker(data[i:end], results, &wg) } // Wait for all map workers to finish go func() { wg.Wait() close(results) }() // Collect results total := 0 for result := range results { total += result } fmt.Println("Total:", total) }
-
Fan-Out/Fan-In:
- 描述:多个 goroutine 处理来自同一通道的数据(Fan-Out),然后将结果汇总到一个通道中(Fan-In)。
- 优点:可以有效利用 CPU 核心,实现数据流的处理。
- 示例:
package main import ( "fmt" "sync" ) func generateNumbers(n int, out chan<- int) { for i := 0; i < n; i++ { out <- i } close(out) } func squareNumbers(in <-chan int, out chan<- int, wg *sync.WaitGroup) { defer wg.Done() for num := range in { out <- num * num } } func main() { numbers := make(chan int, 10) squared := make(chan int, 10) var wg sync.WaitGroup // Generate numbers go generateNumbers(10, numbers) // Start worker goroutines for i := 0; i < 3; i++ { wg.Add(1) go squareNumbers(numbers, squared, &wg) } // Close squared channel once all workers are done go func() { wg.Wait() close(squared) }() // Print squared numbers for result := range squared { fmt.Println(result) } }
这些模型可以根据任务的性质和要求选择合适的实现,来优化并发处理的性能和效率。
经典案例:生成图像缩略图并计算这些缩略图的总大小
func main() { images := []string{ "/Users/xxx/Pictures/avatar/1.jpeg", "/Users/xxx/Pictures/avatar/2.jpeg", "/Users/xxx/Pictures/avatar/3.jpeg", } ch := make(chan string) go func() { defer close(ch) for _, filename := range images { ch <- filename log.Println(filename + " sent .") } }() result := makeThumbnails6(ch) log.Printf("Total size of thumbnails: %d bytes\n", result) } func makeThumbnails6(filenames <-chan string) int64 { sizes := make(chan int64) var wg sync.WaitGroup for f := range filenames { log.Printf("%v receive .", f) wg.Add(1) go func(f string) { defer wg.Done() thumb, err := thumbnail.ImageFile(f) if err != nil { log.Println(err) return } info, _ := os.Stat(thumb) sizes <- info.Size() }(f) } go func() { wg.Wait() close(sizes) }() var total int64 for size := range sizes { total += size } return total }
这里面有几点需要注意的: 1: 闭包的变量: 这里的闭包函数把f 传递进去了,防止改变外部的f
go func(f string) { defer wg.Done() thumb, err := thumbnail.ImageFile(f) if err != nil { log.Println(err) return } info, _ := os.Stat(thumb) sizes <- info.Size() }(f)
2:为什么使用阻塞的size channel?( 外部启动第2个goroutine的时候, makeThumbnails6里面的goroutine执行比较快就结束返回了,关闭了 size channel 怎么办
有一个点很好奇:在makeThumbnails6中如果两次for循环间隔太大,是否导致该函数提前退出。
比如说filenames chan每隔1分钟发送一个消息。但 for f := range filenames 中goroutine只需要半分钟就执行完,看起来会在第二次循环前导致wg.Wait()解除,sizes chan关闭,从而makeThumbnails6未等待filenames chan关闭就返回了)
answer:不会;for f := range filenames 中启动的 goroutine 向 sizes 中发送消息会阻塞(无缓存channel),这样只有循环结束才会开始接收size信息(主goroutine),wg.done之后才会执行,
即wg等待的goroutine数量只会一直增加到最大,然后开始减少,直到wg.wait.
函数中,
makeThumbnails6sizes
channel 用于接收处理后的文件大小。由于 sizes
是一个阻塞的 channel,发送操作(sizes <- info.Size()
)会阻塞直到有 goroutine 从 channel 中接收数据,
这样可以避免在未处理完数据的情况下继续添加更多数据。
3: 这里为什么都是go func() { wg.Wait() close(squared) }() 不能直接在主进程中执行吗?
go func() { wg.Wait() close(sizes) }() var total int64 for size := range sizes { total += size }在并发编程中,使用
go func() { wg.Wait(); close(squared) }()
的主要目的是在主 goroutine 外部执行一些操作,以避免阻塞主进程。以下是具体原因:
-
避免阻塞主 goroutine:
- 在主 goroutine 中直接执行
wg.Wait(); close(squared)
会阻塞主进程,直到所有工作 goroutines 完成。如果这些工作 goroutines 处理的是耗时操作,主进程会因此停滞。 - 使用
go func()
可以异步执行这个操作,让主 goroutine 继续执行其他操作,例如接收结果或处理其他逻辑,而不会被wg.Wait()
阻塞。
- 在主 goroutine 中直接执行
-
确保关闭通道时所有 goroutines 已完成:
wg.Wait()
确保在关闭通道之前,所有的工作 goroutines 都已完成任务。关闭通道操作需要在所有数据处理完成后执行,以避免数据丢失或读取错误。- 通过将关闭通道的操作放在一个新的 goroutine 中,确保在所有工作 goroutines 完成后才关闭通道,这样做可以避免主 goroutine 等待过程中的复杂性。
-
分析代码结构:
- 使用
go func()
可以将通道关闭的逻辑与主程序的其他逻辑分开,使代码结构更清晰和简洁。主 goroutine 可以专注于接收和处理结果,而关闭通道的逻辑在一个独立的 goroutine 中完成。 -
wg.Wait()
: 这个调用会阻塞当前 goroutine,直到WaitGroup
的计数器减到 0,表示所有添加到WaitGroup
中的 goroutines 都已完成。(相当于一个常驻进场一直判断WaitGroup
的计数器是否减到 0,到0 才会关闭channel) - channel关闭了, 下面的for range 循环才会退出.( for range channel是 Go 程序中的一个循环结构,它在当前的 goroutine 中运行 ,但是这个循环退出的条件要等channel关闭才会退出)
- 使用
标签:wg,goroutine,模型,常见,channel,func,go,并非,Wait From: https://www.cnblogs.com/-cyh/p/18412119