首页 > 编程语言 >Go每日一库之64:ants(源码赏析)

Go每日一库之64:ants(源码赏析)

时间:2023-09-28 14:38:25浏览次数:57  
标签:return goroutine worker ants 源码 wq goWorker Pool 赏析

简介

继上一篇Go 每日一库之 ants,这篇文章我们来一起看看ants的源码。

Pool

通过上篇文章,我们知道ants池有两种创建方式:

  • p, _ := ants.NewPool(cap):这种方式创建的池子对象需要调用p.Submit(task)提交任务,任务是一个无参数无返回值的函数;
  • p, _ := ants.NewPoolWithFunc(cap, func(interface{})):这种方式创建的池子对象需要指定池函数,并且使用p.Invoke(arg)调用池函数。arg就是传给池函数func(interface{})的参数。

ants中这两种池子使用不同的结构来表示:ants.Poolants.PoolWithFunc。我们先来介绍PoolPoolWithFunc结构也是类似的,介绍完Pool之后,我们再简单比较一下它们。

Pool结构定义在文件pool.go中:

// src/github.com/panjf2000/ants/pool.go
type Pool struct {
  capacity int32
  running int32
  workers workerArray
  state int32
  lock sync.Locker
  cond *sync.Cond
  workerCache sync.Pool
  blockingNum int
  options *Options
}

各个字段含义如下:

  • capacity:池容量,表示ants最多能创建的 goroutine 数量。如果为负数,表示容量无限制;
  • running:已经创建的 worker goroutine 的数量;
  • workers:存放一组 worker 对象,workerArray只是一个接口,表示一个 worker 容器,后面详述;
  • state:记录池子当前的状态,是否已关闭(CLOSED);
  • lock:锁。ants自己实现了一个自旋锁。用于同步并发操作;
  • cond:条件变量。处理任务等待和唤醒;
  • workerCache:使用sync.Pool对象池管理和创建worker对象,提升性能;
  • blockingNum:阻塞等待的任务数量;
  • options:选项。上一篇文章已经详细介绍过了。

这里明确一个概念,ants中为每个任务都是由 worker 对象来处理的,每个 worker 对象会对应创建一个 goroutine 来处理任务。ants中使用goWorker表示 worker:

// src/github.com/panjf2000/ants/worker.go
type goWorker struct {
  pool *Pool
  task chan func()
  recycleTime time.Time
}

后文详细介绍这一块内容,现在我们只需要知道Pool.workers字段就是存放goWorker对象的容器。

Pool创建

创建Pool对象需调用ants.NewPool(size, options)函数。省略了一些处理选项的代码,最终代码如下:

// src/github.com/panjf2000/ants/pool.go
func NewPool(size int, options ...Option) (*Pool, error) {
  // ...
  p := &Pool{
    capacity: int32(size),
    lock:     internal.NewSpinLock(),
    options:  opts,
  }
  p.workerCache.New = func() interface{} {
    return &goWorker{
      pool: p,
      task: make(chan func(), workerChanCap),
    }
  }
  if p.options.PreAlloc {
    if size == -1 {
      return nil, ErrInvalidPreAllocSize
    }
    p.workers = newWorkerArray(loopQueueType, size)
  } else {
    p.workers = newWorkerArray(stackType, 0)
  }

  p.cond = sync.NewCond(p.lock)

  go p.purgePeriodically()
  return p, nil
}

代码不难理解:

  • 创建Pool对象,设置容量,创建一个自旋锁来初始化lock字段,设置选项;
  • 设置workerCache这个sync.Pool对象的New方法,在调用sync.Pool对象的Get()方法时,如果它没有缓存的 worker 对象了,则调用这个方法创建一个;
  • 根据是否设置了预分配选项,创建不同类型的 workers;
  • 使用p.lock锁创建一个条件变量;
  • 最后启动一个 goroutine 用于定期清理过期的 worker。

Pool.workers字段为workerArray类型,这实际上是一个接口,表示一个 worker 容器:

type workerArray interface {
  len() int
  isEmpty() bool
  insert(worker *goWorker) error
  detach() *goWorker
  retrieveExpiry(duration time.Duration) []*goWorker
  reset()
}

每个方法从名字上很好理解含义:

  • len() int:worker 数量;
  • isEmpty() bool:worker 数量是否为 0;
  • insert(worker *goWorker) error:goroutine 任务执行结束后,将相应的 worker 放回workerArray中;
  • detach() *goWorker:从workerArray中取出一个 worker;
  • retrieveExpiry(duration time.Duration) []*goWorker:取出所有的过期 worker;
  • reset():重置容器。

workerArrayants中有两种实现,即workerStackloopQueue

workerStack

我们先来介绍一下workerStack,它位于文件worker_stack.go中:

// src/github.com/panjf2000/ants/worker_stack.go
type workerStack struct {
  items  []*goWorker
  expiry []*goWorker
  size   int
}

func newWorkerStack(size int) *workerStack {
  return &workerStack{
    items: make([]*goWorker, 0, size),
    size:  size,
  }
}
  • items:空闲的worker
  • expiry:过期的worker

goroutine 完成任务之后,Pool池会将相应的 worker 放回workerStack,调用workerStack.insert()直接appenditems中即可:

func (wq *workerStack) insert(worker *goWorker) error {
  wq.items = append(wq.items, worker)
  return nil
}

新任务到来时,会调用workerStack.detach()从容器中取出一个空闲的 worker:

func (wq *workerStack) detach() *goWorker {
  l := wq.len()
  if l == 0 {
    return nil
  }

  w := wq.items[l-1]
  wq.items[l-1] = nil // avoid memory leaks
  wq.items = wq.items[:l-1]

  return w
}

这里总是返回最后一个 worker,每次insert()也是append到最后,符合栈后进先出的特点,故称为workerStack

这里有一个细节,由于切片的底层结构是数组,只要有引用数组的指针,数组中的元素就不会释放。这里取出切片最后一个元素后,将对应数组元素的指针设置为nil,主动释放这个引用。

上面说过新建Pool对象时会创建一个 goroutine 定期检查和清理过期的 worker。通过调用workerArray.retrieveExpiry()获取过期的 worker 列表。workerStack实现如下:

func (wq *workerStack) retrieveExpiry(duration time.Duration) []*goWorker {
  n := wq.len()
  if n == 0 {
    return nil
  }

  expiryTime := time.Now().Add(-duration)
  index := wq.binarySearch(0, n-1, expiryTime)

  wq.expiry = wq.expiry[:0]
  if index != -1 {
    wq.expiry = append(wq.expiry, wq.items[:index+1]...)
    m := copy(wq.items, wq.items[index+1:])
    for i := m; i < n; i++ {
      wq.items[i] = nil
    }
    wq.items = wq.items[:m]
  }
  return wq.expiry
}

实现使用二分查找法找到已过期的最近一个 worker。由于过期时间是按照 goroutine 执行任务后的空闲时间计算的,而workerStack.insert()入队顺序决定了,它们的过期时间是从早到晚的。所以可以使用二分查找:

func (wq *workerStack) binarySearch(l, r int, expiryTime time.Time) int {
  var mid int
  for l <= r {
    mid = (l + r) / 2
    if expiryTime.Before(wq.items[mid].recycleTime) {
      r = mid - 1
    } else {
      l = mid + 1
    }
  }
  return r
}

二分查找的是最近过期的 worker,即将过期的 worker 的前一个。它和在它之前的 worker 已经全部过期了。

如果找到索引index,将items从开头到index(包括)的所有 worker 复制到expiry字段中。然后将index之后的所有未过期 worker 复制到切片头部,这里使用了copy函数。copy返回实际复制的数量,即未过期的 worker 数量m。然后将切片itemsm开始所有的元素置为nil,避免内存泄漏,因为它们已经被复制到头部了。最后裁剪items切片,返回过期 worker 切片。

