首页 > 其他分享 >同步协程的必备工具: WaitGroup

同步协程的必备工具: WaitGroup

时间:2023-03-17 21:47:02浏览次数:44  
标签:wg WaitGroup 调用 协程 必备 任务 Done

1. 简介

本文将介绍 Go 语言中的 WaitGroup 并发原语,包括 WaitGroup 的基本使用方法、实现原理、使用注意事项以及常见的使用方式。能够更好地理解和应用 WaitGroup 来协调多个 Goroutine 的执行,提高 Go 并发编程的效率和稳定性。

2. 基本使用

2.1 定义

WaitGroup是Go语言标准库中的一个结构体,它提供了一种简单的机制,用于同步多个协程的执行。适用于需要并发执行多个任务并等待它们全部完成后才能继续执行后续操作的场景。

2.2 使用方式

首先主协程创建WaitGroup实例,然后在每个协程的开始处,调用Add(1)方法,表示需要等待一个任务执行完成,然后协程在任务执行完成之后,调用Done方法,表示任务已经执行完成了。

主协程中,需要调用Wait()方法,等待所有协程完成任务,示例如下:

func main(){
    //首先主协程创建WaitGroup实例
    var wg sync.WaitGroup
    // 开始时调用Add方法表示有个任务开始执行
    wg.Add(1)
    go func() {
        // 开始执行...
        //完成之后,调用Done方法
        wg.Done()
    }()
    // 调用Wait()方法,等待所有协程完成任务
    wg.Wait()
    // 执行后续逻辑
}

2.3 使用例子

package main

import (
    "fmt"
    "sync"
)

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 5; i++ {
       wg.Add(1)
       go func(i int) {
          defer wg.Done()
          fmt.Printf("任务%d开始执行\n", i)
          // 模拟协程任务执行一段时间
          time.Sleep(time.Duration(rand.Int() % 100))
          // 线程任务执行完成
          fmt.Printf("任务%d执行完毕\n", i)
       }(i)
    }
    fmt.Println("主协程开始等待所有任务执行完成...")
    wg.Wait()
    fmt.Println("所有协程已经执行完毕...")
}

在这个例子中,我们使用了sync.WaitGroup来等待5个协程执行完毕。在循环中,每创建一个任务,我们调用一次wg.Add(1)方法,然后启动一个协程去执行任务,当协程完成任务后,调用wg.Done方法,告知主协程任务已经执行完毕。然后主协程会在5个协程任务全部执行完毕之后,才会继续向下执行。

3.实现原理

3.1 设计初衷

WaitGroup的设计初衷就是为了等待一组操作完成后再执行下一步操作,通常会在一组协程中使用。

3.2 实现

sync.WaitGroup 结构体中的 state1state2 字段是用于实现 WaitGroup 功能的重要变量。

type WaitGroup struct {
   noCopy noCopy

   state1 uint64
   state2 uint32
}

由于 WaitGroup 需要等待一组操作完成之后再执行,因此需要等待所有操作完成之后才能继续执行。为了实现这个功能,WaitGroup 使用了一个计数器 counter 来记录还有多少个操作没有完成,如果 counter 的值为 0,则表示所有操作已经完成。

同时,WaitGroup 在所有任务都完成之后,需要唤醒所有处于等待的协程,此时需要知道有多少个协程处于等待状态。为了实现这个功能,WaitGroup 使用了一个等待计数器 waiter 来记录当前有多少个协程正在等待操作完成。

这里WaitGroup对于计数器和等待计数器的实现,是通过一个64位无符号整数来实现的,也就是WaitGroup结构体中的state1,其中高32位保存了任务计数器counter的值,低32位保存了等待计数器waiter的值。当我们创建一个 WaitGroup 实例时,该实例的任务计数器等待计数器都被初始化为 0。

而且,等待协程需要等待所有任务完成之后才能继续执行,所以等待协程在任务未完成时会被阻塞,当任务全部完成后,自动被唤醒。WaitGroup使用 state2 用于实现信号量机制。通过调用 runtime_Semacquire()runtime_Semrelease() 函数,可以在不阻塞线程的情况下进行等待和通知操作。下面是AddDoneWait方法的具体实现:

