首页 > 其他分享 >30 天精通 RxJS (22):Subject 基本观念

30 天精通 RxJS (22):Subject 基本观念

时间:2024-04-16 22:34:17浏览次数:34  
标签:console complete 22 RxJS 30 next error subject log

RxJS Logo

终于进到了 RxJS 的第二个重点 Subject,不知道读者们有没有发现? 我们在这篇文章之前的范例,每个 observable 都只订阅了一次,而实际上 observable 是可以多次订阅的

Multiple subscriptions

var source = Rx.Observable.interval(1000).take(3)
var observerA = {
	next: (value) => console.log('A next: ' + value),
	error: (error) => console.log('A error: ' + error),
	complete: () => console.log('A complete!'),
}
var observerB = {
	next: (value) => console.log('B next: ' + value),
	error: (error) => console.log('B error: ' + error),
	complete: () => console.log('B complete!'),
}
source.subscribe(observerA)
source.subscribe(observerB)
// "A next: 0"
// "B next: 0"
// "A next: 1"
// "B next: 1"
// "A next: 2"
// "A complete!"
// "B next: 2"
// "B complete!"

上面这段代码,分别用 observerA 跟 observerB 订阅了 source,从 log 可以看出来 observerA 跟 observerB 都各自收到了元素,但请记得这两个 observer 其实是分开执行的也就是说他们是完全独立的,我们把 observerB 延迟订阅来证明看看

var source = Rx.Observable.interval(1000).take(3)

var observerA = {
	next: (value) => console.log('A next: ' + value),
	error: (error) => console.log('A error: ' + error),
	complete: () => console.log('A complete!'),
}

var observerB = {
	next: (value) => console.log('B next: ' + value),
	error: (error) => console.log('B error: ' + error),
	complete: () => console.log('B complete!'),
}

source.subscribe(observerA)
setTimeout(() => {
	source.subscribe(observerB)
}, 1000)

// "A next: 0"
// "A next: 1"
// "B next: 0"
// "A next: 2"
// "A complete!"
// "B next: 1"
// "B next: 2"
// "B complete!"

这里我们延迟一秒再用 observerB 订阅,可以从 log 中看出 1 秒后 observerA 已经印到了 1,这时 observerB 开始印却是从 0 开始,而不是接着 observerA 的进度,代表这两次的订阅是完全分开来执行的,或者说是每次的订阅都建立了一个新的执行。

这样的行为在大部分的情境下适用,但有些案例下我们会希望第二次订阅 source 不会从头开始接收元素,而是从第一次订阅到当前处理的元素开始发送,我们把这种处理方式称为组播(multicast),那我们要如何做到组播呢?

手动建立 subject

或许已经有读者想到解法了,其实我们可以建立一个中间人来订阅 source 再由中间人转送资料出去,就可以达到我们想要的效果

var source = Rx.Observable.interval(1000).take(3)

var observerA = {
	next: (value) => console.log('A next: ' + value),
	error: (error) => console.log('A error: ' + error),
	complete: () => console.log('A complete!'),
}

var observerB = {
	next: (value) => console.log('B next: ' + value),
	error: (error) => console.log('B error: ' + error),
	complete: () => console.log('B complete!'),
}

var subject = {
	observers: [],
	addObserver: function (observer) {
		this.observers.push(observer)
	},
	next: function (value) {
		this.observers.forEach((o) => o.next(value))
	},
	error: function (error) {
		this.observers.forEach((o) => o.error(error))
	},
	complete: function () {
		this.observers.forEach((o) => o.complete())
	},
}

subject.addObserver(observerA)

source.subscribe(subject)

setTimeout(() => {
	subject.addObserver(observerB)
}, 1000)

// "A next: 0"
// "A next: 1"
// "B next: 1"
// "A next: 2"
// "B next: 2"
// "A complete!"
// "B complete!"

