首页 > 其他分享 >浅谈 rxgo 在项目中的使用方式

浅谈 rxgo 在项目中的使用方式

时间:2023-08-03 18:33:59浏览次数:44  
标签:浅谈 项目 rxgo number eventCh FromEventSource coming data

项目中使用到了 RxGo ,感觉现有的处理方式有一定的优势,当然也有一定的有劣势,遂记录下来,免得自己忘记。

本文介绍的只是 rxgo 的一种方式而已,如果你有不错的使用方式,请不吝赐教,谢谢。

对 rxgo 不清楚的同学可以看看我的另一篇文章,主要是介绍 rxgo 的基础使用。

Go中响应式编程库RxGo详细介绍

1、需求简介

现在项目的需求是,项目中有多个生成数据的地方,数据的消费是不确定何时何处消费的,数据消费的地方,随着版本的迭代可能增加,也可能减少。

我们将上述需求简化抽象后知道,即要有一个地方,接受所有的数据,消费者想什么时候去消费数据,自己去取就好了。使用过消息队列的同学都知道,这个场景太适合使用消息队列组件了,像Kafka、RabbitMQ、nats 等等。

是的,通常来说,使用消息队列是常见的方式。但是我们现在的项目,生成的数据主要是各个抽象出来的服务使用,如果其他系统需要,也可以抽象出来一个服务,将生成的数据发送到 Kafka、RabbitMQ 中去。

因为我们是工业软件,软件是部署到单个机器上的,我们不希望使用这么重的组件,增加我们部署运维的复杂度,那应该如何处理呢?

想必了解消息队列实现方式的同学知道,消息队列的实现方式,和设计模式中的 观察者模式 类似,那我们可以自己实现一个 观察者模式 的库,不就可以了嘛。

自己去实现一个符合现在需求的库,的确是可行的方案,但是对一般的公司而言,还没有到这一步,恰好 Go 中有 RxGo 这个库,符合现在的业务场景,简单说下使用此库的好处:

  • rxgo 作为一个响应式编程的库,提供了Observable和Observer两个核心概念,使得我们可以更容易地处理异步事件流和数据流。
  • 发送数据和消费数据都很简单,比如我们只关心生成观察者之后的数据,使用此库轻松实现

.....

还有很多好处,这里就不一一列举了。

2、rxgo 在项目中的使用方式

2.1 单仓库情景下

package main

import (
	"fmt"
	"github.com/reactivex/rxgo/v2"
	"time"
)

/*---------------------------------------

	观察对象

---------------------------------------*/
var (
	// 事件bus
	eventCh         chan rxgo.Item
	eventObservable rxgo.Observable
)

func init() {
	//指令消息
	eventCh = make(chan rxgo.Item)
	// rxgo.FromEventSource 创建的是 热观察者, 即对从流的一开始产生的所有数据不感兴趣,只对我们开始观察之后的数据感兴趣。
	// 没有观察者的Observable发出的项目将会丢失。对上面表示不理解的同学可以看看 rxgo github 上面的介绍
	// 因为 rxgo.FromEventSource 默认在开始观察数据后, 是阻塞等待的
	// rxgo.WithBackPressureStrategy(rxgo.Drop)
	// 使用Drop策略意味着如果从FromEventSource后面的流程没有准备好消耗一个项目,这个项目会被丢弃。
	eventObservable = rxgo.FromEventSource(eventCh, rxgo.WithBackPressureStrategy(rxgo.Drop)) //observe()时指定buffer

}

func PublishEvent(e interface{}) {
	//发布事件
	eventCh <- rxgo.Of(e)
}

func GetObservable() rxgo.Observable {
	return eventObservable
}

func main() {
	for i := 0; i < 3; i++ {
		go consumer(i)
	}
	producer()

	time.Sleep(time.Second * 60)
}

func consumer(number int) {
	o := GetObservable().Filter(func(i interface{}) bool {
		fmt.Printf("coming。。。。。。。。。。。。。, number: %v, data: %+v\n", number, i)
		return true
	})
	o.DoOnNext(func(i interface{}) {

	})
}

