首页 > 其他分享 >go中的并发学习

go中的并发学习

时间:2023-06-01 09:55:25浏览次数:51  
标签:string boring 学习 并发 Joe func time go main

代码源自于https://github.com/lotusirous/go-concurrency-patterns

自此对各个示例代码进行调试。

1-boring

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func boring(msg string) {
	for i := 0; ; i++ {
		fmt.Println(msg, i)
		time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
	}
}

func main() {
	// after run this line, the main goroutine is finished.
	// main goroutine is a caller. It doesn't wait for func boring finished
	// Thus, we don't see anything
	go boring("boring!") // spawn a goroutine. (1)

	// To solve it, we can make the main go routine run forever by `for {}` statement.

	// for {
	// }

	// A little more interesting is the main goroutine exit. the program also exited
	// This code hang
	fmt.Println("I'm listening")
	time.Sleep(2 * time.Second)
	fmt.Println("You're boring. I'm leaving")

	// However, the main goroutine and boring goroutine does not communicate each other.
	// Thus, the above code is cheated because the boring goroutine prints to stdout by its own function.
	// the line `boring! 1` that we see on terminal is the output from boring goroutine.

	// real conversation requires a communication
}

第一个示例,引入go中的协程。使用go关键字即可,如果主协程不睡眠2s,控制台就不会显示调用的协程输出了。

2-chan

package main

import (
	"fmt"
	"math/rand"
	"time"
)

// now, the boring function additional parametter
// `c chan string` is a channel
func boring(msg string, c chan string) {
	for i := 0; ; i++ {
		// send the value to channel
		// it also waits for receiver to be ready
		c <- fmt.Sprintf("%s %d", msg, i)
		time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
	}
}

func main() {

	// init our channel
	c := make(chan string)
	// placeholdering our goroutine
	go boring("boring!", c)

	for i := 0; i < 5; i++ {
		// <-c read the value from boring function
		// <-c waits for a value to be sent
		fmt.Printf("You say: %q\n", <-c)
	}
	fmt.Println("You're boring. I'm leaving")

}

输出:

You say: "boring! 0"
You say: "boring! 1"
You say: "boring! 2"
You say: "boring! 3"
You say: "boring! 4"
You're boring. I'm leaving

示例2初步使用了chan,可以实现协程间通信。无缓存的chan,没有数据就会阻塞读取,有数据就阻塞写入,所以协程写数据,主协程才能读取数据。交替执行

3-generator

package main

import (
	"fmt"
	"math/rand"
	"time"
)

// boring is a function that returns a channel to communicate with it.
// <-chan string means receives-only channel of string.
func boring(msg string) <-chan string {
	c := make(chan string)
	// we launch goroutine inside a function
	// that sends the data to channel
	go func() {
		// The for loop simulate the infinite sender.
		for i := 0; i < 10; i++ {
			c <- fmt.Sprintf("%s %d", msg, i)
			time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
		}

		// The sender should close the channel
		close(c)

	}()
	return c // return a channel to caller.
}

func main() {

	joe := boring("Joe")
	ahn := boring("Ahn")

	// This loop yields 2 channels in sequence
	for i := 0; i < 10; i++ {
		fmt.Println(<-joe)
		fmt.Println(<-ahn)
	}

	// or we can simply use the for range
	// for msg := range joe {
	// 	fmt.Println(msg)
	// }
	fmt.Println("You're both boring. I'm leaving")

}

输出:

Joe 0
Ahn 0
Joe 1
Ahn 1
Joe 2
Ahn 2
Joe 3
Ahn 3
Joe 4
Ahn 4
Joe 5
Ahn 5
Joe 6
Ahn 6
Joe 7
Ahn 7
Joe 8
Ahn 8
Joe 9
Ahn 9
You're both boring. I'm leaving

示例3和示例2貌似没有太多的区别,只是把chan作为函数返回值了,虽然,返回的时候chan的数据还没有准备好。

4-fanin

package main

import (
	"fmt"
	"math/rand"
	"time"
)

// the boring function return a channel to communicate with it.
func boring(msg string) <-chan string { // <-chan string means receives-only channel of string.
	c := make(chan string)
	go func() { // we launch goroutine inside a function.
		for i := 0; ; i++ {
			c <- fmt.Sprintf("%s %d", msg, i)
			time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
		}

	}()
	return c // return a channel to caller.
}

