首页 > 编程语言 >Rxjava3文档级教程二: 操作符全解

Rxjava3文档级教程二: 操作符全解

时间:2022-12-11 20:35:54浏览次数:75  
标签:Observable lucas 04 ysalliance Rxjava3 操作符 myapplication 全解 com


商业转载请联系作者获得授权,非商业转载请注明出处。

​​Rxjava3文档级教程一: 介绍和基本使用​​

​​Rxjava3文档级教程二: 操作符全解​​

​​Rxjava3文档级教程三: 实战演练​​

目录

​​1 前言​​

​​1.1 用操作符组合Observable​​

​​1.2 操作符分类​​

​​1.2.1 创建操作​​

​​1.1.2 变换操作​​

​​1.1.3 过滤操作​​

​​1.1.4 组合操作​​

​​1.1.5 错误处理​​

​​1.1.6 辅助操作​​

​​1.1.7 条件和布尔操作​​

​​1.1.8 算术和聚合操作​​

​​1.1.9 连接操作​​

​​1.1.10 转换操作​​

​​1.1.11 操作符决策树​​

​​2 创建操作符​​

​​2.1 create()​​

​​2.2 from()​​

​​2.3 just()​​

​​2.4 defer()​​

​​2.5 range()​​

​​2.6 interval()​​

​​2.7 repeat()​​

​​2.8 timer()​​

​​3 过滤操作符​​

​​3.1 skip / skipLast​​

​​3.2 debounce(去抖动)​​

​​3.3 distinct(去重)​​

​​3.4 elementAt(获取指定位置元素)​​

​​3.5 filter(过滤)​​

​​3.6 first(第一个)​​

​​3.7 last(最后一个)​​

​​3.8 ignoreElements & ignoreElement(忽略元素)​​

​​3.9 ofType(过滤类型)​​

​​3.10 sample​​

​​3.11 throttleFirst & throttleLast & throttleWithTimeout & throttleLatest​​

​​3.12 take & takeLast​​

​​3.13 timeout(超时)​​

​​ 3.14 merge/concat​​

​​ 3.15 zip()​​

​​ 3.16 startWith()​​

​​ 3.17 join()​​

​​4 连接/组合操作符​​

​​4.1 startWith()​​

​​4.2 merge / mergeWith​​

​​4.3 zip()​​

​​4.4 combineLatest()​​

​​4.5 switchOnNext()​​

​​5 变换/转换操作符​​

​​5.1 map()​​

​​5.2 flatMap() / concatMap()​​

​​5.3 groupBy()​​

​​5.4 scan()​​

​​5.5 buffer()​​

​​5.6 window()​​

​​5.7 cast()​​

​​5.8 concatMapDelayError​​

​​5.9 concatMapCompletable()​​

​​5.10 concatMapCompletableDelayError()​​

​​5.11 flattenAsFlowable & flattenAsObservable​​

​​6 处理操作符​​

​​6.1 one rrorReturn()​​

​​6.2 one rrorReturnItem()​​

​​6.3 onExceptionResumeNext()​​

​​6.4 retry()​​

​​6.5 retryUntil()​​

​​6.6  retryWhen()​​

​​补充1:Rxjava3包结构的变动:​​

​​补充2:​​

​​参考文章:​​


1 前言

1.1 用操作符组合Observable

对于ReactiveX来说,Observable和Observer仅仅是个开始,它们本身不过是标准观察者模式的一些轻量级扩展,目的是为了更好的处理事件序列。

ReactiveX真正强大的地方在于它的操作符,操作符让你可以变换、组合、操纵和处理Observable发射的数据。

Rx的操作符让你可以用声明式的风格组合异步操作序列,它拥有回调的所有效率优势,同时又避免了典型的异步系统中嵌套回调的缺点。

下面是常用的操作符列表:

  1. ​创建操作​​ Create, Defer, Empty/Never/Throw, From, Interval, Just, Range, Repeat, Start, Timer
  2. ​变换操作​​ Buffer, FlatMap, GroupBy, Map, Scan和Window
  3. ​过滤操作​​ Debounce, Distinct, ElementAt, Filter, First, IgnoreElements, Last, Sample, Skip, SkipLast, Take, TakeLast
  4. ​组合操作​​ And/Then/When, CombineLatest, Join, Merge, StartWith, Switch, Zip
  5. ​错误处理​​ Catch和Retry
  6. ​辅助操作​​ Delay, Do, Materialize/Dematerialize, ObserveOn, Serialize, Subscribe, SubscribeOn, TimeInterval, Timeout, Timestamp, Using
  7. ​条件和布尔操作​​ All, Amb, Contains, DefaultIfEmpty, SequenceEqual, SkipUntil, SkipWhile, TakeUntil, TakeWhile
  8. ​算术和集合操作​​ Average, Concat, Count, Max, Min, Reduce, Sum
  9. ​转换操作​​ To
  10. ​连接操作​​ Connect, Publish, RefCount, Replay
  11. ​反压操作​​,用于增加特殊的流程控制策略的操作符

这些操作符并不全都是ReactiveX的核心组成部分,有一些是语言特定的实现或可选的模块。

1.2 操作符分类

ReactiveX的每种编程语言的实现都实现了一组操作符的集合。不同的实现之间有很多重叠的部分,也有一些操作符只存在特定的实现中。每种实现都倾向于用那种编程语言中他们熟悉的上下文中相似的方法给这些操作符命名。

本文首先会给出ReactiveX的核心操作符列表和对应的文档链接,后面还有一个决策树用于帮助你根据具体的场景选择合适的操作符。最后有一个语言特定实现的按字母排序的操作符列表。

如果你想实现你自己的操作符,可以参考这里:​​实现自定义操作符​

1.2.1 创建操作

​用于创建Observable的操作符​

1.1.2 变换操作

​这些操作符可用于对Observable发射的数据进行变换,详细解释可以看每个操作符的文档​

1.1.3 过滤操作

​这些操作符用于从Observable发射的数据中进行选择​

1.1.4 组合操作

​组合操作符用于将多个Observable组合成一个单一的Observable​

1.1.5 错误处理

​这些操作符用于从错误通知中恢复​

1.1.6 辅助操作

​一组用于处理Observable的操作符​

1.1.7 条件和布尔操作

​这些操作符可用于单个或多个数据项,也可用于Observable​

1.1.8 算术和聚合操作

​这些操作符可用于整个数据序列​

1.1.9 连接操作

​一些有精确可控的订阅行为的特殊Observable​

1.1.10 转换操作

1.1.11 操作符决策树

​几种主要的需求​

​这个页面展示了创建Observable的各种方法。​

对RxJava而言,操作符的相关内容Rxjava3 or 2 其实没什么改动,大部分Rxjava2的操作符都没变,即使有所变动,也只是包名或类名的改动。上面常见的操作符也可以直接从文档中查看用法,下面总结一些常用的操作符进行。


2 创建操作符

create()

创建最简单的事件流

from()

创建事件流,可发送不同类型的数据流

just()

创建事件流,可发送多个参数的数据流

defer()

创建事件流,可缓存可激活事件流

range()

创建事件流,可发送范围内的数据流

interval()

创建延时重复的事件流

repeat()

创建可重复次数的事件流

timer()

创建一次延时的事件流

注意:interval()、timer()、delay()的区别

  • interval():用于创建事件流,周期性重复发送
  • timer():用于创建事件流,延时发送一次
  • delay():用于事件流中,可以延时某次事件流的发送

2.1 create()

