首页 > 编程语言 >从案例中详解go-errgroup-源码

从案例中详解go-errgroup-源码

时间:2023-05-04 18:22:25浏览次数:70  
标签:err errgroup 源码 func error go 执行 true

一、背景

某次会议上发表了error group包,一个g失败,其他的g会同时失败的错误言论(看了一下源码中的一句话The first call to return a non-nil error cancels the group,没进一步看其他源码,片面理解了)。

// The first call to return a non-nil error cancels the group's context, if the
// group was created by calling WithContext.

后面想想好像不对,就再次深入源码,并进行验证!

二、源码解析

官方介绍:包 errgroup 为处理公共任务的子任务的 goroutine 组提供同步、错误传播和上下文取消。

Package errgroup provides synchronization, error propagation, and Context cancelation for groups of goroutines working on subtasks of a common task.

如何理解官方介绍中国呢说的,同步,错误传播,上下文取消?
errgroup源码就132行,短小精悍,下面就看下每个函数的功能。

// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Package errgroup provides synchronization, error propagation, and Context
// cancelation for groups of goroutines working on subtasks of a common task.
package errgroup

import (
	"context"
	"fmt"
	"sync"
)

type token struct{}

// A Group is a collection of goroutines working on subtasks that are part of
// the same overall task.
//
// A zero Group is valid, has no limit on the number of active goroutines,
// and does not cancel on error.
type Group struct {
	cancel func()   // 持有ctx cancel func

	wg sync.WaitGroup    

	sem chan token   // g数量限制的 chan

	errOnce sync.Once   // 单例保存第一次出现的err
	err     error
}

func (g *Group) done() {
	if g.sem != nil {
		<-g.sem
	}
	g.wg.Done()
}

// WithContext returns a new Group and an associated Context derived from ctx.
//
// The derived Context is canceled the first time a function passed to Go
// returns a non-nil error or the first time Wait returns, whichever occurs
// first.
func WithContext(ctx context.Context) (*Group, context.Context) {
	ctx, cancel := context.WithCancel(ctx)
	return &Group{cancel: cancel}, ctx
}

// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *Group) Wait() error {
	g.wg.Wait()
	if g.cancel != nil {
		g.cancel()
	}
	return g.err
}

// Go calls the given function in a new goroutine.
// It blocks until the new goroutine can be added without the number of
// active goroutines in the group exceeding the configured limit.
//
// The first call to return a non-nil error cancels the group's context, if the
// group was created by calling WithContext. The error will be returned by Wait.
func (g *Group) Go(f func() error) {
	if g.sem != nil {
		g.sem <- token{}
	}

	g.wg.Add(1)
	go func() {
		defer g.done()

		if err := f(); err != nil {
			g.errOnce.Do(func() {
				g.err = err
				if g.cancel != nil {
					g.cancel()
				}
			})
		}
	}()
}

// TryGo calls the given function in a new goroutine only if the number of
// active goroutines in the group is currently below the configured limit.
//
// The return value reports whether the goroutine was started.
func (g *Group) TryGo(f func() error) bool {
	if g.sem != nil {
		select {
		case g.sem <- token{}:
			// Note: this allows barging iff channels in general allow barging.
		default:
			return false
		}
	}

	g.wg.Add(1)
	go func() {
		defer g.done()

		if err := f(); err != nil {
			g.errOnce.Do(func() {
				g.err = err
				if g.cancel != nil {
					g.cancel()
				}
			})
		}
	}()
	return true
}

// SetLimit limits the number of active goroutines in this group to at most n.
// A negative value indicates no limit.
//
// Any subsequent call to the Go method will block until it can add an active
// goroutine without exceeding the configured limit.
//
// The limit must not be modified while any goroutines in the group are active.
func (g *Group) SetLimit(n int) {
	if n < 0 {
		g.sem = nil
		return
	}
	if len(g.sem) != 0 {
		panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem)))
	}
	g.sem = make(chan token, n)
}

  • Go(f func() error): 启动一个goroutine,如果设置了g limit,则往chan中追加,当达到数量后会被阻塞。否则正常执行g,g中如果出现错误,则在单例中保存错误信息,如果有cancel func(使用) 则调用取消信号。
  • Wait:等待所有启动的g都完成,如果有cancel func(使用) 则调用取消信号,返回第一次出现的错误,没有则是nil
  • WithContext(ctx context.Context) (*Group, context.Context):启动一个带有cancel信号的errgroup。返回errgoup.Group实例和ctx。
  • TryGo(f func() error) bool:尝试启动一个g,如果g启动了则返回true。在limit还未达到时,则可以启动g
  • SetLimit(n int):设置最大可启动的g数量,如果n为负数,则无数量限制。有启动的g,在重新设置n时则会panic
  • done:私有方法,如果设置了limit,则从channel减少一个数量,然后调用底层的wg.Done

三、深度理解

官方介绍中说的,错误传播,应该就是单例模式存储第一次出现的错我,在调用Wait后会返回。那么如果理解上下文取消。一个g出错会取消其他g嘛?用以下代码进行测试