loopQueue

loopQueue实现基于循环队列,结构定义在文件worker_loop_queue中:

type loopQueue struct {
  items  []*goWorker
  expiry []*goWorker
  head   int
  tail   int
  size   int
  isFull bool
}

func newWorkerLoopQueue(size int) *loopQueue {
  return &loopQueue{
    items: make([]*goWorker, size),
    size:  size,
  }
}

由于是循环队列,这里先创建好了一个长度为size的切片。循环队列有一个队列头指针head,指向第一个有元素的位置,一个队列尾指针tail,指向下一个可以存放元素的位置。所以一开始状态如下:

tail处添加元素,添加后tail指针后移。在head处取出元素,取出后head指针也后移。进行一段时间操作后,队列状态如下:

headtail指针到队列尾了,需要回绕。所以可能出现这种情况:

tail指针赶上head指针了,说明队列就满了:

head指针赶上tail指针了,队列再次为空:

根据示意图,我们再来看loopQueue的操作方法就很简单了。

由于headtail相等的情况有可能是队列空,也有可能是队列满,所以loopQueue中增加一个isFull字段以示区分。goroutine 完成任务之后,会将对应的 worker 对象放回loopQueue,执行的是insert()方法:

func (wq *loopQueue) insert(worker *goWorker) error {
  if wq.size == 0 {
    return errQueueIsReleased
  }

  if wq.isFull {
    return errQueueIsFull
  }
  wq.items[wq.tail] = worker
  wq.tail++

  if wq.tail == wq.size {
    wq.tail = 0
  }
  if wq.tail == wq.head {
    wq.isFull = true
  }

  return nil
}

这个方法执行的就是循环队列的入队流程,注意如果插入后tail==head了,说明队列满了,设置isFull字段。

新任务到来调用loopQueeue.detach()方法获取一个空闲的 worker 结构:

func (wq *loopQueue) detach() *goWorker {
  if wq.isEmpty() {
    return nil
  }

  w := wq.items[wq.head]
  wq.items[wq.head] = nil
  wq.head++
  if wq.head == wq.size {
    wq.head = 0
  }
  wq.isFull = false

  return w
}

这个方法对应的是循环队列的出队流程,注意每次出队后,队列肯定不满了,isFull要重置为false

workerStack结构一样,先入的 worker 对象过期时间早,后入的晚,获取过期 worker 的方法与workerStack中类似,只是没有使用二分查找了。这里就不赘述了。

再看Pool创建

介绍完两种workerArray的实现之后,再来看Pool的创建函数中workers字段的设置:

if p.options.PreAlloc {
  if size == -1 {
    return nil, ErrInvalidPreAllocSize
  }
  p.workers = newWorkerArray(loopQueueType, size)
} else {
  p.workers = newWorkerArray(stackType, 0)
}

newWorkerArray()定义在文件worker_array.go中:

type arrayType int

const (
  stackType arrayType = 1 << iota
  loopQueueType
)

func newWorkerArray(aType arrayType, size int) workerArray {
  switch aType {
  case stackType:
    return newWorkerStack(size)
  case loopQueueType:
    return newWorkerLoopQueue(size)
  default:
    return newWorkerStack(size)
  }
}

即如果设置了预分配选项,就采用loopQueue结构。否则就采用stack的结构。

worker 结构

介绍完Pool的创建和结构,我们来看看 worker 的结构。在ants中 worker 用结构体goWorker表示,定义在文件worker.go中。它的结构非常简单:

// src/github.com/panjf2000/ants/worker.go
type goWorker struct {
  pool *Pool
  task chan func()
  recycleTime time.Time
}

具体字段含义很明显:

  • pool:持有 goroutine 池的引用;
  • task:任务通道,通过这个通道将类型为func ()的函数作为任务发送给goWorker
  • recyleTime:这个字段记录goWorker什么时候被放回池中(即什么时候开始空闲)。其完成任务后,在将其放回 goroutine 池的时候设置。