// <-chan string only get the receive value
// fanIn spawns 2 goroutines to reads the value from 2 channels
// then it sends to value to result channel( `c` channel)
func fanIn(c1, c2 <-chan string) <-chan string {
	c := make(chan string)
	go func() {
		for { // infinite loop to read value from channel.
			v1 := <-c1 // read value from c2. This line will wait when receiving value.
			c <- v1
            //这里和下面的那个一样,改成 c<- <-c1也是没问题的
		}
	}()
	go func() {
		for {
			c <- <-c2 // read value from c2 and send it to c
		}
	}()
	return c
}

func fanInSimple(cs ...<-chan string) <-chan string {
	c := make(chan string)
	for _, ci := range cs { // spawn channel based on the number of input channel

		go func(cv <-chan string) { // cv is a channel value
			for {
				c <- <-cv
			}
		}(ci) // send each channel to

	}
	return c
}

func main() {
	// merge 2 channels into 1 channel
	// c := fanIn(boring("Joe"), boring("Ahn"))
	c := fanInSimple(boring("Joe"), boring("Ahn"))

	for i := 0; i < 5; i++ {
		fmt.Println(<-c) // now we can read from 1 channel
	}
	fmt.Println("You're both boring. I'm leaving")
}

输出:(多次运行,结果不一定相同,也和协程的调度有关系。)

Joe 0
Ahn 0
Ahn 1
Joe 1
Joe 2
You're both boring. I'm leaving

分析:这个在之前的基础上,又变复杂了很多。一看这架子,就知道从chan里面读取5次数据。因为调用了2次boring,所以fanInSimple里面循环了两次,也就是Joe和Ahn都可以往C里面写数据,发生了争抢,所以每次执行的结果会有不同。最后,两个协程都阻塞了,等主协程结束再一起被结束。

5-restore-sequence

package main

import (
	"fmt"
	"math/rand"
	"time"
)

type Message struct {
	str  string
	wait chan bool
}

func fanIn(inputs ...<-chan Message) <-chan Message {
	c := make(chan Message)
	for i := range inputs {
		input := inputs[i]
		go func() {
			for {
				c <- <-input
			}
		}()
	}
	return c
}

// the boring function return a channel to communicate with it.
func boring(msg string) <-chan Message { // <-chan Message means receives-only channel of Message.
	c := make(chan Message)
	waitForIt := make(chan bool) // share between all messages
	go func() {                  // we launch goroutine inside a function.
		for i := 0; ; i++ {
			c <- Message{
				str:  fmt.Sprintf("%s %d", msg, i),
				wait: waitForIt,
			}
			time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)

			// every time the goroutine send message.
			// This code waits until the value to be received.
			<-waitForIt
		}

	}()
	return c // return a channel to caller.
}

func main() {
	// merge 2 channels into 1 channel
	c := fanIn(boring("Joe"), boring("Ahn"))

	for i := 0; i < 5; i++ {
		msg1 := <-c // wait to receive message
		fmt.Println(msg1.str)
		msg2 := <-c
		fmt.Println(msg2.str)

		// each go routine have to wait
		msg1.wait <- true // main goroutine allows the boring goroutine to send next value to message channel.
		msg2.wait <- true
	}
	fmt.Println("You're both boring. I'm leaving")
}

输出:

Joe 0
Ahn 0
Joe 1
Ahn 1
Ahn 2
Joe 2
Ahn 3
Joe 3
Ahn 4
Joe 4
You're both boring. I'm leaving

分析:这次把chan封装成了一个结构体。chan本身是一个指针。首先调用两次boring函数,相当于创建了Joe和Ahn对象,然后以此创建了C对象,让Joe和Ahn都可以写数据给C。在main的循环前,C就已经有数据了,所以进入循环后可以直接输出,Joe输出后,Joe进入阻塞,所以Ahn写入数据,再阻塞,又输出到了控制台。然后循环里面按照顺序写入数据到Joe和Ahn的wait里面,通知他们继续写数据给C。由此,实现了交替输出。

6-select-timeout

代码:

package main

import (
	"fmt"
	"math/rand"
	"time"
)