从上面的代码可以看到,我们先建立了一个实例叫 subject,这个实例具备 observer 所有的方法(next, error, complete),并且还能 addObserver 把 observer 加到内部的清单中,每当有值送出就会遍历清单中的所有 observer 并把值再次送出,这样一来不管多久之后加进来的 observer,都会是从当前处理到的元素接续往下走,就像范例中所示,我们用 subject 订阅 source 并把 observerA 加到 subject 中,一秒后再把 observerB 加到 subject,这时就可以看到 observerB 是直接收 1 开始,这就是组播(multicast)的行为。

让我们把 subject 的 addObserver 改名成 subscribe 如下

var subject = {
	observers: [],
	subscribe: function (observer) {
		this.observers.push(observer)
	},
	next: function (value) {
		this.observers.forEach((o) => o.next(value))
	},
	error: function (error) {
		this.observers.forEach((o) => o.error(error))
	},
	complete: function () {
		this.observers.forEach((o) => o.complete())
	},
}

应该有眼尖的读者已经发现,subject 其实就是用了 Observer Pattern。但这边为了不要混淆 Observer Pattern 跟 RxJS 的 observer 就不再提及。这也是为什么我们在一开始讲 Observer Pattern 希望大家亲自实现的原因。

RxJS 中的 Subject 确实是类似这样运行的,可以在原始码中看到

虽然上面是我们自己手写的 subject,但运行方式跟 RxJS 的 Subject 实例是几乎一样的,我们把前面的代码改成 RxJS 提供的 Subject 试试

var source = Rx.Observable.interval(1000).take(3)

var observerA = {
	next: (value) => console.log('A next: ' + value),
	error: (error) => console.log('A error: ' + error),
	complete: () => console.log('A complete!'),
}

var observerB = {
	next: (value) => console.log('B next: ' + value),
	error: (error) => console.log('B error: ' + error),
	complete: () => console.log('B complete!'),
}

var subject = new Rx.Subject()

subject.subscribe(observerA)

source.subscribe(subject)

setTimeout(() => {
	subject.subscribe(observerB)
}, 1000)

// "A next: 0"
// "A next: 1"
// "B next: 1"
// "A next: 2"
// "B next: 2"
// "A complete!"
// "B complete!"

大家会发现使用方式跟前面是相同的,建立一个 subject 先拿去订阅 observable(source),再把我们真正的 observer 加到 subject 中,这样一来就能完成订阅,而每个加到 subject 中的 observer 都能整组的接收到相同的元素。

什么是 Subject?

虽然前面我们已经示范直接手写一个简单的 subject,但到底 RxJS 中的 Subject 的概念到底是什么呢?

首先 Subject 可以拿去订阅 Observable(source) 代表他是一个 Observer,同时 Subject 又可以被 Observer(observerA, observerB) 订阅,代表他是一个 Observable。

总结成两句话

  • Subject 同时是 Observable 又是 Observer
  • Subject 会对内部的 observers 清单进行组播(multicast)

补充: 没事不要看!其实 Subject 就是 Observer Pattern 的实例并且继承自 Observable。

今日小结

今天介绍了 RxJS 中的第二个重点 Subject,重点放在 Subject 主要的运行方式,以及概念上的所代表的意思,如果今天还不太能够吸收的读者不用紧张,后面我们会讲到 subject 的一些应用,到时候就会有更深的体会。

不知道今天读者么有没有收获呢? 如果有任何问题,欢迎在下方留言给我。

本系列仅作为学习记录所用,摘录自30 天精通 Rxjs!强烈推荐!膜拜大佬!

标签:console,complete,22,RxJS,30,next,error,subject,log
From: https://www.cnblogs.com/xiaojiuwow/p/18139385