调用 Add() 方法增加/减小counter的值,delta的值可以是正数,也可以是负数,下面是Add方法的源码实现:

func (wg *WaitGroup) Add(delta int) {
   // delta 的值可以为负数,Done方法便是通过Add(-1)来实现的
   // statep: 为state1的地址  semap: 为state2的地址
   statep, semap := wg.state()
   // 高32位的值 加上 delta,增加任务计数器的值
   state := atomic.AddUint64(statep, uint64(delta)<<32)
   // v: 取高32位数据,获取到待完成任务数
   v := int32(state >> 32)
   // 取低32位数据,获取到等待线程的值
   w := uint32(state)
   // v > 0: 说明还有待完成的任务数,此时不应该唤醒等待协程
   // w = 0: 说明没有协程在等待,此时可以直接退出
   if v > 0 || w == 0 {
      return
   }     
   // 此时v = 0,所有任务都完成了,唤醒等待协程
   *statep = 0
   for ; w != 0; w-- {
      runtime_Semrelease(semap, false, 0)
   }
}

调用 Done() 方法表示完成了一个任务,通过调用Add方法,delta值为-1,减少任务计数器counter的值,当其归为0时,便自动唤醒所有处于等待的协程。

// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
   wg.Add(-1)
}

调用Wait方法,等待任务执行完成,增加等待计数器Waiter的值:

func (wg *WaitGroup) Wait() {
   // statep: 为state1的地址  semap: 为state2的地址
   statep, semap := wg.state()
   for {
      // 加载state1的值
      state := atomic.LoadUint64(statep)
      // v: 取高32位数据,获取到待完成任务数
      v := int32(state >> 32)
      // 没有任务待执行,全部都完成了
      if v == 0 {
         return
      }
      // 增加waiter计数器的值
      if atomic.CompareAndSwapUint64(statep, state, state+1) {
          // 等待被唤醒
         runtime_Semacquire(semap)
         return
      }
   }
}

3.3 实现补充

Add方法,Done方法以及Wait方法实现中,有一些异常场景的验证逻辑被我删除掉了。当出现异常场景时,说明用户使用方式和WaitGroup的设计初衷相违背了,此时WaitGroup就会直接panic。

下面通过说明使用的注意事项,来间接介绍WaitGroup的异常验证逻辑。

4.使用注意事项

4.1 Add方法和Done方法需要成对出现

下面是一个Add方法和Done方法没有成对出现的例子,此时Add方法调多了,此时计数器永远大于0,Wait 方法会一直阻塞等待。

package main

import (
    "fmt"
    "sync"
)

func main() {
    var wg sync.WaitGroup

    wg.Add(2)

    go func() {
        defer wg.Done()
        fmt.Println("Goroutine 1")
    }()

    go func() {
        fmt.Println("Goroutine 2")
    }()

    wg.Wait()

    fmt.Println("All goroutines finished")
}

在上述代码中,我们调用了wg.Add(2),但只调用了一次wg.Done()。这会导致counter的值大于0,因此调用wg.Wait()会被永久阻塞,不会继续向下继续执行。

还有另外一种情况时Done方法调用多了,此时任务计数器counter的值为负数,从WaitGroup设计的语意来看,就是需要等待完成的任务数为负数,这个不符合预期,此时将会直接panic

package main

import (
    "fmt"
    "sync"
)

func main() {
    var wg sync.WaitGroup
    wg.Add(1)

    go func() {
        fmt.Println("Goroutine 1 started")
        wg.Done() // 第一次调用Done方法
        wg.Done() // 第二次调用Done方法
        fmt.Println("Goroutine 1 completed")
    }()

    wg.Wait()
    fmt.Println("All goroutines completed")
}

在上面的例子中,我们启动了一个goroutine,第一次调用Add方法,counter的值变为1,在第14行调用Done,此时计数器的值变为0,此时等待中的goroutine将会被唤醒。在第15行又调用了一次Done方法,当counter减小为0时,再次调用Done方法会导致panic,因为此时waitGroup的计数器已经为0,再次减少将导致负数计数,这是不被允许的。