goWorker创建时会调用run()方法,run()方法中启动一个新 goroutine 处理任务。run()主体流程非常简单:

func (w *goWorker) run() {
  go func() {
    for f := range w.task {
      if f == nil {
        return
      }
      f()
      if ok := w.pool.revertWorker(w); !ok {
        return
      }
    }
  }()
}

这个方法启动一个新的 goroutine,然后不停地从task通道中接收任务,然后执行任务,任务执行完成之后调用池对象的revertWorker()方法将该goWorker对象放回池中,以便下次取出处理新的任务。revertWorker()方法后面会详细分析。

这里注意,实际上for f := range w.task这个循环直到通道task关闭或取出为nil的任务才会终止。所以这个 goroutine 一直在运行,这正是ants高性能的关键所在。每个goWorker只会启动一次 goroutine, 后续重复利用这个 goroutine。goroutine 每次只执行一个任务就会被放回池中。

还有一个细节,如果放回操作失败,则会调用return,这会让 goroutine 运行结束,防止 goroutine 泄漏

这里f == nil为 true 时return,也是一个细节点,我们后面讲池关闭的时候会详细介绍。

下面我们看看run()方法的异常处理:

defer func() {
  w.pool.workerCache.Put(w)
  if p := recover(); p != nil {
    if ph := w.pool.options.PanicHandler; ph != nil {
      ph(p)
    } else {
      w.pool.options.Logger.Printf("worker exits from a panic: %v\n", p)
      var buf [4096]byte
      n := runtime.Stack(buf[:], false)
      w.pool.options.Logger.Printf("worker exits from panic: %s\n", string(buf[:n]))
    }
  }
  w.pool.cond.Signal()
}()

简单来说,就是在defer中通过recover()函数捕获任务执行过程中抛出的panic。这时任务执行失败,goroutine 也结束了。但是goWorker对象还是可以重复利用,所以defer函数一开始调用w.pool.workerCache.Put(w)goWorker对象放回sync.Pool池中。

接着就是处理panic,如果选项中指定了panic处理器,直接调用这个处理器。否则,ants调用选项中设置的Logger记录一些日志,如堆栈,panic信息等。

最后需要调用w.pool.cond.Signal()通知现在有空闲的goWorker了。因为我们实际运行的goWorker数量由于panic少了一个,而池中可能有其他任务在等待处理。

提交任务

接下来,通过提交任务就可以串起整个流程。由上一篇文章我们知道,可以调用池对象的Submit()方法提交任务:

func (p *Pool) Submit(task func()) error {
  if p.IsClosed() {
    return ErrPoolClosed
  }
  var w *goWorker
  if w = p.retrieveWorker(); w == nil {
    return ErrPoolOverload
  }
  w.task <- task
  return nil
}

首先判断池是否已关闭,然后调用retrieveWorker()方法获取一个空闲的 worker,然后将任务task发送到 worker 的任务通道。下面是retrieveWorker()实现:

func (p *Pool) retrieveWorker() (w *goWorker) {
  p.lock.Lock()

  w = p.workers.detach()
  if w != nil {
    p.lock.Unlock()
  } else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
    p.lock.Unlock()
    spawnWorker()
  } else {
    if p.options.Nonblocking {
      p.lock.Unlock()
      return
    }
  Reentry:
    if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks {
      p.lock.Unlock()
      return
    }
    p.blockingNum++
    p.cond.Wait()
    p.blockingNum--
    var nw int
    if nw = p.Running(); nw == 0 {
      p.lock.Unlock()
      if !p.IsClosed() {
        spawnWorker()
      }
      return
    }
    if w = p.workers.detach(); w == nil {
      if nw < capacity {
        p.lock.Unlock()
        spawnWorker()
        return
      }
      goto Reentry
    }

    p.lock.Unlock()
  }
  return
}

这个方法稍微有点复杂,我们一点点来看。首先调用p.workers.detach()获取goWorker对象。p.workersloopQueue或者workerStack对象,它们都实现了detach()方法,前面已经介绍过了。