// the boring function return a channel to communicate with it.
func boring(msg string) <-chan string { // <-chan string means receives-only channel of string.
	c := make(chan string)
	go func() { // we launch goroutine inside a function.
		for i := 0; ; i++ {
			c <- fmt.Sprintf("%s %d", msg, i)
			time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
		}

	}()
	return c // return a channel to caller.
}

func main() {
	c := boring("Joe")

	// timeout for the whole conversation
	timeout := time.After(5 * time.Second)
	for {
		select {
		case s := <-c:
			fmt.Println(s)
		case <-timeout:
			fmt.Println("You talk too much.")
			return
		}
	}
}

输出:多次执行,输出可能不同,因为延迟是随机的。

Joe 0
Joe 1
Joe 2
Joe 3
Joe 4
Joe 5
Joe 6
Joe 7
Joe 8
You talk too much.

分析:这个比较简单,主要介绍了select的用法。select会从分支里面选择一个当前可以执行的分支来运行。

本段代码就是在延迟5s前,每次选择C分支,从C读取。由于C所在协程会随机睡眠,所以写入次数是不确定的。

7-quit-signal

package main

import (
	"fmt"
	"math/rand"
	"time"
)

// the boring function return a channel to communicate with it.
func boring(msg string, quit chan string) <-chan string { // <-chan string means receives-only channel of string.
	c := make(chan string)
	go func() { // we launch goroutine inside a function.
		for i := 0; ; i++ {
			select {
			case c <- fmt.Sprintf("%s %d", msg, i):
				// do nothing
			case <-quit:
				fmt.Println("clean up")
				quit <- "See you!"
				return
			}
			time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
		}

	}()
	return c // return a channel to caller
}

func main() {
	quit := make(chan string)
	c := boring("Joe", quit)
	for i := 3; i >= 0; i-- {
		fmt.Println(<-c)
	}
	quit <- "Bye"
	fmt.Println("Joe say:", <-quit)
}

输出:

Joe 0
Joe 1
Joe 2
Joe 3
clean up
Joe say: See you!

分析:这个仍然是在前面的基础上变复杂了一些。main决定输出次数,boring决定输出内容。仍然是介绍select的用法。由于quit最后才写入,所以boring里面的select只能写入C。就有了main循环的输出

8-daisy-chan

package main

import (
	"fmt"
)

func f(left, right chan int) {
	left <- 1 + <-right // get the value from the right and add 1 to it
}

func main() {
	const n = 1000
	leftmost := make(chan int)
	left := leftmost
	right := leftmost

	for i := 0; i < n; i++ {
		right = make(chan int)
		go f(left, right)
		left = right
	}

	go func(c chan int) { c <- 1 }(right)
	fmt.Println(<-leftmost)

}

输出:1001。

分析:修改n的值,输出内容也随之变化。最后的输出结果就是n+1的值。过程如下:

首先是创建了3个int的chan。随后是循环,循环里面创建了n个chan,并且创建了n个协程,重点是,n个协程都是阻塞态,因为不能从right读取数据。直到循环结束,第1000个协程可以执行,然后是999...1。第一个协程执行完毕,left里面的值就是1001。随后输出。可以理解成一种类似于栈的效果,只有后面的协程执行完毕,才能执行前面的协程(可以对每个协程的参数进行分析,上一个协程的right,就是下一个协程的left)。第一个协程,实质就是写入数据到leftmost.

9-google1.0

package main

import (
	"fmt"
	"math/rand"
	"time"
)

type Result string
type Search func(query string) Result

var (
	Web   = fakeSearch("web")
	Image = fakeSearch("image")
	Video = fakeSearch("video")
)

func fakeSearch(kind string) Search {
	return func(query string) Result {
		time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
		return Result(fmt.Sprintf("%s result for %q\n", kind, query))
	}
}

// it invokes serially Web, Image and Video appending them to the Results
func Google(query string) (results []Result) {
	results = append(results, Web(query))
	results = append(results, Image(query))
	results = append(results, Video(query))
	return
}

func main() {
	rand.Seed(time.Now().UnixNano())
	start := time.Now()
	results := Google("golang")
	elapsed := time.Since(start)
	fmt.Println(results)
	fmt.Println(elapsed)
}

输出:由于使用了随机数,所以执行结果每次不同。

[web result for "golang"  
 image result for "golang"
 video result for "golang"
]
233.6904ms

分析:就是3次休眠时间的总和。

10-google2.0

package main

import (
	"fmt"
	"math/rand"
	"time"
)

