商业转载请联系作者获得授权,非商业转载请注明出处。
Rxjava3文档级教程一: 介绍和基本使用
Rxjava3文档级教程二: 操作符全解
Rxjava3文档级教程三: 实战演练
目录
1 前言
1.1 用操作符组合Observable
1.2 操作符分类
1.2.1 创建操作
1.1.2 变换操作
1.1.3 过滤操作
1.1.4 组合操作
1.1.5 错误处理
1.1.6 辅助操作
1.1.7 条件和布尔操作
1.1.8 算术和聚合操作
1.1.9 连接操作
1.1.10 转换操作
1.1.11 操作符决策树
2 创建操作符
2.1 create()
2.2 from()
2.3 just()
2.4 defer()
2.5 range()
2.6 interval()
2.7 repeat()
2.8 timer()
3 过滤操作符
3.1 skip / skipLast
3.2 debounce(去抖动)
3.3 distinct(去重)
3.4 elementAt(获取指定位置元素)
3.5 filter(过滤)
3.6 first(第一个)
3.7 last(最后一个)
3.8 ignoreElements & ignoreElement(忽略元素)
3.9 ofType(过滤类型)
3.10 sample
3.11 throttleFirst & throttleLast & throttleWithTimeout & throttleLatest
3.12 take & takeLast
3.13 timeout(超时)
3.14 merge/concat
3.15 zip()
3.16 startWith()
3.17 join()
4 连接/组合操作符
4.1 startWith()
4.2 merge / mergeWith
4.3 zip()
4.4 combineLatest()
4.5 switchOnNext()
5 变换/转换操作符
5.1 map()
5.2 flatMap() / concatMap()
5.3 groupBy()
5.4 scan()
5.5 buffer()
5.6 window()
5.7 cast()
5.8 concatMapDelayError
5.9 concatMapCompletable()
5.10 concatMapCompletableDelayError()
5.11 flattenAsFlowable & flattenAsObservable
6 处理操作符
6.1 one rrorReturn()
6.2 one rrorReturnItem()
6.3 onExceptionResumeNext()
6.4 retry()
6.5 retryUntil()
6.6 retryWhen()
补充1:Rxjava3包结构的变动:
补充2:
参考文章:
1 前言
1.1 用操作符组合Observable
对于ReactiveX来说,Observable和Observer仅仅是个开始,它们本身不过是标准观察者模式的一些轻量级扩展,目的是为了更好的处理事件序列。
ReactiveX真正强大的地方在于它的操作符,操作符让你可以变换、组合、操纵和处理Observable发射的数据。
Rx的操作符让你可以用声明式的风格组合异步操作序列,它拥有回调的所有效率优势,同时又避免了典型的异步系统中嵌套回调的缺点。
下面是常用的操作符列表:
- 创建操作 Create, Defer, Empty/Never/Throw, From, Interval, Just, Range, Repeat, Start, Timer
- 变换操作 Buffer, FlatMap, GroupBy, Map, Scan和Window
- 过滤操作 Debounce, Distinct, ElementAt, Filter, First, IgnoreElements, Last, Sample, Skip, SkipLast, Take, TakeLast
- 组合操作 And/Then/When, CombineLatest, Join, Merge, StartWith, Switch, Zip
- 错误处理 Catch和Retry
- 辅助操作 Delay, Do, Materialize/Dematerialize, ObserveOn, Serialize, Subscribe, SubscribeOn, TimeInterval, Timeout, Timestamp, Using
- 条件和布尔操作 All, Amb, Contains, DefaultIfEmpty, SequenceEqual, SkipUntil, SkipWhile, TakeUntil, TakeWhile
- 算术和集合操作 Average, Concat, Count, Max, Min, Reduce, Sum
- 转换操作 To
- 连接操作 Connect, Publish, RefCount, Replay
- 反压操作,用于增加特殊的流程控制策略的操作符
这些操作符并不全都是ReactiveX的核心组成部分,有一些是语言特定的实现或可选的模块。
1.2 操作符分类
ReactiveX的每种编程语言的实现都实现了一组操作符的集合。不同的实现之间有很多重叠的部分,也有一些操作符只存在特定的实现中。每种实现都倾向于用那种编程语言中他们熟悉的上下文中相似的方法给这些操作符命名。
本文首先会给出ReactiveX的核心操作符列表和对应的文档链接,后面还有一个决策树用于帮助你根据具体的场景选择合适的操作符。最后有一个语言特定实现的按字母排序的操作符列表。
如果你想实现你自己的操作符,可以参考这里:实现自定义操作符
1.2.1 创建操作
- Create — 通过调用观察者的方法从头创建一个Observable
- Defer — 在观察者订阅之前不创建这个Observable,为每一个观察者创建一个新的Observable
- Empty/Never/Throw — 创建行为受限的特殊Observable
- From — 将其它的对象或数据结构转换为Observable
- Interval — 创建一个定时发射整数序列的Observable
- Just — 将对象或者对象集合转换为一个会发射这些对象的Observable
- Range — 创建发射指定范围的整数序列的Observable
- Repeat — 创建重复发射特定的数据或数据序列的Observable
- Start — 创建发射一个函数的返回值的Observable
- Timer — 创建在一个指定的延迟之后发射单个数据的Observable
1.1.2 变换操作
这些操作符可用于对Observable发射的数据进行变换,详细解释可以看每个操作符的文档
- Buffer — 缓存,可以简单的理解为缓存,它定期从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个
- FlatMap — 扁平映射,将Observable发射的数据变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable,可以认为是一个将嵌套的数据结构展开的过程。
- GroupBy — 分组,将原来的Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每一个Observable发射一组不同的数据
- Map — 映射,通过对序列的每一项都应用一个函数变换Observable发射的数据,实质是对序列中的每一项执行一个函数,函数的参数就是这个数据项
- Scan — 扫描,对Observable发射的每一项数据应用一个函数,然后按顺序依次发射这些值
- Window — 窗口,定期将来自Observable的数据分拆成一些Observable窗口,然后发射这些窗口,而不是每次发射一项。类似于Buffer,但Buffer发射的是数据,Window发射的是Observable,每一个Observable发射原始Observable的数据的一个子集
1.1.3 过滤操作
这些操作符用于从Observable发射的数据中进行选择
- Debounce — 只有在空闲了一段时间后才发射数据,通俗的说,就是如果一段时间没有操作,就执行一次操作
- Distinct — 去重,过滤掉重复数据项
- ElementAt — 取值,取特定位置的数据项
- Filter — 过滤,过滤掉没有通过谓词测试的数据项,只发射通过测试的
- First — 首项,只发射满足条件的第一条数据
- IgnoreElements — 忽略所有的数据,只保留终止通知(onError或onCompleted)
- Last — 末项,只发射最后一条数据
- Sample — 取样,定期发射最新的数据,等于是数据抽样,有的实现里叫ThrottleFirst
- Skip — 跳过前面的若干项数据
- SkipLast — 跳过后面的若干项数据
- Take — 只保留前面的若干项数据
- TakeLast — 只保留后面的若干项数据
1.1.4 组合操作
组合操作符用于将多个Observable组合成一个单一的Observable
- And/Then/When — 通过模式(And条件)和计划(Then次序)组合两个或多个Observable发射的数据集
- CombineLatest — 当两个Observables中的任何一个发射了一个数据时,通过一个指定的函数组合每个Observable发射的最新数据(一共两个数据),然后发射这个函数的结果
- Join — 无论何时,如果一个Observable发射了一个数据项,只要在另一个Observable发射的数据项定义的时间窗口内,就将两个Observable发射的数据合并发射
- Merge — 将两个Observable发射的数据组合并成一个
- StartWith — 在发射原来的Observable的数据序列之前,先发射一个指定的数据序列或数据项
- Switch — 将一个发射Observable序列的Observable转换为这样一个Observable:它逐个发射那些Observable最近发射的数据
- Zip — 打包,使用一个指定的函数将多个Observable发射的数据组合在一起,然后将这个函数的结果作为单项数据发射
1.1.5 错误处理
1.1.6 辅助操作
- Delay — 延迟一段时间发射结果数据
- Do — 注册一个动作占用一些Observable的生命周期事件,相当于Mock某个操作
- Materialize/Dematerialize — 将发射的数据和通知都当做数据发射,或者反过来
- ObserveOn — 指定观察者观察Observable的调度程序(工作线程)
- Serialize — 强制Observable按次序发射数据并且功能是有效的
- Subscribe — 收到Observable发射的数据和通知后执行的操作
- SubscribeOn — 指定Observable应该在哪个调度程序上执行
- TimeInterval — 将一个Observable转换为发射两个数据之间所耗费时间的Observable
- Timeout — 添加超时机制,如果过了指定的一段时间没有发射数据,就发射一个错误通知
- Timestamp — 给Observable发射的每个数据项添加一个时间戳
- Using — 创建一个只在Observable的生命周期内存在的一次性资源
1.1.7 条件和布尔操作
这些操作符可用于单个或多个数据项,也可用于Observable
- All — 判断Observable发射的所有的数据项是否都满足某个条件
- Amb — 给定多个Observable,只让第一个发射数据的Observable发射全部数据
- Contains — 判断Observable是否会发射一个指定的数据项
- DefaultIfEmpty — 发射来自原始Observable的数据,如果原始Observable没有发射数据,就发射一个默认数据
- SequenceEqual — 判断两个Observable是否按相同的数据序列
- SkipUntil — 丢弃原始Observable发射的数据,直到第二个Observable发射了一个数据,然后发射原始Observable的剩余数据
- SkipWhile — 丢弃原始Observable发射的数据,直到一个特定的条件为假,然后发射原始Observable剩余的数据
- TakeUntil — 发射来自原始Observable的数据,直到第二个Observable发射了一个数据或一个通知
- TakeWhile — 发射原始Observable的数据,直到一个特定的条件为真,然后跳过剩余的数据
1.1.8 算术和聚合操作
- Average — 计算Observable发射的数据序列的平均值,然后发射这个结果
- Concat — 不交错的连接多个Observable的数据
- Count — 计算Observable发射的数据个数,然后发射这个结果
- Max — 计算并发射数据序列的最大值
- Min — 计算并发射数据序列的最小值
- Reduce — 按顺序对数据序列的每一个应用某个函数,然后返回这个值
- Sum — 计算并发射数据序列的和
1.1.9 连接操作
- Connect — 指示一个可连接的Observable开始发射数据给订阅者
- Publish — 将一个普通的Observable转换为可连接的
- RefCount — 使一个可连接的Observable表现得像一个普通的Observable
- Replay — 确保所有的观察者收到同样的数据序列,即使他们在Observable开始发射数据之后才订阅
1.1.10 转换操作
1.1.11 操作符决策树
- 直接创建一个Observable(创建操作)
- 组合多个Observable(组合操作)
- 对Observable发射的数据执行变换操作(变换操作)
- 从Observable发射的数据中取特定的值(过滤操作)
- 转发Observable的部分值(条件/布尔/过滤操作)
- 对Observable发射的数据序列求值(算术/聚合操作)
- just( ) — 将一个或多个对象转换成发射这个或这些对象的一个Observable
- from( ) — 将一个Iterable, 一个Future, 或者一个数组转换成一个Observable
- repeat( ) — 创建一个重复发射指定数据或数据序列的Observable
- repeatWhen( ) — 创建一个重复发射指定数据或数据序列的Observable,它依赖于另一个Observable发射的数据
- create( ) — 使用一个函数从头创建一个Observable
- defer( ) — 只有当订阅者订阅才创建Observable;为每个订阅创建一个新的Observable
- range( ) — 创建一个发射指定范围的整数序列的Observable
- interval( ) — 创建一个按照给定的时间间隔发射整数序列的Observable
- timer( ) — 创建一个在给定的延时之后发射单个数据的Observable
- empty( ) — 创建一个什么都不做直接通知完成的Observable
- error( ) — 创建一个什么都不做直接通知错误的Observable
- never( ) — 创建一个不发射任何数据的Observable
对RxJava而言,操作符的相关内容Rxjava3 or 2 其实没什么改动,大部分Rxjava2的操作符都没变,即使有所变动,也只是包名或类名的改动。上面常见的操作符也可以直接从文档中查看用法,下面总结一些常用的操作符进行。
2 创建操作符
create() | 创建最简单的事件流 |
from() | 创建事件流,可发送不同类型的数据流 |
just() | 创建事件流,可发送多个参数的数据流 |
defer() | 创建事件流,可缓存可激活事件流 |
range() | 创建事件流,可发送范围内的数据流 |
interval() | 创建延时重复的事件流 |
repeat() | 创建可重复次数的事件流 |
timer() | 创建一次延时的事件流 |
注意:interval()、timer()、delay()的区别
- interval():用于创建事件流,周期性重复发送
- timer():用于创建事件流,延时发送一次
- delay():用于事件流中,可以延时某次事件流的发送
2.1 create()
创建Observable最原始的方式,onNext/onComplete/onError方法可完全自由控制。在Rxjava3文档级教程:入门到掌握 (一 基本用法 )中,被观察者的创建基本都用的这种方式,不再重写赘述。
2.2 from()
String[] stringArray = {"a", "b", "c"};
Observable.fromArray(stringArray);
Observable.fromArray("a", "b", "c");
Observable.fromArray(1, 2, 3, 4);
fromIterable方法参数为实现Iterable接口的类,如List/Map/Set等集合类。
String[] strings = {"a", "b", "c"};
List<String> listString = Arrays.asList(strings);
Observable.fromIterable(listString);
2.3 just()
just重载了多个参数数量不同的方法,最大可带10个参数,just实际上同样是调用的fromArray方法;
Observable.just(1, 2, 3, 4)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i("lucas", interger + "");
}
});
2.4 defer()
defer确保了Observable
代码在被订阅后才执行(而不是创建后立即执行)。
2.5 range()
//发送从10开始的整数,发送4个(发到13)
Observable.range(10, 4)
.subscribe(integer -> Log.i("lucas", ""+integer));
//发送从10开始的长整型数,发送6个(发到15)
Observable.rangeLong(10, 6)
.subscribe(integer -> Log.i("lucas", ""+integer));
2.6 interval()
interval用于定时发送
//每3秒发个自增整数
Observable.interval(3, TimeUnit.SECONDS);
//初始延时1秒,每3秒发一个自增整数
Observable.interval(1, 3, TimeUnit.SECONDS);
//初始延时2秒,后每1秒发一个从10开始的整数,发5个(发到14)停止
Observable.intervalRange(10, 5, 2, 1, TimeUnit.SECONDS);
2.7 repeat()
repeat操作符可以重复发送指定次数的某个事件流,repeat操作符默认在trampoline调度器上执行,repeat默认重复次数为Long.MAX_VALUE,可使用重载方法指定次数以及使用repeatUntil指定条件。
//一直重复
Observable.fromArray(1, 2, 3, 4).repeat();
//重复发送5次
Observable.fromArray(1, 2, 3, 4).repeat(5);
//重复发送直到符合条件时停止重复
Observable.fromArray(1, 2, 3, 4).repeatUntil(new BooleanSupplier() {
@Override
public boolean getAsBoolean() throws Exception {
//自定判断条件,为true即可停止,默认为false
return false;
}
});
2.8 timer()
timer用于延时发送。
//延时3秒后,发送一个整数0
Observable.timer(3, TimeUnit.SECONDS);
3 过滤操作符
过滤操作符主要是指对数据源进行选择和过滤的常用操作符。
bounce() | 事件流只发射规定范围时间内的数据项 |
distinct() | 事件流只发射不重复的数据项 |
elementAt() | 事件流只发射第N个数据项 |
filter() | 事件流只发射符合规定函数的数据项 |
first() | 事件流只发射第一个数据项 |
last() | 事件流只发射最后一项数据项 |
ignoreElements() | 忽略事件流的发射,只发射事件流的终止事件 |
sample() | 事件流对指定的时间间隔进行数据项的采样 |
skip() | 事件流忽略前N个数据项 |
skipLast() | 事件流忽略后N个数据项 |
take() | 事件流只发射前N个数据项 |
takeLast() | 事件流只发射后N个数据项 |
3.1 skip / skipLast
可以作用于Flowable,Observable,表示源发射数据前,跳过多少个。skipLast(n)操作表示从流的尾部跳过n个元素。
Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
source.skipLast(4)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Throwable {
Log.w("TAG","onNext--->"+ integer);
}
});
// 结果:1 2 3 4 5 6
//Lambda写法
Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
source.skip(4)
.subscribe(i->{
Log.w("TAG","onNext--->"+ i);
});
// 结果:5 6 7 8 9 10
3.2 debounce(去抖动)
可作用于Flowable,Observable。在Android开发,通常为了防止用户重复点击而设置标记位,而通过RxJava的debounce操作符可以有效达到该效果。在规定时间内,用户重复点击只有最后一次有效,
Observable<String> source = Observable.create(emitter -> {
emitter.onNext("A");
Thread.sleep(1_500);
emitter.onNext("B");
Thread.sleep(500);
emitter.onNext("C");
Thread.sleep(250);
emitter.onNext("D");
Thread.sleep(2000);
emitter.onNext("E");
emitter.onComplete();
});
source.subscribeOn(Schedulers.io())
.debounce(1, TimeUnit.SECONDS)
.blockingSubscribe(
item -> Log.w("TAG","onNext--->"+ item),
Throwable::printStackTrace,
() -> Log.w("TAG","onNext--->"+ "onComplete" ));
//结果
2020-04-03 16:08:47.520 30559-30559/com.ysalliance.getfan.myapplication W/TAG: onNext--->A
2020-04-03 16:08:49.777 30559-30559/com.ysalliance.getfan.myapplication W/TAG: onNext--->D
2020-04-03 16:08:50.776 30559-30559/com.ysalliance.getfan.myapplication W/TAG: onNext--->E
2020-04-03 16:08:50.776 30559-30559/com.ysalliance.getfan.myapplication W/TAG: onNext--->onComplete
上文代码中,数据源以一定的时间间隔发送A,B,C,D,E。操作符debounce的时间设为1秒,发送A后1.5秒并没有发射其他数据,所以A能成功发射。发射B后,在1秒之内,又发射了C和D,在D之后的2秒才发射E,所有B、C都失效,只有D有效;而E之后已经没有其他数据流了,所有E有效。
3.3 distinct(去重)
可作用于Flowable,Observable,去掉数据源重复的数据。
distinctUntilChanged()去掉相邻重复数据。
Observable.just(2, 3, 4, 4, 2, 1)
.distinct()
.subscribe(System.out::print);
// 打印:2 3 4 1
Observable.just(1, 1, 2, 1, 2, 3, 3, 4)
.distinctUntilChanged()
.subscribe(System.out::print);
//打印:1 2 1 2 3 4
3.4 elementAt(获取指定位置元素)
可作用于Flowable,Observable,从数据源获取指定位置的元素,从0开始。
elementAtOrError:指定元素的位置超过数据长度,则发射异常。
Observable.just(2,4,3,1,5,8)
.elementAt(0)
.subscribe(integer ->
Log.d("TAG","elmentAt->"+integer));
打印:2
Observable<String> source = Observable.just("Kirk", "Spock", "Chekov", "Sulu");
Single<String> element = source.elementAtOrError(4);
element.subscribe(
name -> System.out.println("onSuccess will not be printed!"),
error -> System.out.println("onError: " + error));
打印:onSuccess will not be printed!
3.5 filter(过滤)
可作用于 Flowable,Observable,Maybe,Single。在filter中返回表示发射该元素,返回false表示过滤该数据。
Observable.just(1, 2, 3, 4, 5, 6)
.filter(x -> x % 2 == 0)
.subscribe(System.out::print);
打印:2 4 6
3.6 first(第一个)
作用于 Flowable,Observable。发射数据源第一个数据,如果没有则发送默认值。
Observable<String> source = Observable.just("A", "B", "C");
Single<String> firstOrDefault = source.first("D");
firstOrDefault.subscribe(System.out::println);
打印:A
Observable<String> emptySource = Observable.empty();
Single<String> firstOrError = emptySource.firstOrError();
firstOrError.subscribe(
element -> System.out.println("onSuccess will not be printed!"),
error -> System.out.println("onError: " + error));
打印:onError: java.util.NoSuchElementException
和firstElement的区别是first返回的是Single,而firstElement返回Maybe。firstOrError在没有数据会返回异常。
3.7 last(最后一个)
last、lastElement、lastOrError与fist、firstElement、firstOrError相对应。
Observable<String> source = Observable.just("A", "B", "C");
Single<String> lastOrDefault = source.last("D");
lastOrDefault.subscribe(System.out::println);
//打印:C
Observable<String> source = Observable.just("A", "B", "C");
Maybe<String> last = source.lastElement();
last.subscribe(System.out::println);
//打印:C
Observable<String> emptySource = Observable.empty();
Single<String> lastOrError = emptySource.lastOrError();
lastOrError.subscribe(
element -> System.out.println("onSuccess will not be printed!"),
error -> System.out.println("onError: " + error));
// 打印:onError: java.util.NoSuchElementException
3.8 ignoreElements & ignoreElement(忽略元素)
ignoreElements 作用于Flowable、Observable。ignoreElement作用于Maybe、Single。两者都是忽略掉数据,不发射任何数据,返回完成或者错误时间。
这里关注下intervalRange的用法,以下面这个例子说明:从1开始输出5个数据,延迟1秒执行,每隔1秒执行一次:
Single<Long> source = Single.timer(1, TimeUnit.SECONDS);
Completable completable = source.ignoreElement();
completable.doOnComplete(() -> System.out.println("Done!"))
.blockingAwait();
// 1秒后打印:Donde!
Observable<Long> source = Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS);
Completable completable = source.ignoreElements();
completable.doOnComplete(() -> System.out.println("Done!"))
.blockingAwait();
// 六秒后打印:Done!
3.9 ofType(过滤类型)
作用于Flowable、Observable、Maybe,过滤选择类型。
Observable<Number> numbers = Observable.just(1, 4.0, 3, 2.71, 2f, 7);
Observable<Integer> integers = numbers.ofType(Integer.class);
integers.subscribe((Integer x) -> System.out.print(x+" "));
//打印:1 3 7
3.10 sample
作用于Flowable、Observable,在一个周期内发射最新的数据。sample操作符会在指定的事件内从数据项中采集所需要的数据。
Observable<String> source = Observable.create(emitter -> {
emitter.onNext("A");
Thread.sleep(500);
emitter.onNext("B");
Thread.sleep(200);
emitter.onNext("C");
Thread.sleep(800);
emitter.onNext("D");
Thread.sleep(600);
emitter.onNext("E");
emitter.onComplete();
});
source.subscribeOn(Schedulers.io())
.sample(1, TimeUnit.SECONDS)
.blockingSubscribe(
item -> System.out.print(item+" "),
Throwable::printStackTrace,
() -> System.out.print("onComplete"));
// 打印: C D onComplete
与debounce的区别是,sample是以时间为周期的发射,一秒又一秒内的最新数据。而debounce是最后一个有效数据开始。
3.11 throttleFirst & throttleLast & throttleWithTimeout & throttleLatest
作用于Flowable、Observable。throttleFirst是指定周期内第一个数据,throttleLast与smaple一致。throttleWithTimeout与debounce一致。
Observable<String> source = Observable.create(emitter -> {
emitter.onNext("A");
Thread.sleep(500);
emitter.onNext("B");
Thread.sleep(200);
emitter.onNext("C");
Thread.sleep(800);
emitter.onNext("D");
Thread.sleep(600);
emitter.onNext("E");
emitter.onComplete();
});
source.subscribeOn(Schedulers.io())
.throttleFirst(1, TimeUnit.SECONDS)
.blockingSubscribe(
item -> System.out.print(item+" "),
Throwable::printStackTrace,
() -> System.out.print(" onComplete"));
//打印:A D onComplete
source.subscribeOn(Schedulers.io())
.throttleLast(1, TimeUnit.SECONDS)
.blockingSubscribe(
item -> System.out.print(item+" "),
Throwable::printStackTrace,
() -> System.out.print(" onComplete"));
// 打印:C D onComplete
throttleLatest:如果源的第一个数据总会被发射,然后开始周期计时,此时的效果就会跟throttleLast一致。
Observable<String> source = Observable.create(emitter -> {
emitter.onNext("A");
Thread.sleep(500);
emitter.onNext("B");
Thread.sleep(200);
emitter.onNext("C");
Thread.sleep(200);
emitter.onNext("D");
Thread.sleep(400);
emitter.onNext("E");
Thread.sleep(400);
emitter.onNext("F");
Thread.sleep(400);
emitter.onNext("G");
Thread.sleep(2000);
emitter.onComplete();
});
source.subscribeOn(Schedulers.io())
.throttleLatest(1, TimeUnit.SECONDS)
.blockingSubscribe(
item -> Log.e("RxJava",item),
Throwable::printStackTrace,
() -> Log.e("RxJava","finished"));
//打印 A D F G RxJava","finished
3.12 take & takeLast
作用于Flowable、Observable。take发射前n个元素。takeLast发射后n个元素。
Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
source.take(4)
.subscribe(System.out::print);
//打印:1 2 3 4
source.takeLast(4)
.subscribe(System.out::println);
//打印:7 8 9 10
3.13 timeout(超时)
作用于Flowable、Observable、Maybe、Single、Completabl。后一个数据发射未在前一个元素发射后规定时间内发射则返回超时异常。
Observable<String> source = Observable.create(emitter -> {
emitter.onNext("A");
Thread.sleep(800);
emitter.onNext("B");
Thread.sleep(400);
emitter.onNext("C");
Thread.sleep(1200);
emitter.onNext("D");
emitter.onComplete();
});
source.timeout(1, TimeUnit.SECONDS)
.subscribe(
item -> System.out.println("onNext: " + item),
error -> System.out.println("onError: " + error),
() -> System.out.println("onComplete will not be printed!"));
// 打印:
// onNext: A
// onNext: B
// onNext: C
// one rror: java.util.concurrent.TimeoutException:
The source did not signal an event for 1 seconds
and has been terminated.
3.14 merge/concat
merge操作符可以合并两个事件流,如果在merge操作符上增加延时发送的操作,那么就会导致其发射的数据项是无序的,会跟着发射的时间点进行合并。虽然是将两个事件流合并成一个事件流进行发射,但在最终的一个事件流中,发射出来的却是两次数据流。
merge和concat的区别:merge():合并后发射的数据项是无序的,concat():合并后发射的数据项是有序的。
Observable<String> just1 = Observable.just("A", "B", "C");
Observable<String> just2 = Observable.just("1", "2", "3");
Observable.merge(just1, just2).subscribe(new Consumer<Serializable>() {
@Override
public void accept(Serializable serializable) throws Exception {
Log.i("lucas", "" + serializable.toString() );
}
});
//打印结果
2020-04-04 17:52:20.734 4372-4372/com.ysalliance.getfan.myapplication I/lucas: A
2020-04-04 17:52:20.734 4372-4372/com.ysalliance.getfan.myapplication I/lucas: B
2020-04-04 17:52:20.734 4372-4372/com.ysalliance.getfan.myapplication I/lucas: C
2020-04-04 17:52:20.734 4372-4372/com.ysalliance.getfan.myapplication I/lucas: 1
2020-04-04 17:52:20.734 4372-4372/com.ysalliance.getfan.myapplication I/lucas: 2
2020-04-04 17:52:20.734 4372-4372/com.ysalliance.getfan.myapplication I/lucas: 3
3.15 zip()
zip操作符是将两个数据流进行指定的函数规则合并。
Observable<String> just1 = Observable.just("A", "B", "C");
Observable<String> just2 = Observable.just("1", "2", "3");
Observable.zip(just1, just2, new BiFunction<String, String, String>() {
@Override
public String apply(String s, String s2) throws Exception {
return s + s2;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i("lucas", "" + s );
}
});
//打印结果
2020-04-04 17:55:08.905 4744-4744/com.ysalliance.getfan.myapplication I/lucas: A1
2020-04-04 17:55:08.905 4744-4744/com.ysalliance.getfan.myapplication I/lucas: B2
2020-04-04 17:55:08.905 4744-4744/com.ysalliance.getfan.myapplication I/lucas: C3
3.16 startWith()
startWith操作符是将另一个数据流合并到原数据流的开头。
Observable<String> just1 = Observable.just("A", "B", "C");
Observable<String> just2 = Observable.just("1", "2", "3");
just1.startWith(just2).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i("lucas", "" + s );
}
});
//打印结果
2020-04-04 17:57:22.155 4917-4917/com.ysalliance.getfan.myapplication I/lucas: 1
2020-04-04 17:57:22.155 4917-4917/com.ysalliance.getfan.myapplication I/lucas: 2
2020-04-04 17:57:22.156 4917-4917/com.ysalliance.getfan.myapplication I/lucas: 3
2020-04-04 17:57:22.156 4917-4917/com.ysalliance.getfan.myapplication I/lucas: A
2020-04-04 17:57:22.156 4917-4917/com.ysalliance.getfan.myapplication I/lucas: B
2020-04-04 17:57:22.156 4917-4917/com.ysalliance.getfan.myapplication I/lucas: C
3.17 join()
join操作符是有时间期限的合并操作符。
Observable<String> just1 = Observable.just("A", "B", "C");
Observable<Long> just2 = Observable.interval(1, TimeUnit.SECONDS);
just1.join(just2, new Function<String, ObservableSource<Long>>() {
@Override
public ObservableSource<Long> apply(String s) throws Exception {
return Observable.timer(3, TimeUnit.SECONDS);
}
}, new Function<Long, ObservableSource<Long>>() {
@Override
public ObservableSource<Long> apply(Long l) throws Exception {
return Observable.timer(8, TimeUnit.SECONDS);
}
}, new BiFunction<String, Long, String>() {
@Override
public String apply(String s, Long l) throws Exception {
return s + l;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i("lucas", "" + s );
}
});
//打印结果
2020-04-04 18:04:43.751 6042-6109/com.ysalliance.getfan.myapplication I/lucas: A0
2020-04-04 18:04:43.752 6042-6109/com.ysalliance.getfan.myapplication I/lucas: B0
2020-04-04 18:04:43.752 6042-6109/com.ysalliance.getfan.myapplication I/lucas: C0
2020-04-04 18:04:44.750 6042-6109/com.ysalliance.getfan.myapplication I/lucas: A1
2020-04-04 18:04:44.751 6042-6109/com.ysalliance.getfan.myapplication I/lucas: B1
2020-04-04 18:04:44.751 6042-6109/com.ysalliance.getfan.myapplication I/lucas: C1
join操作符有三个函数需要设置
第一个函数:规定just2的过期期限
第二个函数:规定just1的过期期限
第三个函数:规定just1和just2的合并规则
由于just2的期限只有3秒的时间,而just2延时1秒发送一次,所以just2只发射了2次,其输出的结果就只能和just2输出的两次进行合并,其输出格式有点类似我们的排列组合。
4 连接/组合操作符
通过连接操作符,可以将多个被观察数据(数据源)连接在一起。
4.1 startWith()
可作用于Flowable、Observable。将指定数据源合并在另外数据源的开头。
Observable<String> name = Observable.just("My", "name");
Observable<String> name2 = Observable.just("is", "Lucas","!");
name2.startWith(name).subscribe(item -> Log.d("Lucas",item));
//打印:
2020-04-03 16:35:49.493 31609-31609/com.ysalliance.getfan.myapplication D/Lucas: My
2020-04-03 16:35:49.493 31609-31609/com.ysalliance.getfan.myapplication D/Lucas: name
2020-04-03 16:35:49.493 31609-31609/com.ysalliance.getfan.myapplication D/Lucas: is
2020-04-03 16:35:49.493 31609-31609/com.ysalliance.getfan.myapplication D/Lucas: Lucas
2020-04-03 16:35:49.493 31609-31609/com.ysalliance.getfan.myapplication D/Lucas: !
4.2 merge / mergeWith
可作用所有数据源类型,用于合并多个数据源到一个数据源。
Observable<String> name = Observable.just("My", "name");
Observable<String> name2 = Observable.just("is", "Lucas","!");
Observable.merge(name,name2).subscribe(v -> Log.d("lucas", v));
name.mergeWith(name2).subscribe(v -> Log.d("lucas",v));
//打印:
2020-04-03 16:39:41.934 32212-32212/com.ysalliance.getfan.myapplication D/lucas: My
2020-04-03 16:39:41.934 32212-32212/com.ysalliance.getfan.myapplication D/lucas: name
2020-04-03 16:39:41.934 32212-32212/com.ysalliance.getfan.myapplication D/lucas: is
2020-04-03 16:39:41.935 32212-32212/com.ysalliance.getfan.myapplication D/lucas: Lucas
2020-04-03 16:39:41.935 32212-32212/com.ysalliance.getfan.myapplication D/lucas: !
2020-04-03 16:39:41.937 32212-32212/com.ysalliance.getfan.myapplication D/lucas: My
2020-04-03 16:39:41.937 32212-32212/com.ysalliance.getfan.myapplication D/lucas: name
2020-04-03 16:39:41.937 32212-32212/com.ysalliance.getfan.myapplication D/lucas: is
2020-04-03 16:39:41.937 32212-32212/com.ysalliance.getfan.myapplication D/lucas: Lucas
2020-04-03 16:39:41.937 32212-32212/com.ysalliance.getfan.myapplication D/lucas: !
merge在合并数据源时,如果一个合并发生异常后会立即调用观察者的onError方法,并停止合并。可通过mergeDelayError操作符,将发生的异常留到最后处理。
Observable<String> name = Observable.just("My", "name");
Observable<String> name2 = Observable.just("is", "Lucas","!");
Observable<String> error = Observable.error(new NullPointerException("Error!"));
Observable.mergeDelayError(name,error,name2).subscribe(
v -> Log.d("lucas",v), e->Log.d("lucas",e.getMessage()));
//打印:
2020-04-03 16:42:07.030 32391-32391/com.ysalliance.getfan.myapplication D/lucas: My
2020-04-03 16:42:07.030 32391-32391/com.ysalliance.getfan.myapplication D/lucas: name
2020-04-03 16:42:07.033 32391-32391/com.ysalliance.getfan.myapplication D/lucas: is
2020-04-03 16:42:07.034 32391-32391/com.ysalliance.getfan.myapplication D/lucas: Lucas
2020-04-03 16:42:07.034 32391-32391/com.ysalliance.getfan.myapplication D/lucas: !
2020-04-03 16:42:07.034 32391-32391/com.ysalliance.getfan.myapplication D/lucas: Error!
4.3 zip()
可作用于Flowable、Observable、Maybe、Single。将多个数据源的数据一个一个的合并在一起哇。当其中一个数据源发射完事件之后,若其他数据源还有数据未发射完毕,也会停止。
Observable<String> name = Observable.just("My", "name");
Observable<String> name2 = Observable.just("is", "Lucas", "!", "haha!");
name.zipWith(name2, (first, last) -> first + "-" + last)
.subscribe(item -> Log.d("lucas", item));
//打印:
2020-04-03 16:44:59.127 32616-32616/com.ysalliance.getfan.myapplication D/lucas: My-is
2020-04-03 16:44:59.128 32616-32616/com.ysalliance.getfan.myapplication D/lucas: name-Lucas
4.4 combineLatest()
public static String[] str = {"A", "B", "C", "D", "E"};
public void combineLatest() {
Observable<String> just1 = Observable.interval(1, TimeUnit.SECONDS).map(new Function<Long, String>() {
@Override
public String apply(Long aLong) throws Exception {
return str[(int) (aLong % 5)];
}
});
Observable<Long> just2 = Observable.interval(1, TimeUnit.SECONDS);
Observable.combineLatest(just1, just2, new BiFunction<String, Long, String>() {
@Override
public String apply(String s, Long l) throws Exception {
return s + l;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("onNext=" + s);
}
});
}
//输出
onNext=A0
onNext=B0
onNext=B1
onNext=C1
onNext=C2
onNext=D2
onNext=D3
onNext=E3
onNext=E4
onNext=A4
onNext=A5
可作用于Flowable, Observable。在结合不同数据源时,发射速度快的数据源最新item与较慢的相结合。 如下时间线,Observable-1发射速率快,发射了65,Observable-2才发射了C, 那么两者结合就是C5。
4.5 switchOnNext()
一个发射多个小数据源的数据源,这些小数据源发射数据的时间发生重复时,取最新的数据源。
5 变换/转换操作符
变换操作符用于变化数据源的数据,并转化为新的数据源。
map() | 对数据流的类型进行转换 |
flatMap() | 对数据流的类型进行包装成另一个数据流 |
scan() | 对上一轮处理过后的数据流进行函数处理 |
groupBy() | 对所有的数据流进行分组 |
buffer() | 缓存发射的数据流到一定数量,随后发射出数据流集合 |
window() | 缓存发射的数据流到一定数量,随后发射出新的事件流 |
5.1 map()
map利用Function进行类型转换的例子:
Observable.just("1", "2", "3").map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return Integer.valueOf(s) * 100;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i("lucas", ""+integer);
}
});
在实际编码中,我们还是常用Lambda表达式进行代码的精简优化,但是掌握Lambda的前提是,你还得会写非Lambda的表达,否则长时间下来可能会忘记原来的类写法:
Observable.just("1", "2", "3")
.map( (String s) -> Integer.valueOf(s) * 100)
.subscribe(integer -> Log.i("lucas", ""+integer));
可以看到,使用Lambda表达式后只有三行,简洁!
5.2 flatMap() / concatMap()
flatMap操作符将数据流进行类型转换,然后将新的数据流传递给新的事件流进行分发,这里通过模拟请求登录的延时操作进行说明
public static class UserParams {
public UserParams(String username, String password) {
this.username = username;
this.password = password;
}
public String username;
public String password;
}
Observable.just(new UserParams("lucas", "123123")).flatMap(new Function<UserParams, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(UserParams userParams) throws Exception {
return Observable.just(userParams.username + "登录成功").delay(2, TimeUnit.SECONDS);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i("lucas", "" + s );
}
});
Lambda表达式优化:
Observable.just(new UserParams("lucas", "123123"))
.flatMap((UserParams userParams)->Observable.just(userParams.username + "登录成功")
.delay(2, TimeUnit.SECONDS))
.subscribe(s -> Log.i("lucas", "" + s ));
concatMap与flatMap的区别: concatMap是有序的,flatMap是无序的。
再举个组合的例子:
Observable.just("A", "B", "C")
.flatMap(a -> {
return Observable.intervalRange(1, 3, 0, 1, TimeUnit.SECONDS)
.map(b -> '(' + a + ", " + b + ')');
})
.blockingSubscribe( v -> {
Log.d("lucas", v+" " );
});
//打印结果
2020-04-03 22:05:58.363 12414-12414/com.ysalliance.getfan.myapplication D/lucas: (A, 1)
2020-04-03 22:05:58.365 12414-12414/com.ysalliance.getfan.myapplication D/lucas: (B, 1)
2020-04-03 22:05:58.367 12414-12414/com.ysalliance.getfan.myapplication D/lucas: (C, 1)
2020-04-03 22:05:59.355 12414-12414/com.ysalliance.getfan.myapplication D/lucas: (A, 2)
2020-04-03 22:05:59.362 12414-12414/com.ysalliance.getfan.myapplication D/lucas: (B, 2)
2020-04-03 22:05:59.362 12414-12414/com.ysalliance.getfan.myapplication D/lucas: (C, 2)
2020-04-03 22:06:00.357 12414-12414/com.ysalliance.getfan.myapplication D/lucas: (A, 3)
2020-04-03 22:06:00.361 12414-12414/com.ysalliance.getfan.myapplication D/lucas: (B, 3)
2020-04-03 22:06:00.363 12414-12414/com.ysalliance.getfan.myapplication D/lucas: (C, 3)
5.3 groupBy()
groupBy操作符可以将发射出来的数据项进行分组,并将分组后的数据项保存在具有key-value映射的事件流中。groupBy具体的分组规则由groupBy操作符传递进来的函数参数Function所决定的,它可以将key和value按照Function的返回值进行分组,返回一个具有分组规则的事件流GroupedObservable,注意这里分组出来的事件流是按照原始事件流的顺序输出的,我们可以通过sorted()对数据项进行排序,然后输出有序的数据流。
Observable.just("java", "c", "c++", "python", "javaScript", "android")
.groupBy(new Function<String, Character>() {
@Override
public Character apply(String s) throws Exception {
return s.charAt(0);//按首字母分组
}
})
.subscribe(new Consumer<GroupedObservable<Character, String>>() {
@Override
public void accept(final GroupedObservable<Character, String> characterStringGroupedObservable) throws Exception {
//排序后,直接订阅输出key和value
characterStringGroupedObservable.sorted().subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i("lucas", "onNext= key:" + characterStringGroupedObservable.getKey() + " value:" + s);
}
});
}
});
//打印结果
2020-04-04 16:36:13.766 32460-32460/com.ysalliance.getfan.myapplication I/lucas: onNext= key:p value:python
2020-04-04 16:36:13.766 32460-32460/com.ysalliance.getfan.myapplication I/lucas: onNext= key:a value:android
2020-04-04 16:36:13.766 32460-32460/com.ysalliance.getfan.myapplication I/lucas: onNext= key:c value:c
2020-04-04 16:36:13.766 32460-32460/com.ysalliance.getfan.myapplication I/lucas: onNext= key:c value:c++
2020-04-04 16:36:13.766 32460-32460/com.ysalliance.getfan.myapplication I/lucas: onNext= key:j value:java
2020-04-04 16:36:13.766 32460-32460/com.ysalliance.getfan.myapplication I/lucas: onNext= key:j value:javaScript
Observable<String> animals = Observable.just(
"Tiger", "Elephant", "Cat", "Chameleon", "Frog", "Fish", "Turtle", "Flamingo");
animals.groupBy(animal -> animal.charAt(0), String::toUpperCase)
.concatMapSingle(Observable::toList)
.subscribe(System.out::println);
// prints:
// [TIGER, TURTLE]
// [ELEPHANT]
// [CAT, CHAMELEON]
// [FROG, FISH, FLAMINGO]
5.4 scan()
scan操作符会对发射的数据和上一轮发射的数据进行函数处理,并返回的数据供下一轮使用,持续这个过程来产生剩余的数据流。其应用场景有简单的累加计算,判断所有数据的最小值等。
Observable.just(2, 4, 1, 9).scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer < integer2 ? integer : integer2;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer item) throws Exception {
Log.i("lucas", "" + item );
}
});
//打印结果
2020-04-04 16:14:10.931 30301-30301/com.ysalliance.getfan.myapplication I/lucas: 2
2020-04-04 16:14:10.931 30301-30301/com.ysalliance.getfan.myapplication I/lucas: 2
2020-04-04 16:14:10.931 30301-30301/com.ysalliance.getfan.myapplication I/lucas: 1
2020-04-04 16:14:10.931 30301-30301/com.ysalliance.getfan.myapplication I/lucas: 1
Lambda表达式写法:
Observable.just(2, 4, 1, 9)
.scan((Integer integer, Integer integer2) -> integer < integer2 ? integer : integer2)
.subscribe(item -> Log.i("lucas", "" + item ));
带初始值的聚合叠加:
Observable.just(1, 2, 3)
.scan(10, (x, y) -> x + y)
.subscribe(item -> Log.i("lucas", "" + item ));
// prints:
2020-04-04 16:23:06.590 31451-31451/com.ysalliance.getfan.myapplication I/lucas: 10
2020-04-04 16:23:06.590 31451-31451/com.ysalliance.getfan.myapplication I/lucas: 11
2020-04-04 16:23:06.590 31451-31451/com.ysalliance.getfan.myapplication I/lucas: 13
2020-04-04 16:23:06.590 31451-31451/com.ysalliance.getfan.myapplication I/lucas: 16
5.5 buffer()
buffer操作符可以将发射出来的数据流,在给定的缓存池中进行缓存,当缓存池中的数据项溢满时,则将缓存池的数据项进行输出,重复上述过程,直到将发射出来的数据全部发射出去。如果发射出来的数据不够缓存池的大小,则按照当前发射出来的数量进行输出。如果对buffer操作符设置了skip参数,则buffer每次缓存池溢满时,会跳过指定的skip数据项,然后再进行缓存和输出。
Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
.buffer(3).subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
Log.i("lucas", "" + integers.toString() );
}
});
//打印结果
2020-04-04 16:40:41.744 32722-32722/com.ysalliance.getfan.myapplication I/lucas: [1, 2, 3]
2020-04-04 16:40:41.745 32722-32722/com.ysalliance.getfan.myapplication I/lucas: [4, 5, 6]
2020-04-04 16:40:41.745 32722-32722/com.ysalliance.getfan.myapplication I/lucas: [7, 8]
5.6 window()
window操作符和buffer操作符在功能上实现的效果是一样的,但window操作符最大区别在于同样是缓存一定数量的数据项,window操作符最终发射出来的是新的事件流integerObservable,而buffer操作符发射出来的是新的数据流,也就是说,window操作符发射出来新的事件流中的数据项,还可以经过Rxjava其他操作符进行处理。
Observable.just(1, 2, 3, 4)
.window(2, 1).subscribe(new Consumer<Observable<Integer>>() {
@Override
public void accept(Observable<Integer> integerObservable) throws Exception {
integerObservable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i("lucas", "" + integer );
}
});
}
});
//打印结果
2020-04-04 16:49:54.681 1475-1475/com.ysalliance.getfan.myapplication I/lucas: 1
2020-04-04 16:49:54.681 1475-1475/com.ysalliance.getfan.myapplication I/lucas: 2
2020-04-04 16:49:54.682 1475-1475/com.ysalliance.getfan.myapplication I/lucas: 2
2020-04-04 16:49:54.682 1475-1475/com.ysalliance.getfan.myapplication I/lucas: 3
2020-04-04 16:49:54.682 1475-1475/com.ysalliance.getfan.myapplication I/lucas: 3
2020-04-04 16:49:54.682 1475-1475/com.ysalliance.getfan.myapplication I/lucas: 4
2020-04-04 16:49:54.682 1475-1475/com.ysalliance.getfan.myapplication I/lucas: 4
Observable.range(1, 4)
// Create windows containing at most 2 items, and skip 3 items before starting a new window.
.window(2)
.flatMapSingle(window -> {
return window.map(String::valueOf)
.reduce(new StringJoiner(", ", "[", "]"), StringJoiner::add);
})
.subscribe(System.out::println);
// prints:
// [1, 2]
// [3, 4]
5.7 cast()
作用于Flowable、Observable、Maybe、Single。将数据元素转型成其他类型,转型失败会抛出异常。
Observable<Number> numbers = Observable.just(1, 4.0, 3f, 7, 12, 4.6, 5);
numbers.filter((Number x) -> Integer.class.isInstance(x))
.cast(Integer.class)
.subscribe((Integer x) -> System.out.println(x));
// 打印:
// 1
// 7
// 12
// 5
5.8 concatMapDelayError
与concatMap作用相同,只是将过程发送的所有错误延迟到最后处理。
Observable.intervalRange(1, 3, 0, 1, TimeUnit.SECONDS)
.concatMapDelayError(x -> {
if (x.equals(1L)) return Observable.error(new IOException("Something went wrong!"));
else return Observable.just(x, x * x);
})
.blockingSubscribe(
x -> System.out.println("onNext: " + x),
error -> System.out.println("onError: " + error.getMessage()));
// prints:
// onNext: 2
// onNext: 4
// onNext: 3
// onNext: 9
// one rror: Something went wrong!
5.9 concatMapCompletable()
作用于Flowable、Observable。与contactMap类似,不过应用于函数后,返回的是CompletableSource。订阅一次并在所有CompletableSource对象完成时返回一个Completable对象。
Observable<Integer> source = Observable.just(2, 1, 3);
Completable completable = source.concatMapCompletable(x -> {
return Completable.timer(x, TimeUnit.SECONDS)
.doOnComplete(() -> System.out.println("Info: Processing of item \"" + x + "\" completed"));
});
completable.doOnComplete(() -> System.out.println("Info: Processing of all items completed"))
.blockingAwait();
// prints:
// Info: Processing of item "2" completed
// Info: Processing of item "1" completed
// Info: Processing of item "3" completed
// Info: Processing of all items completed
5.10 concatMapCompletableDelayError()
与concatMapCompletable作用相同,只是将过程发送的所有错误延迟到最后处理。
Observable<Integer> source = Observable.just(2, 1, 3);
Completable completable = source.concatMapCompletableDelayError(x -> {
if (x.equals(2)) {
return Completable.error(new IOException("Processing of item \"" + x + "\" failed!"));
} else {
return Completable.timer(1, TimeUnit.SECONDS)
.doOnComplete(() -> System.out.println("Info: Processing of item \"" + x + "\" completed"));
}
});
completable.doOnError(error -> System.out.println("Error: " + error.getMessage()))
.onErrorComplete()
.blockingAwait();
// prints:
// Info: Processing of item "1" completed
// Info: Processing of item "3" completed
// Error: Processing of item "2" failed!
5.11 flattenAsFlowable & flattenAsObservable
作用于Maybe、Single,将其转化为Flowable,或Observable。
Single<Double> source = Single.just(2.0);
Flowable<Double> flowable = source.flattenAsFlowable(x -> {
return List.of(x, Math.pow(x, 2), Math.pow(x, 3));
});
flowable.subscribe(x -> System.out.println("onNext: " + x));
// prints:
// onNext: 2.0
// onNext: 4.0
// onNext: 8.0
6 处理操作符
onErrorReturn() | 当错误发生时,它会忽略onError的回调且会发射一个新的数据项并回调onCompleted() |
onErrorResumeNext() | 当错误发生时,它会忽略onError的回调且会发射一个新的事件流并回调onCompleted() |
onExceptionResumeNext() | 当错误发生时,如果onError收到的Throwable不是一个Exception,它会回调onError方法,且不会回调备用的事件流,如果onError收到的Throwable是一个Exception,它会回调备用的事件流进行数据的发射 |
retry() | 当错误发生时,发射器会重新发射 |
retryWhen() | 当错误发生时,根据Tharowble类型决定发射器是否重新发射 |
6.1 one rrorReturn()
作用于Flowable、Observable、Maybe、Single。但调用数据源的onError函数后会回到该函数,可对错误进行处理,然后返回值,会调用观察者onNext()继续执行,执行完调用onComplete()函数结束所有事件的发射。
Single.just("2A")
.map(v -> Integer.parseInt(v, 10))
.onErrorReturn(error -> {
if (error instanceof NumberFormatException) return 0;
else throw new IllegalArgumentException();
})
.subscribe(
v->Log.d("lucas", v+" " ),
error -> System.err.println("onError should not be printed!"));
//
2020-04-03 22:13:56.573 13238-13238/com.ysalliance.getfan.myapplication D/lucas: 0
6.2 one rrorReturnItem()
与onErrorReturn类似,onErrorReturnItem不对错误进行处理,直接返回一个值。
Single.just("2A")
.map(v -> Integer.parseInt(v, 10))
.onErrorReturnItem(0)
.subscribe(
v->Log.d("lucas", v+" " ),
error -> System.err.println("onError should not be printed!"));
//
2020-04-03 22:12:53.757 13100-13100/com.ysalliance.getfan.myapplication D/lucas: 0
6.3 onExceptionResumeNext()
可作用于Flowable、Observable、Maybe。
onErrorReturn发生异常时,回调onComplete()函数后不再往下执行,而onExceptionResumeNext则是要在处理异常的时候返回一个数据源,然后继续执行,如果返回null,则调用观察者的onError()函数。
Observable.create((ObservableOnSubscribe<Integer>) e -> {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onError(new NullPointerException());
e.onNext(4);
})
.onErrorResumeNext(throwable -> {
Log.d("lucas", "onErrorResumeNext ");
return Observable.just(4);
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d("lucas", "onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d("lucas", "onNext " + integer);
}
@Override
public void one rror(Throwable e) {
Log.d("lucas", "onError ");
}
@Override
public void onComplete() {
Log.d("lucas", "onComplete ");
}
});
//运行结果
2020-04-03 22:15:58.668 13456-13456/com.ysalliance.getfan.myapplication D/lucas: onSubscribe
2020-04-03 22:15:58.672 13456-13456/com.ysalliance.getfan.myapplication D/lucas: onNext 1
2020-04-03 22:15:58.672 13456-13456/com.ysalliance.getfan.myapplication D/lucas: onNext 2
2020-04-03 22:15:58.673 13456-13456/com.ysalliance.getfan.myapplication D/lucas: onNext 3
2020-04-03 22:15:58.673 13456-13456/com.ysalliance.getfan.myapplication D/lucas: one rrorResumeNext
2020-04-03 22:15:58.674 13456-13456/com.ysalliance.getfan.myapplication D/lucas: onNext 4
2020-04-03 22:15:58.674 13456-13456/com.ysalliance.getfan.myapplication D/lucas: onComplete
6.4 retry()
可作用于所有的数据源,当发生错误时,数据源重复发射item,直到没有异常或者达到所指定的次数。
Observable.create((ObservableOnSubscribe<Integer>) e -> {
e.onNext(1);
e.onNext(2);
if (first){
first=false;
e.onError(new NullPointerException());
}
})
.retry(9)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d("lucas", "onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d("lucas", "onNext " + integer);
}
@Override
public void one rror(Throwable e) {
Log.d("lucas", "onError ");
}
@Override
public void onComplete() {
Log.d("lucas", "onComplete ");
}
});
//打印结果
2020-04-03 22:18:28.605 13647-13647/com.ysalliance.getfan.myapplication D/lucas: onSubscribe
2020-04-03 22:18:28.607 13647-13647/com.ysalliance.getfan.myapplication D/lucas: onNext 1
2020-04-03 22:18:28.607 13647-13647/com.ysalliance.getfan.myapplication D/lucas: onNext 2
2020-04-03 22:18:28.607 13647-13647/com.ysalliance.getfan.myapplication D/lucas: onNext 1
2020-04-03 22:18:28.607 13647-13647/com.ysalliance.getfan.myapplication D/lucas: onNext 2
- retry():表示重试无限次
- retry(long times):表示重试指定次数
- retry(Func predicate):可以根据函数参数中的Throwable类型和重试次数决定本次需不需要重试
6.5 retryUntil()
作用于Flowable、Observable、Maybe。与retry类似,但发生异常时,返回值是false表示继续执行(重复发射数据),true不再执行,但会调用onError方法。
Observable.create((ObservableOnSubscribe<Integer>) e -> {
e.onNext(1);
e.onNext(2);
e.onError(new NullPointerException());
e.onNext(3);
e.onComplete();
})
.retryUntil(() -> true)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d("lucas", "onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d("lucas", "onNext " + integer);
}
@Override
public void one rror(Throwable e) {
Log.d("lucas", "onError ");
}
@Override
public void onComplete() {
Log.d("lucas", "onComplete ");
}
});
//打印结果
2020-04-03 22:22:01.625 13905-13905/com.ysalliance.getfan.myapplication D/lucas: onSubscribe
2020-04-03 22:22:01.627 13905-13905/com.ysalliance.getfan.myapplication D/lucas: onNext 1
2020-04-03 22:22:01.627 13905-13905/com.ysalliance.getfan.myapplication D/lucas: onNext 2
2020-04-03 22:22:01.627 13905-13905/com.ysalliance.getfan.myapplication D/lucas: one rror
6.6 retryWhen()
retryWhen操作符和retry操作符相似,区别在于retryWhen将错误Throwable传递给了函数进行处理并产生新的事件流进行处理。
private static int retryCount = 0;
private static int maxRetries = 2;
public void retryWhen(){
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
for (int i = 1; i < 5; i++) {
if (i == 4) {
e.onError(new Exception("onError crash"));
}
e.onNext(i);
}
}
})
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
if (++retryCount <= maxRetries) {
// When this Observable calls onNext, the original Observable will be retried (i.e. re-subscribed).
System.out.println("get error, it will try after " + 1 + " seconds, retry count " + retryCount);
return Observable.timer(1, TimeUnit.SECONDS);
}
return Observable.error(throwable);
}
});
}
})
.onErrorReturn(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable throwable) throws Exception {
return -1;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
System.out.println("onError");
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("onComplete");
}
});
}
//结果
onNext=1
onNext=2
onNext=3
get error, it will try after 1 seconds, retry count 1
onNext=1
onNext=2
onNext=3
get error, it will try after 1 seconds, retry count 2
onNext=1
onNext=2
onNext=3
onNext=-1
onComplete
补充1:Rxjava3包结构的变动:
Rxjava3组件位于 io.reactivex.rxjava3包(rxjava1有 rx,rxjava2是 io.reactivex)。 这使得Rxjava3可以和早期版本一起使用。 此外,RxJava 的核心类型(Flowable、 Observer 等)已经被移动到 io.reactivex.rxjava3.core。
Component | RxJava 2 | RxJava 3 |
Core | | |
Annotations | | |
Disposables | | |
Exceptions | | |
Functions | | |
Flowables | | |
Observables | | |
Subjects | | |
Processors | | |
Observers | | |
Subscribers | | |
Parallel | | |
Internal | | |
补充2:
在RxJava1.0中,有的人会使用CompositeSubscription来收集Subscription,来统一取消订阅,现在在RxJava2.0中,由于subscribe()方法现在返回void,那怎么办呢?
其实在RxJava2.0中,Flowable提供了subscribeWith这个方法可以返回当前订阅的观察者,并且通过ResourceSubscriber DisposableSubscriber等观察者来提供 Disposable的接口。
所以,如果想要达成RxJava1.0的效果,现在应该是这样做:
CompositeDisposable composite = new CompositeDisposable();
composite.add(Flowable.range(1, 8).subscribeWith(subscriber));
这个subscriber 应该是 ResourceSubscriber 或者 DisposableSubscriber 的实例。
参考文章:
因为写RxJava系列的文章时进行了很多阅读和参考,因此不分一二三等,将全系列的参考引用统一如下:
RxJava3 Wiki:https://github.com/ReactiveX/RxJava/wiki
RxJava3官方github:What's different in 3.0 · ReactiveX/RxJava Wiki · GitHub
ReactiveX文档中文翻译:创建操作 · ReactiveX文档中文翻译
single:ReactiveX - Single
操作符系列讲的很好的文章:Android响应式编程——RxJava3框架的使用(二)_e电动小马达e的博客-CSDN博客
基础介绍:Android响应式编程——RxJava3框架的使用(一)_e电动小马达e的博客-CSDN博客_android rxjava3
RxLifecycle:https://github.com/trello/RxLifecycle