当谈到并发时,许多编程语言都采用共享内存/状态模型。然而,Go 通过实现 Communicating Sequential Processes(CSP)而与众不同。在 CSP 中,程序由不共享状态的并行处理器组成;相反,他们使用 Channel 来沟通和同步他们的行动。因此,对于有兴趣采用 Go 的开发人员来说,理解 Channel 的工作原理变得至关重要。在本文中,我将使用地鼠经营他们想象中的咖啡馆的令人愉快的类比来说明 Channel ,因为我坚信人类是更好的视觉学习者。
场景
Partier, Candier, Stringer 三人正在经营一家咖啡馆。鉴于制作咖啡比接受订单需要更多时间,Partier 将协助接受顾客的订单,然后将这些订单传递到厨房,由 Candier 和 Stringer 准备咖啡。
无缓冲 Channels
最初,咖啡馆以最简单的方式运营:每当收到新订单时,Partier 都会将订单放入 Channel 中,并等到 Candier 或 Stringer 接受后才接受任何新订单。 Partier 和厨房之间的通信是通过使用 ch := make(chan Order)
创建的无缓冲 Channel 来实现的。当 Channel 中没有挂单时,即使 Stringer 和 Candier 都准备好接受新订单,它们也会保持空闲状态并等待新订单到达。
当收到新订单时,Partier 将其放入 Channel 中,使 Candier 或 Stringer 可以接受该订单。但是,在继续接受新订单之前,Partier 必须等待后厨的两位工作者(Candier, Stringer)的其中一个从 Channel 中检索并获取订单。
由于 Candier 和 Stringer 都可以接受新订单,因此他们中的任何一个都会立即接受新订单。但是,无法保证或预测收到订单的具体收件人。 Stringer 和 Candier 之间的选择是不确定的,它取决于调度和 Go 运行时的内部机制等因素。假设 Candier 收到了第一个订单。
Candier 处理完第一个订单后,又回到等待状态。如果没有新订单到达,Candier 和 Stringer 这两个工作人员将保持空闲状态,直到 Partier 将另一个订单放入通道中供他们处理。
当新订单到达时,Stringer 和 Candier 都可以处理它。即使 Candier 刚刚处理了上一个订单,接收新订单的具体工人仍然是不确定的。在这种情况下,假设 Candier 再次被分配了第二个订单。
新订单 order3 到达,Candier 当前正忙于处理 order2,她没有在队列中等待 order := <-ch
,Stringer 成为唯一可以接收 order3 的工作人员。因此,他会得到它。
在 order3 发送到 Stringer 后,order4 立即到达。此时,Stringer 和 Candier 都已忙于处理各自的订单,没有人可以接收 order4。由于 Channel 没有缓冲,因此将 order4 放入其中会阻塞 Partier,直到 Stringer 或 Candier 可以接受 order4。这种情况值得特别注意,因为我经常看到人们对无缓冲通道(使用 make(chan order)
或 make(chan order, 0)
创建)和缓冲区大小为 1 的通道(使用 make(chan order, 1)
创建)感到困惑)。因此,他们错误地期望 ch <- order4
立即完成并接收 order5。如果您也是这么想的,我在 Go Playground 上创建了一个片段来帮助您纠正您的误解 https://go.dev/play/p/shRNiDDJYB4。
带缓冲的 Channel
无缓冲通道可以工作,但是它限制了整体吞吐量。如果可以接受更多订单并在后端(厨房)按顺序处理它们,那就更好了。这可以通过缓冲通道来实现。现在,即使 Stringer 和 Candier 忙于处理订单,Partier 仍然可以在通道中留下新订单,并在通道未满的情况下继续接受其他订单,例如最多 3 个挂单。
通过引入缓冲 Channel,咖啡馆增强了处理更多订单的能力。然而,仔细选择适当的缓冲区大小以维持客户合理的等待时间至关重要。毕竟,没有顾客愿意忍受过长的等待。有时,拒绝新订单可能比接受新订单但无法及时履行订单更容易接受。此外,在临时容器化 (Docker) 应用程序中使用缓冲 Channel 时务必谨慎,因为容器会随机重启,在这种情况下从 Channel 恢复消息可能是一项具有挑战性的任务,甚至近乎不可能。
Channels vs Blocking Queues
尽管本质上不同,Java 中的 Blocking Queue 用于线程之间的通信,而 Go 中的 Channel 用于 Goroutine 的通信,BlockingQueue 和 Channel 的行为有些相似。如果你熟悉BlockingQueue,理解 Channel 肯定会很容易。
常见使用场景
Channel 是 Go 应用程序中一项基本且广泛使用的功能,可用于多种用途。Channel 的一些常见用例包括:
- Goroutine 通信:Channel 支持不同 Goroutine 之间的消息交换,允许它们进行协作,而无需直接共享状态。
- 工作池:如上例所示,Channel 通常用于管理工作池,其中多个相同的工作者处理来自共享Channel 的传入任务。
- 扇出、扇入:Channel 用于扇出、扇入模式,其中多个 Goroutine(扇出)执行工作并将结果发送到单个 Channel,而另一个 Goroutine(扇入)消耗这些结果。
- 超时和截止日期:Channel与 select 语句结合可用于处理超时和截止日期,确保程序可以优雅地处理延迟并避免无限期的等待。
我将在其他文章中更详细地探讨 Channel 的不同用法。然而,现在,让我们通过实现上述咖啡馆场景并见证渠道如何运作来结束这篇介绍性博客。我们将探讨 Partier、Candier 和 Stringer 之间的交互,观察 Channel 如何促进它们之间的顺畅沟通和协调,从而实现咖啡馆内高效的订单处理和同步。
Show me your code!
package main
import (
"fmt"
"log"
"math/rand"
"sync"
"time"
)
func main() {
ch := make(chan order, 3)
wg := &sync.WaitGroup{} // More on WaitGroup another day
wg.Add(2)
go func() {
defer wg.Done()
worker("Candier", ch)
}()
go func() {
defer wg.Done()
worker("Stringer", ch)
}()
for i := 0; i < 10; i++ {
waitForOrders()
o := order(i)
log.Printf("Partier: I %v, I will pass it to the channel\n", o)
ch <- o
}
log.Println("No more orders, closing the channel to signify workers to stop")
close(ch)
log.Println("Wait for workers to gracefully stop")
wg.Wait()
log.Println("All done")
}
func waitForOrders() {
processingTime := time.Duration(rand.Intn(2)) * time.Second
time.Sleep(processingTime)
}
func worker(name string, ch <-chan order) {
for o := range ch {
log.Printf("%s: I got %v, I will process it\n", name, o)
processOrder(o)
log.Printf("%s: I completed %v, I'm ready to take a new order\n", name, o)
}
log.Printf("%s: I'm done\n", name)
}
func processOrder(_ order) {
processingTime := time.Duration(2+rand.Intn(2)) * time.Second
time.Sleep(processingTime)
}
type order int
func (o order) String() string {
return fmt.Sprintf("order-%02d", o)
}
您可以复制此代码,对其进行调整并在 IDE 上运行它,以更好地了解通道的工作原理。
本文译自:https://medium.com/stackademic/go-concurrency-visually-explained-channel-c6f88070aafa
欢迎加我好友,交流可观测性相关话题或了解我们的商业产品,我的微信号:picobyte
,加好友请备注您的公司和姓名