type Result string
type Search func(query string) Result

var (
	Web   = fakeSearch("web")
	Image = fakeSearch("image")
	Video = fakeSearch("video")
)

func fakeSearch(kind string) Search {
	return func(query string) Result {
		time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
		return Result(fmt.Sprintf("%s result for %q\n", kind, query))
	}
}

func Google(query string) []Result {
	c := make(chan Result)

	// each search performs in a goroutine
	go func() {
		c <- Web(query)
	}()
	go func() {
		c <- Image(query)
	}()
	go func() {
		c <- Video(query)
	}()

	var results []Result
	for i := 0; i < 3; i++ {
		results = append(results, <-c)
	}

	return results
}

func main() {
	rand.Seed(time.Now().UnixNano())
	start := time.Now()
	results := Google("golang")
	elapsed := time.Since(start)
	fmt.Println(results)
	fmt.Println(elapsed)
}

输出:由于使用了随机数,所以执行结果每次不同。

[web result for "golang"
 video result for "golang"
 image result for "golang"
]
53.0727ms

分析:使用协程,让3次搜索同时进行,然后抢占式的写入数据。搜索时间大大缩短。

11-google2.1

package main

import (
	"fmt"
	"math/rand"
	"time"
)

type Result string
type Search func(query string) Result

var (
	Web   = fakeSearch("web")
	Image = fakeSearch("image")
	Video = fakeSearch("video")
)

func fakeSearch(kind string) Search {
	return func(query string) Result {
		time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
		return Result(fmt.Sprintf("%s result for %q\n", kind, query))
	}
}

// I don't want to wait for slow server
func Google(query string) []Result {
	c := make(chan Result)

	// each search performs in a goroutine
	go func() {
		c <- Web(query)
	}()
	go func() {
		c <- Image(query)
	}()
	go func() {
		c <- Video(query)
	}()

	var results []Result

	// the global timeout for 3 queries
	// it means after 50ms, it ignores the result from the server that taking response greater than 50ms
	//
	timeout := time.After(50 * time.Millisecond)
	for i := 0; i < 3; i++ {
		select {
		case r := <-c:
			results = append(results, r)
		// this line ignore the slow server.
		case <-timeout:
			fmt.Println("timeout")
			return results
		}
	}

	return results
}

func main() {
	rand.Seed(time.Now().UnixNano())
	start := time.Now()
	results := Google("golang")
	elapsed := time.Since(start)
	fmt.Println(results)
	fmt.Println(elapsed)
}

输出:由于使用了随机数,所以执行结果每次不同。

[video result for "golang"
 image result for "golang"
 web result for "golang"
]
30.0635ms

分析:使用了select,以控制搜索时间,增加用户体验。

12-google3.0

package main

import (
	"fmt"
	"math/rand"
	"time"
)

type Result string
type Search func(query string) Result

var (
	Web1   = fakeSearch("web1")
	Web2   = fakeSearch("web2")
	Image1 = fakeSearch("image1")
	Image2 = fakeSearch("image2")
	Video1 = fakeSearch("video1")
	Video2 = fakeSearch("video2")
)

func fakeSearch(kind string) Search {
	return func(query string) Result {
		time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
		return Result(fmt.Sprintf("%s result for %q\n", kind, query))
	}
}

// How do we avoid discarding result from the slow server.
// We duplicates to many instance, and perform parallel request.
func First(query string, replicas ...Search) Result {
	c := make(chan Result)
	for i := range replicas {
		go func(idx int) {
			c <- replicas[idx](query)
		}(i)
	}
	// the magic is here. First function always waits for 1 time after receiving the result
	return <-c
}

// I don't want to wait for slow server
func Google(query string) []Result {
	c := make(chan Result)

	// each search performs in a goroutine
	go func() {
		c <- First(query, Web1, Web2)
	}()
	go func() {
		c <- First(query, Image1, Image2)
	}()
	go func() {
		c <- First(query, Video1, Video2)
	}()

	var results []Result

	// the global timeout for 3 queries
	// it means after 50ms, it ignores the result from the server that taking response greater than 50ms
	timeout := time.After(50 * time.Millisecond)
	for i := 0; i < 3; i++ {
		select {
		case r := <-c:
			results = append(results, r)
		// this line ignore the slow server.
		case <-timeout:
			fmt.Println("timeout")
			return results
		}
	}

	return results
}

