首页 > 其他分享 >Go语言精进之路读书笔记第33条——掌握Go并发模型和常见并发模式

Go语言精进之路读书笔记第33条——掌握Go并发模型和常见并发模式

时间:2024-02-24 14:55:07浏览次数:22  
标签:... args 读书笔记 goroutine 并发 func time Go channel

不要通过共享内存来通信,而应该通过通信来共享内存。——Rob Pike

33.1 Go并发模型

CSP(Communicating Sequential Process,通信顺序进程)模型。一个符合CSP模型的并发程序应该是一组通过输入/输出原语连接起来的P的集合。

Go始终推荐以CSP模型风格构建并发程序。Go针对CSP模型提供了三种并发原语:

  • goroutine:对应CSP模型中的P,封装了数据的处理逻辑,是Go运行时调度的基本执行单元
  • channel:对应CSP模型中的输入/输出原语,用于goroutine之间的通信和同步
  • select:用于应对多路输入/输出,可以让goroutine同时协调处理多个channel操作

33.2 Go常见的并发模式

1.创建模式

  • 简单场景:使用go关键字+函数/方法创建goroutine
go fmt.Println("I am a goroutine")
  • 复杂场景:通过CSP模型输入/输出原语的承载体channel在goroutine之间建立联系
  • 创建模式:在内部创建一个goroutine并返回一个channel类型变量的函数
type T struct {...}

func spawn(f func()) chan T {
    c := make(chan T)
    go func() {
        // 使用channel变量c(通过闭包方式)与调用spawn的goroutine通信
        ...
        f()
        ...
    }()

    return c
}

func main() {
    c := spawn(func() {})
    // 使用channel变量c与新创建的goroutine通信
}

2.退出模式

(1) 分离模式

借助了线程模型中的术语,分离(detached)模式。对于分离模式的goroutine,创建它的goroutine不需要关心它的退出,这类goroutine在启动后即与其创建者彻底分离,其生命周期与其执行的主函数相关,函数返回即goroutine退出

用途:

    1. 一次性任务,新创建的goroutine用来执行一个简单的任务,执行后立即退出
    1. 常驻后台执行一些特定任务,如监视(monitor)、观察(watch),通常采用for {...}for { select {...} }代码段形式,并多以定时器(timer)或事件(event)驱动执行
(2) join模式

在线程模型中,父线程可以通过pthread_join来等待子线程结束并获取子线程的结束状态。在Go中,也有类似需求:goroutine的创建者需要等待新goroutine结束。

①等待一个goroutine退出

  • spawn函数使用典型的goroutine创建模式创建了一个goroutine,main goroutine作为创建者通过spawn函数返回的channel与新goroutine建立联系
  • main goroutine在创建完新goroutine后便在该channel上阻塞等待,直到新goroutine退出前向该channel发送了一个信息
func worker(args ...interface{}) {
    if len(args) == 0 {
        return
    }
    interval, ok := args[0].(int)
    if !ok {
        return
    }

    time.Sleep(time.Second * (time.Duration(interval)))
}

func spawn(f func(args ...interface{}), args ...interface{}) chan struct{} {
    c := make(chan struct{})
    go func() {
        f(args...)
        c <- struct{}{}
    }()
    return c
}

func main() {
    done := spawn(worker, 5)
    println("spawn a worker goroutine")
    <-done
    println("worker done")
}

②获取goroutine的退出状态

将channel中承载的类型由struct{}改为了error,这样channel承载的信息就不只是一个信号了,还携带了有价值的信息:新goroutine的结束状态

var OK = errors.New("ok")

func worker(args ...interface{}) error {
    if len(args) == 0 {
        return errors.New("invalid args")
    }
    interval, ok := args[0].(int)
    if !ok {
        return errors.New("invalid interval arg")
    }

    time.Sleep(time.Second * (time.Duration(interval)))
    return OK
}

func spawn(f func(args ...interface{}) error, args ...interface{}) chan error {
    c := make(chan error)
    go func() {
        c <- f(args...)
    }()
    return c
}

func main() {
    done := spawn(worker, 5)
    println("spawn worker1")
    err := <-done
    fmt.Println("worker1 done:", err)
    done = spawn(worker)
    println("spawn worker2")
    err = <-done
    fmt.Println("worker2 done:", err)
}