如果返回了一个goWorker对象,说明有空闲 goroutine,直接返回。

否则,池容量还没用完(即容量大于正在工作的goWorker数量),则调用spawnWorker()新建一个goWorker,执行其run()方法:

spawnWorker := func() {
  w = p.workerCache.Get().(*goWorker)
  w.run()
}

否则,池容量已用完。如果设置了非阻塞选项,则直接返回。否则,如果设置了最大阻塞队列长度上限,且当前阻塞等待的任务数量已经达到这个上限,直接返回。否则,阻塞等待数量 +1,调用p.cond.Wait()等待。

然后goWorker.run()完成一个任务后,调用池的revertWorker()方法放回goWorker

func (p *Pool) revertWorker(worker *goWorker) bool {
  if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() {
    return false
  }
  worker.recycleTime = time.Now()
  p.lock.Lock()

  if p.IsClosed() {
    p.lock.Unlock()
    return false
  }

  err := p.workers.insert(worker)
  if err != nil {
    p.lock.Unlock()
    return false
  }

  p.cond.Signal()
  p.lock.Unlock()
  return true
}

这里设置了goWorkerrecycleTime字段,用于判定过期。然后将goWorker放回池。workersinsert()方法前面也已经分析过了。

接着调用p.cond.Signal()唤醒之前retrieveWorker()方法中的等待。retrieveWorker()方法继续执行,阻塞等待数量 -1,这里判断当前goWorker的数量(也即 goroutine 数量)。如果数量等于 0,很有可能池子刚刚执行了Release()关闭,这时需要判断池是否处于关闭状态,如果是则直接返回。否则,调用spawnWorker()创建一个新的goWorker并执行其run()方法。

如果当前goWorker数量不为 0,则调用p.workers.detach()取出一个空闲的goWorker返回。这个操作有可能失败,因为可能同时有多个 goroutine 在等待,唤醒的时候只有部分 goroutine 能获取到goWorker。如果失败了,其容量还未用完,直接创建新的goWorker,反之重新执行阻塞等待逻辑。

这里有很多加锁和解锁的逻辑,再加上和信号量混在一起很难看明白。其实只需要知道一点就很简单了,那就是p.cond.Wait()内部会将当前 goroutine 挂起,然后解开它持有的锁,即会调用p.lock.Unlock()。这也是为什么revertWorker()p.lock.Lock()加锁能成功的原因。然后p.cond.Signal()p.cond.Broadcast()会唤醒因为p.cond.Wait()而挂起的 goroutine,但是需要Signal()/Broadcast()所在 goroutine 调用解锁方法。而调用p.cond.Wait()的 goroutine 被唤醒之后,内部会重新执行加锁操作(即调用p.lock.Lock()),所以p.cond.Wait()之后的逻辑还是在有锁的状态下执行的。

最后,放上整体流程图:

清理过期goWorker

NewPool()函数中会启动一个 goroutine 定期清理过期的goWorker

func (p *Pool) purgePeriodically() {
  heartbeat := time.NewTicker(p.options.ExpiryDuration)
  defer heartbeat.Stop()

  for range heartbeat.C {
    if p.IsClosed() {
      break
    }

    p.lock.Lock()
    expiredWorkers := p.workers.retrieveExpiry(p.options.ExpiryDuration)
    p.lock.Unlock()

    for i := range expiredWorkers {
      expiredWorkers[i].task <- nil
      expiredWorkers[i] = nil
    }

    if p.Running() == 0 {
      p.cond.Broadcast()
    }
  }
}

如果池子已关闭,直接退出 goroutine。由选项ExpiryDuration来设置清理的间隔,如果没有设置该选项,采用默认值 1s:

// src/github.com/panjf2000/ants/pool.go
func NewPool(size int, options ...Option) (*Pool, error) {
  if expiry := opts.ExpiryDuration; expiry < 0 {
    return nil, ErrInvalidPoolExpiry
  } else if expiry == 0 {
    opts.ExpiryDuration = DefaultCleanIntervalTime
  }
}