func GetGoid() int64 {
	var (
		buf [64]byte
		n   = runtime.Stack(buf[:], false)
		stk = strings.TrimPrefix(string(buf[:n]), "goroutine")
	)

	idField := strings.Fields(stk)[0]
	id, err := strconv.Atoi(idField)
	if err != nil {
		panic(fmt.Errorf("can not get goroutine id: %v", err))
	}

	return int64(id)
}

var (
	datarange = []string{"a", "b", "c"}
	randIndex = rand.Int31n(10)
)

func calc(index int, val string) (string, error) {
	if randIndex == int32(index) {
		return "", errors.New("invalid index")
	}

	return val, nil
}

func TestErrGroupNoCtx(t *testing.T) {
	var wg errgroup.Group

	result := make(map[string]bool)
	var mu sync.Mutex

	for i, v := range datarange {
		index, val := i, v
		wg.Go(func() error {
			gid := GetGoid()

			data, err := calc(index, val)
			if err != nil {
				return fmt.Errorf("在g: %d报错, %s\n", gid, err)
			}

			fmt.Printf("[%s] 执行: %d\n", data, gid)
			mu.Lock()
			result[data] = true
			mu.Unlock()

			fmt.Printf("正常退出: %d\n", GetGoid())

			return nil
		})
	}

	if err := wg.Wait(); err != nil {
		fmt.Println(err)
	}

	fmt.Println("运行结束", result)

	// first nil err
	_, ok := result[datarange[randIndex]]
	assert.Equal(t, ok, false)
}

运行后输出

[a] 执行: 35
正常退出: 35
[c] 执行: 37
正常退出: 37
在g: 36报错, invalid index

运行结束 map[a:true c:true]
PASS

可以看到,其中b报错了,但是其他两个还是正常执行了,所以说,一个g报错,其他g并不会被杀死,Wait会等待所有g执行完在返回err。

在看WithContext,为什要用带cancel的ctx呢,看包内Go也只是在某个错误中调用了取消信号,怎么会取消其他的呢?这里返回到ctx有什么用?
为什么说可以取消其他g呢?其实就是要调用方来自己负责,每个各自的 goroutine 所有者必须处理取消信号。


var (
	datarange = []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}
)


func TestErrGroupWithCtx(t *testing.T) {
	wg, ctx := errgroup.WithContext(context.Background())
	result := make(map[string]bool)
	var mu sync.Mutex
	for i, v := range datarange {
		index, val := i, v
		wg.Go(func() error {
			gid := GetGoid()
			select {
			case <-ctx.Done():
				fmt.Printf("goroutine: %d 未执行,msg: %s\n", gid, ctx.Err())
				return nil
			default:

			}
			data, err := calc(index, val)
			if err != nil {
				return fmt.Errorf("在g: %d报错, %s\n", gid, err)
			}

			fmt.Printf("[%s] 执行: %d\n", data, gid)
			mu.Lock()
			result[data] = true
			mu.Unlock()

			fmt.Printf("正常退出: %d\n", gid)

			return nil
		})
	}

	if err := wg.Wait(); err != nil {
		fmt.Println(err)
	}

	fmt.Println("运行结束", result)

	// first nil err
	_, ok := result[datarange[randIndex]]
	assert.Equal(t, ok, false)
}

执行如下命令

go test -v errgroup_test.go --count=1 -run=TestErrGroupWithCtx

=== RUN   TestErrGroupWithCtx
[a] 执行: 7
[j] 执行: 16
正常退出: 7
正常退出: 16
[f] 执行: 12
正常退出: 12
[g] 执行: 13
正常退出: 13
[h] 执行: 14
[i] 执行: 15
正常退出: 15
正常退出: 14
[c] 执行: 9
正常退出: 9
goroutine: 10 未执行,msg: context canceled
goroutine: 11 未执行,msg: context canceled
在g: 8报错, invalid index

运行结束 map[a:true c:true f:true g:true h:true i:true j:true]
--- PASS: TestErrGroupWithCtx (0.00s)
PASS
ok      command-line-arguments  0.262s

可以看到,在出错前,启动了的g都能正常执行,在出现错误后,调用Go启动的g就无法正常执行了,关键还是看使用方如何处理cancel这个信号。

**针对同步,官方有个案例,可以看 ** example-Group-Parallel
可以理解为用逻辑实现的并发的获取同步数据

四、总结

  • Group: 启动多个 goroutine,Wait 会一直等到所有 g 执行完,然后抛出第一个报错 g 的 err 内容,报错并不会停止其他g
  • WithContext: 带有 cancel context的group,同样 Wait 会等待所有 g 执行完,然后抛出第一个报错 g 的 err 内容,报错并不会停止其他 g,文档中说到的The first call to return a non-nil error cancels the group 意思是 cancel group 持有的 ctx,而对于启动 g 所有者来说,要自己处理 ctx 的取消信号,并非errgoup会杀死其他 g。