③等待多个goroutine退出

  • 通过sync.WaitGroup实现等待多个goroutine退出
  • 在所有新创建的goroutine退出后,Wait方法返回,该监视goroutine会向done这个channel写入一个信号,这时main goroutine才会从阻塞在done channel上的状态中恢复,继续往下执行
func worker(args ...interface{}) {
    if len(args) == 0 {
        return
    }

    interval, ok := args[0].(int)
    if !ok {
        return
    }

    time.Sleep(time.Second * (time.Duration(interval)))
}

func spawnGroup(n int, f func(args ...interface{}), args ...interface{}) chan struct{} {
    c := make(chan struct{})
    var wg sync.WaitGroup

    for i := 0; i < n; i++ {
        wg.Add(1)
        go func(i int) {
            name := fmt.Sprintf("worker-%d:", i)
            f(args...)
            println(name, "done")
            wg.Done() // worker done!
        }(i)
    }

    go func() {
        wg.Wait()
        c <- struct{}{}
    }()

    return c
}

func main() {
    done := spawnGroup(5, worker, 3)
    println("spawn a group of workers")
    <-done
    println("group workers done")
}

④支持超时机制的等待

  • 不想无限阻塞等待所有新创建goroutine的退出,而是仅等待一个段合理的时间
  • 通过select原语同时监听timer.C和done这两个channel,哪个先返回数据就执行哪个case分支
func worker(args ...interface{}) {
    if len(args) == 0 {
        return
    }

    interval, ok := args[0].(int)
    if !ok {
        return
    }

    time.Sleep(time.Second * (time.Duration(interval)))
}

func spawnGroup(n int, f func(args ...interface{}), args ...interface{}) chan struct{} {
    c := make(chan struct{})
    var wg sync.WaitGroup

    for i := 0; i < n; i++ {
        wg.Add(1)
        go func(i int) {
            name := fmt.Sprintf("worker-%d:", i)
            f(args...)
            println(name, "done")
            wg.Done() // worker done!
        }(i)
    }

    go func() {
        wg.Wait()
        c <- struct{}{}
    }()

    return c
}

func main() {
    done := spawnGroup(5, worker, 30)
    println("spawn a group of workers")

    timer := time.NewTimer(time.Second * 5)
    defer timer.Stop()
    select {
    case <-timer.C:
        println("wait group workers exit timeout!")
    case <-done:
        println("group workers done")
    }
}
(3) notify-and-wait模式

①通知并等待一个goroutine退出

使用创建模式创建goroutine的spawn函数返回的channel的作用发生了变化,从原先的只是用于新goroutine发送退出信号给创建者,变成了一个双向的数据通道:既承载创建者发送给新goroutine的退出信号,也承载新goroutine返回给创建者的退出状态

func worker(j int) {
    time.Sleep(time.Second * (time.Duration(j)))
}

func spawn(f func(int)) chan string {
    quit := make(chan string)
    go func() {
        var job chan int // 模拟job channel
        for {
            select {
            case j := <-job:
                f(j)
            case <-quit:
                quit <- "ok"
            }
        }
    }()
    return quit
}

func main() {
    quit := spawn(worker)
    println("spawn a worker goroutine")

    time.Sleep(5 * time.Second)

    // notify the child goroutine to exit
    println("notify the worker to exit...")
    quit <- "exit"

    timer := time.NewTimer(time.Second * 10)
    defer timer.Stop()
    select {
    case status := <-quit:
        println("worker done:", status)
    case <-timer.C:
        println("wait worker exit timeout")
    }
}

②通知并等待多个goroutine退出

  • 利用了当使用close函数关闭channel时,所有阻塞到该channel上的goroutine都会得到通知这一特性
  • 通过close(job)来实现广播,各个监听job channel的worker goroutine,通过“comma ok”模式获取的ok值为false,也就表明该channel已关闭,于是worker goroutine执行退出逻辑
func worker(j int) {
    time.Sleep(time.Second * (time.Duration(j)))
}