// src/github.com/panjf2000/ants/pool.go
const (
  DefaultCleanIntervalTime = time.Second
)

然后就是每个清理周期,调用p.workers.retrieveExpiry()方法,取出过期的goWorker因为由这些goWorker启动的 goroutine 还阻塞在通道task上,所以要向该通道发送一个nil值,而goWorker.run()方法中接收到一个值为nil的任务会return,结束 goroutine,避免了 goroutine 泄漏

如果所有goWorker都被清理掉了,可能这时还有 goroutine 阻塞在retrieveWorker()方法中的p.cond.Wait()上,所以这里需要调用p.cond.Broadcast()唤醒这些 goroutine。

容量动态修改

在运行过程中,可以动态修改池的容量。调用p.Tune(size int)方法:

func (p *Pool) Tune(size int) {
  if capacity := p.Cap(); capacity == -1 || size <= 0 || size == capacity || p.options.PreAlloc {
    return
  }
  atomic.StoreInt32(&p.capacity, int32(size))
}

这里只是简单设置了一下新的容量,不影响当前正在执行的goWorker,而且如果设置了预分配选项,容量不能再次设置。

下次执行revertWorker()的时候就会以新的容量判断是否能放回,下次执行retrieveWorker()的时候也会以新容量判断是否能创建新goWorker

关闭和重新启动Pool

使用完成之后,需要关闭Pool,避免 goroutine 泄漏。调用池对象的Release()方法关闭:

func (p *Pool) Release() {
  atomic.StoreInt32(&p.state, CLOSED)
  p.lock.Lock()
  p.workers.reset()
  p.lock.Unlock()
  p.cond.Broadcast()
}

调用p.workers.reset()结束loopQueuewokerStack中的 goroutine,做一些清理工作,同时为了防止有 goroutine 阻塞在p.cond.Wait()上,执行一次p.cond.Broadcast()

workerStackloopQueuereset()基本相同,即发送niltask通道从而结束 goroutine,然后重置各个字段:

// loopQueue 版本
func (wq *loopQueue) reset() {
  if wq.isEmpty() {
    return
  }

Releasing:
  if w := wq.detach(); w != nil {
    w.task <- nil
    goto Releasing
  }
  wq.items = wq.items[:0]
  wq.size = 0
  wq.head = 0
  wq.tail = 0
}

// stack 版本
func (wq *workerStack) reset() {
  for i := 0; i < wq.len(); i++ {
    wq.items[i].task <- nil
    wq.items[i] = nil
  }
  wq.items = wq.items[:0]
}

池关闭后还可以调用Reboot()重启:

func (p *Pool) Reboot() {
  if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
    go p.purgePeriodically()
  }
}

由于p.purgePeriodically()p.Release()之后检测到池关闭就直接退出了,这里需要重新开启一个 goroutine 定期清理。

PoolWithFuncWorkWithFunc

上一篇文章中我们还介绍了另一种方式创建Pool,即NewPoolWithFunc(),指定一个函数。后面提交任务时调用p.Invoke()提供参数就可以执行该函数了。这种方式创建的 Pool 和 Woker 结构如下:

type PoolWithFunc struct {
  workers []*goWorkerWithFunc
  poolFunc func(interface{})
}

type goWorkerWithFunc struct {
  pool *PoolWithFunc
  args chan interface{}
  recycleTime time.Time
}

与前面介绍的PoolgoWorker大体相似,只是PoolWithFunc保存了传入的函数对象,使用数组保存 worker。goWorkerWithFuncinterface{}args通道的数据类型,其实也好理解,因为已经有函数了,只需要传入数据作为参数就可以运行了:

func (w *goWorkerWithFunc) run() {
  go func() {
    for args := range w.args {
      if args == nil {
        return
      }
      w.pool.poolFunc(args)
      if ok := w.pool.revertWorker(w); !ok {
        return
      }
    }
  }()
}

