ch := make(chan interface{}, 1024)
func produce(item interface{}) {
ch <- item
}
func consume() {
for item := range ch {
// 异步消费
go func() {
_ = item // processing item
}()
}
}
仓库地址:https://github.com/bytedance/gopkg
var aborted int32
var ch = channel.New(
channel.WithNonBlock(),
channel.WithTimeout(time.Second),
channel.WithTimeoutCallback(func(i interface{}) {
atomic.AddInt32(&aborted, 1) // monitor timeout task
}),
)
// 生产速度:每 10ms 生产一条消息
func produce() {
for i := 1; i <= 20; i++ {
ch.Input(i)
time.Sleep(time.Millisecond * 10)
}
}
// 消费速度:每 100ms 消费一条消息
var consumed int
func consume() {
for item := range ch.Output() {
consumed++
time.Sleep(time.Millisecond * 100) // 例如 RPC 调用
}
}
var cpuChecker = func() func(c channel.Channel) bool {
var overloaded int32
go func() {
for {
cpuPercent := cpu.Percent(time.Second) // cost 1 second
if cp >= 0.5 {
atomic.CompareAndSwapInt32(&overloaded, 0, 1)
} else {
atomic.CompareAndSwapInt32(&overloaded, 1, 0)
}
}
}()
return func(c channel.Channel) bool {
return atomic.LoadInt32(&overloaded) > 0
}
}
var channel = channel.New(
channel.WithThrottle(cpuChecker()),
)
五、FAQ
- 是不是用了这个库一定不会有故障?
这个库的目的是尽可能避免传统 channel 会遇到的坑,并帮助你实现更多生产级别的控制能力,至于需要什么样的控制完全取决于使用姿势。
此外,如果系统本身就处于处理能力跟不上的状态,该库更多提供的是快速恢复和丢弃任务的功能,而非也没有能力做到完全无损。
不过它的接口设计也一定程度避免了一些常见的 Go 原生 channel 的坑,比如:允许重复关闭 channel,允许对closed channel 写入。尽可能不 panic。
- 是不是要把所有 Go 原生 chan 都换成这个库?
没有必要,在性能上也会有损。不是每一个 channel 都在系统中是核心通信交互的作用。有些可能仅仅只是作为一个信号事件来用。
只有真正有大量生产消费关系的 channel ,需要用该库替换。
- 是否需要主动关闭 channel ?
与 Go 原生 chan 结构一样,关闭 channel 并不是必须的。如果你不主动调用 ch.Close() ,该库会在你不持有 ch 引用时,自动关闭 ch。
- 为什么 Input 的时候不能使用原生 <- 的语法?
Go chan 的「写入」语法并不能帮助我们实现很多功能性需求,且暴露可写 chan 类型给用户容易产生误用,尤其是容易出现往 closed chan 中写入数据的常见报错。所以目前使用 Input() 函数方式。可扩展性和安全性都要比原始 chan <- value
这种用法要好得多。
- 性能比原生 channel 对比如何?
由于引入了中间缓冲层,所以性能上必然是劣于原生 chan 的。但是该库的使用场景更多是功能导向的需求而非性能。
目前压测下,性能比原生要差 3-5 倍。单次操作在 500ns 级别。
- 使用上有哪些注意事项?
为了与原生 channel 在语义上尽可能接近,默认依然是有 size 以及阻塞的。如需实现非阻塞 channel,需要手动指定 WithNonBlock
option。