func spawnGroup(n int, f func(int)) chan struct{} {
    quit := make(chan struct{})
    job := make(chan int)
    var wg sync.WaitGroup

    for i := 0; i < n; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done() // 保证wg.Done在goroutine退出前被执行
            name := fmt.Sprintf("worker-%d:", i)
            for {
                j, ok := <-job
                if !ok {
                    println(name, "done")
                    return
                }
                // do the job
                worker(j)
            }
        }(i)
    }

    go func() {
        <-quit
        close(job) // 广播给所有新goroutine
        wg.Wait()
        quit <- struct{}{}
    }()

    return quit
}

func main() {
    quit := spawnGroup(5, worker)
    println("spawn a group of workers")

    time.Sleep(5 * time.Second)
    // notify the worker goroutine group to exit
    println("notify the worker group to exit...")
    quit <- struct{}{}

    timer := time.NewTimer(time.Second * 5)
    defer timer.Stop()
    select {
    case <-timer.C:
        println("wait group workers exit timeout!")
    case <-quit:
        println("group workers done")
    }
}
(4) 退出模式的应用
  • 一组goroutine的退出总体上有两种情况。一种是并发退出,各个goroutine的退出先后次序对数据处理无影响;另一种是串行退出,各个goroutine按照一定次序逐个进行,次序若错了可能会导致错误
  • 并发退出:
    • 通过sync.WaitGroup在外层等待每个goroutine的退出
    • 通过select监听一个退出通知channel和一个timerchannel,决定到底是正常退出还是超时退出
  • 串行退出:
    • 将每次的left(剩余时间)传入下一个要执行的goroutine的Shutdown方法中
    • select同样使用这个left作为timeout的值,并通过timer.Reset重新设置timer定时器周期
type GracefullyShutdowner interface {
    Shutdown(waitTimeout time.Duration) error
}

type ShutdownerFunc func(time.Duration) error

func (f ShutdownerFunc) Shutdown(waitTimeout time.Duration) error {
    return f(waitTimeout)
}

func ConcurrentShutdown(waitTimeout time.Duration, shutdowners ...GracefullyShutdowner) error {
    c := make(chan struct{})

    go func() {
        var wg sync.WaitGroup
        for _, g := range shutdowners {
            wg.Add(1)
            go func(shutdowner GracefullyShutdowner) {
                defer wg.Done()
                shutdowner.Shutdown(waitTimeout)
            }(g)
        }
        wg.Wait()
        c <- struct{}{}
    }()

    timer := time.NewTimer(waitTimeout)
    defer timer.Stop()

    select {
    case <-c:
        return nil
    case <-timer.C:
        return errors.New("wait timeout")
    }
}

func SequentialShutdown(waitTimeout time.Duration, shutdowners ...GracefullyShutdowner) error {
    start := time.Now()
    var left time.Duration
    timer := time.NewTimer(waitTimeout)

    for _, g := range shutdowners {
        elapsed := time.Since(start)
        left = waitTimeout - elapsed

        c := make(chan struct{})
        go func(shutdowner GracefullyShutdowner) {
            shutdowner.Shutdown(left)
            c <- struct{}{}
        }(g)

        timer.Reset(left)
        select {
        case <-c:
            //continue
        case <-timer.C:
            return errors.New("wait timeout")
        }
    }

    return nil
}

对应的测试代码

func shutdownMaker(processTm int) func(time.Duration) error {
    return func(time.Duration) error {
        time.Sleep(time.Second * time.Duration(processTm))
        return nil
    }
}

func TestConcurrentShutdown(t *testing.T) {
    f1 := shutdownMaker(2)
    f2 := shutdownMaker(6)

    err := ConcurrentShutdown(10*time.Second, ShutdownerFunc(f1), ShutdownerFunc(f2))
    if err != nil {
        t.Errorf("want nil, actual: %s", err)
        return
    }

    err = ConcurrentShutdown(4*time.Second, ShutdownerFunc(f1), ShutdownerFunc(f2))
    if err == nil {
        t.Error("want timeout, actual nil")
        return
    }
}