创建Observable最原始的方式,onNext/onComplete/onError方法可完全自由控制。在Rxjava3文档级教程:入门到掌握 (一 基本用法 )中,被观察者的创建基本都用的这种方式,不再重写赘述。

Rxjava3文档级教程二: 操作符全解_java

2.2 from()

Rxjava3文档级教程二: 操作符全解_android_02

String[] stringArray = {"a", "b", "c"};
Observable.fromArray(stringArray);
Observable.fromArray("a", "b", "c");
Observable.fromArray(1, 2, 3, 4);

fromIterable方法参数为实现Iterable接口的类,如List/Map/Set等集合类。

String[] strings = {"a", "b", "c"};
List<String> listString = Arrays.asList(strings);
Observable.fromIterable(listString);

2.3 just()

Rxjava3文档级教程二: 操作符全解_操作符_03

just重载了多个参数数量不同的方法,最大可带10个参数,just实际上同样是调用的fromArray方法;

Observable.just(1, 2, 3, 4)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i("lucas", interger + "");
}
});

2.4 defer()

Rxjava3文档级教程二: 操作符全解_操作符_04

defer确保了​​Observable​​代码在被订阅后才执行(而不是创建后立即执行)。

2.5 range()

Rxjava3文档级教程二: 操作符全解_android_05

//发送从10开始的整数,发送4个(发到13)
Observable.range(10, 4)
.subscribe(integer -> Log.i("lucas", ""+integer));
//发送从10开始的长整型数,发送6个(发到15)
Observable.rangeLong(10, 6)
.subscribe(integer -> Log.i("lucas", ""+integer));

2.6 interval()

Rxjava3文档级教程二: 操作符全解_java_06

interval用于定时发送

//每3秒发个自增整数
Observable.interval(3, TimeUnit.SECONDS);
//初始延时1秒,每3秒发一个自增整数
Observable.interval(1, 3, TimeUnit.SECONDS);
//初始延时2秒,后每1秒发一个从10开始的整数,发5个(发到14)停止
Observable.intervalRange(10, 5, 2, 1, TimeUnit.SECONDS);

2.7 repeat()

Rxjava3文档级教程二: 操作符全解_android_07

repeat操作符可以重复发送指定次数的某个事件流,repeat操作符默认在trampoline调度器上执行,repeat默认重复次数为Long.MAX_VALUE,可使用重载方法指定次数以及使用repeatUntil指定条件。

//一直重复
Observable.fromArray(1, 2, 3, 4).repeat();
//重复发送5次
Observable.fromArray(1, 2, 3, 4).repeat(5);
//重复发送直到符合条件时停止重复
Observable.fromArray(1, 2, 3, 4).repeatUntil(new BooleanSupplier() {
@Override
public boolean getAsBoolean() throws Exception {
//自定判断条件,为true即可停止,默认为false
return false;
}
});

2.8 timer()

Rxjava3文档级教程二: 操作符全解_RxJava_08

timer用于延时发送。

//延时3秒后,发送一个整数0
Observable.timer(3, TimeUnit.SECONDS);

3 过滤操作符

过滤操作符主要是指对数据源进行选择和过滤的常用操作符。

bounce()

事件流只发射规定范围时间内的数据项

distinct()

事件流只发射不重复的数据项

elementAt()

事件流只发射第N个数据项

filter()

事件流只发射符合规定函数的数据项

first()

事件流只发射第一个数据项

last()

事件流只发射最后一项数据项

ignoreElements()

忽略事件流的发射,只发射事件流的终止事件

sample()

事件流对指定的时间间隔进行数据项的采样

skip()

事件流忽略前N个数据项

skipLast()

事件流忽略后N个数据项

take()

事件流只发射前N个数据项

takeLast()

事件流只发射后N个数据项

3.1 skip / skipLast

Rxjava3文档级教程二: 操作符全解_RxJava_09

可以作用于Flowable,Observable,表示源发射数据前,跳过多少个。skipLast(n)操作表示从流的尾部跳过n个元素。

Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
source.skipLast(4)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Throwable {
Log.w("TAG","onNext--->"+ integer);
}
});
// 结果:1 2 3 4 5 6

//Lambda写法
Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
source.skip(4)
.subscribe(i->{
Log.w("TAG","onNext--->"+ i);
});
// 结果:5 6 7 8 9 10

3.2 debounce(去抖动)

Rxjava3文档级教程二: 操作符全解_android_10

可作用于Flowable,Observable。在Android开发,通常为了防止用户重复点击而设置标记位,而通过RxJava的debounce操作符可以有效达到该效果。在规定时间内,用户重复点击只有最后一次有效,

Observable<String> source = Observable.create(emitter -> {
emitter.onNext("A");

Thread.sleep(1_500);
emitter.onNext("B");

Thread.sleep(500);
emitter.onNext("C");

Thread.sleep(250);
emitter.onNext("D");

Thread.sleep(2000);
emitter.onNext("E");
emitter.onComplete();
});

source.subscribeOn(Schedulers.io())
.debounce(1, TimeUnit.SECONDS)
.blockingSubscribe(
item -> Log.w("TAG","onNext--->"+ item),
Throwable::printStackTrace,
() -> Log.w("TAG","onNext--->"+ "onComplete" ));

//结果
2020-04-03 16:08:47.520 30559-30559/com.ysalliance.getfan.myapplication W/TAG: onNext--->A
2020-04-03 16:08:49.777 30559-30559/com.ysalliance.getfan.myapplication W/TAG: onNext--->D
2020-04-03 16:08:50.776 30559-30559/com.ysalliance.getfan.myapplication W/TAG: onNext--->E
2020-04-03 16:08:50.776 30559-30559/com.ysalliance.getfan.myapplication W/TAG: onNext--->onComplete

上文代码中,数据源以一定的时间间隔发送A,B,C,D,E。操作符debounce的时间设为1秒,发送A后1.5秒并没有发射其他数据,所以A能成功发射。发射B后,在1秒之内,又发射了C和D,在D之后的2秒才发射E,所有B、C都失效,只有D有效;而E之后已经没有其他数据流了,所有E有效。

Rxjava3文档级教程二: 操作符全解_java_11

3.3 distinct(去重)

Rxjava3文档级教程二: 操作符全解_java_12

可作用于Flowable,Observable,去掉数据源重复的数据。

distinctUntilChanged()去掉相邻重复数据。

Observable.just(2, 3, 4, 4, 2, 1)
.distinct()
.subscribe(System.out::print);

// 打印:2 3 4 1
Observable.just(1, 1, 2, 1, 2, 3, 3, 4)
.distinctUntilChanged()
.subscribe(System.out::print);
//打印:1 2 1 2 3 4

3.4 elementAt(获取指定位置元素)

Rxjava3文档级教程二: 操作符全解_android_13

可作用于Flowable,Observable,从数据源获取指定位置的元素,从0开始。

elementAtOrError:指定元素的位置超过数据长度,则发射异常。

Observable.just(2,4,3,1,5,8)
.elementAt(0)
.subscribe(integer ->
Log.d("TAG","elmentAt->"+integer));
打印:2

Observable<String> source = Observable.just("Kirk", "Spock", "Chekov", "Sulu");
Single<String> element = source.elementAtOrError(4);

element.subscribe(
name -> System.out.println("onSuccess will not be printed!"),
error -> System.out.println("onError: " + error));
打印:onSuccess will not be printed!

3.5 filter(过滤)

Rxjava3文档级教程二: 操作符全解_android_14

可作用于 Flowable,Observable,Maybe,Single。在filter中返回表示发射该元素,返回false表示过滤该数据。

Observable.just(1, 2, 3, 4, 5, 6)
.filter(x -> x % 2 == 0)
.subscribe(System.out::print);
打印:2 4 6