所以在调用Done方法时,需要保证每次调用都与Add方法的调用一一对应,否则会导致程序出现错误。

4.2 在所有任务都已经添加之后,才调用Wait方法进行等待

WaitGroup的设计初衷就是为了等待一组操作完成后再执行下一步操作。所以,如果在所有任务添加之前,便调用Wait方法进行等待,此时有可能会导致等待协程提前被唤醒,执行下一步操作,而尚未添加的任务则不会被等待,这违反了WaitGroup的设计初衷,也不符合预期。下面是一个简单的例子:

package main

import (
        "fmt"
        "sync"
        "time"
)

func main() {
        var wg sync.WaitGroup
        for i := 1; i <= 3; i++ {
           go func(id int) {
              wg.Add(1)
              defer wg.Done()
              fmt.Printf("Goroutine %d started\n", id)
              time.Sleep(time.Duration(id) * time.Second) 
              fmt.Printf("Goroutine %d finished\n", id)
           }(i)
        }
        
        // 不等待所有任务添加,就开始等待
        wg.Wait()
        fmt.Println("All goroutines finished")
        time.Sleep(10 * time.Second)
}

代码执行结果如下,等待协程被提前唤醒,执行之后的操作,而子任务在等待协程唤醒后才开始执行:

All goroutines finished
Goroutine 1 started
Goroutine 3 started
Goroutine 2 started
Goroutine 1 finished
Goroutine 2 finished
Goroutine 3 finished

在这个例子中,我们创建了三个协程并打印出它们开始和结束的消息。但是,我们没有在任务开始前调用Add方法添加任务,而是在任务开始之后再调用Add方法添加任务。

这可能会导致某些任务未被加入到WaitGroup中,等待协程就调用了wg.Wait方法,这样就会导致一些任务未被加入WaitGrou,从而导致等待协程不会等待这些任务执行完成。如果这种情况发生了,我们会看到"All goroutines finished"被输出,但实际上有一些协程还没有完成。

因此,我们应该在所有任务添加完毕之后再调用Wait方法,以保证等待的正确性。

5. WaitGroup常见使用方式

在函数或方法中使用,如果一个大任务可以拆分为多个独立的子任务,此时会将其进行拆分,并使用多个协程来并发执行这些任务,提高执行效率,同时使用WaitGroup等待所有子任务执行完成,完成协程间的同步。

使用方式也比较简单,先创建一个 sync.WaitGroup,在函数/方法中启动多个协程,每个协程执行一个任务。然后在协程开始执行任务前,调用 WaitGroup.Add(1),表示有一个任务要执行,然后在任务执行完成后调用 WaitGroup.Done(),表示这个任务执行完成了。

最后,在函数/方法返回之前,需要调用 WaitGroup.Wait(),等待所有的任务执行完成。大概示例如下:

func funcName() {
    var wg sync.WaitGroup
    for _, 任务 := range 任务列表 {
      wg.Add(1)
      go func() {
        defer wg.Done()
        //执行任务
      }
   }
   // 调用wait方法等待所有任务完成
   wg.Wait()
}

下面来看go-redis中ClusterClient结构体中ForEachMaster方法中对于WaitGroup的使用。ForEachMaster方法通常用于在 Redis 集群中执行针对所有主节点的某种操作,例如在集群中添加或删除键,或者执行一些全局的诊断操作,具体执行的操作由传入参数fn指定。

这里ForEachMaster方法会对所有主节点执行某种操作,这里的实现是对所有主节点执行某种操作这个大任务,拆分为多个独立的子任务,每个子任务完成对一个Master节点执行指定操作,然后每个子任务启动一个协程去执行,主协程使用WaitGroup等待所有协程完成指定子任务,ForEachMaster也就完成了对所有主节点执行某种操作的任务。具体实现如下:

