简介
看到一个有意思的库:
SingleFlight是Go语言提供的一个扩展包。作用是当有多个goroutine同时调用同一个函数的时候,只允许一个goroutine去调用这个函数,等到这个调用的goroutine返回结果的时候,再把结果返回给这几个同时调用的goroutine,这样可以减少并发调用的数量。
Singleflight是以阻塞读的方式来控制向下游请求的并发量的,在第一个goroutine请求没有返回前,所有的请求都将被阻塞。极端情况下可能导致我们的程序hang住,由于连锁反应甚至导致我们整个系统挂掉。
可以传递ctx,制定超时时间,避免线程的长时间阻塞。
假设第一个线程超时了,后续只要有结果还是可以正确返回值的。
实现原理
Singleflight使用互斥锁Muext和Map来实现,Mutext用来保证并发时的读写安全,Map用来保存同一个key的正在处理(in flight)的请求。
type Group struct { mu sync.Mutex // protects m m map[string]*call // lazily initialized }
同时,Singleflight定义了一个call对象,call代表了正在执行的fn函数的请求或者是已经执行完成的请求。
// call is an in-flight or completed singleflight.Do call type call struct { wg sync.WaitGroup // These fields are written once before the WaitGroup is done // and are only read after the WaitGroup is done. val interface{} err error // These fields are read and written with the singleflight // mutex held before the WaitGroup is done, and are read but // not written after the WaitGroup is done. dups int chans []chan<- Result }
主要看下Do方法,DoChan方法和Do方法类似
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) { g.mu.Lock() if g.m == nil { g.m = make(map[string]*call) } if c, ok := g.m[key]; ok { // 存在相同的key c.dups++ g.mu.Unlock() c.wg.Wait() // 等待这个key的第一个调用完成 // ... return c.val, c.err, true // 复用第一个key的请求结果 } c := new(call) // 第一个调用创建一个call c.wg.Add(1) g.m[key] = c g.mu.Unlock() g.doCall(c, key, fn) // 调用方法 return c.val, c.err, c.dups > 0 }
// doCall方法会实际的执行函数fn func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) { defer func() { // ... g.mu.Lock() defer g.mu.Unlock() c.wg.Done() // 阻塞等待完成 if g.m[key] == c { delete(g.m, key) // 调用完成删除这个key } // ... }() func() { // ... c.val, c.err = fn() // 真正调用,结果赋值给call // ... }() }
应用场景
在缓存击穿的时候有用,假设大量的请求进来缓存miss,请求击穿到数据库
有的数据当时是热点,可能很快就变成了冷数据。针对类似的场景是不太合适设置不过期的,这个时候的Singleflight就派上用场了,可以避免大量的请求击穿到数据库,从而保护我们的服务稳定。
伪代码实现:
func getDataSingleFlight(key string) (interface{}, error) { v, err, _ := g.Do(key, func() (interface{}, error) { // 查缓存 data, err := getDataFromCache(key) if err == nil { return data, nil } if errors.Is(err, ErrNotFound) { // 查DB data, err := getDataFromDB(key) if err == nil { setCache(data) // 设置缓存 return data, nil } return nil, err } return nil, err // 缓存出错直接返回,防止穿透到DB }) if err != nil { return nil, err } return v, nil }
标签:return,请求,err,nil,合并,call,key,Singleflight,func From: https://www.cnblogs.com/twh233/p/18198397