相关文章

  • 30 天精通 RxJS (21):深入 Observable
    我们已经把绝大部分的operators都介绍完了,但一直没有机会好好的解释Observable的operators运行方式。在系列文章的一开头是以数组(Array)的operators(map,filter,concatAll)作为切入点,让读者们在学习observable时会更容易接受跟理解,但实际上observable的oper......
  • 30.swagger springboot中简单整理
    类似postman接口在线测试API接口哈哈我也不太懂接口还没前后端整合过呢也就是注释你的各种接口吧可有可无先说依赖问题这个是swagger2的依赖当然出现问题了已经更新到了swagger3了参考:https://cloud.tencent.com/developer/article/1936862https://mvnrepos......
  • P8290 [省选联考 2022] 填树
    MyBlogsP8290[省选联考2022]填树很有意思的拉插优化DP。首先可以枚举\(L\)来限制选的数的值域在\(L,L+k\)中。然后进行树上DP:设\(v_i\)表示当前点\(i\)能填多少种数,\(w_i\)表示当前点\(i\)能填的数的和。\(f_i\)表示当前\(i\)子树内的所有合法根链数量,\(g......
  • 【题解】P4307 [JSOI2009] 球队收益 / 球队预算
    P4307[JSOI2009]球队收益/球队预算题解题目传送门题意简述一共有\(n\)个球队比赛,输了赢了都会有相应的支出,现在让你安排\(m\)场比赛的输赢,是总支出最少。思路首先看到最小支出,状态不好定义,直接费用流,启动!。后文如果没有特殊说明,边的费用均为\(0\)。考虑建图,其......
  • masscan下载编译安装,Visual Studio 2022
    Windowswin11编译masscan.exe,2024解决错误:LNK2019无法解析的外部符号e_next_bytee_next_int32第一步:克隆仓库https://github.com/robertdavidgraham/masscan.git第二部:VisualStudio打开vs10/masscan.sln第三步:项目-重定目标解决方案第四步:生成-重新生成解决方案......
  • [题解] [CCPC陕西省赛2022 D题] Hash
    [CCPC陕西省赛2022D题]Hash题目描述给定一个字符串\(S\),按照如下方法获取\(S\)的哈希值://LanguageC++14longlongmod=5999993;longlonggethas(strings){longlongret=0;for(charc:s)ret=(ret*29+(c-'a'+1))%mod;returnret;}找到一个......
  • 如何在Ubuntu 22.04上用Docker安装Sentry
    Sentry是一个免费和开源的错误跟踪平台,可以实时监控和修复崩溃。它使软件开发人员能够看到重要的东西,更快地解决问题,并不断了解他们的应用程序。这个平台提供了对生产部署的实时洞察力,并提供了重现和修复崩溃的信息。Sentry支持所有主要的语言和框架,并与你喜欢的应用程序和服务集......
  • [转帖]MiSans字库GB18030标准验证情况
    https://zhuanlan.zhihu.com/p/663626158  提供的信息,手机厂商终于见到跨入GB18030-2022实现级别3的进步。 MiSans网站提供两个汉字库下载和使用,MiSansFAQ也告知只要升级去XiaomiHyperOS后,已可完整支持。MiSansL3字体有以下说明:MiSans新增60340字符符合GB180......
  • [题解][2021-2022年度国际大学生程序设计竞赛第10届陕西省程序设计竞赛] Type The Str
    题目描述给定n个字符串,有以下几种操作:打出一个字符,花费1。删除一个字符,花费1。复制并打出一个之前打出过的字符串,花费k。求打出所有n个字符串的最小花费。(注意,打出顺序和字符串输入的顺序不必相同)题解显然,操作3需要算字符串的最长公共子序列来处理。这个问题可以转换为......
  • 30 天精通 RxJS (20):Observable Operators - window, windowToggle, groupBy
    前几天我们讲完了能把HigherOrderObservable转成一般的Observable的operators,今天我们要讲能够把一般的Observable转成HigherOrderObservable的operators。其实前端不太有机会用到这类型的Operators,都是在比较特殊的需求下才会看到,但还是会有遇到的时候。Op......