func (c *ClusterClient) ForEachMaster(
   ctx context.Context,
   fn func(ctx context.Context, client *Client) error,
) error {
   // 重新加载集群状态,以确保状态信息是最新的
   state, err := c.state.ReloadOrGet(ctx)
   if err != nil {
      return err
   }
   var wg sync.WaitGroup
   // 用于协程间通信
   errCh := make(chan error, 1)
    // 获取到redis集群中所有的master节点
   for _, master := range state.Masters {
      // 启动一个协程来执行该任务
      wg.Add(1)
      go func(node *clusterNode) {
         // 任务完成时,调用Done告知WaitGroup任务已完成
         defer wg.Done()
         err := fn(ctx, node.Client)
         if err != nil {
            select {
            case errCh <- err:
            default:
            }
         }
      }(master)
   }
   // 主协程等待所有任务完成
   wg.Wait()
   return nil
 }

6.总结

本文介绍了 Go 语言中的 WaitGroup 并发原语,它提供了一种简单且强大的机制来协调多个 Goroutine 的执行。我们首先学习了 WaitGroup 的基本使用方法,包括如何创建 WaitGroup、如何向计数器中添加值、如何等待所有 Goroutine 完成以及如何在 Goroutine 中通知 WaitGroup 完成。

接着,我们了解了 WaitGroup 的实现原理,包括计数器和等待计数器的实现。了解了实现原理之后,我们可以更好地理解 WaitGroup 的内部机制以及如何更好地使用它来实现我们的需求。

在接下来的部分中,我们介绍了一些使用 WaitGroup 的注意事项,以及常见的使用方式。基于此,我们完成了对WaitGroup的介绍。

标签:wg,WaitGroup,调用,协程,必备,任务,Done
From: https://www.cnblogs.com/chenjiazhan/p/17228255.html

相关文章

  • 【并发编程二十】协程(coroutine)_协程库
     【并发编程二十】协程(coroutine)一、线程的缺点二、协程三、优点四、个人理解五、协程库1、window系统2、unix系统(包括linux的各个版本)2.1、makecontext2......
  • 前端常考vue面试题(必备)
    computed的实现原理computed本质是一个惰性求值的观察者。computed内部实现了一个惰性的watcher,也就是computedwatcher,computedwatcher不会立刻求值,同时持有......
  • Nginx:轻松搭建高性能Web服务的必备利器
    一、Nginx简介1.1Nginx的特点和优点高性能:Nginx采用了事件驱动、异步非阻塞的处理方式,可以处理大量并发连接请求,同时减少服务器资源的占用。它的吞吐量比传统的Web服务器高......
  • 协程方法
    示例1usingSystem.Collections;usingUnityEngine; publicclassTest:MonoBehaviour{    privatevoidStart()    {        StartCoroutine......
  • C++协程
    参考C++那些事之C++20协程#include<iostream>#include<coroutine>structgenerator{structpromise_type{intcurrent_value;std::suspend......
  • 情侣间必备的责任
    情侣间必备的责任〈了解、包容、照顾、帮助、信任、体贴、〉    了解   两个人一旦真心恋爱了,这是最关键两个字,人往往都要在一起接触时间久了,彼此才能慢慢感觉......
  • go 协程收集 不同函数结果和error
    需要用到"golang.org/x/sync/errgroup"这个库改成并行的方式这个总共执行4s就可以......
  • python异步编程asyncio实现协程
    importasyncio@asyncio.coroutinedeff1():print(1)yieldfromasyncio.sleep(2)print(2)@asyncio.coroutinedeff2():print(3)yieldfromasyncio.s......
  • 推荐收藏!10大程序员必备生产力工具
    作为程序员,提高生产力是我们一直追求的目标。随着技术的发展,越来越多的工具和应用程序被开发出来,帮助程序员们更好地完成工作。在本文中,我将介绍一些程序员必备的生产力工......
  • 【经验交流】一些地编工作的必备常用网站
    说明:加粗斜体标记的须使用一定*技术上网*(敏感词)一.灵感类Artstation:国外最大的CG艺术平台,原话/分镜/3D/动效GGAC:国内比较大的CG艺术平台站酷:更偏向平面设计B......