func main() {
	rand.Seed(time.Now().UnixNano())
	start := time.Now()
	results := Google("golang")
	elapsed := time.Since(start)
	fmt.Println(results)
	fmt.Println(elapsed)

}

输出:由于使用了随机数,所以执行结果每次不同。

timeout
[web1 result for "golang"  
 image1 result for "golang"
]
62.1344ms

分析:增加了搜索来源,抢占式的返回数据,优先返回先查询的数据。

13-adv-pingpong

代码:

package main

import (
	"fmt"
	"time"
)

type Ball struct{ hits int }

func player(name string, table chan *Ball) {
	for {
		ball := <-table // player grabs the ball
		ball.hits++
		fmt.Println(name, ball.hits)
		time.Sleep(100 * time.Millisecond)
		table <- ball // pass the ball
	}
}

func main() {
	table := make(chan *Ball)

	go player("ping", table)
	go player("pong", table)
	table <- new(Ball) // game on; toss the ball
	time.Sleep(1 * time.Second)
	<-table // game over, grab the ball
	panic("show me the stack")

}

输出:和电脑系统有关系,有时候能输出12个。

pong 1
ping 2
pong 3
ping 4
pong 5
ping 6
pong 7
ping 8
pong 9
ping 10
pong 11
panic: show me the stack

goroutine 1 [running]:  
main.main()
        D:/Git/myTest/main.go:28 +0xd2

分析:创建了ball chan的指针,协程创建ping和pong,都阻塞。直到main里面写入给table,然后才可以交替输出数据。

14-adv-subscription

代码:


输出:


此段代码太长。

15-bounded-parallelism

代码:

package main

import (
	"crypto/md5"
	"errors"
	"fmt"
	"io/ioutil"
	"os"
	"path/filepath"
	"sort"
	"sync"
)

// walkFiles starts a goroutine to walk the directory tree at root and send the
// path of each regular file on the string channel.  It sends the result of the
// walk on the error channel.  If done is closed, walkFiles abandons its work.
func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
	paths := make(chan string)
	errc := make(chan error, 1)
	go func() { // HL
		// Close the paths channel after Walk returns.
		defer close(paths) // HL
		// No select needed for this send, since errc is buffered.
		errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error { // HL
			if err != nil {
				return err
			}
			if !info.Mode().IsRegular() {
				return nil
			}
			select {
			case paths <- path: // HL
			case <-done: // HL
				return errors.New("walk canceled")
			}
			return nil
		})
	}()
	return paths, errc
}

// A result is the product of reading and summing a file using MD5.
type result struct {
	path string
	sum  [md5.Size]byte
	err  error
}

// digester reads path names from paths and sends digests of the corresponding
// files on c until either paths or done is closed.
func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
	for path := range paths { // HLpaths
		data, err := ioutil.ReadFile(path)
		select {
		case c <- result{path, md5.Sum(data), err}:
		case <-done:
			return
		}
	}
}

// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents.  If the directory walk
// fails or any read operation fails, MD5All returns an error.  In that case,
// MD5All does not wait for inflight read operations to complete.
func MD5All(root string) (map[string][md5.Size]byte, error) {
	// MD5All closes the done channel when it returns; it may do so before
	// receiving all the values from c and errc.
	done := make(chan struct{})
	defer close(done)

	paths, errc := walkFiles(done, root)

	// Start a fixed number of goroutines to read and digest files.
	c := make(chan result) // HLc
	var wg sync.WaitGroup
	const numDigesters = 20
	wg.Add(numDigesters)
	for i := 0; i < numDigesters; i++ {
		go func() {
			digester(done, paths, c) // HLc
			wg.Done()
		}()
	}
	go func() {
		wg.Wait()
		close(c) // HLc
	}()
	// End of pipeline. OMIT

	m := make(map[string][md5.Size]byte)
	for r := range c {
		if r.err != nil {
			return nil, r.err
		}
		m[r.path] = r.sum
	}
	// Check whether the Walk failed.
	if err := <-errc; err != nil { // HLerrc
		return nil, err
	}
	return m, nil
}