从通道接收函数参数,执行池中保存的函数对象。

其他细节

task缓冲通道

还记得创建p.workerCache这个sync.Pool对象的代码么:

p.workerCache.New = func() interface{} {
  return &goWorker{
    pool: p,
    task: make(chan func(), workerChanCap),
  }
}

sync.Pool中没有goWorker对象时,调用New()方法创建一个,注意到这里创建的task通道使用workerChanCap作为容量。这个变量定义在ants.go文件中:

var (
  // workerChanCap determines whether the channel of a worker should be a buffered channel
  // to get the best performance. Inspired by fasthttp at
  // https://github.com/valyala/fasthttp/blob/master/workerpool.go#L139
  workerChanCap = func() int {
    // Use blocking channel if GOMAXPROCS=1.
    // This switches context from sender to receiver immediately,
    // which results in higher performance (under go1.5 at least).
    if runtime.GOMAXPROCS(0) == 1 {
      return 0
    }

    // Use non-blocking workerChan if GOMAXPROCS>1,
    // since otherwise the sender might be dragged down if the receiver is CPU-bound.
    return 1
  }()
)

为了方便对照,我把注释也放上来了。ants参考了著名的 Web 框架fasthttp的实现。当GOMAXPROCS为 1 时(即操作系统线程数为 1),向通道task发送会挂起发送 goroutine,将执行流程转向接收 goroutine,这能提升接收处理性能。如果GOMAXPROCS大于 1,ants使用带缓冲的通道,为了防止接收 goroutine 是 CPU 密集的,导致发送 goroutine 被阻塞。下面是fasthttp中的相关代码:

// src/github.com/valyala/fasthttp/workerpool.go
var workerChanCap = func() int {
  // Use blocking workerChan if GOMAXPROCS=1.
  // This immediately switches Serve to WorkerFunc, which results
  // in higher performance (under go1.5 at least).
  if runtime.GOMAXPROCS(0) == 1 {
    return 0
  }

  // Use non-blocking workerChan if GOMAXPROCS>1,
  // since otherwise the Serve caller (Acceptor) may lag accepting
  // new connections if WorkerFunc is CPU-bound.
  return 1
}()

自旋锁

ants利用atomic.CompareAndSwapUint32()这个原子操作实现了一个自旋锁。与其他类型的锁不同,自旋锁在加锁失败之后不会立刻进入等待,而是会继续尝试。这对于很快就能获得锁的应用来说能极大提升性能,因为能避免加锁和解锁导致的线程切换:

type spinLock uint32

func (sl *spinLock) Lock() {
  backoff := 1
  for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {
    for i := 0; i < backoff; i++ {
      runtime.Gosched()
    }
    backoff <<= 1
  }
}

func (sl *spinLock) Unlock() {
  atomic.StoreUint32((*uint32)(sl), 0)
}

// NewSpinLock instantiates a spin-lock.
func NewSpinLock() sync.Locker {
  return new(spinLock)
}

另外这里使用了指数退避,先等 1 个循环周期,通过runtime.Gosched()告诉运行时切换其他 goroutine 运行。如果还是获取不到锁,就再等 2 个周期。如果还是不行,再等 4,8,16...以此类推。这可以防止短时间内获取不到锁,导致 CPU 时间的浪费。

总结

ants源码短小精悍,没有引用其他任何第三方库。各种细节处理,各种性能优化的点都是值得我们细细品味的。强烈建议大家读一读源码。阅读优秀的源码,能极大地提高自身的编码素养。

大家如果发现好玩、好用的 Go 语言库,欢迎到 Go 每日一库 GitHub 上提交 issue

参考

  1. ants GitHub:github.com/panjf2000/ants
  2. Go 每日一库 GitHub:https://github.com/go-quiz/go-daily-lib

标签:return,goroutine,worker,ants,源码,wq,goWorker,Pool,赏析
From: https://www.cnblogs.com/arena/p/17735725.html

