项目中使用到了 RxGo ,感觉现有的处理方式有一定的优势,当然也有一定的有劣势,遂记录下来,免得自己忘记。
本文介绍的只是 rxgo 的一种方式而已,如果你有不错的使用方式,请不吝赐教,谢谢。
对 rxgo 不清楚的同学可以看看我的另一篇文章,主要是介绍 rxgo 的基础使用。
1、需求简介
现在项目的需求是,项目中有多个生成数据的地方,数据的消费是不确定何时何处消费的,数据消费的地方,随着版本的迭代可能增加,也可能减少。
我们将上述需求简化抽象后知道,即要有一个地方,接受所有的数据,消费者想什么时候去消费数据,自己去取就好了。使用过消息队列的同学都知道,这个场景太适合使用消息队列组件了,像Kafka、RabbitMQ、nats 等等。
是的,通常来说,使用消息队列是常见的方式。但是我们现在的项目,生成的数据主要是各个抽象出来的服务使用,如果其他系统需要,也可以抽象出来一个服务,将生成的数据发送到 Kafka、RabbitMQ 中去。
因为我们是工业软件,软件是部署到单个机器上的,我们不希望使用这么重的组件,增加我们部署运维的复杂度,那应该如何处理呢?
想必了解消息队列实现方式的同学知道,消息队列的实现方式,和设计模式中的 观察者模式
类似,那我们可以自己实现一个 观察者模式
的库,不就可以了嘛。
自己去实现一个符合现在需求的库,的确是可行的方案,但是对一般的公司而言,还没有到这一步,恰好 Go 中有 RxGo 这个库,符合现在的业务场景,简单说下使用此库的好处:
- rxgo 作为一个响应式编程的库,提供了
Observable和Observer
两个核心概念,使得我们可以更容易地处理异步事件流和数据流。 - 发送数据和消费数据都很简单,比如我们只关心生成观察者之后的数据,使用此库轻松实现
.....
还有很多好处,这里就不一一列举了。
2、rxgo 在项目中的使用方式
2.1 单仓库情景下
package main
import (
"fmt"
"github.com/reactivex/rxgo/v2"
"time"
)
/*---------------------------------------
观察对象
---------------------------------------*/
var (
// 事件bus
eventCh chan rxgo.Item
eventObservable rxgo.Observable
)
func init() {
//指令消息
eventCh = make(chan rxgo.Item)
// rxgo.FromEventSource 创建的是 热观察者, 即对从流的一开始产生的所有数据不感兴趣,只对我们开始观察之后的数据感兴趣。
// 没有观察者的Observable发出的项目将会丢失。对上面表示不理解的同学可以看看 rxgo github 上面的介绍
// 因为 rxgo.FromEventSource 默认在开始观察数据后, 是阻塞等待的
// rxgo.WithBackPressureStrategy(rxgo.Drop)
// 使用Drop策略意味着如果从FromEventSource后面的流程没有准备好消耗一个项目,这个项目会被丢弃。
eventObservable = rxgo.FromEventSource(eventCh, rxgo.WithBackPressureStrategy(rxgo.Drop)) //observe()时指定buffer
}
func PublishEvent(e interface{}) {
//发布事件
eventCh <- rxgo.Of(e)
}
func GetObservable() rxgo.Observable {
return eventObservable
}
func main() {
for i := 0; i < 3; i++ {
go consumer(i)
}
producer()
time.Sleep(time.Second * 60)
}
func consumer(number int) {
o := GetObservable().Filter(func(i interface{}) bool {
fmt.Printf("coming。。。。。。。。。。。。。, number: %v, data: %+v\n", number, i)
return true
})
o.DoOnNext(func(i interface{}) {
})
}
func producer() {
for i := 0; i < 3; i++ {
fmt.Println(i)
PublishEvent(i)
}
}
特别注意,上述代码执行的结果是不确定的,因为 rxgo 采用的是异步事件流。
执行结果:
0
1
coming。。。。。。。。。。。。。, number: 1, data: 1
coming。。。。。。。。。。。。。, number: 2, data: 0
coming。。。。。。。。。。。。。, number: 0, data: 0
2
coming。。。。。。。。。。。。。, number: 1, data: 2
coming。。。。。。。。。。。。。, number: 0, data: 2
coming。。。。。。。。。。。。。, number: 2, data: 2
2.2 多仓库场景下
上面我们已经实现了一个生产者消费者模式的demo,在实际开发中,我们可能会将某些功能单独抽离出来作为一个仓库,那么这个时候应该怎么将单独仓库中产生的数据发送到全局的事件 Bus 中呢,接下来,一起看下下面的代码就知道如何处理了。
这里为了方便演示、测试,直接将代码写在了同一份文件中,理解到其中的含义后,我们可以自行拆解。
package main
import (
"fmt"
"github.com/reactivex/rxgo/v2"
"time"
)
/*---------------------------------------
观察对象
---------------------------------------*/
var (
// 事件bus
eventCh chan rxgo.Item
eventObservable rxgo.Observable
forwardEventObservable rxgo.Observable
)
func init() {
//指令消息
eventCh = make(chan rxgo.Item)
// rxgo.FromEventSource 创建的是 热观察者, 即对从流的一开始产生的所有数据不感兴趣,只对我们开始观察之后的数据感兴趣。
// 没有观察者的Observable发出的项目将会丢失。对上面表示不理解的同学可以看看 rxgo github 上面的介绍
// 因为 rxgo.FromEventSource 默认在开始观察数据后, 是阻塞等待的
// rxgo.WithBackPressureStrategy(rxgo.Drop)
// 使用Drop策略意味着如果从FromEventSource后面的流程没有准备好消耗一个项目,这个项目会被丢弃。
eventObservable = rxgo.FromEventSource(eventCh, rxgo.WithBackPressureStrategy(rxgo.Drop)) //observe()时指定buffer
initOther() // 因为同一份文件的 init 函数是从上到下执行的, 为了 forwardEvent 不报错, 这里手动调用
forwardEvent()
}
func PublishEvent(e interface{}) {
//发布事件
eventCh <- rxgo.Of(e)
}
func GetObservable() rxgo.Observable {
return eventObservable
}
func forwardEvent() {
forwardEventObservable = GetOtherObservable()
forwardEventObservable.DoOnNext(func(i interface{}) {
PublishOtherEvent(i)
}, rxgo.WithBufferedChannel(2))
}
func main() {
for i := 0; i < 1; i++ {
go consumer(i)
}
go producer()
// 这里类似于其他仓库中生成数据
go otherProducer()
time.Sleep(time.Second * 60)
}
func consumer(number int) {
o := GetObservable().Filter(func(i interface{}) bool {
fmt.Printf("coming。。。。。。。。。。。。。, number: %v, data: %+v\n", number, i)
return true
})
o.DoOnNext(func(i interface{}) {
})
}
func producer() {
for i := 0; i < 3; i++ {
fmt.Println(i)
PublishEvent(i)
}
}
func otherProducer() {
for i := 0; i < 3; i++ {
fmt.Println("other is sending")
PublishEvent(fmt.Sprintf("other: %d", i))
}
}
/*
以下代码是其他仓库中的 rxgo 配置
*/
var (
// 事件bus
otherEvent chan rxgo.Item
otherObservable rxgo.Observable
)
func initOther() {
//指令消息
otherEvent = make(chan rxgo.Item)
// rxgo.FromEventSource 创建的是 热观察者, 即对从流的一开始产生的所有数据不感兴趣,只对我们开始观察之后的数据感兴趣。
// 没有观察者的Observable发出的项目将会丢失。对上面表示不理解的同学可以看看 rxgo github 上面的介绍
// 因为 rxgo.FromEventSource 默认在开始观察数据后, 是阻塞等待的
// rxgo.WithBackPressureStrategy(rxgo.Drop)
// 使用Drop策略意味着如果从FromEventSource后面的流程没有准备好消耗一个项目,这个项目会被丢弃。
otherObservable = rxgo.FromEventSource(otherEvent, rxgo.WithBackPressureStrategy(rxgo.Drop)) //observe()时指定buffer
}
func PublishOtherEvent(e interface{}) {
//发布事件
otherEvent <- rxgo.Of(e)
}
func GetOtherObservable() rxgo.Observable {
return otherObservable
}
特别注意,上述代码执行的结果是不确定的。
执行结果:
0
1
other is sending
other is sending
coming。。。。。。。。。。。。。, number: 0, data: 0
2
coming。。。。。。。。。。。。。, number: 0, data: 2
other is sending
coming。。。。。。。。。。。。。, number: 0, data: other: 2
3、rxgo 配置简要说明
这里主要讲解下我们在实际项目中用到的几个参数配置,其他的等待你的自行发掘。
-
rxgo.WithBufferedChannel(3)
: 设置 channel 的缓存大小; -
rxgo.WithBackPressureStrategy(rxgo.*Drop*)
: 使用Drop策略意味着如果后面的流程没有准备好消耗一个数据,这个数据会被丢弃。
演示案例:
package main
import (
"fmt"
"github.com/reactivex/rxgo/v2"
"time"
)
/*---------------------------------------
观察对象
---------------------------------------*/
var (
// 事件bus
eventCh chan rxgo.Item
eventObservable rxgo.Observable
)
func init() {
//指令消息
eventCh = make(chan rxgo.Item)
// rxgo.FromEventSource 创建的是 热观察者, 即对从流的一开始产生的所有数据不感兴趣,只对我们开始观察之后的数据感兴趣。
// 没有观察者的Observable发出的项目将会丢失。对上面表示不理解的同学可以看看 rxgo github 上面的介绍
// 因为 rxgo.FromEventSource 默认在开始观察数据后, 是阻塞等待的
// rxgo.WithBackPressureStrategy(rxgo.Drop)
// 使用Drop策略意味着如果从FromEventSource后面的流程没有准备好消耗一个项目,这个项目会被丢弃。
eventObservable = rxgo.FromEventSource(eventCh, rxgo.WithBackPressureStrategy(rxgo.Drop)) //observe()时指定buffer
}
func PublishEvent(e interface{}) {
//发布事件
eventCh <- rxgo.Of(e)
}
func GetObservable() rxgo.Observable {
return eventObservable
}
func main() {
for i := 0; i < 1; i++ {
go consumer(i)
}
producer()
time.Sleep(time.Second * 60)
}
func consumer(number int) {
// 为什么是在 消费数据的地方设置 channel 的容量?
// 这样可以根据消费者自身情况灵活调整
o := GetObservable().Filter(func(i interface{}) bool {
fmt.Printf("coming。。。。。。。。。。。。。, number: %v, data: %+v\n", number, i)
return true
}, rxgo.WithBufferedChannel(3))
o.DoOnNext(func(i interface{}) {
})
}
func producer() {
for i := 0; i < 3; i++ {
fmt.Println(i)
PublishEvent(i)
}
}
这次测试结果就是固定的了,大家可以想想为什么。
执行结果:
0
1
2
coming。。。。。。。。。。。。。, number: 0, data: 0
coming。。。。。。。。。。。。。, number: 0, data: 1
coming。。。。。。。。。。。。。, number: 0, data: 2
特别注意:
标签:浅谈,项目,rxgo,number,eventCh,FromEventSource,coming,data From: https://www.cnblogs.com/huageyiyangdewo/p/17604153.html为什么是在 消费数据的地方设置 channel 的容量?
答:这样可以根据消费者自身情况灵活调整