其实我们使用errgroup主要就是因为, Group 代替 sync.WaitGroup 简化了 goroutine 的计数和错误处理。

常见的坑:

errgroup for循环里千万别忘了 i, x := i, x,以前用 sync.Waitgroup 的时候都是 go func 手动给闭包传参解决这个问题的,errgroup 的.Go没法这么干,需要重新声明变量,获取实际的值

额外知识点

多任务时,才需要考虑,同步、异步、并发、并行

  • 同步:多任务开始执行,任务A、B、C全部执行完成后才算结束
  • 异步:多任务开始执行,只需要主任务 A 执行完成就算结束,主任务执行的时候,可以同时执行异步任务 B、C,主任务 A 可以不需要等待异步任务 B、C 的结果。
  • 并发:多个任务在同一个时间段内同时执行,如果是单核心计算机,CPU 会不断地切换任务来完成并发操作。
  • 并行:多任务在同一个时刻同时执行,计算机需要有多核心,每个核心独立执行一个任务,多个任务同时执行,不需要切换。

并发、并行,是逻辑结构的设计模式。
同步、异步,是逻辑调用方式。串行是同步的一种实现,就是没有并发,所有任务一个一个执行完成。
并发、并行是异步的 2 种实现方式

五、参考

  1. how-to-exit-when-the-first-error-occurs-for-one-of-the-goroutines-within-a-wait

标签:err,errgroup,源码,func,error,go,执行,true
From: https://www.cnblogs.com/panlq/p/17372144.html

相关文章

  • JUC并发编程原理精讲(源码分析)
    1.JUC前言知识JUC即java.util.concurrent涉及三个包:java.util.concurrentjava.util.concurrent.atomicjava.util.concurrent.locks普通的线程代码:ThreadRunnable没有返回值、效率相比入Callable相对较低!Callable有返回值!【工作常用】1.1进程和线程进程:是......
  • 互联网医院系统源码:数据安全与隐私保护问题如何解决?
    当下,互联网医院系统源码已经走进了很多人的视野中,它的作用和好处小编就不用强调了,今天我们来聊另一个话题——隐私与数据安全。在智慧医疗行业,安全问题更是重中之重,这也自然而然成为了老生常谈的一个问题。本文小编将从互联网医院系统源码的数据安全与隐私保护的意义、当前面临的挑......
  • Django--orm介绍
    djangoORM简介"""ORM:对象关系映射"""orm目的就是为了能够让不懂SQL语句的人,通过python面向对象的知识点也能够轻松自如的操作数据库类》》》表对象 》》》 表里面的数据对象点属性》》》字段对应的值#缺点:sql封装死了,有......
  • Django如何更换默认的数据库?
    问题:Django默认的sqlite3不好用。如何更换呢?解决:第一步:配置文件在setting中--更改databaseDATABASES={#'default':{#'ENGINE':'django.db.backends.sqlite3',#'NAME':BASE_DIR/'db.sqlite3',#......
  • springboot与mongodb之事务管理(二)
    一、事务说明1、在4.0版本中,MongoDB支持副本集上的多文档事务,分片集群是不支持事务的,会报以下异常TransactionsarenotsupportedbytheMongoDBclustertowhichthisclientisconnected2、在版本4.2中,MongoDB引入了分布式事务,在副本集或分片集群上都是支持事务的。3......
  • springboot与mongodb之整合(一)
    一、添加maven依赖<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-mongodb</artifactId><version>2.6.7</version></dependency>二、配置properties文件1、mongodb无......
  • django-channel 配置 channel layer 添加redis的账号和密码
    最近公司要使用django-channel搭建socket, 文档:https://channels.readthedocs.io/en/stable/introduction.html文档里面并没有写如果redis有账号和密码的话,怎么配置。配置方法:https://github.com/django/channels/issues/164#issuecomment-220513297如下:CHANNEL_LAYERS......
  • [ubuntu]安装配置go
    系统环境ubuntu18.04安装配置安装Go语言sudoapt-getupdatesudoapt-get-yupgradewgethttps://storage.googleapis.com/golang/go1.7.linux-amd64.tar.gzsudotar-xvfgo1.7.linux-amd64.tar.gzsudomvgo/usr/local设置环境vim/etc/profileexportGORO......
  • golang基础知识
    一golang基础知识Go(又称Golang)是Google的RobertGriesemer,RobPike及KenThompson开发的一种计算机编程语言语言。设计初衷Go语言是谷歌推出的一种的编程语言,可以在不损失应用程序性能的情况下降低代码的复杂性。谷歌首席软件工程师罗布派克(RobPike)说:我们之所以开发......
  • 在线直播系统源码,默认倒计时,自定义输入时间倒计时
    在线直播系统源码,默认倒计时,自定义输入时间倒计时html部分代码 <divid="app">  <inputtype="num"v-model="time">  <inputtype="button" @click="click_input(time)"value="点击">  <div>{{get_cod......