首页 > 其他分享 >Go每日一库之181:conc(并发库)

Go每日一库之181:conc(并发库)

时间:2023-09-29 21:24:35浏览次数:48  
标签:wg WaitGroup conc 一库 181 func error Go

来自公司 sourcegraph 的 conc** (https://github.com/sourcegraph/conc) 并发库,目标是 better structured concurrency for go, 简单的评价一下
每个公司都有类似的轮子,与以往的库比起来,多了泛型,代码写起来更优雅,不需要 interface, 不需要运行时 assert, 性能肯定更好
我们在写通用库和框架的时候,都有一个原则,
并发控制与业务逻辑分离**,背离这个原则肯定做不出通用库

整体介绍

1. WaitGroup 与 Panic

标准库自带 sync.WaitGroup 用于等待 goroutine 运行结束,缺点是我们要处理控制部分

代码里大量的 wg.Addwg.Done 函数,所以一般封装成右侧的库

type WaitGroup struct {
 wg sync.WaitGroup
 pc panics.Catcher
}

// Go spawns a new goroutine in the WaitGroup.
func (h *WaitGroup) Go(f func()) {
 h.wg.Add(1)
 go func() {
  defer h.wg.Done()
  h.pc.Try(f)
 }()
}

但是如何处理 panic 呢?简单的可以在闭包 doSomething 运行时增加一个 safeGo 函数,用于捕捉 recover

原生 Go 要生成大量无用代码,我司 repo 运动式的清理过一波,也遇到过 goroutine 忘写 recover 导致的事故。conc 同时提供 catcher 封装 recover 逻辑,conc.WaitGroup 可以选择 Wait 重新抛出 panic, 也可以 WaitAndRecover 返回捕获到的 panic 堆栈信息

type WaitGroup struct {
 wg sync.WaitGroup
 pc panics.Catcher
}

// Go spawns a new goroutine in the WaitGroup.
func (h *WaitGroup) Go(f func()) {
 h.wg.Add(1)
 go func() {
  defer h.wg.Done()
  h.pc.Try(f)
 }()
}

2. ForEach 与 Map

高级语言很多的基操,在 go 里面很奢侈,只能写很多繁琐代码。conc封装了泛型版本的 iterator 和 mapper

func process(values []int) {
    iter.ForEach(values, handle)
}

func concMap(input []int, f func(int) int) []int {
    return iter.Map(input, f)
}

上面是使用例子,用户只需要写业务函数 handle. 相比 go1.19 前的版本,泛型的引入,使得基础库的编写更游刃有余

// Iterator is also safe for reuse and concurrent use.
type Iterator[T any] struct {
 // MaxGoroutines controls the maximum number of goroutines
 // to use on this Iterator's methods.
 //
 // If unset, MaxGoroutines defaults to runtime.GOMAXPROCS(0).
 MaxGoroutines int
}

MaxGoroutines 默认 GOMAXPROCS 并发处理传参 slice, 也可以自定义,个人认为不合理,默认为 1 最妥

// ForEachIdx is the same as ForEach except it also provides the
// index of the element to the callback.
func ForEachIdx[T any](input []T, f func(int, *T)) { Iterator[T]{}.ForEachIdx(input, f) }

ForEachIdx 在创建 Iterator[T]{} 可以自定义并发度,最终调用 iter.ForEachIdx

// ForEachIdx is the same as ForEach except it also provides the
// index of the element to the callback.
func (iter Iterator[T]) ForEachIdx(input []T, f func(int, *T)) {
  ......
 var idx atomic.Int64
 // Create the task outside the loop to avoid extra closure allocations.
 task := func() {
  i := int(idx.Add(1) - 1)
  for ; i < numInput; i = int(idx.Add(1) - 1) {
   f(i, &input[i])
  }
 }

 var wg conc.WaitGroup
 for i := 0; i < iter.MaxGoroutines; i++ {
  wg.Go(task)
 }
 wg.Wait()
}

ForEachIdx 泛型函数写得非常好,略去部分代码。朴素的实现在 for 循环里创建闭包,传入 idx 参数,然后 wg.Go 去运行。但是这样会产生大量闭包,我司遇到过大量闭包,造成 heap 内存增长很快频繁触发 GC 的性能问题,所以在外层只创建一个闭包,通过 atomic 控制 idx

func Map[T, R any](input []T, f func(*T) R) []R {
 return Mapper[T, R]{}.Map(input, f)
}

func MapErr[T, R any](input []T, f func(*T) (R, error)) ([]R, error) {
 return Mapper[T, R]{}.MapErr(input, f)
}

MapMapErr 也只是对 ForEachIdx 的封装,区别是处理 error

3. 各种 Pool 与 Stream

Pool 用于并发处理,同时 Wait 等待任务结束。相比我司现有 concurrency 库

  • 增加了泛型实现
  • 增加了对 goroutine 的复用
  • 增加并发度设置(我司有,但 conc 实现方式更巧秒)
  • 支持的函数签名更多

先看一下支持的接口

Go(f func())
Go(f func() error) 
Go(f func(ctx context.Context) error)
Go(f func(context.Context) (T, error))
Go(f func() (T, error)) 
Go(f func() T)
Go(f func(context.Context) (T, error))

理论上这一个足够用了,传参 Context, 返回泛型类型与错误。

Wait() ([]T, error) 

这是对应的 Wait 回收函数,返回泛型结果 []T 与错误。具体 Pool 实现由多种组合而来:Pool, ErrorPool, ContextPool, ResultContextPool, ResultPool

func (p *Pool) Go(f func()) {
 p.init()

 if p.limiter == nil {
  // No limit on the number of goroutines.
  select {
  case p.tasks <- f:
   // A goroutine was available to handle the task.
  default:
   // No goroutine was available to handle the task.
   // Spawn a new one and send it the task.
   p.handle.Go(p.worker)
   p.tasks <- f
  }
 }
  ......
}

func (p *Pool) worker() {
 // The only time this matters is if the task panics.
 // This makes it possible to spin up new workers in that case.
 defer p.limiter.release()

 for f := range p.tasks {
  f()
 }
}

复用方式很巧妙,如果处理速度足够快,没必要过多创建 goroutine

Stream 用于并发处理 goroutine, 但是返回结果保持顺序

type Stream struct {
 pool             pool.Pool
 callbackerHandle conc.WaitGroup
 queue            chan callbackCh

 initOnce sync.Once
}

实现很简单,queue 是一个 channel, 类型 callbackCh 同样也是 channel, 在真正派生 goroutine 前按序顺生成 callbackCh 传递结果

Stream 命名很差,容易让人混淆,感觉叫 OrderedResultsPool 更理想,整体非常鸡肋

超时

超时永远是最难处理的问题,目前 conc 库 Wait 函数并没有提供 timeout 传参,这就要求闭包内部必须考滤超时,如果添加 timeout 传参,又涉及 conc 内部库并发问题题

Wait() ([]T, error)

比如这个返回值,内部 append 到 slice 时是有锁的,如果 Wait 提前结束了会发生什么?

[]T 拿到的部分结果只能丢弃,返回给上层 timeout error

Context 框架传递参数

通用库很容易做的臃肿,我司并发库会给闭包产生新的 context, 并继承所需框架层的 metadata, 两种实现无可厚非,这些细节总得要处理

小结

代码量不大,感兴趣的可以看看。没有造轮子的必要,够用就行,这种库写了也没价值

标签:wg,WaitGroup,conc,一库,181,func,error,Go
From: https://www.cnblogs.com/arena/p/17737371.html

相关文章

  • Go每日一库之180:fastcache(协程安全且支持大量数据存储的高性能缓存库)
    fastcache是一个线程安全并且支持大量数据存储的高性能缓存组件库。这是官方Github主页上的项目介绍,和fasthttp名字一样以fast打头,作者对项目代码的自信程度可见一斑。此外该库的核心代码非常轻量,笔者本着学习的目的分析下内部的代码实现。基准测试官方给出了fastca......
  • Go每日一库之179:env(将系统环境变量解析到结构体的库)
    该包的实现是基于标准库os/env包中的相关函数(比如Getenv)来获取系统的环境变量的。获取到环境变量值后,再通过结构体中的tag,将值映射到对应的结构体字段上。使用示例下面是将系统的一些环境变量映射到config结构体的示例。如下:我们可以像以下这样运行该代码:$PRODUCTION=trueHO......
  • Go每日一库之178:chromedp(一个基于Chrome DevTools协议的库,支持数据采集、截取网页长
    该库提供了一种简单、高效、可靠的方式来控制Chrome浏览器进行自动化测试和爬取数据。项目地址:https://github.com/chromedp/chromedp它可以模拟用户在浏览器中执行各种操作,如点击、输入文本、截取网页长图、将网页内容转换成pdf文档、下载图片等,从而获取到需要采集的数据。基......
  • Go每日一库之176:filetype(文件类型鉴别)
    filetype(https://github.com/h2non/filetype)是一个Go语言的第三方库,可以根据文件的魔数(magicnumbers)签名来推断文件的类型和MIME类型。它支持多种常见的文件类型,包括图片、视频、音频、文档、压缩包等。它还提供了一些便捷的函数和类型匹配器,可以方便地对文件进行分类和筛选......
  • Go每日一库之174:delve (Go 调试工具)
    简介Delve 用来调试 Go 语言开发的程序,该工具的目标是为 Go 语言提供一个简单、功能齐全的调试工具。为什么不推荐gdb• gdb对Go的调试支持是通过一个python脚本文件 src/runtime/runtime-gdb.py 扩展的,功能有限• gdb只能做到最基本的变量打印,却理解不了go......
  • Go每日一库之173:Pie (高性能、类型安全的slice操作库)
    在Go语言中,对slice和map是我们最常用的数据结构。比如,计算两个切片的交集、差集;判断切片中的元素是否都满足某个条件的等。我推荐大家使用这个包:[elliotchance/pie](https://github.com/elliotchance/pie)。该包封装了对切片和map的常用操作,能满足工作中的大部分需求。比如计算......
  • Go每日一库之172:go-prompt
    简介受python提示工具包的启发,在Go中构建强大的交互式提示一、代码示例packagemainimport( "fmt" "github.com/c-bata/go-prompt")funccompleter(dprompt.Document)[]prompt.Suggest{ s:=[]prompt.Suggest{ {Text:"users",Description:"Store......
  • Go每日一库之133:lo(基于泛型的 Golang lodash 库)
    近日,Go核心开发团队终于宣布了Go1.18正式版本的发布!这是一个大家期待很久的版本!Go1.18包含大量新功能:模糊测试、性能改进、工作区等,以及Go语言开源以来最大的一次语法特性变更——支持泛型!支持泛型后,我们便不再需要写如下冗余的代码:现在只需要简单的一行即可:funcMi......
  • Go每日一库之132:wasm与tinygo
    WASM的概念,这几年还是挺火的,新的语言,比如Rust、Go、Swift等,都对WASM提供支持。相比之下,Go语言的简单性,使得对WASM的支持,使用起来也较简单。本文是目前公开资料中为数不多较完整的教程,希望能对你有帮助。WASM是什么标题说:“Golang中的Wasm太棒了。”,但请用几句话来说......
  • Go每日一库之131:caddy(轻量web服务器)
    一直以来,我都是使用Nginx作为Web服务器,但是配置可以说是非常麻烦了。每次我要新开一个域名,都要先使用acme.sh签发SSL证书,然后再写配置,大概要花上5分钟的时间。曾经想过写个脚本自动完成这些工作,但是苦于对Linux的了解不多,也就作罢了。最近看到了Caddy,一个用Go写的......