代码源自于https://github.com/lotusirous/go-concurrency-patterns
自此对各个示例代码进行调试。
1-boring
package main
import (
"fmt"
"math/rand"
"time"
)
func boring(msg string) {
for i := 0; ; i++ {
fmt.Println(msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}
func main() {
// after run this line, the main goroutine is finished.
// main goroutine is a caller. It doesn't wait for func boring finished
// Thus, we don't see anything
go boring("boring!") // spawn a goroutine. (1)
// To solve it, we can make the main go routine run forever by `for {}` statement.
// for {
// }
// A little more interesting is the main goroutine exit. the program also exited
// This code hang
fmt.Println("I'm listening")
time.Sleep(2 * time.Second)
fmt.Println("You're boring. I'm leaving")
// However, the main goroutine and boring goroutine does not communicate each other.
// Thus, the above code is cheated because the boring goroutine prints to stdout by its own function.
// the line `boring! 1` that we see on terminal is the output from boring goroutine.
// real conversation requires a communication
}
第一个示例,引入go中的协程。使用go关键字即可,如果主协程不睡眠2s,控制台就不会显示调用的协程输出了。
2-chan
package main
import (
"fmt"
"math/rand"
"time"
)
// now, the boring function additional parametter
// `c chan string` is a channel
func boring(msg string, c chan string) {
for i := 0; ; i++ {
// send the value to channel
// it also waits for receiver to be ready
c <- fmt.Sprintf("%s %d", msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}
func main() {
// init our channel
c := make(chan string)
// placeholdering our goroutine
go boring("boring!", c)
for i := 0; i < 5; i++ {
// <-c read the value from boring function
// <-c waits for a value to be sent
fmt.Printf("You say: %q\n", <-c)
}
fmt.Println("You're boring. I'm leaving")
}
输出:
You say: "boring! 0"
You say: "boring! 1"
You say: "boring! 2"
You say: "boring! 3"
You say: "boring! 4"
You're boring. I'm leaving
示例2初步使用了chan,可以实现协程间通信。无缓存的chan,没有数据就会阻塞读取,有数据就阻塞写入,所以协程写数据,主协程才能读取数据。交替执行
3-generator
package main
import (
"fmt"
"math/rand"
"time"
)
// boring is a function that returns a channel to communicate with it.
// <-chan string means receives-only channel of string.
func boring(msg string) <-chan string {
c := make(chan string)
// we launch goroutine inside a function
// that sends the data to channel
go func() {
// The for loop simulate the infinite sender.
for i := 0; i < 10; i++ {
c <- fmt.Sprintf("%s %d", msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
// The sender should close the channel
close(c)
}()
return c // return a channel to caller.
}
func main() {
joe := boring("Joe")
ahn := boring("Ahn")
// This loop yields 2 channels in sequence
for i := 0; i < 10; i++ {
fmt.Println(<-joe)
fmt.Println(<-ahn)
}
// or we can simply use the for range
// for msg := range joe {
// fmt.Println(msg)
// }
fmt.Println("You're both boring. I'm leaving")
}
输出:
Joe 0
Ahn 0
Joe 1
Ahn 1
Joe 2
Ahn 2
Joe 3
Ahn 3
Joe 4
Ahn 4
Joe 5
Ahn 5
Joe 6
Ahn 6
Joe 7
Ahn 7
Joe 8
Ahn 8
Joe 9
Ahn 9
You're both boring. I'm leaving
示例3和示例2貌似没有太多的区别,只是把chan作为函数返回值了,虽然,返回的时候chan的数据还没有准备好。
package main
import (
"fmt"
"math/rand"
"time"
)
// the boring function return a channel to communicate with it.
func boring(msg string) <-chan string { // <-chan string means receives-only channel of string.
c := make(chan string)
go func() { // we launch goroutine inside a function.
for i := 0; ; i++ {
c <- fmt.Sprintf("%s %d", msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}()
return c // return a channel to caller.
}
// <-chan string only get the receive value
// fanIn spawns 2 goroutines to reads the value from 2 channels
// then it sends to value to result channel( `c` channel)
func fanIn(c1, c2 <-chan string) <-chan string {
c := make(chan string)
go func() {
for { // infinite loop to read value from channel.
v1 := <-c1 // read value from c2. This line will wait when receiving value.
c <- v1
//这里和下面的那个一样,改成 c<- <-c1也是没问题的
}
}()
go func() {
for {
c <- <-c2 // read value from c2 and send it to c
}
}()
return c
}
func fanInSimple(cs ...<-chan string) <-chan string {
c := make(chan string)
for _, ci := range cs { // spawn channel based on the number of input channel
go func(cv <-chan string) { // cv is a channel value
for {
c <- <-cv
}
}(ci) // send each channel to
}
return c
}
func main() {
// merge 2 channels into 1 channel
// c := fanIn(boring("Joe"), boring("Ahn"))
c := fanInSimple(boring("Joe"), boring("Ahn"))
for i := 0; i < 5; i++ {
fmt.Println(<-c) // now we can read from 1 channel
}
fmt.Println("You're both boring. I'm leaving")
}
输出:(多次运行,结果不一定相同,也和协程的调度有关系。)
Joe 0
Ahn 0
Ahn 1
Joe 1
Joe 2
You're both boring. I'm leaving
分析:这个在之前的基础上,又变复杂了很多。一看这架子,就知道从chan里面读取5次数据。因为调用了2次boring,所以fanInSimple里面循环了两次,也就是Joe和Ahn都可以往C里面写数据,发生了争抢,所以每次执行的结果会有不同。最后,两个协程都阻塞了,等主协程结束再一起被结束。
package main
import (
"fmt"
"math/rand"
"time"
)
type Message struct {
str string
wait chan bool
}
func fanIn(inputs ...<-chan Message) <-chan Message {
c := make(chan Message)
for i := range inputs {
input := inputs[i]
go func() {
for {
c <- <-input
}
}()
}
return c
}
// the boring function return a channel to communicate with it.
func boring(msg string) <-chan Message { // <-chan Message means receives-only channel of Message.
c := make(chan Message)
waitForIt := make(chan bool) // share between all messages
go func() { // we launch goroutine inside a function.
for i := 0; ; i++ {
c <- Message{
str: fmt.Sprintf("%s %d", msg, i),
wait: waitForIt,
}
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
// every time the goroutine send message.
// This code waits until the value to be received.
<-waitForIt
}
}()
return c // return a channel to caller.
}
func main() {
// merge 2 channels into 1 channel
c := fanIn(boring("Joe"), boring("Ahn"))
for i := 0; i < 5; i++ {
msg1 := <-c // wait to receive message
fmt.Println(msg1.str)
msg2 := <-c
fmt.Println(msg2.str)
// each go routine have to wait
msg1.wait <- true // main goroutine allows the boring goroutine to send next value to message channel.
msg2.wait <- true
}
fmt.Println("You're both boring. I'm leaving")
}
输出:
Joe 0
Ahn 0
Joe 1
Ahn 1
Ahn 2
Joe 2
Ahn 3
Joe 3
Ahn 4
Joe 4
You're both boring. I'm leaving
分析:这次把chan封装成了一个结构体。chan本身是一个指针。首先调用两次boring函数,相当于创建了Joe和Ahn对象,然后以此创建了C对象,让Joe和Ahn都可以写数据给C。在main的循环前,C就已经有数据了,所以进入循环后可以直接输出,Joe输出后,Joe进入阻塞,所以Ahn写入数据,再阻塞,又输出到了控制台。然后循环里面按照顺序写入数据到Joe和Ahn的wait里面,通知他们继续写数据给C。由此,实现了交替输出。
代码:
package main
import (
"fmt"
"math/rand"
"time"
)
// the boring function return a channel to communicate with it.
func boring(msg string) <-chan string { // <-chan string means receives-only channel of string.
c := make(chan string)
go func() { // we launch goroutine inside a function.
for i := 0; ; i++ {
c <- fmt.Sprintf("%s %d", msg, i)
time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
}
}()
return c // return a channel to caller.
}
func main() {
c := boring("Joe")
// timeout for the whole conversation
timeout := time.After(5 * time.Second)
for {
select {
case s := <-c:
fmt.Println(s)
case <-timeout:
fmt.Println("You talk too much.")
return
}
}
}
输出:多次执行,输出可能不同,因为延迟是随机的。
Joe 0
Joe 1
Joe 2
Joe 3
Joe 4
Joe 5
Joe 6
Joe 7
Joe 8
You talk too much.
分析:这个比较简单,主要介绍了select的用法。select会从分支里面选择一个当前可以执行的分支来运行。
本段代码就是在延迟5s前,每次选择C分支,从C读取。由于C所在协程会随机睡眠,所以写入次数是不确定的。
package main
import (
"fmt"
"math/rand"
"time"
)
// the boring function return a channel to communicate with it.
func boring(msg string, quit chan string) <-chan string { // <-chan string means receives-only channel of string.
c := make(chan string)
go func() { // we launch goroutine inside a function.
for i := 0; ; i++ {
select {
case c <- fmt.Sprintf("%s %d", msg, i):
// do nothing
case <-quit:
fmt.Println("clean up")
quit <- "See you!"
return
}
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}()
return c // return a channel to caller
}
func main() {
quit := make(chan string)
c := boring("Joe", quit)
for i := 3; i >= 0; i-- {
fmt.Println(<-c)
}
quit <- "Bye"
fmt.Println("Joe say:", <-quit)
}
输出:
Joe 0
Joe 1
Joe 2
Joe 3
clean up
Joe say: See you!
分析:这个仍然是在前面的基础上变复杂了一些。main决定输出次数,boring决定输出内容。仍然是介绍select的用法。由于quit最后才写入,所以boring里面的select只能写入C。就有了main循环的输出
package main
import (
"fmt"
)
func f(left, right chan int) {
left <- 1 + <-right // get the value from the right and add 1 to it
}
func main() {
const n = 1000
leftmost := make(chan int)
left := leftmost
right := leftmost
for i := 0; i < n; i++ {
right = make(chan int)
go f(left, right)
left = right
}
go func(c chan int) { c <- 1 }(right)
fmt.Println(<-leftmost)
}
输出:1001。
分析:修改n的值,输出内容也随之变化。最后的输出结果就是n+1的值。过程如下:
首先是创建了3个int的chan。随后是循环,循环里面创建了n个chan,并且创建了n个协程,重点是,n个协程都是阻塞态,因为不能从right读取数据。直到循环结束,第1000个协程可以执行,然后是999...1。第一个协程执行完毕,left里面的值就是1001。随后输出。可以理解成一种类似于栈的效果,只有后面的协程执行完毕,才能执行前面的协程(可以对每个协程的参数进行分析,上一个协程的right,就是下一个协程的left)。第一个协程,实质就是写入数据到leftmost.
package main
import (
"fmt"
"math/rand"
"time"
)
type Result string
type Search func(query string) Result
var (
Web = fakeSearch("web")
Image = fakeSearch("image")
Video = fakeSearch("video")
)
func fakeSearch(kind string) Search {
return func(query string) Result {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
return Result(fmt.Sprintf("%s result for %q\n", kind, query))
}
}
// it invokes serially Web, Image and Video appending them to the Results
func Google(query string) (results []Result) {
results = append(results, Web(query))
results = append(results, Image(query))
results = append(results, Video(query))
return
}
func main() {
rand.Seed(time.Now().UnixNano())
start := time.Now()
results := Google("golang")
elapsed := time.Since(start)
fmt.Println(results)
fmt.Println(elapsed)
}
输出:由于使用了随机数,所以执行结果每次不同。
[web result for "golang"
image result for "golang"
video result for "golang"
]
233.6904ms
分析:就是3次休眠时间的总和。
package main
import (
"fmt"
"math/rand"
"time"
)
type Result string
type Search func(query string) Result
var (
Web = fakeSearch("web")
Image = fakeSearch("image")
Video = fakeSearch("video")
)
func fakeSearch(kind string) Search {
return func(query string) Result {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
return Result(fmt.Sprintf("%s result for %q\n", kind, query))
}
}
func Google(query string) []Result {
c := make(chan Result)
// each search performs in a goroutine
go func() {
c <- Web(query)
}()
go func() {
c <- Image(query)
}()
go func() {
c <- Video(query)
}()
var results []Result
for i := 0; i < 3; i++ {
results = append(results, <-c)
}
return results
}
func main() {
rand.Seed(time.Now().UnixNano())
start := time.Now()
results := Google("golang")
elapsed := time.Since(start)
fmt.Println(results)
fmt.Println(elapsed)
}
输出:由于使用了随机数,所以执行结果每次不同。
[web result for "golang"
video result for "golang"
image result for "golang"
]
53.0727ms
分析:使用协程,让3次搜索同时进行,然后抢占式的写入数据。搜索时间大大缩短。
package main
import (
"fmt"
"math/rand"
"time"
)
type Result string
type Search func(query string) Result
var (
Web = fakeSearch("web")
Image = fakeSearch("image")
Video = fakeSearch("video")
)
func fakeSearch(kind string) Search {
return func(query string) Result {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
return Result(fmt.Sprintf("%s result for %q\n", kind, query))
}
}
// I don't want to wait for slow server
func Google(query string) []Result {
c := make(chan Result)
// each search performs in a goroutine
go func() {
c <- Web(query)
}()
go func() {
c <- Image(query)
}()
go func() {
c <- Video(query)
}()
var results []Result
// the global timeout for 3 queries
// it means after 50ms, it ignores the result from the server that taking response greater than 50ms
//
timeout := time.After(50 * time.Millisecond)
for i := 0; i < 3; i++ {
select {
case r := <-c:
results = append(results, r)
// this line ignore the slow server.
case <-timeout:
fmt.Println("timeout")
return results
}
}
return results
}
func main() {
rand.Seed(time.Now().UnixNano())
start := time.Now()
results := Google("golang")
elapsed := time.Since(start)
fmt.Println(results)
fmt.Println(elapsed)
}
输出:由于使用了随机数,所以执行结果每次不同。
[video result for "golang"
image result for "golang"
web result for "golang"
]
30.0635ms
分析:使用了select,以控制搜索时间,增加用户体验。
package main
import (
"fmt"
"math/rand"
"time"
)
type Result string
type Search func(query string) Result
var (
Web1 = fakeSearch("web1")
Web2 = fakeSearch("web2")
Image1 = fakeSearch("image1")
Image2 = fakeSearch("image2")
Video1 = fakeSearch("video1")
Video2 = fakeSearch("video2")
)
func fakeSearch(kind string) Search {
return func(query string) Result {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
return Result(fmt.Sprintf("%s result for %q\n", kind, query))
}
}
// How do we avoid discarding result from the slow server.
// We duplicates to many instance, and perform parallel request.
func First(query string, replicas ...Search) Result {
c := make(chan Result)
for i := range replicas {
go func(idx int) {
c <- replicas[idx](query)
}(i)
}
// the magic is here. First function always waits for 1 time after receiving the result
return <-c
}
// I don't want to wait for slow server
func Google(query string) []Result {
c := make(chan Result)
// each search performs in a goroutine
go func() {
c <- First(query, Web1, Web2)
}()
go func() {
c <- First(query, Image1, Image2)
}()
go func() {
c <- First(query, Video1, Video2)
}()
var results []Result
// the global timeout for 3 queries
// it means after 50ms, it ignores the result from the server that taking response greater than 50ms
timeout := time.After(50 * time.Millisecond)
for i := 0; i < 3; i++ {
select {
case r := <-c:
results = append(results, r)
// this line ignore the slow server.
case <-timeout:
fmt.Println("timeout")
return results
}
}
return results
}
func main() {
rand.Seed(time.Now().UnixNano())
start := time.Now()
results := Google("golang")
elapsed := time.Since(start)
fmt.Println(results)
fmt.Println(elapsed)
}
输出:由于使用了随机数,所以执行结果每次不同。
timeout
[web1 result for "golang"
image1 result for "golang"
]
62.1344ms
分析:增加了搜索来源,抢占式的返回数据,优先返回先查询的数据。
代码:
package main
import (
"fmt"
"time"
)
type Ball struct{ hits int }
func player(name string, table chan *Ball) {
for {
ball := <-table // player grabs the ball
ball.hits++
fmt.Println(name, ball.hits)
time.Sleep(100 * time.Millisecond)
table <- ball // pass the ball
}
}
func main() {
table := make(chan *Ball)
go player("ping", table)
go player("pong", table)
table <- new(Ball) // game on; toss the ball
time.Sleep(1 * time.Second)
<-table // game over, grab the ball
panic("show me the stack")
}
输出:和电脑系统有关系,有时候能输出12个。
pong 1
ping 2
pong 3
ping 4
pong 5
ping 6
pong 7
ping 8
pong 9
ping 10
pong 11
panic: show me the stack
goroutine 1 [running]:
main.main()
D:/Git/myTest/main.go:28 +0xd2
分析:创建了ball chan的指针,协程创建ping和pong,都阻塞。直到main里面写入给table,然后才可以交替输出数据。
代码:
输出:
此段代码太长。
代码:
package main
import (
"crypto/md5"
"errors"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sort"
"sync"
)
// walkFiles starts a goroutine to walk the directory tree at root and send the
// path of each regular file on the string channel. It sends the result of the
// walk on the error channel. If done is closed, walkFiles abandons its work.
func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
paths := make(chan string)
errc := make(chan error, 1)
go func() { // HL
// Close the paths channel after Walk returns.
defer close(paths) // HL
// No select needed for this send, since errc is buffered.
errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error { // HL
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
select {
case paths <- path: // HL
case <-done: // HL
return errors.New("walk canceled")
}
return nil
})
}()
return paths, errc
}
// A result is the product of reading and summing a file using MD5.
type result struct {
path string
sum [md5.Size]byte
err error
}
// digester reads path names from paths and sends digests of the corresponding
// files on c until either paths or done is closed.
func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
for path := range paths { // HLpaths
data, err := ioutil.ReadFile(path)
select {
case c <- result{path, md5.Sum(data), err}:
case <-done:
return
}
}
}
// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents. If the directory walk
// fails or any read operation fails, MD5All returns an error. In that case,
// MD5All does not wait for inflight read operations to complete.
func MD5All(root string) (map[string][md5.Size]byte, error) {
// MD5All closes the done channel when it returns; it may do so before
// receiving all the values from c and errc.
done := make(chan struct{})
defer close(done)
paths, errc := walkFiles(done, root)
// Start a fixed number of goroutines to read and digest files.
c := make(chan result) // HLc
var wg sync.WaitGroup
const numDigesters = 20
wg.Add(numDigesters)
for i := 0; i < numDigesters; i++ {
go func() {
digester(done, paths, c) // HLc
wg.Done()
}()
}
go func() {
wg.Wait()
close(c) // HLc
}()
// End of pipeline. OMIT
m := make(map[string][md5.Size]byte)
for r := range c {
if r.err != nil {
return nil, r.err
}
m[r.path] = r.sum
}
// Check whether the Walk failed.
if err := <-errc; err != nil { // HLerrc
return nil, err
}
return m, nil
}
func main() {
// Calculate the MD5 sum of all files under the specified directory,
// then print the results sorted by path name.
m, err := MD5All(os.Args[1])
if err != nil {
fmt.Println(err)
return
}
var paths []string
for path := range m {
paths = append(paths, path)
}
sort.Strings(paths)
for _, path := range paths {
fmt.Printf("%x %s\n", m[path], path)
}
}
输出:
D:\Git\myTest>main.exe main.go
27a334717b87c9c6fd9a3993010740f2 main.go
分析:这段代码需要编译运行。可以好好学习这种方法以计算文件的md5了。
代码:
// This implemented a sample from the video
// https://www.youtube.com/watch?v=LSzR0VEraWw
package main
import (
"context"
"log"
"time"
)
func sleepAndTalk(ctx context.Context, d time.Duration, msg string) {
select {
case <-time.After(d):
log.Println(msg)
case <-ctx.Done():
log.Println(ctx.Err())
}
}
func main() {
log.Println("started")
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
time.AfterFunc(time.Second, cancel)
sleepAndTalk(ctx, 5*time.Second, "hello")
}
输出:
2023/06/01 08:34:23 started
2023/06/01 08:34:24 context canceled
分析:
直接相信bing,而不自己求证。在这里,我觉得相信他是对的。
代码:
package main
import "log"
// A channel-based ring buffer removes the oldest item when the queue is full
// Ref:
// https://tanzu.vmware.com/content/blog/a-channel-based-ring-buffer-in-go
func NewRingBuffer(inCh, outCh chan int) *ringBuffer {
return &ringBuffer{
inCh: inCh,
outCh: outCh,
}
}
// ringBuffer throttle buffer for implement async channel.
type ringBuffer struct {
inCh chan int
outCh chan int
}
func (r *ringBuffer) Run() {
for v := range r.inCh {
select {
case r.outCh <- v:
default:
<-r.outCh // pop one item from outchan
r.outCh <- v
}
}
close(r.outCh)
}
func main() {
inCh := make(chan int)
outCh := make(chan int, 4) // try to change outCh buffer to understand the result
rb := NewRingBuffer(inCh, outCh)
go rb.Run()
for i := 0; i < 10; i++ {
inCh <- i
}
close(inCh)
for res := range outCh {
log.Println(res)
}
}
输出:
2023/06/01 08:44:20 5
2023/06/01 08:44:20 6
2023/06/01 08:44:20 7
2023/06/01 08:44:20 8
2023/06/01 08:44:20 9
分析:本段代码很有意思,本来只是随便看看的,没想到看到这一篇,令人很感兴趣。现在看来,因为当时不理解对chan进行遍历的过程,所以不懂。
in是无缓冲的,读写都会阻塞,out则有4个缓冲。只有in在主协程写入了数据,协程才可以运行。执行顺序应该是这样的。in写入0,out写入0,而后in随意写入,子协程可以任意挑选,直到out被写满,才改变select挑选的分支,或者循环结束。无论如何,out里面都是最后几个数据。令人奇怪的是,最后输出的log总是out的长度+1.
我只能认为,由于调度的关系,in已经关闭(不影响循环的v读取),主协程已经输出了一次log,然后恰好子协程进行了循环,可以再写入out,所以才有了out的长度+1,真的有这么巧合吗,并且,每次执行都一样。修改out的大小也是一样。还要一点神奇的是,main里面的循环次数改成奇数,就只能输出out大小的log
难道每次的bing不是同一个,有时候不帮我写代码,有时候告诉我没有装go的编译器,现在又告诉我运行了一下。
我猜也是在线编译器。但是我用一样的编译器,也不能输出7条之多。
这个解释还是靠谱的,只是比我们的猜测更专业一些。
package main
import (
"log"
"sync"
)
// A channel-based ring buffer removes the oldest item when the queue is full
// Ref:
// https://tanzu.vmware.com/content/blog/a-channel-based-ring-buffer-in-go
func NewRingBuffer(inCh, outCh chan int) *ringBuffer {
return &ringBuffer{
inCh: inCh,
outCh: outCh,
}
}
// ringBuffer throttle buffer for implement async channel.
type ringBuffer struct {
inCh chan int
outCh chan int
}
func (r *ringBuffer) Run(wg *sync.WaitGroup) {
defer wg.Done() // notify the wait group that this goroutine is done
for v := range r.inCh {
select {
case r.outCh <- v: //读取in,写入out
default:
<-r.outCh // pop one item from outchan
r.outCh <- v //读取out,再读取in,写入out
}
}
close(r.outCh)
}
func main() {
inCh := make(chan int)
outCh := make(chan int, 4) // try to change outCh buffer to understand the result
rb := NewRingBuffer(inCh, outCh)
var wg sync.WaitGroup // create a wait group to synchronize goroutines
wg.Add(1) // add one goroutine to the wait group
go rb.Run(&wg) // pass the wait group pointer to the Run method
for i := 0; i < 10; i++ {
inCh <- i
}
close(inCh)
wg.Wait() // wait for all goroutines in the wait group to finish
for res := range outCh {
log.Println(res)
}
}
这是bing提出的优化,还是有效的。关闭out以后,不影响遍历。
代码:
// Credit:
// https://gobyexample.com/worker-pools
// Worker pool benefits:
// - Efficiency because it distributes the work across threads.
// - Flow control: Limit work in flight
// Disadvantage of worker:
// Lifetimes complexity: clean up and idle worker
// Principles:
// Start goroutines whenever you have the concurrent work to do.
// The goroutine should exit as soon as posible the work is done. This helps us
// to clean up the resources and manage the lifetimes correctly.
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Println("worker", id, "started job", j)
time.Sleep(time.Second)
fmt.Println("worker", id, "fnished job", j)
results <- j * 2
}
}
func workerEfficient(id int, jobs <-chan int, results chan<- int) {
// sync.WaitGroup helps us to manage the job
var wg sync.WaitGroup
for j := range jobs {
wg.Add(1)
// we start a goroutine to run the job
go func(job int) {
// start the job
fmt.Println("worker", id, "started job", job)
time.Sleep(time.Second)
fmt.Println("worker", id, "fnished job", job)
results <- job * 2
wg.Done()
}(j)
}
// With a help to manage the lifetimes of goroutines
// we can add more handler when a goroutine finished
wg.Wait()
}
func main() {
const numbJobs = 8
jobs := make(chan int, numbJobs)
results := make(chan int, numbJobs)
// 1. Start the worker
// it is a fixed pool of goroutines receive and perform tasks from a channel
// In this example, we define a fixed 3 workers
// they receive the `jobs` from the channel jobs
// we also naming the worker name with `w` variable.
for w := 1; w <= 3; w++ {
go workerEfficient(w, jobs, results)
}
// 2. send the work
// other goroutine sends the work to the channels
// in this example, the `main` goroutine sends the work to the channel `jobs`
for j := 1; j <= numbJobs; j++ {
jobs <- j
}
close(jobs)
fmt.Println("Closed job")
for a := 1; a <= numbJobs; a++ {
<-results
}
close(results)
}
输出:涉及到互斥,所以执行结果是不确定的。
Closed job
worker 2 started job 8
worker 2 started job 7
worker 1 started job 3
worker 1 started job 6
worker 1 started job 1
worker 2 started job 2
worker 1 started job 4
worker 1 started job 5
worker 1 fnished job 3
worker 2 fnished job 7
worker 2 fnished job 8
worker 1 fnished job 1
worker 2 fnished job 2
worker 1 fnished job 6
worker 1 fnished job 4
worker 1 fnished job 5