相关文章

  • Go每日一库之63:ants
    简介处理大量并发是Go语言的一大优势。语言内置了方便的并发语法,可以非常方便的创建很多个轻量级的goroutine并发处理任务。相比于创建多个线程,goroutine更轻量、资源占用更少、切换速度更快、无线程上下文切换开销更少。但是受限于资源总量,系统中能够创建的goroutine数量......
  • 源码编译nginx安装脚本
    #!/bin/bashprofile(){echo"installingrelyon...."yum-yinstallgccgcc-c++pcrewgetopensslopenssl-devellibtoolgdgd-develecho"dowloadnginx_1.18......"cd/usr/local/src&&wgethttps://nginx.org/download/nginx-1.......
  • 易游平台的设计与实现-计算机毕业设计源码+LW文档
    1选题的意义和研究现状1.1选题的意义目前各行各业都在运用网络信息管理程序,不同的人群也都接触到信息管理,特别是在各大企业行业广泛的应运起来。通过对当前网络环境发展的分析与总结,开发易游平台的设计可以改变以往的易游平台的设计方式,改变传统线下易游平台的设计发展到无纸化的......
  • 少数民族民歌网络图书馆-计算机毕业设计源码+LW文档
    一、选题依据随着计算机架构、网络技术和数据库技术的飞速发展,现代计算机系统已经从以计算为中心向以信息化处理为中心的方向发展。图书馆作为一个为社会存储各种知识的载体,不仅承担着为现代社会的公民提供进行日常学习休闲的一个重要场地,还同时承担着对文化的保存和传播。而图书......
  • Dubbo源码浅析(一)—RPC框架与Dubbo
    一、什么是RPC1.1RPC概念RPC,RemoteProcedureCall即远程过程调用,与之相对的是本地服务调用,即LPC(LocalProcedureCall)。本地服务调用比较常用,像我们应用内部程序(注意此处是程序而不是方法,程序包含方法)互相调用即为本地过程调用,而远程过程调用是指在本地调取远程过程进行......
  • Qemu源码分析(8)—Apple的学习笔记
    一,前言本节主要看stm32f4_discovery_board_init_callback函数,里面大概看明白了,主要是2个部分,一个是SDL的初始化,另外一个是mcu中各个模块中寄存器对象的属性添加及设置属性值。二,分析Object*object_new(constchar*typename)才会调用class_init和ti->instance_init看到了set"hse......
  • KBEngine服务端源码-分析
    目录后续会做更详细的更新classTask子类图示classTimerHandler子类图示Entity实体类--继承自(publicscript::ScriptObject)声明部分宏BASE_SCRIPT_HREADER宏SCRIPT_HREADER_BASE宏 ENTITY_HEADER实现部分宏ENTITY_METHOD_DECLARE_BEGIN宏ENTITY_CPP_IMPL宏SCRIPT_METHOD_DE......
  • Python桌面可视化+自动登录学校教务系统(含源码!!!)
    前言:通过Python爬虫与tkinter模块实现桌面快捷自动化登录教务系统效果展示:整体思路:创建主界面,在界面中手动输入用户名和密码,点击登录后自动打开浏览器,截取整个页面,裁剪出登录页面中的图片验证码并保存到本地,对验证码图片进行处理,识别出验证码。将用户名,密码,验证码一同自动输入到对......
  • YOLOV5源码解读-general.py、detect.py
    YOLOV5.4,可能与之前版本不一样,但大同小异general.py1#YOLOv5generalutils23importglob4importlogging5importmath6importos7importplatform8importrandom9importre10importsubprocess11importtime12frompathlibimpo......
  • WEBRTC回声消除-AECM算法源码解析之参数解析
    一概述 webrtc针对回声问题一共开源了3种回声消除算法,分别为aec,aecm,以及aec3,其中aec是最早期的版本,在后续的更新中aec3的出现代替了aec在webrtc中的地位,而aecm主要是针对计算能力较弱的移动端或是嵌入式设备而开发的,但同时也带来了它自己的劣势;本文主要介绍AECM算法的计......