3.6 first(第一个)

Rxjava3文档级教程二: 操作符全解_RxJava3_15

作用于 Flowable,Observable。发射数据源第一个数据,如果没有则发送默认值。

Observable<String> source = Observable.just("A", "B", "C");
Single<String> firstOrDefault = source.first("D");
firstOrDefault.subscribe(System.out::println);
打印:A

Observable<String> emptySource = Observable.empty();
Single<String> firstOrError = emptySource.firstOrError();
firstOrError.subscribe(
element -> System.out.println("onSuccess will not be printed!"),
error -> System.out.println("onError: " + error));
打印:onError: java.util.NoSuchElementException

和firstElement的区别是first返回的是Single,而firstElement返回Maybe。firstOrError在没有数据会返回异常。

3.7 last(最后一个)

Rxjava3文档级教程二: 操作符全解_操作符_16

last、lastElement、lastOrError与fist、firstElement、firstOrError相对应。

Observable<String> source = Observable.just("A", "B", "C");
Single<String> lastOrDefault = source.last("D");
lastOrDefault.subscribe(System.out::println);
//打印:C

Observable<String> source = Observable.just("A", "B", "C");
Maybe<String> last = source.lastElement();
last.subscribe(System.out::println);
//打印:C

Observable<String> emptySource = Observable.empty();
Single<String> lastOrError = emptySource.lastOrError();
lastOrError.subscribe(
element -> System.out.println("onSuccess will not be printed!"),
error -> System.out.println("onError: " + error));
// 打印:onError: java.util.NoSuchElementException

3.8 ignoreElements & ignoreElement(忽略元素)

Rxjava3文档级教程二: 操作符全解_android_17

ignoreElements 作用于Flowable、Observable。ignoreElement作用于Maybe、Single。两者都是忽略掉数据,不发射任何数据,返回完成或者错误时间。

这里关注下intervalRange的用法,以下面这个例子说明:从1开始输出5个数据,延迟1秒执行,每隔1秒执行一次:

Single<Long> source = Single.timer(1, TimeUnit.SECONDS);
Completable completable = source.ignoreElement();
completable.doOnComplete(() -> System.out.println("Done!"))
.blockingAwait();
// 1秒后打印:Donde!

Observable<Long> source = Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS);
Completable completable = source.ignoreElements();
completable.doOnComplete(() -> System.out.println("Done!"))
.blockingAwait();
// 六秒后打印:Done!

3.9 ofType(过滤类型)

作用于Flowable、Observable、Maybe,过滤选择类型。

Observable<Number> numbers = Observable.just(1, 4.0, 3, 2.71, 2f, 7);
Observable<Integer> integers = numbers.ofType(Integer.class);
integers.subscribe((Integer x) -> System.out.print(x+" "));
//打印:1 3 7

3.10 sample

Rxjava3文档级教程二: 操作符全解_java_18

作用于Flowable、Observable,在一个周期内发射最新的数据。sample操作符会在指定的事件内从数据项中采集所需要的数据。

Observable<String> source = Observable.create(emitter -> {
emitter.onNext("A");

Thread.sleep(500);
emitter.onNext("B");

Thread.sleep(200);
emitter.onNext("C");

Thread.sleep(800);
emitter.onNext("D");

Thread.sleep(600);
emitter.onNext("E");
emitter.onComplete();
});

source.subscribeOn(Schedulers.io())
.sample(1, TimeUnit.SECONDS)
.blockingSubscribe(
item -> System.out.print(item+" "),
Throwable::printStackTrace,
() -> System.out.print("onComplete"));

// 打印: C D onComplete

与debounce的区别是,sample是以时间为周期的发射,一秒又一秒内的最新数据。而debounce是最后一个有效数据开始。

Rxjava3文档级教程二: 操作符全解_java_19

3.11 throttleFirst & throttleLast & throttleWithTimeout & throttleLatest

作用于Flowable、Observable。throttleFirst是指定周期内第一个数据,throttleLast与smaple一致。throttleWithTimeout与debounce一致。

Observable<String> source = Observable.create(emitter -> {
emitter.onNext("A");

Thread.sleep(500);
emitter.onNext("B");

Thread.sleep(200);
emitter.onNext("C");

Thread.sleep(800);
emitter.onNext("D");

Thread.sleep(600);
emitter.onNext("E");
emitter.onComplete();
});

source.subscribeOn(Schedulers.io())
.throttleFirst(1, TimeUnit.SECONDS)
.blockingSubscribe(
item -> System.out.print(item+" "),
Throwable::printStackTrace,
() -> System.out.print(" onComplete"));
//打印:A D onComplete

source.subscribeOn(Schedulers.io())
.throttleLast(1, TimeUnit.SECONDS)
.blockingSubscribe(
item -> System.out.print(item+" "),
Throwable::printStackTrace,
() -> System.out.print(" onComplete"));

// 打印:C D onComplete

throttleLatest:如果源的第一个数据总会被发射,然后开始周期计时,此时的效果就会跟throttleLast一致。

Observable<String> source = Observable.create(emitter -> {
emitter.onNext("A");

Thread.sleep(500);
emitter.onNext("B");

Thread.sleep(200);
emitter.onNext("C");

Thread.sleep(200);
emitter.onNext("D");

Thread.sleep(400);
emitter.onNext("E");

Thread.sleep(400);
emitter.onNext("F");

Thread.sleep(400);
emitter.onNext("G");

Thread.sleep(2000);
emitter.onComplete();
});
source.subscribeOn(Schedulers.io())
.throttleLatest(1, TimeUnit.SECONDS)
.blockingSubscribe(
item -> Log.e("RxJava",item),
Throwable::printStackTrace,
() -> Log.e("RxJava","finished"));

//打印 A D F G RxJava","finished

3.12 take & takeLast

Rxjava3文档级教程二: 操作符全解_java_20

作用于Flowable、Observable。take发射前n个元素。takeLast发射后n个元素。

Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

source.take(4)
.subscribe(System.out::print);
//打印:1 2 3 4

source.takeLast(4)
.subscribe(System.out::println);
//打印:7 8 9 10

3.13 timeout(超时)

作用于Flowable、Observable、Maybe、Single、Completabl。后一个数据发射未在前一个元素发射后规定时间内发射则返回超时异常。

Observable<String> source = Observable.create(emitter -> {
emitter.onNext("A");

Thread.sleep(800);
emitter.onNext("B");

Thread.sleep(400);
emitter.onNext("C");

Thread.sleep(1200);
emitter.onNext("D");
emitter.onComplete();
});

source.timeout(1, TimeUnit.SECONDS)
.subscribe(
item -> System.out.println("onNext: " + item),
error -> System.out.println("onError: " + error),
() -> System.out.println("onComplete will not be printed!"));
// 打印:
// onNext: A
// onNext: B
// onNext: C
// one rror: java.util.concurrent.TimeoutException:
The source did not signal an event for 1 seconds
and has been terminated.

 3.14 merge/concat

Rxjava3文档级教程二: 操作符全解_操作符_21

merge操作符可以合并两个事件流,如果在merge操作符上增加延时发送的操作,那么就会导致其发射的数据项是无序的,会跟着发射的时间点进行合并。虽然是将两个事件流合并成一个事件流进行发射,但在最终的一个事件流中,发射出来的却是两次数据流。

merge和concat的区别:merge():合并后发射的数据项是无序的,concat():合并后发射的数据项是有序的。

Observable<String> just1 = Observable.just("A", "B", "C");
Observable<String> just2 = Observable.just("1", "2", "3");