func TestSequentialShutdown(t *testing.T) {
    f1 := shutdownMaker(2)
    f2 := shutdownMaker(6)

    err := SequentialShutdown(10*time.Second, ShutdownerFunc(f1), ShutdownerFunc(f2))
    if err != nil {
        t.Errorf("want nil, actual: %s", err)
        return
    }

    err = SequentialShutdown(5*time.Second, ShutdownerFunc(f1), ShutdownerFunc(f2))
    if err == nil {
        t.Error("want timeout, actual nil")
        return
    }
}

3.管道模式

  • 管道模式:每个数据处理环节都由一组功能相同的goroutine完成,在每个数据处理环节,goroutine都要从数据输入channel获取前一个环节生产的数据,然后对这些数据进行处理,并将处理后的结果数据通过数据输出channel发往下一个环节
func newNumGenerator(start, count int) <-chan int {
    c := make(chan int)
    go func() {
        for i := start; i < start+count; i++ {
            c <- i
        }
        close(c)
    }()
    return c
}

func filterOdd(in int) (int, bool) {
    if in%2 != 0 {
        return 0, false
    }
    return in, true
}

func square(in int) (int, bool) {
    return in * in, true
}

func spawn(f func(int) (int, bool), in <-chan int) <-chan int {
    out := make(chan int)

    go func() {
        for v := range in {
            r, ok := f(v)
            if ok {
                out <- r
            }
        }
        close(out)
    }()

    return out
}

func main() {
    in := newNumGenerator(1, 20)
    out := spawn(square, spawn(filterOdd, in))

    for v := range out {
        println(v)
    }
}
  • 扩展:扇入/扇出模式

    • 扇入模式:在某个处理环节,处理程序面对不止一个输入channel,我们把所有输入channel的数据汇聚到一个统一的输入channel,然后处理程序再从这个channel中读取数据并处理,直到该channel因所有输入channel关闭而关闭
    • 扇出模式:在某个处理环节,多个功能相同的goroutine从同一个channel读取数据并处理,直到该channel关闭。可以在一组goroutine中均衡分配工作量,从而更均衡地使用CPU
  • 我们通过spawnGroup函数实现了扇出模式,针对每个输入channel,我们都建立多个功能相同的goroutine,让它们从这个共同的输入channel读取数据并处理,直到channel被关闭

  • 在spawnGroup函数的结尾处,我们将多个goroutine的输出channel聚合到一个groupOut channel中,这就是扇入模式的实现

func newNumGenerator(start, count int) <-chan int {
    c := make(chan int)
    go func() {
        for i := start; i < start+count; i++ {
            c <- i
        }
        close(c)
    }()
    return c
}

func filterOdd(in int) (int, bool) {
    if in%2 != 0 {
        return 0, false
    }
    return in, true
}

func square(in int) (int, bool) {
    return in * in, true
}

func spawnGroup(name string, num int, f func(int) (int, bool), in <-chan int) <-chan int {
    groupOut := make(chan int)
    var outSlice []chan int
    for i := 0; i < num; i++ {
        out := make(chan int)
        go func(i int) {
            name := fmt.Sprintf("%s-%d:", name, i)
            fmt.Printf("%s begin to work...\n", name)

            for v := range in {
                r, ok := f(v)
                if ok {
                    out <- r
                }
            }
            close(out)
            fmt.Printf("%s work done\n", name)
        }(i)
        outSlice = append(outSlice, out)
    }

    // Fan-in
    //
    // out --\
    //        \
    // out ---- --> groupOut
    //        /
    // out --/
    //
    go func() {
        var wg sync.WaitGroup
        for _, out := range outSlice {
            wg.Add(1)
            go func(out <-chan int) {
                for v := range out {
                    groupOut <- v
                }
                wg.Done()
            }(out)
        }
        wg.Wait()
        close(groupOut)
    }()

    return groupOut
}

func main() {
    in := newNumGenerator(1, 20)
    out := spawnGroup("square", 2, square, spawnGroup("filterOdd", 3, filterOdd, in))

    time.Sleep(3 * time.Second)

    for v := range out {
        fmt.Println(v)
    }
}

4.超时与取消模式

  • 第一版实现,理性的网络状况
type result struct {
    value string
}