func main() {
	// Calculate the MD5 sum of all files under the specified directory,
	// then print the results sorted by path name.
	m, err := MD5All(os.Args[1])
	if err != nil {
		fmt.Println(err)
		return
	}
	var paths []string
	for path := range m {
		paths = append(paths, path)
	}
	sort.Strings(paths)
	for _, path := range paths {
		fmt.Printf("%x  %s\n", m[path], path)
	}
}

输出:

D:\Git\myTest>main.exe main.go
27a334717b87c9c6fd9a3993010740f2  main.go

分析:这段代码需要编译运行。可以好好学习这种方法以计算文件的md5了。

16-context

代码:

// This implemented a sample from the video
// https://www.youtube.com/watch?v=LSzR0VEraWw
package main

import (
	"context"
	"log"
	"time"
)

func sleepAndTalk(ctx context.Context, d time.Duration, msg string) {
	select {
	case <-time.After(d):
		log.Println(msg)
	case <-ctx.Done():
		log.Println(ctx.Err())
	}
}

func main() {
	log.Println("started")
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)

	time.AfterFunc(time.Second, cancel)
	sleepAndTalk(ctx, 5*time.Second, "hello")
}

输出:

2023/06/01 08:34:23 started
2023/06/01 08:34:24 context canceled

分析:

image-20230601083457354

直接相信bing,而不自己求证。在这里,我觉得相信他是对的。

17-ring-buffer-channel

代码:

package main

import "log"

// A channel-based ring buffer removes the oldest item when the queue is full
// Ref:
// https://tanzu.vmware.com/content/blog/a-channel-based-ring-buffer-in-go

func NewRingBuffer(inCh, outCh chan int) *ringBuffer {
	return &ringBuffer{
		inCh:  inCh,
		outCh: outCh,
	}
}

// ringBuffer throttle buffer for implement async channel.
type ringBuffer struct {
	inCh  chan int
	outCh chan int
}

func (r *ringBuffer) Run() {
	for v := range r.inCh {
		select {
		case r.outCh <- v:
		default:
			<-r.outCh // pop one item from outchan
			r.outCh <- v
		}
	}
	close(r.outCh)
}

func main() {
	inCh := make(chan int)
	outCh := make(chan int, 4) // try to change outCh buffer to understand the result
	rb := NewRingBuffer(inCh, outCh)
	go rb.Run()

	for i := 0; i < 10; i++ {
		inCh <- i
	}

	close(inCh)

	for res := range outCh {
		log.Println(res)
	}

}

输出:

2023/06/01 08:44:20 5
2023/06/01 08:44:20 6
2023/06/01 08:44:20 7
2023/06/01 08:44:20 8
2023/06/01 08:44:20 9

分析:本段代码很有意思,本来只是随便看看的,没想到看到这一篇,令人很感兴趣。现在看来,因为当时不理解对chan进行遍历的过程,所以不懂。

in是无缓冲的,读写都会阻塞,out则有4个缓冲。只有in在主协程写入了数据,协程才可以运行。执行顺序应该是这样的。in写入0,out写入0,而后in随意写入,子协程可以任意挑选,直到out被写满,才改变select挑选的分支,或者循环结束。无论如何,out里面都是最后几个数据。令人奇怪的是,最后输出的log总是out的长度+1.

我只能认为,由于调度的关系,in已经关闭(不影响循环的v读取),主协程已经输出了一次log,然后恰好子协程进行了循环,可以再写入out,所以才有了out的长度+1,真的有这么巧合吗,并且,每次执行都一样。修改out的大小也是一样。还要一点神奇的是,main里面的循环次数改成奇数,就只能输出out大小的log

image-20230601090335465

难道每次的bing不是同一个,有时候不帮我写代码,有时候告诉我没有装go的编译器,现在又告诉我运行了一下。

image-20230601091556688

image-20230601091957844

我猜也是在线编译器。但是我用一样的编译器,也不能输出7条之多。

image-20230601092449658

这个解释还是靠谱的,只是比我们的猜测更专业一些。

package main

import (
	"log"
	"sync"
)

// A channel-based ring buffer removes the oldest item when the queue is full
// Ref:
// https://tanzu.vmware.com/content/blog/a-channel-based-ring-buffer-in-go

func NewRingBuffer(inCh, outCh chan int) *ringBuffer {
	return &ringBuffer{
		inCh:  inCh,
		outCh: outCh,
	}
}

// ringBuffer throttle buffer for implement async channel.
type ringBuffer struct {
	inCh  chan int
	outCh chan int
}