Observable.merge(just1, just2).subscribe(new Consumer<Serializable>() {
@Override
public void accept(Serializable serializable) throws Exception {
Log.i("lucas", "" + serializable.toString() );
}
});
//打印结果
2020-04-04 17:52:20.734 4372-4372/com.ysalliance.getfan.myapplication I/lucas: A
2020-04-04 17:52:20.734 4372-4372/com.ysalliance.getfan.myapplication I/lucas: B
2020-04-04 17:52:20.734 4372-4372/com.ysalliance.getfan.myapplication I/lucas: C
2020-04-04 17:52:20.734 4372-4372/com.ysalliance.getfan.myapplication I/lucas: 1
2020-04-04 17:52:20.734 4372-4372/com.ysalliance.getfan.myapplication I/lucas: 2
2020-04-04 17:52:20.734 4372-4372/com.ysalliance.getfan.myapplication I/lucas: 3

 3.15 zip()

Rxjava3文档级教程二: 操作符全解_操作符_22

zip操作符是将两个数据流进行指定的函数规则合并。

Observable<String> just1 = Observable.just("A", "B", "C");
Observable<String> just2 = Observable.just("1", "2", "3");

Observable.zip(just1, just2, new BiFunction<String, String, String>() {
@Override
public String apply(String s, String s2) throws Exception {
return s + s2;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i("lucas", "" + s );
}
});
//打印结果
2020-04-04 17:55:08.905 4744-4744/com.ysalliance.getfan.myapplication I/lucas: A1
2020-04-04 17:55:08.905 4744-4744/com.ysalliance.getfan.myapplication I/lucas: B2
2020-04-04 17:55:08.905 4744-4744/com.ysalliance.getfan.myapplication I/lucas: C3

 3.16 startWith()

Rxjava3文档级教程二: 操作符全解_java_23

startWith操作符是将另一个数据流合并到原数据流的开头。

Observable<String> just1 = Observable.just("A", "B", "C");
Observable<String> just2 = Observable.just("1", "2", "3");

just1.startWith(just2).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i("lucas", "" + s );
}
});
//打印结果
2020-04-04 17:57:22.155 4917-4917/com.ysalliance.getfan.myapplication I/lucas: 1
2020-04-04 17:57:22.155 4917-4917/com.ysalliance.getfan.myapplication I/lucas: 2
2020-04-04 17:57:22.156 4917-4917/com.ysalliance.getfan.myapplication I/lucas: 3
2020-04-04 17:57:22.156 4917-4917/com.ysalliance.getfan.myapplication I/lucas: A
2020-04-04 17:57:22.156 4917-4917/com.ysalliance.getfan.myapplication I/lucas: B
2020-04-04 17:57:22.156 4917-4917/com.ysalliance.getfan.myapplication I/lucas: C

 3.17 join()

Rxjava3文档级教程二: 操作符全解_java_24

join操作符是有时间期限的合并操作符。

Observable<String> just1 = Observable.just("A", "B", "C");
Observable<Long> just2 = Observable.interval(1, TimeUnit.SECONDS);

just1.join(just2, new Function<String, ObservableSource<Long>>() {
@Override
public ObservableSource<Long> apply(String s) throws Exception {
return Observable.timer(3, TimeUnit.SECONDS);
}
}, new Function<Long, ObservableSource<Long>>() {
@Override
public ObservableSource<Long> apply(Long l) throws Exception {
return Observable.timer(8, TimeUnit.SECONDS);
}
}, new BiFunction<String, Long, String>() {
@Override
public String apply(String s, Long l) throws Exception {
return s + l;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i("lucas", "" + s );
}
});

//打印结果
2020-04-04 18:04:43.751 6042-6109/com.ysalliance.getfan.myapplication I/lucas: A0
2020-04-04 18:04:43.752 6042-6109/com.ysalliance.getfan.myapplication I/lucas: B0
2020-04-04 18:04:43.752 6042-6109/com.ysalliance.getfan.myapplication I/lucas: C0
2020-04-04 18:04:44.750 6042-6109/com.ysalliance.getfan.myapplication I/lucas: A1
2020-04-04 18:04:44.751 6042-6109/com.ysalliance.getfan.myapplication I/lucas: B1
2020-04-04 18:04:44.751 6042-6109/com.ysalliance.getfan.myapplication I/lucas: C1

join操作符有三个函数需要设置

第一个函数:规定just2的过期期限
第二个函数:规定just1的过期期限
第三个函数:规定just1和just2的合并规则
由于just2的期限只有3秒的时间,而just2延时1秒发送一次,所以just2只发射了2次,其输出的结果就只能和just2输出的两次进行合并,其输出格式有点类似我们的排列组合。


4 连接/组合操作符

通过连接操作符,可以将多个被观察数据(数据源)连接在一起。

4.1 startWith()

可作用于Flowable、Observable。将指定数据源合并在另外数据源的开头。

Observable<String> name = Observable.just("My", "name");
Observable<String> name2 = Observable.just("is", "Lucas","!");
name2.startWith(name).subscribe(item -> Log.d("Lucas",item));

//打印:
2020-04-03 16:35:49.493 31609-31609/com.ysalliance.getfan.myapplication D/Lucas: My
2020-04-03 16:35:49.493 31609-31609/com.ysalliance.getfan.myapplication D/Lucas: name
2020-04-03 16:35:49.493 31609-31609/com.ysalliance.getfan.myapplication D/Lucas: is
2020-04-03 16:35:49.493 31609-31609/com.ysalliance.getfan.myapplication D/Lucas: Lucas
2020-04-03 16:35:49.493 31609-31609/com.ysalliance.getfan.myapplication D/Lucas: !

4.2 merge / mergeWith

可作用所有数据源类型,用于合并多个数据源到一个数据源。

Observable<String> name = Observable.just("My", "name");
Observable<String> name2 = Observable.just("is", "Lucas","!");

Observable.merge(name,name2).subscribe(v -> Log.d("lucas", v));
name.mergeWith(name2).subscribe(v -> Log.d("lucas",v));
//打印:
2020-04-03 16:39:41.934 32212-32212/com.ysalliance.getfan.myapplication D/lucas: My
2020-04-03 16:39:41.934 32212-32212/com.ysalliance.getfan.myapplication D/lucas: name
2020-04-03 16:39:41.934 32212-32212/com.ysalliance.getfan.myapplication D/lucas: is
2020-04-03 16:39:41.935 32212-32212/com.ysalliance.getfan.myapplication D/lucas: Lucas
2020-04-03 16:39:41.935 32212-32212/com.ysalliance.getfan.myapplication D/lucas: !
2020-04-03 16:39:41.937 32212-32212/com.ysalliance.getfan.myapplication D/lucas: My
2020-04-03 16:39:41.937 32212-32212/com.ysalliance.getfan.myapplication D/lucas: name
2020-04-03 16:39:41.937 32212-32212/com.ysalliance.getfan.myapplication D/lucas: is
2020-04-03 16:39:41.937 32212-32212/com.ysalliance.getfan.myapplication D/lucas: Lucas
2020-04-03 16:39:41.937 32212-32212/com.ysalliance.getfan.myapplication D/lucas: !

merge在合并数据源时,如果一个合并发生异常后会立即调用观察者的onError方法,并停止合并。可通过mergeDelayError操作符,将发生的异常留到最后处理。

Observable<String> name = Observable.just("My", "name");
Observable<String> name2 = Observable.just("is", "Lucas","!");
Observable<String> error = Observable.error(new NullPointerException("Error!"));