func producer() {

	for i := 0; i < 3; i++ {
		fmt.Println(i)
		PublishEvent(i)
	}
}

特别注意,上述代码执行的结果是不确定的,因为 rxgo 采用的是异步事件流。

执行结果:

0
1
coming。。。。。。。。。。。。。, number: 1, data: 1
coming。。。。。。。。。。。。。, number: 2, data: 0
coming。。。。。。。。。。。。。, number: 0, data: 0
2
coming。。。。。。。。。。。。。, number: 1, data: 2
coming。。。。。。。。。。。。。, number: 0, data: 2
coming。。。。。。。。。。。。。, number: 2, data: 2

2.2 多仓库场景下

上面我们已经实现了一个生产者消费者模式的demo,在实际开发中,我们可能会将某些功能单独抽离出来作为一个仓库,那么这个时候应该怎么将单独仓库中产生的数据发送到全局的事件 Bus 中呢,接下来,一起看下下面的代码就知道如何处理了。

这里为了方便演示、测试,直接将代码写在了同一份文件中,理解到其中的含义后,我们可以自行拆解。

package main

import (
	"fmt"
	"github.com/reactivex/rxgo/v2"
	"time"
)

/*---------------------------------------

	观察对象

---------------------------------------*/
var (
	// 事件bus
	eventCh                chan rxgo.Item
	eventObservable        rxgo.Observable
	forwardEventObservable rxgo.Observable
)

func init() {
	//指令消息
	eventCh = make(chan rxgo.Item)
	// rxgo.FromEventSource 创建的是 热观察者, 即对从流的一开始产生的所有数据不感兴趣,只对我们开始观察之后的数据感兴趣。
	// 没有观察者的Observable发出的项目将会丢失。对上面表示不理解的同学可以看看 rxgo github 上面的介绍
	// 因为 rxgo.FromEventSource 默认在开始观察数据后, 是阻塞等待的
	// rxgo.WithBackPressureStrategy(rxgo.Drop)
	// 使用Drop策略意味着如果从FromEventSource后面的流程没有准备好消耗一个项目,这个项目会被丢弃。
	eventObservable = rxgo.FromEventSource(eventCh, rxgo.WithBackPressureStrategy(rxgo.Drop)) //observe()时指定buffer

	initOther() // 因为同一份文件的 init 函数是从上到下执行的, 为了 forwardEvent 不报错, 这里手动调用
	forwardEvent()
}

func PublishEvent(e interface{}) {
	//发布事件
	eventCh <- rxgo.Of(e)
}

func GetObservable() rxgo.Observable {
	return eventObservable
}

func forwardEvent() {
	forwardEventObservable = GetOtherObservable()
	forwardEventObservable.DoOnNext(func(i interface{}) {
		PublishOtherEvent(i)
	}, rxgo.WithBufferedChannel(2))
}

func main() {
	for i := 0; i < 1; i++ {
		go consumer(i)
	}
	go producer()

	// 这里类似于其他仓库中生成数据
	go otherProducer()

	time.Sleep(time.Second * 60)
}

func consumer(number int) {
	o := GetObservable().Filter(func(i interface{}) bool {
		fmt.Printf("coming。。。。。。。。。。。。。, number: %v, data: %+v\n", number, i)
		return true
	})
	o.DoOnNext(func(i interface{}) {

	})
}

func producer() {

	for i := 0; i < 3; i++ {
		fmt.Println(i)
		PublishEvent(i)
	}
}

func otherProducer() {

	for i := 0; i < 3; i++ {
		fmt.Println("other is sending")
		PublishEvent(fmt.Sprintf("other: %d", i))
	}
}

/*
	以下代码是其他仓库中的 rxgo 配置
*/

var (
	// 事件bus
	otherEvent      chan rxgo.Item
	otherObservable rxgo.Observable
)

