本文分享自华为云社区《Go并发范式 流水线和优雅退出 Pipeline 与 Cancellation》,作者:张俭。


Go 的并发原语可以轻松构建流数据管道,从而高效利用 I/O 和多个 CPU。 本文展示了此类pipelines的示例,强调了操作失败时出现的细微之处,并介绍了干净地处理失败的技术。



  • 通过inbound channel从上游获取数据
  • 在data上进行运算,通常会产生新的值
  • 通过outbound channel向下游发送数据

每个Stage都有数个inbound channel和outbound channel,除了第一个和最后一个Stage,分别只有outbound和inbound channel。第一个Stage通常叫做SourceProducer。最后一个Stage通常叫做SinkConsumer

我们将从一个简单的示例pipeline开始来解释这些想法和技术。 稍后,我们将提供一个更实际的例子。

Squaring numbers 平方数


第一阶段,gen,是个将整数列表转换为一个发射列表中整数的channel的函数。gen函数启动一个go routine,用来发送channel中的整数,然后当所有的整数都被发出后,将channel关闭:

func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
    return out

第二阶段,sq从上面的channel中接收数据,返回一个发射对应整数平方数的channel。当inbound channel关闭后,并且这一阶段将所有的value发送到下游后,再将这个outbound channel关闭

func sq(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
    return out


func main() {
    // Set up the pipeline
    c := gen(2, 3)
    out := sq(c)

    // Consume the output
    // 4
    // 9

既然sq的inbound channel和outbound channel类型相同,我们可以将其进行任意数量的组合。我们还可以将main函数重写为循环,就像在其他Stage中做的那样一样。

func main() {
    // Set up the pipeline and consume the output.
    for n := range sq(sq(gen(2, 3))) {
        fmt.Println(n) // 16 then 81


许多函数可以从一个channel中获取数据直到channel被关闭,这被叫做扇出。这提供了一种在worker之间分配工作以并行化 CPU 使用和 I/O 的方法。

一个函数可以通过将多个input channel多路复用到同一个channel,当所有的channel关闭时,该多路复用channel才关闭。从而达到从多个input获取数据并处理,直到所有input channel都关闭才停止的效果。这叫做扇入。


func main() {
    in := gen(2, 3)

    // Distribute the sq work across two goroutines that both read from in.
    c1 := sq(in)
    c2 := sq(in)

    // Consume the merged output from c1 and c2.
    for n := range merge(c1, c2) {
        fmt.Println(n) // 4 then 9, or 9 then 4

merge函数通过对每个channel开启一个协程,把数据拷贝到另一个out channel中,实现将channel列表转换为一个channel的效果。当所有send操作完成后,再将out channel关闭。


func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Start an output goroutine for each input channel in cs. output
    // copies values from c to out until c is closed, then calls wg.Done
    output := func(c <-chan int) {
        for n := range c {
            out <- n
    for _, c := range cs {
        go output(c)
    // Start a goroutine to close out once all the output goroutines are
    // done.  This must start after the wg.Add call.
    go func() {
    return out



  • 当发送任务结束后,关闭发送output channel
  • 直到input channel关闭前,一直从input channel中接收消息


但是在实际的pipeline中,阶段并不总是接收所有来自inbound channel的数据。通常,如果inbound的值出现了错误,pipeline会提前退出。 在任何一种情况下,接收者都不必等待剩余值到达,并且我们希望fast fail(较早阶段的Stage尽早停止后期Stage不需要的值)。

在我们的示例pipeline中,如果一个Stage未能消费所有inbound值,则尝试计算后并发送这些值的 goroutine 将无限期阻塞:

    // Consume the first value from the output.
    out := merge(c1, c2)
    fmt.Println(<-out) // 4 or 9
    // Since we didn't receive the second value from out,
    // one of the output goroutines is hung attempting to send it.


即使下游的Stage无法接收所有inbound value,我们也需要把上游的协程退出。如果把上游的协程改为有buffer的,可以解决上面的问题。如果Buffer中还有空间,则发送操作可以立刻完成

c := make(chan int, 2) // buffer size 2
c <- 1  // succeeds immediately
c <- 2  // succeeds immediately
c <- 3  // blocks until another goroutine does <-c and receives 1

当要发送的数目可以在channel创建时知道时,buffer可以简化代码。举个例子,让我们来使用buffer channel,不开辟新的协程来重写gen方法:

func gen(nums ...int) <-chan int {
    out := make(chan int, len(nums))
    for _, n := range nums {
        out <- n
    return out


func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int, 1) // enough space for the unread inputs
    // ... 其余的没有变更 ...



Explicity cancellation 显示取消

main函数决定不从out处接收所有数据,而是退出时,它必须知会上游阶段的协程放弃接下来的发送。它通过向一个名叫done的channel发送数据来完成这个动作。因为发送方有两个,所以 向done发送两次数据。

func main() {
    in := gen(2, 3)

    // Distribute the sq work across two goroutines that both read from in.
    c1 := sq(in)
    c2 := sq(in)

    // Consume the first value from output.
    done := make(chan struct{}, 2)
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 or 9

    // Tell the remaining senders we're leaving.
    done <- struct{}{}
    done <- struct{}{}

发送到out channel的发送者把原来的逻辑替换成一个select操作,select或者发送一个数据,抑或从done处接收到数据。因为done中数据值的类型根本不重要,主要是接收到值这个事件本身很重要,所以done channel的类型时struct {}output循环继续在inbound channel上执行,所以上游的阶段并没有被阻塞。(我们稍后会讨论如何让循环迅速返回。)

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Start an output goroutine for each input channel in cs.  output
    // copies values from c to out until c is closed or it receives a value
    // from done, then output calls wg.Done.
    output := func(c <-chan int) {
        for n := range c {
            select {
            case out <- n:
            case <-done:
    // ... the rest is unchanged ...



这说明main函数可以简单地通过关闭done channel来让所有的发送者不阻塞。关闭操作是一个高效的广播。我们把pipeline中的每个函数都接受done作为参数,并把done在defer语句中关闭, 这样,如果在main函数中返回,都会通知pipeline中的阶段退出。

func main() {
    // Set up a done channel that's shared by the whole pipeline,
    // and close that channel when this pipeline exits, as a signal
    // for all the goroutines we started to exit.
    done := make(chan struct{})
    defer close(done)

    in := gen(done, 2, 3)

    // Distribute the sq work across two goroutines that both read from in.
    c1 := sq(done, in)
    c2 := sq(done, in)

    // Consume the first value from output.
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 or 9

    // done will be closed by the deferred call.

现在当donechannel关闭后,接收到close信息的阶段,都可以直接退出了。merge函数中的outout协程可以不从inbound channel中取数据直接退出,因为它知道,上游的发送sq,接收到close信息,也会直接退出。output通过defer语句来保证wg.Done()一定被调用。(译者注:来关闭out channel)

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Start an output goroutine for each input channel in cs.  output
    // copies values from c to out until c or done is closed, then calls
    // wg.Done.
    output := func(c <-chan int) {
        defer wg.Done()
        for n := range c {
            select {
            case out <- n:
            case <-done:
    // ... the rest is unchanged ...



  • 当所有的发送操作完成后,关闭outbound channel
  • 如果发送发不阻塞,或是channel没有关闭,接收者会一直从channel中接收数据


  • 确保channel的buffer足够大
  • 显示知会发送者,接收者已经放弃接收

Digesting a tree 对树进行摘要


MD5 是一种消息摘要算法,可用作文件校验和。 命令行实用程序 md5sum 打印文件列表的摘要值。

% md5sum *.go
d47c2bbc28298ca9befdfbc5d3aa4e65  bounded.go
ee869afd31f83cbb2d10ee81b2b831dc  parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96  serial.go

我们的示例程序类似于 md5sum,但将单个目录作为参数并打印该目录下每个常规文件的摘要值,按路径名排序。

% go run serial.go .
d47c2bbc28298ca9befdfbc5d3aa4e65  bounded.go
ee869afd31f83cbb2d10ee81b2b831dc  parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96  serial.go


func main() {
    // Calculate the MD5 sum of all files under the specified directory,
    // then print the results sorted by path name.
    m, err := MD5All(os.Args[1])
    if err != nil {
    var paths []string
    for path := range m {
        paths = append(paths, path)
    for _, path := range paths {
        fmt.Printf("%x  %s\n", m[path], path)


// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents.  If the directory walk
// fails or any read operation fails, MD5All returns an error.
func MD5All(root string) (map[string][md5.Size]byte, error) {
    m := make(map[string][md5.Size]byte)
    err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
        if err != nil {
            return err
        if !info.Mode().IsRegular() {
            return nil
        data, err := ioutil.ReadFile(path)
        if err != nil {
            return err
        m[path] = md5.Sum(data)
        return nil
    if err != nil {
        return nil, err
    return m, nil



type result struct {
    path string
    sum  [md5.Size]byte
    err  error

sumFiles返回两个channel:一个是result channel,另一个是filepath.Walk中产生的错误。walk函数针对每个文件启动一个新的协程来处理,然后检查donechannel。如果done已经被关闭,walk函数会立刻停止:

func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
    // For each regular file, start a goroutine that sums the file and
    // sends the result on c.
    // Send the result of the walk on errc.
    c := make(chan result)
    errc := make(chan error, 1)
    go func() {
        var wg sync.WaitGroup
        // If any error occurred, walk method will return
        err := filepath.Walk(root, func(path string, info fs.FileInfo, err error) error {
            if err != nil {
                return err
            if !info.Mode().IsRegular() {
                return nil
            go func() {
                data, err := ioutil.ReadFile(path)
                select {
                case c <- result{
                    path: path,
                    sum:  md5.Sum(data),
                    err:  err,
                case <-done:
            // Abort the walk if done is closed.
            select {
            case <-done:
                return errors.New("walk canceled")
                return nil
        // Walk has returned, so all calls to wg.Add are done.
        // Start a goroutine to close c once all the sends are done.
        // No select needed here, since errc is buffered.
        errc <- err
    return c, errc

MD5Allc中接收到摘要数据。当发生错误时,MD5All会迅速返回,通过defer语句来关闭done channel

func MD5All(root string) (map[string][md5.Size]byte, error) {
    // MD5All closes the done channel when it returns; it may do so before
    // receiving all the values from c and errc.
    done := make(chan struct{})
    defer close(done)

    c, errc := sumFiles(done, root)

    m := make(map[string][md5.Size]byte)
    for r := range c {
        if r.err != nil {
            return nil, r.err
        m[r.path] = r.sum
    if err := <-errc; err != nil {
        return nil, err
    return m, nil


parallel.go 中的 MD5All 实现为每个文件启动一个新的 goroutine。 在包含许多大文件的目录中,这可能会分配比机器上可用的内存更多的内存。

我们可以通过限制并行读取的文件数量来限制这些分配。 在新的解决方式中,我们通过创建固定数量的 goroutine 来读取文件来做到这一点。 我们的pipeline现在分为三个阶段:遍历树、读取并计算文件摘要以及收集摘要。

第一阶段 walkFiles 发射出文件树中常规文件的路径:

func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
    paths := make(chan string)
    errc := make(chan error, 1)
    go func() {
        // Close the paths channel after Walk returns.
        defer close(paths)
        // No select needed for this send, since errc is buffered.
        errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            if !info.Mode().IsRegular() {
                return nil
            select {
            case paths <- path:
            case <-done:
                return errors.New("walk canceled")
            return nil
    return paths, errc

第二阶段启动固定数量的协程来计算文件摘要,然后发送到c channel中

func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
    for path := range paths {
        data, err := ioutil.ReadFile(path)
        select {
        case c <- result{path, md5.Sum(data), err}:
        case <-done:

和之前的示例不同,因为多个协程都在共享channel上发送数据,digester函数并没有关闭output channel。作为替代,当所有的digesters跑完之后,MD5All会关闭channel

    // Start a fixed number of goroutines to read and digest files.
    c := make(chan result)
    var wg sync.WaitGroup
    const numDigesters = 20
    for i := 0; i < numDigesters; i++ {
        go func() {
            digester(done, paths, c)
    go func() {


最终阶段从c中取得所有结果,并且检查errc中的错误。此检查不能更早发生,因为在此之前,walkFiles 可能会阻塞:


   m := make(map[string][md5.Size]byte)
    for r := range c {
        if r.err != nil {
            return nil, r.err
        m[r.path] = r.sum
    // Check whether the Walk failed.
    if err := <-errc; err != nil {
        return nil, err
    return m, nil


本文介绍了在 Go 中构建流数据pipeline的技术。 处理此类pipeline中的故障很棘手,因为pipeline中的每个阶段可能会阻止尝试向下游发送值,并且下游阶段可能不再关心传入的数据。 我们展示了关闭通道如何向管道启动的所有 goroutine 广播“done”信号,并定义了正确构建管道的指南。