func (r *ringBuffer) Run(wg *sync.WaitGroup) {
	defer wg.Done() // notify the wait group that this goroutine is done
	for v := range r.inCh {
		select {
		case r.outCh <- v: //读取in,写入out
		default:
			<-r.outCh    // pop one item from outchan
			r.outCh <- v //读取out,再读取in,写入out
		}
	}
	close(r.outCh)
}

func main() {
	inCh := make(chan int)
	outCh := make(chan int, 4) // try to change outCh buffer to understand the result
	rb := NewRingBuffer(inCh, outCh)
	var wg sync.WaitGroup // create a wait group to synchronize goroutines
	wg.Add(1)             // add one goroutine to the wait group
	go rb.Run(&wg)        // pass the wait group pointer to the Run method

	for i := 0; i < 10; i++ {
		inCh <- i
	}

	close(inCh)

	wg.Wait() // wait for all goroutines in the wait group to finish

	for res := range outCh {
		log.Println(res)
	}

}

这是bing提出的优化,还是有效的。关闭out以后,不影响遍历。

18-worker-pool

代码:

// Credit:
// https://gobyexample.com/worker-pools

// Worker pool benefits:
// - Efficiency because it distributes the work across threads.
// - Flow control: Limit work in flight

// Disadvantage of worker:
// Lifetimes complexity: clean up and idle worker

// Principles:
// Start goroutines whenever you have the concurrent work to do.
// The goroutine should exit as soon as posible the work is done. This helps us
// to clean up the resources and manage the lifetimes correctly.
package main

import (
	"fmt"
	"sync"
	"time"
)

func worker(id int, jobs <-chan int, results chan<- int) {
	for j := range jobs {
		fmt.Println("worker", id, "started job", j)
		time.Sleep(time.Second)
		fmt.Println("worker", id, "fnished job", j)
		results <- j * 2
	}
}

func workerEfficient(id int, jobs <-chan int, results chan<- int) {
	// sync.WaitGroup helps us to manage the job
	var wg sync.WaitGroup
	for j := range jobs {

		wg.Add(1)
		// we start a goroutine to run the job
		go func(job int) {
			// start the job
			fmt.Println("worker", id, "started job", job)
			time.Sleep(time.Second)
			fmt.Println("worker", id, "fnished job", job)
			results <- job * 2
			wg.Done()

		}(j)

	}
	// With a help to manage the lifetimes of goroutines
	// we can add more handler when a goroutine finished
	wg.Wait()
}
func main() {
	const numbJobs = 8
	jobs := make(chan int, numbJobs)
	results := make(chan int, numbJobs)

	// 1. Start the worker
	// it is a fixed pool of goroutines receive and perform tasks from a channel

	// In this example, we define a fixed 3 workers
	// they receive the `jobs` from the channel jobs
	// we also naming the worker name with `w` variable.
	for w := 1; w <= 3; w++ {
		go workerEfficient(w, jobs, results)
	}

	// 2. send the work
	// other goroutine sends the work to the channels

	// in this example, the `main` goroutine sends the work to the channel `jobs`
	for j := 1; j <= numbJobs; j++ {
		jobs <- j
	}
	close(jobs)
	fmt.Println("Closed job")
	for a := 1; a <= numbJobs; a++ {
		<-results
	}
	close(results)

}

输出:涉及到互斥,所以执行结果是不确定的。

Closed job
worker 2 started job 8
worker 2 started job 7
worker 1 started job 3
worker 1 started job 6
worker 1 started job 1
worker 2 started job 2
worker 1 started job 4
worker 1 started job 5
worker 1 fnished job 3
worker 2 fnished job 7
worker 2 fnished job 8
worker 1 fnished job 1
worker 2 fnished job 2
worker 1 fnished job 6
worker 1 fnished job 4
worker 1 fnished job 5

分析:go workerEfficient(w, jobs, results)。给人的感觉就和创建了几个工人的对象一般。本来是创建3个工人,由于协程调度的关系,任务都被1和2抢占了。chan就是这么神奇,里面没有数据,就会阻塞遍历。直到主协程写完数据,关闭jobs,才结束循环,进而结束协程。本例主要介绍了sync.WaitGroup的用法。就像监工一样,不停的催促协程完成任务,而不退出。

看了这么多,感觉自己加深了理解,但是要自己写出来,还是很有难度的。