func first(servers ...*httptest.Server) (result, error) {
    c := make(chan result, len(servers))
    queryFunc := func(server *httptest.Server) {
        url := server.URL
        resp, err := http.Get(url)
        if err != nil {
            log.Printf("http get error: %s\n", err)
            return
        }
        defer resp.Body.Close()
        body, _ := ioutil.ReadAll(resp.Body)
        c <- result{
            value: string(body),
        }
    }
    for _, serv := range servers {
        go queryFunc(serv)
    }
    return <-c, nil
}

func fakeWeatherServer(name string) *httptest.Server {
    return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        log.Printf("%s receive a http request\n", name)
        time.Sleep(1 * time.Second)
        w.Write([]byte(name + ":ok"))
    }))
}

func main() {
    result, err := first(fakeWeatherServer("open-weather-1"),
        fakeWeatherServer("open-weather-2"),
        fakeWeatherServer("open-weather-3"))
    if err != nil {
        log.Println("invoke first error:", err)
        return
    }

    log.Println(result)
}

  • 增加超时控制
type result struct {
    value string
}

func first(servers ...*httptest.Server) (result, error) {
    c := make(chan result, len(servers))
    queryFunc := func(server *httptest.Server) {
        url := server.URL
        resp, err := http.Get(url)
        if err != nil {
            log.Printf("http get error: %s\n", err)
            return
        }
        defer resp.Body.Close()
        body, _ := ioutil.ReadAll(resp.Body)
        c <- result{
            value: string(body),
        }
    }
    for _, serv := range servers {
        go queryFunc(serv)
    }

    select {
    case r := <-c:
        return r, nil
    case <-time.After(500 * time.Millisecond):
        return result{}, errors.New("timeout")
    }
}

func fakeWeatherServer(name string) *httptest.Server {
    return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        log.Printf("%s receive a http request\n", name)
        time.Sleep(1 * time.Second)
        w.Write([]byte(name + ":ok"))
    }))
}

func main() {
    result, err := first(fakeWeatherServer("open-weather-1"),
        fakeWeatherServer("open-weather-2"),
        fakeWeatherServer("open-weather-3"))
    if err != nil {
        log.Println("invoke first error:", err)
        return
    }

    log.Println(result)
}
  • 利用context包实现取消模式
    • 利用context.WithCancel创建了一个可以被取消的context.Context变量
    • 通过defer cancel()设定cancel函数在first函数返回前被执行,那些尚未返回的goroutine都将收到cancel事件并退出(http包支持利用context.Context的超时和cancel机制)
type result struct {
    value string
}

func first(servers ...*httptest.Server) (result, error) {
    c := make(chan result)
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    queryFunc := func(i int, server *httptest.Server) {
        url := server.URL
        req, err := http.NewRequest("GET", url, nil)
        if err != nil {
            log.Printf("query goroutine-%d: http NewRequest error: %s\n", i, err)
            return
        }
        req = req.WithContext(ctx)

        log.Printf("query goroutine-%d: send request...\n", i)
        resp, err := http.DefaultClient.Do(req)
        if err != nil {
            log.Printf("query goroutine-%d: get return error: %s\n", i, err)
            return
        }
        log.Printf("query goroutine-%d: get response\n", i)
        defer resp.Body.Close()
        body, _ := ioutil.ReadAll(resp.Body)

        c <- result{
            value: string(body),
        }
        return
    }

    for i, serv := range servers {
        go queryFunc(i, serv)
    }

    select {
    case r := <-c:
        return r, nil
    case <-time.After(500 * time.Millisecond):
        return result{}, errors.New("timeout")
    }
}

func fakeWeatherServer(name string, interval int) *httptest.Server {
    return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        log.Printf("%s receive a http request\n", name)
        time.Sleep(time.Duration(interval) * time.Millisecond)
        w.Write([]byte(name + ":ok"))
    }))
}

func main() {
    result, err := first(fakeWeatherServer("open-weather-1", 200),
        fakeWeatherServer("open-weather-2", 1000),
        fakeWeatherServer("open-weather-3", 600))
    if err != nil {
        log.Println("invoke first error:", err)
        return
    }

    fmt.Println(result)
    time.Sleep(10 * time.Second)
}