func initOther() {
	//指令消息
	otherEvent = make(chan rxgo.Item)
	// rxgo.FromEventSource 创建的是 热观察者, 即对从流的一开始产生的所有数据不感兴趣,只对我们开始观察之后的数据感兴趣。
	// 没有观察者的Observable发出的项目将会丢失。对上面表示不理解的同学可以看看 rxgo github 上面的介绍
	// 因为 rxgo.FromEventSource 默认在开始观察数据后, 是阻塞等待的
	// rxgo.WithBackPressureStrategy(rxgo.Drop)
	// 使用Drop策略意味着如果从FromEventSource后面的流程没有准备好消耗一个项目,这个项目会被丢弃。
	otherObservable = rxgo.FromEventSource(otherEvent, rxgo.WithBackPressureStrategy(rxgo.Drop)) //observe()时指定buffer

}

func PublishOtherEvent(e interface{}) {
	//发布事件
	otherEvent <- rxgo.Of(e)
}

func GetOtherObservable() rxgo.Observable {
	return otherObservable
}

特别注意,上述代码执行的结果是不确定的。

执行结果:

0
1
other is sending
other is sending
coming。。。。。。。。。。。。。, number: 0, data: 0
2
coming。。。。。。。。。。。。。, number: 0, data: 2
other is sending
coming。。。。。。。。。。。。。, number: 0, data: other: 2

3、rxgo 配置简要说明

这里主要讲解下我们在实际项目中用到的几个参数配置,其他的等待你的自行发掘。

  • rxgo.WithBufferedChannel(3): 设置 channel 的缓存大小;

  • rxgo.WithBackPressureStrategy(rxgo.*Drop*): 使用Drop策略意味着如果后面的流程没有准备好消耗一个数据,这个数据会被丢弃。

演示案例:

package main

import (
	"fmt"
	"github.com/reactivex/rxgo/v2"
	"time"
)

/*---------------------------------------

	观察对象

---------------------------------------*/
var (
	// 事件bus
	eventCh         chan rxgo.Item
	eventObservable rxgo.Observable
)

func init() {
	//指令消息
	eventCh = make(chan rxgo.Item)
	// rxgo.FromEventSource 创建的是 热观察者, 即对从流的一开始产生的所有数据不感兴趣,只对我们开始观察之后的数据感兴趣。
	// 没有观察者的Observable发出的项目将会丢失。对上面表示不理解的同学可以看看 rxgo github 上面的介绍
	// 因为 rxgo.FromEventSource 默认在开始观察数据后, 是阻塞等待的
	// rxgo.WithBackPressureStrategy(rxgo.Drop)
	// 使用Drop策略意味着如果从FromEventSource后面的流程没有准备好消耗一个项目,这个项目会被丢弃。
	eventObservable = rxgo.FromEventSource(eventCh, rxgo.WithBackPressureStrategy(rxgo.Drop)) //observe()时指定buffer

}

func PublishEvent(e interface{}) {
	//发布事件
	eventCh <- rxgo.Of(e)
}

func GetObservable() rxgo.Observable {
	return eventObservable
}

func main() {
	for i := 0; i < 1; i++ {
		go consumer(i)
	}
	producer()

	time.Sleep(time.Second * 60)
}

func consumer(number int) {
	// 为什么是在 消费数据的地方设置 channel 的容量?
	// 这样可以根据消费者自身情况灵活调整
	o := GetObservable().Filter(func(i interface{}) bool {
		fmt.Printf("coming。。。。。。。。。。。。。, number: %v, data: %+v\n", number, i)
		return true
	}, rxgo.WithBufferedChannel(3))
	o.DoOnNext(func(i interface{}) {

	})
}

func producer() {

	for i := 0; i < 3; i++ {
		fmt.Println(i)
		PublishEvent(i)
	}
}

这次测试结果就是固定的了,大家可以想想为什么。

执行结果:

0
1
2
coming。。。。。。。。。。。。。, number: 0, data: 0
coming。。。。。。。。。。。。。, number: 0, data: 1
coming。。。。。。。。。。。。。, number: 0, data: 2

特别注意:

为什么是在 消费数据的地方设置 channel 的容量?
答:这样可以根据消费者自身情况灵活调整

标签:浅谈,项目,rxgo,number,eventCh,FromEventSource,coming,data
From: https://www.cnblogs.com/huageyiyangdewo/p/17604153.html

相关文章

  • gn/ninja: 谷歌的新一代项目构建系统简介
    gn/ninja背景gn是谷歌开源的一个元构建系统(meta-buildsystem)。这个”元构建“的意思是,gn并不直接帮你构建项目,而是帮你产生构建项目的ninja文件,然后你再用ninja去构建项目。或者你可以这么理解,gn相当于帮你生成Makefile,然后你再用make去编译构建你的项目。这么做的原因是,ninj......
  • Java(从零到企业级电商项目实战)学习笔记
    资料网站:http://learning.happymmall.com/env.html一、mybatis三剑客:generator,plugin,pagehelperpagehelper->https://github.com/pagehelper/Mybatis-PageHelper二、spring例子:https://github.com/spring-projects/spring-mvc-showcasehttps://github.com/spring-proj......
  • 前端性能优化的利器 ——— 浅谈JavaScript中的防抖和节流
    防抖和节流函数是工作中两种常用的前端性能优化函数,今天我就来总结一下什么是防抖和节流,并详细说明一下如何在工作中应用防抖和节流函数什么是防抖和节流?在JavaScript中,防抖(debounce)和节流(throttle)是用来限制函数执行频率的两种常见技术。防抖(debounce)是指在某个时间段内......
  • Java后端03(浅谈注解)
    注解功能一:提示信息功能二:存储信息​ 注解需要定义注解类,类对象需要有落实的实体,注解可以出现在类Class上,方法Method上,成员变量Field上以及构造方法Constructor上,注解对象需要被添加注解的实体所对应的反射对象进行获取,人话:要获得注解信息,首先要获得修饰的东西的反射......
  • 前端项目时因chunk-vendors过大导致首屏加载太慢,Vue Build时chunk-vendors的优化方案
    1、compression-webpack-plugin插件打包.gz文件安装插件也可以指定版本 我这里下载的是1.1.2版本的,试过更高的版本会有ES6语法的报错,因为我node使用的是v12,如果node版本更高可以尝试更高版本npminstall--save-devcompression-webpack-pluginnpminstall--save-devc......
  • 安卓UI相关开源项目库汇总
    awesome-github-android-ui是由OpenDigg整理并维护的安卓UI相关开源项目库集合。我们会定期同步OpenDigg上的项目到这里,也欢迎各位提交项目给我们。如果收录的项目有错误,可以通过issue反馈给我们。这里的项目Star数不是实时更新的,一般是一周更新一次。内容抽屉菜单ListViewWebVie......
  • 项目准备
    1. fastjson介绍   2java处理json的库 : fastjson , jackson , gsonfastjson是阿里巴巴的开源JSON解析库,它可以解析JSON格式的字符串,支持将Java Bean序列化为JSON字符串,也可以从JSON字符串反序列化到JavaBean。 fastjson是json的序列化和反序列化fastjson已经被广泛使......
  • 牛客网项目开发学习
    牛客网项目SpringSpringIocInversionofControl控制反转,是一种面向对象编程的设计思想。DependencyInjection依赖注入,是IOC思想的实现方式。IocContainerIoc容器,是实现依赖注入的关键,本质上是一个工厂。SpringMVC三层架构:表现层,业务层,数据访问层。MVC:Model模型......
  • 浅谈地产行业税务热点问题及解决方案
    地产行业作为我国国民经济的重要组成部分和国民经济支柱产业,对经济、金融和民生有着巨大的贡献。总体来说,房地产在我国经济社会发展中占据着关键性的地位,在经济增长、财政金融和民生等方面发挥着重要作用。中国约30%的GDP来自于房地产及其供应链的各类活动,包括使用钢铁、水泥、玻璃......
  • Eclipse 创建OSGI项目并调试
    File->new->Plug-inProject......