Observable.mergeDelayError(name,error,name2).subscribe(
v -> Log.d("lucas",v), e->Log.d("lucas",e.getMessage()));

//打印:
2020-04-03 16:42:07.030 32391-32391/com.ysalliance.getfan.myapplication D/lucas: My
2020-04-03 16:42:07.030 32391-32391/com.ysalliance.getfan.myapplication D/lucas: name
2020-04-03 16:42:07.033 32391-32391/com.ysalliance.getfan.myapplication D/lucas: is
2020-04-03 16:42:07.034 32391-32391/com.ysalliance.getfan.myapplication D/lucas: Lucas
2020-04-03 16:42:07.034 32391-32391/com.ysalliance.getfan.myapplication D/lucas: !
2020-04-03 16:42:07.034 32391-32391/com.ysalliance.getfan.myapplication D/lucas: Error!

4.3 zip()

可作用于Flowable、Observable、Maybe、Single。将多个数据源的数据一个一个的合并在一起哇。当其中一个数据源发射完事件之后,若其他数据源还有数据未发射完毕,也会停止。

Observable<String> name = Observable.just("My", "name");
Observable<String> name2 = Observable.just("is", "Lucas", "!", "haha!");
name.zipWith(name2, (first, last) -> first + "-" + last)
.subscribe(item -> Log.d("lucas", item));

//打印:
2020-04-03 16:44:59.127 32616-32616/com.ysalliance.getfan.myapplication D/lucas: My-is
2020-04-03 16:44:59.128 32616-32616/com.ysalliance.getfan.myapplication D/lucas: name-Lucas

4.4 combineLatest()

Rxjava3文档级教程二: 操作符全解_操作符_25

public static String[] str = {"A", "B", "C", "D", "E"};

public void combineLatest() {
Observable<String> just1 = Observable.interval(1, TimeUnit.SECONDS).map(new Function<Long, String>() {
@Override
public String apply(Long aLong) throws Exception {
return str[(int) (aLong % 5)];
}
});
Observable<Long> just2 = Observable.interval(1, TimeUnit.SECONDS);

Observable.combineLatest(just1, just2, new BiFunction<String, Long, String>() {
@Override
public String apply(String s, Long l) throws Exception {
return s + l;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("onNext=" + s);
}
});
}

//输出
onNext=A0
onNext=B0
onNext=B1
onNext=C1
onNext=C2
onNext=D2
onNext=D3
onNext=E3
onNext=E4
onNext=A4
onNext=A5

可作用于Flowable, Observable。在结合不同数据源时,发射速度快的数据源最新item与较慢的相结合。 如下时间线,Observable-1发射速率快,发射了65,Observable-2才发射了C, 那么两者结合就是C5。

Rxjava3文档级教程二: 操作符全解_java_26

4.5 switchOnNext()

一个发射多个小数据源的数据源,这些小数据源发射数据的时间发生重复时,取最新的数据源。

Rxjava3文档级教程二: 操作符全解_操作符_27


5 变换/转换操作符

变换操作符用于变化数据源的数据,并转化为新的数据源。

map()

对数据流的类型进行转换

flatMap()

对数据流的类型进行包装成另一个数据流

scan()

对上一轮处理过后的数据流进行函数处理

groupBy()

对所有的数据流进行分组

buffer()

缓存发射的数据流到一定数量,随后发射出数据流集合

window()

缓存发射的数据流到一定数量,随后发射出新的事件流

5.1 map()

Rxjava3文档级教程二: 操作符全解_android_28

map利用Function进行类型转换的例子:

Observable.just("1", "2", "3").map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return Integer.valueOf(s) * 100;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i("lucas", ""+integer);
}
});

在实际编码中,我们还是常用Lambda表达式进行代码的精简优化,但是掌握Lambda的前提是,你还得会写非Lambda的表达,否则长时间下来可能会忘记原来的类写法:

Observable.just("1", "2", "3")
.map( (String s) -> Integer.valueOf(s) * 100)
.subscribe(integer -> Log.i("lucas", ""+integer));

可以看到,使用Lambda表达式后只有三行,简洁!

5.2 flatMap() / concatMap()

Rxjava3文档级教程二: 操作符全解_操作符_29

flatMap操作符将数据流进行类型转换,然后将新的数据流传递给新的事件流进行分发,这里通过模拟请求登录的延时操作进行说明

public static class UserParams {

public UserParams(String username, String password) {
this.username = username;
this.password = password;
}

public String username;
public String password;
}



Observable.just(new UserParams("lucas", "123123")).flatMap(new Function<UserParams, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(UserParams userParams) throws Exception {
return Observable.just(userParams.username + "登录成功").delay(2, TimeUnit.SECONDS);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i("lucas", "" + s );
}
});

Lambda表达式优化:

Observable.just(new UserParams("lucas", "123123"))
.flatMap((UserParams userParams)->Observable.just(userParams.username + "登录成功")
.delay(2, TimeUnit.SECONDS))
.subscribe(s -> Log.i("lucas", "" + s ));

concatMap与flatMap的区别:    concatMap是有序的,flatMap是无序的。

再举个组合的例子:

Observable.just("A", "B", "C")
.flatMap(a -> {
return Observable.intervalRange(1, 3, 0, 1, TimeUnit.SECONDS)
.map(b -> '(' + a + ", " + b + ')');
})
.blockingSubscribe( v -> {
Log.d("lucas", v+" " );
});

//打印结果
2020-04-03 22:05:58.363 12414-12414/com.ysalliance.getfan.myapplication D/lucas: (A, 1)
2020-04-03 22:05:58.365 12414-12414/com.ysalliance.getfan.myapplication D/lucas: (B, 1)
2020-04-03 22:05:58.367 12414-12414/com.ysalliance.getfan.myapplication D/lucas: (C, 1)
2020-04-03 22:05:59.355 12414-12414/com.ysalliance.getfan.myapplication D/lucas: (A, 2)
2020-04-03 22:05:59.362 12414-12414/com.ysalliance.getfan.myapplication D/lucas: (B, 2)
2020-04-03 22:05:59.362 12414-12414/com.ysalliance.getfan.myapplication D/lucas: (C, 2)
2020-04-03 22:06:00.357 12414-12414/com.ysalliance.getfan.myapplication D/lucas: (A, 3)
2020-04-03 22:06:00.361 12414-12414/com.ysalliance.getfan.myapplication D/lucas: (B, 3)
2020-04-03 22:06:00.363 12414-12414/com.ysalliance.getfan.myapplication D/lucas: (C, 3)

5.3 groupBy()

Rxjava3文档级教程二: 操作符全解_操作符_30

groupBy操作符可以将发射出来的数据项进行分组,并将分组后的数据项保存在具有key-value映射的事件流中。groupBy具体的分组规则由groupBy操作符传递进来的函数参数Function所决定的,它可以将key和value按照Function的返回值进行分组,返回一个具有分组规则的事件流GroupedObservable,注意这里分组出来的事件流是按照原始事件流的顺序输出的,我们可以通过sorted()对数据项进行排序,然后输出有序的数据流。

Observable.just("java", "c", "c++", "python", "javaScript", "android")
.groupBy(new Function<String, Character>() {
@Override
public Character apply(String s) throws Exception {
return s.charAt(0);//按首字母分组
}
})
.subscribe(new Consumer<GroupedObservable<Character, String>>() {
@Override
public void accept(final GroupedObservable<Character, String> characterStringGroupedObservable) throws Exception {
//排序后,直接订阅输出key和value
characterStringGroupedObservable.sorted().subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i("lucas", "onNext= key:" + characterStringGroupedObservable.getKey() + " value:" + s);
}
});
}
});

