defaultIfEmpty
@Test
public void testDefaultIfEmpty() {
Flux.range(1,10)
.defaultIfEmpty(30)
.subscribe(System.out::println);
}
@Test
public void testDefaultIfEmpty1() {
Flux.empty()
.defaultIfEmpty(30)
.subscribe(System.out::println);
}
defaultIfEmpty为空Flux提供默认值。
last
@Test
public void testLast() {
Flux.range(1,10)
.last()
.subscribe(System.out::println);
}
获取最后一个元素。输出10
skip
@Test
public void testSkip() {
Flux.range(1,10)
.skip(5)
.subscribe(System.out::println);
}
skip跳过n个元素。输出6到10。
@Test
public void testSkip1() {
Flux.range(1,10)
.skip(5)
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription subscription) {
subscription.request(2);
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
@Override
public void one rror(Throwable throwable) {
if (throwable != null) {
throwable.printStackTrace();
}
}
@Override
public void onComplete() {
System.out.println("complete");
}
});
}
调用subscription.request来实现limit功能。再加上skip可以实现分页功能。
log
@Test
public void testLog() {
Flux.range(1,10)
.log()
.subscribe();
}
log类似日志打印。
doOn系列函数
@Test
public void testDoOn() {
Flux.range(1,10)
.doFirst(() -> System.out.println("first"))
.doOnNext(System.out::println)
.doOnRequest(t -> {
System.out.println("request:" + t);
})
.doOnCancel(() -> System.out.println("cancel"))
.doOnComplete(() -> System.out.println("complete"))
.doOnTerminate(() -> System.out.println("terminate"))
.subscribe();
}
输出:
first
request:9223372036854775807
1
2
3
4
5
6
7
8
9
10
complete
terminate
首先执行doFirst,下游向上游请求数据时执行doOnRequest且执行一次。有数据时执行doOnNext,发生了cancel执行doOnCancel。请求完成执行doOnComplete。最后执行doOnTerminate。doOnRequest默认是向上游请求 Long.MAX_VALUE个数据。
@Test
public void testDoOn1() {
Flux.range(1,10)
.doFirst(() -> System.out.println("first"))
.doOnNext(System.out::println)
.doOnRequest(t -> {
System.out.println("request:" + t + ":"+(t==Long.MAX_VALUE));
})
.doOnCancel(() -> System.out.println("cancel"))
.doOnComplete(() -> System.out.println("complete"))
.doOnTerminate(() -> System.out.println("terminate"))
.doOnSubscribe(t -> {
t.request(9);
t.cancel();
})
.subscribe();
}
输出:
first
request:9:false
1
2
3
4
5
6
7
8
9
cancel
request:9223372036854775807:true
doOnSubscribe执行了cancel后执行了doOnCancel。cancel执行后没有执行doOnComplete和doOnTerminate。
@Test
public void testDoOn3() {
Flux.just(1,Flux.error(new RuntimeException()))
.doOnError(e -> System.out.println(e.getClass()))
.doOnNext(System.out::println)
.subscribe();
}
当发生异常时执行doOnError。
@Test
public void testDoOn4() {
Flux.just(1,Flux.error(new RuntimeException()))
.doFirst(() -> System.out.println("first"))
.doOnError(e -> System.out.println(e.getClass()))
.doOnNext(System.out::println)
.doOnRequest(t -> {
System.out.println("request:" + t + ":"+(t==Long.MAX_VALUE));
})
.doOnCancel(() -> System.out.println("cancel"))
.doOnComplete(() -> System.out.println("complete"))
.doOnTerminate(() -> System.out.println("terminate"))
.doOnSubscribe(t -> {
t.request(9);
t.cancel();
})
.subscribe();
}
输出:
first
request:9:false
1
FluxError
complete
terminate
cancel
cancel
执行出现异常时,先执行doOnError,在执行doOnComplete,最后执行doOnTerminate。
@Test
public void testDoOn5() {
Flux.just(1,Flux.error(new RuntimeException()))
.doOnEach(System.out::println)
.subscribe();
}
输出:
doOnEach_onNext(1)
doOnEach_onNext(FluxError)
onComplete()
有元素时执行doOnNext,出现异常时执行doOnError,最后执行doOnComplete。doOnEach相当于简化了doOnNext,doOnError,doOnComplete的调用。
onErrorResume
@Test
public void testOnErrorResume() {
Flux.range(1,10)
.map(k -> 4/(k-2))
.onErrorResume(e -> Flux.error(new RuntimeException(e.getMessage())))
.subscribe(System.out::println);
}
当发生异常时,封装成另一个异常。
@Test
public void testOnErrorResume1() {
Flux.range(1,10)
.map(k -> 4/(k-2))
.onErrorResume(e -> Flux.just(0))
.subscribe(System.out::println);
}
.
发生异常时给个默认值。输出-4,0