首页 > 其他分享 >go并发控制

go并发控制

时间:2022-12-06 13:25:04浏览次数:42  
标签:wg 控制 return nil ErrGroup 并发 func go

并发控制 Golang基础库中已经提供不少并发控制工具,比如Channel、WaitGroup、各种锁等等。   ErrGroup WaitGroup可以等待多个Goroutine执行结束,但很多时候并发执行多个任务,如果其中一个任务出错那么整体失败,需要直接返回,这种情况下我们可以使用ErrGroup   ErrGroup借助封装了WaitGroup、Once以及Context,调用Wait时如果一个任务失败取消Context直接返回,核心逻辑如下   type ErrGroup struct {  ctx context.Context  cancel func()    wg sync.WaitGroup    errOnce sync.Once  err error }   func (g *ErrGroup) Wait() error {  g.wg.Wait()    if g.cancel != nil {   g.cancel()  }    return g.err }   func (g *ErrGroup) Go(f func(ctx context.Context) error) {  g.wg.Add(1)    go func() {   defer g.wg.Done()   if err := f(g.ctx); err != nil {             // 执行失败则运行cancel    g.errOnce.Do(func() {     g.err = err     if g.cancel != nil {      g.cancel()     }    })   }  }() } 控制并发数 借助有缓冲的Channel,可以实现控制Goroutine并发数,逻辑如下:   func NewCtrlGroup(number int) *CtrlGroup {  return &CtrlGroup{   ch: make(chan struct{}, number),  } }   type CtrlGroup struct {  ch chan struct{}  wg sync.WaitGroup }   func (g *CtrlGroup) Enter() {  g.ch <- struct{}{} }   func (g *CtrlGroup) Leave() {  <-g.ch }   func (g *CtrlGroup) Go(f func()) {  g.Enter() // 接收到新任务,发送到Channel,如果Channel满需要等待  g.wg.Add(1)    go func() {   defer g.Leave() // 任务结束,取出一个元素   defer g.wg.Done()   f()  }() }   func (g *CtrlGroup) Wait() {  g.wg.Wait() } MapReduce 除了WaitGroup、ErrGroup处理一些简单的并发任务,有时候我们需要执行类似MapReduce的操作,通过Map对数据源并行处理,然后通过Reduce合并结果。在Java、Python中提供了类似功能。   比如实现一个实现一组数据的平方和,利用MapReduce在Golang中实现如下:    num := 1000000    res, err := mapreduce.New(mapreduce.WithWorkers(16)).   From(func(r mapreduce.Writer) error { // 产生数据源    for i := 1; i < num; i++ {     r.Write(i)    }    return nil   }).   Map(func(item any) (any, error) { // 处理数据    v, ok := item.(int)    if !ok {     return nil, fmt.Errorf("invaild type")    }      resp := v * v      return resp, nil   }).   Reduce(func(r mapreduce.Reader) (any, error) { // 合并结果    sum := 0    for {     item, ok := r.Read()     if !ok {      break     }       v, ok := item.(int)     if !ok {      return nil, fmt.Errorf("invaild type")     }       sum += v    }    return sum, nil   }).   Do() 主要逻辑是利用Channel(或者线程安全的队列)将源数据发送到Map的执行Worker中,处理完后再转发到Reduce Goroutine中,通过ErrGroup等待所有Worker执行完成。源码见mapreduce.go。   类似的也可以实现Kubernetes中Ctroller模式,通过队列或者Channel将生产者与消费者解耦,并行处理提高运行速度。   总结 本文总结了Golang的一些有趣的编程模式,例如链式调用、可选配置、并发控制等,通过这些技巧或者手段,可以提高编码的质量,所有代码见gocorex

标签:wg,控制,return,nil,ErrGroup,并发,func,go
From: https://www.cnblogs.com/cheyunhua/p/16954943.html

相关文章