//打印结果
2020-04-04 16:36:13.766 32460-32460/com.ysalliance.getfan.myapplication I/lucas: onNext= key:p value:python
2020-04-04 16:36:13.766 32460-32460/com.ysalliance.getfan.myapplication I/lucas: onNext= key:a value:android
2020-04-04 16:36:13.766 32460-32460/com.ysalliance.getfan.myapplication I/lucas: onNext= key:c value:c
2020-04-04 16:36:13.766 32460-32460/com.ysalliance.getfan.myapplication I/lucas: onNext= key:c value:c++
2020-04-04 16:36:13.766 32460-32460/com.ysalliance.getfan.myapplication I/lucas: onNext= key:j value:java
2020-04-04 16:36:13.766 32460-32460/com.ysalliance.getfan.myapplication I/lucas: onNext= key:j value:javaScript
Observable<String> animals = Observable.just(
"Tiger", "Elephant", "Cat", "Chameleon", "Frog", "Fish", "Turtle", "Flamingo");

animals.groupBy(animal -> animal.charAt(0), String::toUpperCase)
.concatMapSingle(Observable::toList)
.subscribe(System.out::println);

// prints:
// [TIGER, TURTLE]
// [ELEPHANT]
// [CAT, CHAMELEON]
// [FROG, FISH, FLAMINGO]

5.4 scan()

Rxjava3文档级教程二: 操作符全解_java_31

scan操作符会对发射的数据和上一轮发射的数据进行函数处理,并返回的数据供下一轮使用,持续这个过程来产生剩余的数据流。其应用场景有简单的累加计算,判断所有数据的最小值等。

Observable.just(2, 4, 1, 9).scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer < integer2 ? integer : integer2;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer item) throws Exception {
Log.i("lucas", "" + item );
}
});

//打印结果
2020-04-04 16:14:10.931 30301-30301/com.ysalliance.getfan.myapplication I/lucas: 2
2020-04-04 16:14:10.931 30301-30301/com.ysalliance.getfan.myapplication I/lucas: 2
2020-04-04 16:14:10.931 30301-30301/com.ysalliance.getfan.myapplication I/lucas: 1
2020-04-04 16:14:10.931 30301-30301/com.ysalliance.getfan.myapplication I/lucas: 1

Lambda表达式写法:

Observable.just(2, 4, 1, 9)
.scan((Integer integer, Integer integer2) -> integer < integer2 ? integer : integer2)
.subscribe(item -> Log.i("lucas", "" + item ));

带初始值的聚合叠加:

Observable.just(1, 2, 3)
.scan(10, (x, y) -> x + y)
.subscribe(item -> Log.i("lucas", "" + item ));

// prints:
2020-04-04 16:23:06.590 31451-31451/com.ysalliance.getfan.myapplication I/lucas: 10
2020-04-04 16:23:06.590 31451-31451/com.ysalliance.getfan.myapplication I/lucas: 11
2020-04-04 16:23:06.590 31451-31451/com.ysalliance.getfan.myapplication I/lucas: 13
2020-04-04 16:23:06.590 31451-31451/com.ysalliance.getfan.myapplication I/lucas: 16

5.5 buffer()

Rxjava3文档级教程二: 操作符全解_操作符_32

buffer操作符可以将发射出来的数据流,在给定的缓存池中进行缓存,当缓存池中的数据项溢满时,则将缓存池的数据项进行输出,重复上述过程,直到将发射出来的数据全部发射出去。如果发射出来的数据不够缓存池的大小,则按照当前发射出来的数量进行输出。如果对buffer操作符设置了skip参数,则buffer每次缓存池溢满时,会跳过指定的skip数据项,然后再进行缓存和输出。

Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
.buffer(3).subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
Log.i("lucas", "" + integers.toString() );
}
});
//打印结果
2020-04-04 16:40:41.744 32722-32722/com.ysalliance.getfan.myapplication I/lucas: [1, 2, 3]
2020-04-04 16:40:41.745 32722-32722/com.ysalliance.getfan.myapplication I/lucas: [4, 5, 6]
2020-04-04 16:40:41.745 32722-32722/com.ysalliance.getfan.myapplication I/lucas: [7, 8]

5.6 window()

Rxjava3文档级教程二: 操作符全解_操作符_33

window操作符和buffer操作符在功能上实现的效果是一样的,但window操作符最大区别在于同样是缓存一定数量的数据项,window操作符最终发射出来的是新的事件流integerObservable,而buffer操作符发射出来的是新的数据流,也就是说,window操作符发射出来新的事件流中的数据项,还可以经过Rxjava其他操作符进行处理。

Observable.just(1, 2, 3, 4)
.window(2, 1).subscribe(new Consumer<Observable<Integer>>() {
@Override
public void accept(Observable<Integer> integerObservable) throws Exception {
integerObservable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i("lucas", "" + integer );
}
});
}
});

//打印结果
2020-04-04 16:49:54.681 1475-1475/com.ysalliance.getfan.myapplication I/lucas: 1
2020-04-04 16:49:54.681 1475-1475/com.ysalliance.getfan.myapplication I/lucas: 2
2020-04-04 16:49:54.682 1475-1475/com.ysalliance.getfan.myapplication I/lucas: 2
2020-04-04 16:49:54.682 1475-1475/com.ysalliance.getfan.myapplication I/lucas: 3
2020-04-04 16:49:54.682 1475-1475/com.ysalliance.getfan.myapplication I/lucas: 3
2020-04-04 16:49:54.682 1475-1475/com.ysalliance.getfan.myapplication I/lucas: 4
2020-04-04 16:49:54.682 1475-1475/com.ysalliance.getfan.myapplication I/lucas: 4
Observable.range(1, 4)
// Create windows containing at most 2 items, and skip 3 items before starting a new window.
.window(2)
.flatMapSingle(window -> {
return window.map(String::valueOf)
.reduce(new StringJoiner(", ", "[", "]"), StringJoiner::add);
})
.subscribe(System.out::println);

// prints:
// [1, 2]
// [3, 4]

5.7 cast()

作用于Flowable、Observable、Maybe、Single。将数据元素转型成其他类型,转型失败会抛出异常。

Observable<Number> numbers = Observable.just(1, 4.0, 3f, 7, 12, 4.6, 5);

numbers.filter((Number x) -> Integer.class.isInstance(x))
.cast(Integer.class)
.subscribe((Integer x) -> System.out.println(x));

// 打印:
// 1
// 7
// 12
// 5

5.8 concatMapDelayError

与concatMap作用相同,只是将过程发送的所有错误延迟到最后处理。

Observable.intervalRange(1, 3, 0, 1, TimeUnit.SECONDS)
.concatMapDelayError(x -> {
if (x.equals(1L)) return Observable.error(new IOException("Something went wrong!"));
else return Observable.just(x, x * x);
})
.blockingSubscribe(
x -> System.out.println("onNext: " + x),
error -> System.out.println("onError: " + error.getMessage()));

// prints:
// onNext: 2
// onNext: 4
// onNext: 3
// onNext: 9
// one rror: Something went wrong!

5.9 concatMapCompletable()

作用于Flowable、Observable。与contactMap类似,不过应用于函数后,返回的是CompletableSource。订阅一次并在所有CompletableSource对象完成时返回一个Completable对象。

Observable<Integer> source = Observable.just(2, 1, 3);
Completable completable = source.concatMapCompletable(x -> {
return Completable.timer(x, TimeUnit.SECONDS)
.doOnComplete(() -> System.out.println("Info: Processing of item \"" + x + "\" completed"));
});