标签:string,boring,学习,并发,Joe,func,time,go,main
From: https://www.cnblogs.com/dayq/p/17448077.html

相关文章

  • 各个语言运行100万个并发任务需要多少内存?
    译者注:原文链接:https://pkolaczk.github.io/memory-consumption-of-async/Github项目地址:https://github.com/pkolaczk/async-runtimes-benchmarks正文在这篇博客文章中,我深入探讨了异步和多线程编程在内存消耗方面的比较,跨足了如Rust、Go、Java、C#、Python、Node.js和Elix......
  • 贝叶斯学习及共轭先验
    今天的主要任务是来理解共轭先验以及贝叶斯学习。最近在研究主题模型,里面用到了一些,另外在机器学习中,贝叶斯学习是重要的一个方向,所以有必要学习和掌握。Contents  1.贝叶斯学习  2.Beta分布及共轭先验  1.贝叶斯学习   首先,我从最简单的硬币投掷开始。现在给你一个......
  • 5.5. Java并发工具类(如CountDownLatch、CyclicBarrier等)
    5.5.1CountDownLatchCountDownLatch是一个同步辅助类,它允许一个或多个线程等待,直到其他线程完成一组操作。CountDownLatch有一个计数器,当计数器减为0时,等待的线程将被唤醒。计数器只能减少,不能增加。示例:使用CountDownLatch等待所有线程完成任务假设我们有一个任务需要三个子......
  • Canvas API初步学习
    1.字体 在canvas中最常见的字体表现形式有填充字体和漏空字体。   漏空字体用方法:strokeText(Text,left,top,[maxlength]);  填充字体用方法:fillText(Text,left,top,[maxlength]);上面的两个方法的最后一个参数是可选的,四个参数的含义分为是:需绘制的字符串,绘制到画布中时......
  • Python进行多输出(多因变量)回归:集成学习梯度提升决策树GRADIENT BOOSTING,GBR回归训练
    原文链接: http://tecdat.cn/?p=25939最近我们被客户要求撰写关于多输出(多因变量)回归的研究报告,包括一些图形和统计输出。在之前的文章中,我们研究了许多使用多输出回归分析的方法。在本教程中,我们将学习如何使用梯度提升决策树GRADIENTBOOSTINGREGRESSOR拟合和预测多输出回归......
  • 在学习分布式系统时遇到的五个常见误解
    哈喽大家好,我是咸鱼我们知道,随着企业规模或者说业务规模的不断扩大,为了应对不断增长的业务需求和提高系统的可伸缩性、可靠性和性能,计算机系统由一开始的单体系统逐渐发展成分布式系统那么今天咸鱼给大家介绍一些关于小白在学习分布式系统遇到的一些常见误解误解1.网络是可靠的......
  • golang vscode开发环境配置
    1.下载go安装包并安装官网下载地址2.下载vscode并安装官网下载地址3.安装vscodego语言开发扩展(插件)4.切换国内下载源,cmd输入如下代码goenv-wGO111MODULE=ongoenv-wGOPROXY=https://goproxy.cn,direct5.安装vscodego开发工具包windows下vscodeCtrl+Shift+P找......
  • mongocxx c++ 14标准,进行多表联合查询
     #include<mongocxx/client.hpp>#include<mongocxx/instance.hpp>#include<mongocxx/uri.hpp>#include<bsoncxx/builder/stream/document.hpp>#include<bsoncxx/json.hpp>#include<bsoncxx/types.hpp>usingbsoncxx::builder::s......
  • mongodb安装
    一、YUM安装MongoDB1、添加一个yum源创建一个/etc/yum.repos.d/mongodb-org-5.0.repo文件[mongodb-org-5.0]name=MongoDBRepositorybaseurl=https://repo.mongodb.org/yum/redhat/$releasever/mongodb-org/5.0/x86_64/gpgcheck=1enabled=1gpgkey=https://www.mongodb.or......
  • 什么是非监督学习
    非监督学习(UnsupervisedLearning)是一种机器学习任务,其中算法从未标记的数据中学习模式、结构和关系,以发现数据中的隐藏信息和有意义的结构。与监督学习不同,非监督学习中没有标签或输出变量来指导学习过程,算法需要自行发现数据的内在模式。在非监督学习中,算法的目标是对数据进行......