前言
rx教程一抓一大把,很容易陷入其中无法自拔,甚至看完所有之后还是不知道到底应该怎么用,本篇文章只是举例一些api,一次为上游,上游变换或过滤等 下游 三个分支进行划分 列出操作符或者回调方法
先学会套路然后由浅入深,先学以致用然后慢慢深入才是最能实现学习并快乐着。
基本创建套路
普通
Observable.create()
让两个上游按前后先后顺序通知给下游Observable.concat(Observable.just(1,2,3), Observable.just(4,5,6))
延迟开启
Flowable.interval(1, TimeUnit.SECONDS)
定时任务
Observable.timer(2, TimeUnit.SECONDS)
只接受一个
Single.just(new Random().nextInt())
//下游的监听用
.subscribe(new SingleObserver<Integer>() {
defer
每次订阅创建新的,没订阅不创建新的Observable.defer(new Callable<ObservableSource<Integer>>() {
便捷方式Observable.just(1, 1, 1, 2, 2, 3)
合并Observable.merge
感觉和另外一个差不多
跟随的操作符或回调
doOnNext
只有参数不处理返回值 ,表示即将调用onNext之前的通知
map
变换 也就是对象的转换 一对一的转换flatMap
一对多多对多 不保证顺序 还可以用new Function<xObservableSource>
参考演示
concatmap(function)
保证顺序 和flatmap的唯一区别
zip(function)
合并,如两个上游交换发布给下游。debounce
去掉发射频率过快的
distinct()
去重,上游发射了重复的到下游之后就没有重复了
filter(function)
过滤 回调有返回值 返回true表示不过滤
reduce(function<t1,t2,53>()
多个接受? 上游juet3个,那么这里参数也是3个, 多合一的逻辑下游只受到一个处理结果.
scan((arg1,arg2)->return arg1+arg2 )
执行开始第一次直接不回调scan参数而是直接告诉下游,而第二次进行回调参数1则是前一次结果,参数2当前index的值
那么 scan为(arg1+arg2) Observable.just(1, 5, 9,12,15)
输出为
1
(index->1:1+5)6
(index:2:6+9)15
以此类推。
skip(count)
代表刚开始跳过几个 ,假设count=2也就是12345那么下游接受的时候只能收到从3开始的了。
take(count)
代表下游只能接受多3个,假设上游发布了3个,而设定的只有2个,那么下游只能接受2个了。
buffer(count,skip)
控制流量,下游用(new Consumer<List<T>>
方可体现,比如有10个数从1开始到10结束, count=3,skip=2那么list第一次有3个, 是123,第二次 345 567 789 10
如map(new Function<原始参数,变换结果>->变换结果返回 )
last(t)
t表示上游的类型值,满足这个值才告诉下游,否则不会受到。
.window(3, TimeUnit.SECONDS)
较为复杂,划分窗口,下游又进行订阅的情况使用 。
subscribeOn(function)
被观察者上游的执行线程设置 一般在安卓汇总设置为后台进程
observeOn(function)
观察者 下游的执行线程设置,或者叫回调的监听
上游的操作
上游的subscrire(e)中的常用操作
e.onNext(1);//传递结果给下游
e.onError()//传递错误给下游
e.onComplete();//传递完成给下游
指定线程的语法糖
用于subscribeOn
和observeOn
中。
Schedulers.io()
代表io操作的线程, 通常用于网络,读写文件等io密集型的操作;
Schedulers.computation()
代表CPU计算密集型的操作, 例如需要大量计算的操作;
Schedulers.newThread()
代表一个常规的新线程;
AndroidSchedulers.mainThread()
代表Android的主线程
使用场景
使用场景1
1)通过 Observable.create() 方法,调用 OkHttp 网络请求;
2)通过 map 操作符集合 gson,将 Response 转换为 bean 类;
3)通过 doOnNext() 方法,解析 bean 中的数据,并进行数据库存储等操作;
4)调度线程,在子线程中进行耗时操作任务,在主线程中更新 UI ;
5)通过 subscribe(),根据请求成功或者失败来更新 UI 。
参考 https://www.jianshu.com/p/81fac37430dd
activity结束
if (mDisposable != null){
mDisposable.dispose();
}
在安卓主线程每隔几秒执行一个方法,而且每个方法都是在主线程执行。
Observable.just(0L)
.observeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.w(TAG, "1111:" + aLong + "," + "," + Thread.currentThread().getName());
}
}).delay(3, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.w(TAG, "2222-" + aLong + "," + Thread.currentThread().getName());
}
})
.delay(3, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.w(TAG, "3333-" + aLong + "," + Thread.currentThread().getName());
}
})
.delay(3, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.w(TAG, "44444----" + aLong + "," + Thread.currentThread().getName());
}
});