首页 > 其他分享 >golang开发:go并发的建议(完)

golang开发:go并发的建议(完)

时间:2022-10-10 07:11:15浏览次数:51  
标签:异步 ch goroutine golang 并发 fanout func go options

上次说了一下Go语言布道师 Dave Cheney对Go并发的建议,个人觉得最重要的一条,这次主要想说一下这个。
8.3. Never start a goroutine without knowning when it will stop(永远不要在不知道何时停止的情况下启动 goroutine)

我们的需求

我这边当时有个需求是这样的,我们有个考试系统的,每次学员答完试卷去检查一下这次交卷是否是这次考试的最后一份试卷,如果是最后一份试卷的话,需要计算这次考试的总成绩,生成考试的学习报告,当然了,如果不是最后一份试卷的话啥也不干。
生成试卷和报告是必须要生成的,不能出现考完试了没有总成绩和总报告。
接到这个需求的时候,我首先想到的是使用golang的goroutine去异步算出成绩生成报告。然后写代码就是这样的。

go createReport()

这不刚好是8.3 永远不要这样写的建议么?
然后觉得应该写一个管理goroutine异步执行任务的类库,创建执行销毁都由这个管理工具去执行。准备写的时候发现B站的代码里有一个这样的类库,异步执行的类库。

B站的类库

B站代码里面异步任务是这个文件
openbilibili-go-common-master/library/sync/pipeline/fanout/fanout.go

var (
	// ErrFull chan full.
	ErrFull   = errors.New("fanout: chan full")
	stats     = prom.BusinessInfoCount
	traceTags = []trace.Tag{
		trace.Tag{Key: trace.TagSpanKind, Value: "background"},
		trace.Tag{Key: trace.TagComponent, Value: "sync/pipeline/fanout"},
	}
)

type options struct {
	worker int
	buffer int
}

// Option fanout option
type Option func(*options)

// Worker specifies the worker of fanout
func Worker(n int) Option {
	if n <= 0 {
		panic("fanout: worker should > 0")
	}
	return func(o *options) {
		o.worker = n
	}
}

// Buffer specifies the buffer of fanout
func Buffer(n int) Option {
	if n <= 0 {
		panic("fanout: buffer should > 0")
	}
	return func(o *options) {
		o.buffer = n
	}
}

type item struct {
	f   func(c context.Context)
	ctx context.Context
}

// Fanout async consume data from chan.
type Fanout struct {
	name    string
	ch      chan item
	options *options
	waiter  sync.WaitGroup

	ctx    context.Context
	cancel func()
}

// New new a fanout struct.
func New(name string, opts ...Option) *Fanout {
	if name == "" {
		name = "fanout"
	}
	o := &options{
		worker: 1,
		buffer: 1024,
	}
	for _, op := range opts {
		op(o)
	}
	c := &Fanout{
		ch:      make(chan item, o.buffer),
		name:    name,
		options: o,
	}
	c.ctx, c.cancel = context.WithCancel(context.Background())
	c.waiter.Add(o.worker)
	for i := 0; i < o.worker; i++ {
		go c.proc()
	}
	return c
}

func (c *Fanout) proc() {
	defer c.waiter.Done()
	for {
		select {
		case t := <-c.ch:
			wrapFunc(t.f)(t.ctx)
			stats.State(c.name+"_channel", int64(len(c.ch)))
		case <-c.ctx.Done():
			return
		}
	}
}

func wrapFunc(f func(c context.Context)) (res func(context.Context)) {
	res = func(ctx context.Context) {
		defer func() {
			if r := recover(); r != nil {
				buf := make([]byte, 64*1024)
				buf = buf[:runtime.Stack(buf, false)]
				log.Error("panic in fanout proc, err: %s, stack: %s", r, buf)
			}
		}()
		f(ctx)
		if tr, ok := trace.FromContext(ctx); ok {
			tr.Finish(nil)
		}
	}
	return
}

// Do save a callback func.
func (c *Fanout) Do(ctx context.Context, f func(ctx context.Context)) (err error) {
	if f == nil || c.ctx.Err() != nil {
		return c.ctx.Err()
	}
	nakeCtx := metadata.WithContext(ctx)
	if tr, ok := trace.FromContext(ctx); ok {
		tr = tr.Fork("", "Fanout:Do").SetTag(traceTags...)
		nakeCtx = trace.NewContext(nakeCtx, tr)
	}
	select {
	case c.ch <- item{f: f, ctx: nakeCtx}:
	default:
		err = ErrFull
	}
	stats.State(c.name+"_channel", int64(len(c.ch)))
	return
}