completable.doOnComplete(() -> System.out.println("Info: Processing of all items completed"))
.blockingAwait();

// prints:
// Info: Processing of item "2" completed
// Info: Processing of item "1" completed
// Info: Processing of item "3" completed
// Info: Processing of all items completed

5.10 concatMapCompletableDelayError()

与concatMapCompletable作用相同,只是将过程发送的所有错误延迟到最后处理。

Observable<Integer> source = Observable.just(2, 1, 3);
Completable completable = source.concatMapCompletableDelayError(x -> {
if (x.equals(2)) {
return Completable.error(new IOException("Processing of item \"" + x + "\" failed!"));
} else {
return Completable.timer(1, TimeUnit.SECONDS)
.doOnComplete(() -> System.out.println("Info: Processing of item \"" + x + "\" completed"));
}
});

completable.doOnError(error -> System.out.println("Error: " + error.getMessage()))
.onErrorComplete()
.blockingAwait();

// prints:
// Info: Processing of item "1" completed
// Info: Processing of item "3" completed
// Error: Processing of item "2" failed!

5.11 flattenAsFlowable & flattenAsObservable

作用于Maybe、Single,将其转化为Flowable,或Observable。

Single<Double> source = Single.just(2.0);
Flowable<Double> flowable = source.flattenAsFlowable(x -> {
return List.of(x, Math.pow(x, 2), Math.pow(x, 3));
});

flowable.subscribe(x -> System.out.println("onNext: " + x));

// prints:
// onNext: 2.0
// onNext: 4.0
// onNext: 8.0

6 处理操作符

onErrorReturn()

当错误发生时,它会忽略onError的回调且会发射一个新的数据项并回调onCompleted()

onErrorResumeNext()

当错误发生时,它会忽略onError的回调且会发射一个新的事件流并回调onCompleted()

onExceptionResumeNext()

当错误发生时,如果onError收到的Throwable不是一个Exception,它会回调onError方法,且不会回调备用的事件流,如果onError收到的Throwable是一个Exception,它会回调备用的事件流进行数据的发射

retry()

当错误发生时,发射器会重新发射

retryWhen()

当错误发生时,根据Tharowble类型决定发射器是否重新发射

6.1 one rrorReturn()

Rxjava3文档级教程二: 操作符全解_android_34

作用于Flowable、Observable、Maybe、Single。但调用数据源的onError函数后会回到该函数,可对错误进行处理,然后返回值,会调用观察者onNext()继续执行,执行完调用onComplete()函数结束所有事件的发射。

Single.just("2A")
.map(v -> Integer.parseInt(v, 10))
.onErrorReturn(error -> {
if (error instanceof NumberFormatException) return 0;
else throw new IllegalArgumentException();
})
.subscribe(
v->Log.d("lucas", v+" " ),
error -> System.err.println("onError should not be printed!"));

//
2020-04-03 22:13:56.573 13238-13238/com.ysalliance.getfan.myapplication D/lucas: 0

6.2 one rrorReturnItem()

与onErrorReturn类似,onErrorReturnItem不对错误进行处理,直接返回一个值。

Single.just("2A")
.map(v -> Integer.parseInt(v, 10))
.onErrorReturnItem(0)
.subscribe(
v->Log.d("lucas", v+" " ),
error -> System.err.println("onError should not be printed!"));

//
2020-04-03 22:12:53.757 13100-13100/com.ysalliance.getfan.myapplication D/lucas: 0

6.3 onExceptionResumeNext()

Rxjava3文档级教程二: 操作符全解_java_35

可作用于Flowable、Observable、Maybe。

onErrorReturn发生异常时,回调onComplete()函数后不再往下执行,而onExceptionResumeNext则是要在处理异常的时候返回一个数据源,然后继续执行,如果返回null,则调用观察者的onError()函数。

Observable.create((ObservableOnSubscribe<Integer>) e -> {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onError(new NullPointerException());
e.onNext(4);
})
.onErrorResumeNext(throwable -> {
Log.d("lucas", "onErrorResumeNext ");
return Observable.just(4);
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d("lucas", "onSubscribe ");
}

@Override
public void onNext(Integer integer) {
Log.d("lucas", "onNext " + integer);
}

@Override
public void one rror(Throwable e) {
Log.d("lucas", "onError ");
}

@Override
public void onComplete() {
Log.d("lucas", "onComplete ");
}
});


//运行结果
2020-04-03 22:15:58.668 13456-13456/com.ysalliance.getfan.myapplication D/lucas: onSubscribe
2020-04-03 22:15:58.672 13456-13456/com.ysalliance.getfan.myapplication D/lucas: onNext 1
2020-04-03 22:15:58.672 13456-13456/com.ysalliance.getfan.myapplication D/lucas: onNext 2
2020-04-03 22:15:58.673 13456-13456/com.ysalliance.getfan.myapplication D/lucas: onNext 3
2020-04-03 22:15:58.673 13456-13456/com.ysalliance.getfan.myapplication D/lucas: one rrorResumeNext
2020-04-03 22:15:58.674 13456-13456/com.ysalliance.getfan.myapplication D/lucas: onNext 4
2020-04-03 22:15:58.674 13456-13456/com.ysalliance.getfan.myapplication D/lucas: onComplete

6.4 retry()

Rxjava3文档级教程二: 操作符全解_android_36

可作用于所有的数据源,当发生错误时,数据源重复发射item,直到没有异常或者达到所指定的次数。

Observable.create((ObservableOnSubscribe<Integer>) e -> {
e.onNext(1);
e.onNext(2);

if (first){
first=false;
e.onError(new NullPointerException());

}

})
.retry(9)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d("lucas", "onSubscribe ");
}

@Override
public void onNext(Integer integer) {
Log.d("lucas", "onNext " + integer);

}

@Override
public void one rror(Throwable e) {
Log.d("lucas", "onError ");
}

@Override
public void onComplete() {
Log.d("lucas", "onComplete ");
}
});

//打印结果
2020-04-03 22:18:28.605 13647-13647/com.ysalliance.getfan.myapplication D/lucas: onSubscribe
2020-04-03 22:18:28.607 13647-13647/com.ysalliance.getfan.myapplication D/lucas: onNext 1
2020-04-03 22:18:28.607 13647-13647/com.ysalliance.getfan.myapplication D/lucas: onNext 2
2020-04-03 22:18:28.607 13647-13647/com.ysalliance.getfan.myapplication D/lucas: onNext 1
2020-04-03 22:18:28.607 13647-13647/com.ysalliance.getfan.myapplication D/lucas: onNext 2
  • retry():表示重试无限次
  • retry(long times):表示重试指定次数
  • retry(Func predicate):可以根据函数参数中的Throwable类型和重试次数决定本次需不需要重试

6.5 retryUntil()

作用于Flowable、Observable、Maybe。与retry类似,但发生异常时,返回值是false表示继续执行(重复发射数据),true不再执行,但会调用onError方法。

Observable.create((ObservableOnSubscribe<Integer>) e -> {
e.onNext(1);
e.onNext(2);
e.onError(new NullPointerException());
e.onNext(3);
e.onComplete();
})
.retryUntil(() -> true)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d("lucas", "onSubscribe ");
}

@Override
public void onNext(Integer integer) {
Log.d("lucas", "onNext " + integer);

}

@Override
public void one rror(Throwable e) {
Log.d("lucas", "onError ");
}

@Override
public void onComplete() {
Log.d("lucas", "onComplete ");
}
});