标签:...,args,读书笔记,goroutine,并发,func,time,Go,channel
From: https://www.cnblogs.com/brynchen/p/18031087

相关文章

  • [数据库] 使用索引(2): mongoDB
    mongoDB的索引mongodb的索引和mysql基本类似,也是默认主键(相当于mongo中的_id字段)为索引,进行索引排序etc.索引分类单键索引将一个字段作为索引,默认_id,也可以将其他字段作为索引db.collection.createIndex({year:1})其中value为1则是正序,为-1则是倒序复合索引......
  • SciTech-Mathmatics-Trigonometric Identities you must remember: 需要记住的三角函
    TrigonometricIdentities(Revision:1.4)TrigonometricIdentitiesyoumustrememberThe“bigthree”trigonometricidentitiesare\(\large\begin{equation}\sin^{2}t+cos^{2}t=1\tag{1}\end{equation}\)\(\large\begin{equation}\sin(......
  • kettle从入门到精通 第四十七课 ETL之kettle mongo output 写入
    1、上一节课我们学习了mongoinput读取步骤,本节课我们一起学习下mongoout写入步骤,该步骤可以将数据写入到mongo中,如下图所示。 2、 配置mongo连接,有两种方式,如截图所示。ConnectionString:如StringconnectionString="mongodb://username:password@localhost:27017/myda......
  • linux(ubuntu22.04)+PicGo(gui版)+阿里云oss搭建图床教程
    linux(ubuntu22.04)+PicGo(gui版)+阿里云oss搭建图床教程资源库PicGo下载链接:山东镜像源github原版阿里云oss链接linux下PicGo(gui版)的安装从资源库链接里下载后缀为.AppImage的安装包,版本可以选择稳定版2.3.1也可以用更新的beta版。修改文件权限,打开文......
  • 《程序是怎么跑起来的》第5章读书笔记
    第4张介绍了内存那么第5张就是磁盘。在开篇告诉了我们内存只主存而磁盘主要指硬盘。计算机中的储存器包括内存和磁盘储存在磁盘中的程序需要先加载到内存才能运行,不能在磁盘上直接运行。内存与磁盘的联系是非常密切的。第1个体现是磁盘缓存。磁盘缓存是一块内存空间,用于临时存放从......
  • 《程序是怎么跑起来的》第4章读书笔记
    计算机是处理数据的机器,而处理对象的数据储存在内存和磁盘中。内存本质上是一种名为内存芯片的装置,内存芯片分为ram,rom等不同类型,但从外部来看,它们的基本原理是相同的内存芯片外部有引脚负责连接电源以及输入地址信号等等。内存芯片内部有很多能储存巴比特数据的容器,只要指定容器......
  • 《程序是怎么跑起来的》第3章读书笔记
    经过前两章对计算机内容最基本的理解之后,就迎来了对计算机的计算,而计算机也不是万能的,它也会出现错误,那么就涉及到计算机在计算小数时会出现错误的原因,首先课题通过一个问题将0.1累加100次的结果不是10这一话题成功将读者引入进去。然后告诉了我们为什么在计算机中会这样子出错的......
  • Go 100 mistakes - Expecting deterministic behavior using select and channels
      funcmain(){messageCh:=make(chanint,10)disconnectCh:=make(chanstruct{},1)fori:=0;i<10;i++{messageCh<-i}gofunc(){for{select{casev:=<-messageCh:......
  • golang中协程&管道&锁
    进程和线程进程(Process)就是程序在操作系统中的一次执行过程,是系统进行资源分配和调度的基本单位,进程是一个动态概念,是程序在执行过程中分配和管理资源的基本单位,每一个进程都有一个自己的地址空间。一个进程至少有5种基本状态,它们是:初始态,执行态,等待状态,就绪状态,终止状态,通......
  • mysql面试高频问题---事务-MVCC多版本并发控制(难)
    MVCC多版本并发控制1.问题锁:排他锁(如一个事务获取了一个数据行的排他锁,其他事务就不能再获取该行的其他锁)mvcc:多版本并发控制2.MVCC多版本并发控制MVCC全称Multi-VersionConcurrencyControl,多版本并发控制。指维护一个数据的多个版本,使得读写操作没有冲突MVCC的具体......