// Close close fanout
func (c *Fanout) Close() error {
	if err := c.ctx.Err(); err != nil {
		return err
	}
	c.cancel()
	c.waiter.Wait()
	return nil
}

使用方法
	ca := New("cache", Worker(100), Buffer(1024))
	var run bool
	ca.Do(context.Background(), func(c context.Context) {
		run = true
	})

主要分析一下这个类库,以后自己写或者使用的时候就能得心应手了,而且这个类库也算是创建goroutine,通过channel通信的经典写法吧
在这里插入图片描述
1.New方法调用的时候,会创建buffer个ch channel,worker个goroutine.由于ch是空的,worker个goroutine会阻塞住,一直等待有程序往ch里面写入数据
2.Do函数一但被调用,会传入异步任务的func,func就会写入到ch里面了,goroutine就可以从ch里面读取到数据,并且执行这个数据里面的func
践行了这个原则
不要通过共享内存来通信,要通过通信来共享内存

有个需要注意的点,就Do函数在执行代码是这样的
在这里插入图片描述
代码里面可以看到在c.ch 写入数据的时候,如果超过c.ch的长度(测试代码里面是1024)就报错返回了,这样就不能保证每个异步任务都能稳定执行了,这样的结果就是,如果程序处理慢或者异步任务数量比较多的话(超过1024),异步任务就无法完成。当然了,我们也可以修改代码改成等待ch的里面数据被goroutine处理的小于1024了,也会执行,这样就变成一个不可控的程序了,如果有3000个异步任务没人知道执行完成需要多长时间,然后我们程序如果重启的话,是等待它完成重启还是强制重启,等待完成不知道需要等待多长时间,强制重启就无法保证任务能够全部完成。

最终方案

为了一定能够在任何异常情况算出分数和生成报告,最后使用消息队列做了这件事,发送完成答卷的消息,接收到完成答卷的消息之后算出分数生成报告。做完之后虽然保证了可靠性,但是觉得自己发消息自己收消息确实也很别扭。
不知道其他童鞋有没有更好的更合理的方案。

标签:异步,ch,goroutine,golang,并发,fanout,func,go,options
From: https://www.cnblogs.com/feixiangmanon/p/16774317.html

相关文章

  • 1、冒泡排序——Go语言版
    前情提示Go语言学习者,文章若有不妥之处,感谢指正。本文参考:https://www.runoob.com/w3cnote/ten-sorting-algorithm.html关于排序算法相关的Golang代码,为了便于下载和......
  • GoLand 和 Pycharm的 快捷键设置与常用插件
    GoLand插件Gopher美化进度条,让等待更优雅。CodeGlancepro旁边浏览框。快捷键设置删除行:ctrl+L重新格式化代码ctrl+K开始新行ctrl+enter终端:......
  • Go语言编程 pdf
    高清扫描版下载链接:https://pan.baidu.com/s/14cOX1SocrtvZI_PbqK-nhw点击这里获取提取码 ......
  • Go语言核心编程 pdf
    高清扫描版下载链接:https://pan.baidu.com/s/1EV1O5Ff1yctrx4cIonZYcQ点击这里获取提取码 ......
  • Django-debug-toolbar的使用
    Django-debug-toolbar,该工具为我们提供了更加丰富的调试信息,如sql语句,信号,缓存等数据查询。install: pipinstalldjango-debug-toolbar usage:基本配置:1、先......
  • @MVCC多版本并发控制
    文章目录​​innodb多版本并发控制原理​​​​一、MVCC简介​​​​二、实现原理​​​​1)SELECT​​​​2)INSERT​​​​3)DELETE​​​​4)UPDATE​​​​三、M......
  • Go的接口
    理解Go的接口在Go中其实更好理解接口的定义,接口就是对一类结构体的规范,属于这个接口的结构体都实现了一些方法但是在Go中其实是一种隐式的方法,并没有像Java一样用impleme......
  • Go Mutex 源码详解
    前言在上一篇文章中,我们一起学习了如何使用​​Go​​​中的互斥锁​​Mutex​​​,那么本篇文章,我们就一起来探究下​​Mutex​​底层是如何实现的,知其然,更要知其所以......
  • [Algorithm] DP - Min Number of Jumps
    You'regivenanon-emptyarrayofpositiveintegerswhereeachintegerrepresentsthemaximumnumberofstepsyoucantakeforwardinthearray.Forexa......
  • 分享Go书籍-《Go Web编程》
    大家好,我是沙漠尽头的狼。最近几天在看一本Go的书籍,看了100来页,感觉不错,分享给大家。书籍基本信息书籍信息:书名:GoWeb编程作者:(新加坡)郑兆雄(SauSheongChang)著;黄健......