//打印结果
2020-04-03 22:22:01.625 13905-13905/com.ysalliance.getfan.myapplication D/lucas: onSubscribe
2020-04-03 22:22:01.627 13905-13905/com.ysalliance.getfan.myapplication D/lucas: onNext 1
2020-04-03 22:22:01.627 13905-13905/com.ysalliance.getfan.myapplication D/lucas: onNext 2
2020-04-03 22:22:01.627 13905-13905/com.ysalliance.getfan.myapplication D/lucas: one rror

6.6  retryWhen()

Rxjava3文档级教程二: 操作符全解_android_37

retryWhen操作符和retry操作符相似,区别在于retryWhen将错误Throwable传递给了函数进行处理并产生新的事件流进行处理。

private static int retryCount = 0;
private static int maxRetries = 2;

public void retryWhen(){
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
for (int i = 1; i < 5; i++) {
if (i == 4) {
e.onError(new Exception("onError crash"));
}
e.onNext(i);
}
}
})
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
if (++retryCount <= maxRetries) {
// When this Observable calls onNext, the original Observable will be retried (i.e. re-subscribed).
System.out.println("get error, it will try after " + 1 + " seconds, retry count " + retryCount);
return Observable.timer(1, TimeUnit.SECONDS);
}
return Observable.error(throwable);
}
});
}
})
.onErrorReturn(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable throwable) throws Exception {
return -1;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
System.out.println("onError");
}
}, new Action() {

@Override
public void run() throws Exception {
System.out.println("onComplete");
}
});
}


//结果
onNext=1
onNext=2
onNext=3
get error, it will try after 1 seconds, retry count 1
onNext=1
onNext=2
onNext=3
get error, it will try after 1 seconds, retry count 2
onNext=1
onNext=2
onNext=3
onNext=-1
onComplete

补充1:Rxjava3包结构的变动:

Rxjava3组件位于 io.reactivex.rxjava3包(rxjava1有 rx,rxjava2是 io.reactivex)。 这使得Rxjava3可以和早期版本一起使用。 此外,RxJava 的核心类型(Flowable、 Observer 等)已经被移动到 io.reactivex.rxjava3.core。

Component

RxJava 2

RxJava 3

Core

​io.reactivex​

​io.reactivex.rxjava3.core​

Annotations

​io.reactivex.annotations​

​io.reactivex.rxjava3.annotations​

Disposables

​io.reactivex.disposables​

​io.reactivex.rxjava3.disposables​

Exceptions

​io.reactivex.exceptions​

​io.reactivex.rxjava3.exceptions​

Functions

​io.reactivex.functions​

​io.reactivex.rxjava3.functions​

Flowables

​io.reactivex.flowables​

​io.reactivex.rxjava3.flowables​

Observables

​io.reactivex.observables​

​io.reactivex.rxjava3.observables​

Subjects

​io.reactivex.subjects​

​io.reactivex.rxjava3.subjects​

Processors

​io.reactivex.processors​

​io.reactivex.rxjava3.processors​

Observers

​io.reactivex.observers​

​io.reactivex.rxjava3.observers​

Subscribers

​io.reactivex.subscribers​

​io.reactivex.rxjava3.subscribers​

Parallel

​io.reactivex.parallel​

​io.reactivex.rxjava3.parallel​

Internal

​io.reactivex.internal​

​io.reactivex.rxjava3.internal​

补充2:

在RxJava1.0中,有的人会使用CompositeSubscription来收集Subscription,来统一取消订阅,现在在RxJava2.0中,由于subscribe()方法现在返回void,那怎么办呢?

其实在RxJava2.0中,Flowable提供了subscribeWith这个方法可以返回当前订阅的观察者,并且通过ResourceSubscriber DisposableSubscriber等观察者来提供 Disposable的接口

所以,如果想要达成RxJava1.0的效果,现在应该是这样做:

CompositeDisposable composite = new CompositeDisposable();

composite.add(Flowable.range(1, 8).subscribeWith(subscriber));

这个subscriber 应该是 ResourceSubscriber 或者 DisposableSubscriber 的实例。

参考文章:

因为写RxJava系列的文章时进行了很多阅读和参考,因此不分一二三等,将全系列的参考引用统一如下:

RxJava3 Wiki:​​https://github.com/ReactiveX/RxJava/wiki​

RxJava3官方github:​​What's different in 3.0 · ReactiveX/RxJava Wiki · GitHub​

ReactiveX文档中文翻译:​​创建操作 · ReactiveX文档中文翻译​

single:​​ReactiveX - Single​

操作符系列讲的很好的文章:​​Android响应式编程——RxJava3框架的使用(二)_e电动小马达e的博客-CSDN博客​​

基础介绍:​​Android响应式编程——RxJava3框架的使用(一)_e电动小马达e的博客-CSDN博客_android rxjava3​​

RxLifecycle:​​https://github.com/trello/RxLifecycle​



标签:Observable,lucas,04,ysalliance,Rxjava3,操作符,myapplication,全解,com
From: https://blog.51cto.com/u_12853553/5928614

相关文章

  • c语言表达式求值和操作符属性
    一、表达式求值表达式求值顺序一部分是由操作符的优先级和结合性决定。同样,有些表达式的操作数在求值的过程中可能需要转化为其他类型1.隐式类型转换表达式中的字符和短整型......
  • C语言操作符
    C语言的操作符分为:算术操作符、移位操作符、位操作符、赋值操作符、单目操作符、关系操作符、逻辑操作符、条件操作符、逗号操作符、下标引用,函数调用和结构成员。 ​一、......
  • post表单数据格式完全解析multipart/form-data(C#实现)
    post表单数据格式完全解析multipart/form-data参数说明内容boundaryboundary一个字符串,用以分隔不同的参数;stringboundary=Guid.NewGuid().ToString()......
  • Service完全解析
    下面我们就围绕Service对其进行全面讲解:(请发邮件到 ​​[email protected]​​  获得最新翻强软件。)1.Service生命周期Service生命周期可以从两种启动Service......
  • JavaScript:操作符: 逗号运算符
    逗号运算符,是极少见的运算符,我们看一下代码理解一下逗号运算符的功能:先说结论,逗号运算符的优先级非常低,比赋值运算符=还要低;同时,逗号隔开的几个表达式,都会各自进行计算,......
  • JavaScript:操作符: 空值合并运算符(??)
    这是一个新增的运算符,它的功能是:对于表达式1??表达式2,如果表达式1的结果是null或者undefined时,返回表达式b的结果;否则返回表达式a的结果;它与赋值运算符结合使用,即??=,即......
  • JavaScript:操作符:操作符的特点
    在JS中,所有的操作符,都同时在做两件事,第一件事是进行计算,第二件事是返回计算的结果,这个结果需要有变量去接收,否则就成为无人认领的数据而被垃圾回收;在JS中,有很多不常用的......
  • JavaScript:操作符:正负号和自增自减及其隐式转换
    正负号正号即加号,负号即减号,运算结果同数学意义一样;对非数字类型进行正负号运算,会隐式转换为数字,再进行运算;一些特殊的非数字,转换情况同算术运算符;自增自减自增即为++......
  • JavaScript:操作符:比较运算符及其隐式转换
    不等关系即大于>;大于等于>=;小于<;小于等于<=当比较的两个变量,有非数字时,会隐式转换为数字再比较,转换情况同算术运算符;当两个变量均为字符串时,不会进行转换,而是逐位比较......
  • JavaScript:操作符:逻辑运算符及其隐式转换
    逻辑非!用来对布尔值进行取反,即!true=false;当取反的变量不是布尔值,会进行隐式转换为布尔值:非0的数字,都转换为true非空字符串,转换为true非空对象,转换为trueInf......