前言
虽然 RxJS 提供了非常多的 Operators. 但依然会有不够用的时候. 这时就可以自定义 Operator 了.
Operator Is Just a Function Observable => Observable
Operator 只是一个函数.
timer(1000).pipe(obs => obs).subscribe();
它接收一个 Observable 返回一个 Observable.
Operator 都是放在管道 (pipe) 内的, 所以从源头 Observable 一直传递下去.
每一个 Operator 接收 upstream(上游) 的 Observable 并且返回一个 Observable (通常是新的一个) 给 downstream(下游).
每一个 Operator 就是对发布值的加工处理.
Standard Operator
source 源头
首先我们有个源头
const source = timer(1000);
每一秒发布一次
Upstream / Downstream Observable
接着搞一个 pipe 和 operator.
const source = timer(1000); source .pipe(upstreamObs => { console.log(upstreamObs === source); // true const downstreamObs = new Observable(); return downstreamObs; }) .subscribe(v => console.log(v));
可以看到 operator 函数接收的就是 upstream 的 Observable, 然后返回一个新的 Observable 到 downstream.
Connect Upstream and Downstream
downstream to upstream
当 downstream 被 subscribe, 我们 subscribe upstream
当 downstream 被 unsubscribe, 我们 unsubscribe upstream
const source = timer(1000); source .pipe(upstreamObs => { const downstreamObs = new Observable(subscriber => { // 当 downstream Observable 被订阅以后, 我们才订阅 upstream Observable const upstreamSub = upstreamObs.subscribe(); return () => { // 当 downstream 被退订以后, 我们也要退订 upstream Observable upstreamSub.unsubscribe(); }; }); return downstreamObs; }) .subscribe(v => console.log(v));
upstream to downstream
当接收到 upstream 发布时, 我们也发布到 downstream
const downstreamObs = new Observable(subscriber => { const upstreamSub = upstreamObs.subscribe(upstreamValue => { const downStreamValue = upstreamValue + 'downstream value'; // decorate value for downstream subscriber.next(downStreamValue); // 发布到 downstream }); return () => { upstreamSub.unsubscribe(); }; });
小结
从上面几招可以看出来, 主要就是 upstream 和 downstream 中间的关系.
如何订阅, 退订, 发布 value 等等. 上面只是给一个简单直观的例子. 真实场景中, upstream 和 downstream 发布时机, 往往是不一致的 (比如 upstream 发布 1 次后, downstream 可能发布多次)
Operator with Config
只要把 Operator 包装成工厂函数就可以支持 Config 了
function myOperator(config: string): OperatorFunction<number, string> { console.log(config); return upstreamObs => { const downstreamObs = new Observable<string>(subscriber => { const upstreamSub = upstreamObs.subscribe(upstreamValue => { const downStreamValue = upstreamValue + 'downstream value'; // decorate value for downstream subscriber.next(downStreamValue); // 发布到 downstream }); return () => { upstreamSub.unsubscribe(); }; }); return downstreamObs; }; } const source = timer(1000); source.pipe(myOperator('some config here...')).subscribe(v => console.log(v));
一个 Factory 函数, 通过配置生产 Operator.
OperatorFunction 定义了 Operator 接口, 它是加入了泛型的 Observable => Observable.
如果 upstream 和 downstream 的类型是一样的话可以用它的简化版本 MonoTypeOperatorFunction<T>
标签:Observable,const,downstream,RxJS,Custom,source,upstream,Operator From: https://www.cnblogs.com/keatkeat/p/16851630.html