merge
@Test
public void testMerge() {
Flux.merge(Flux.just(1,2,3),Flux.range(5,6))
.subscribe(System.out::println);
}
merge将多个Flux合并成一个Flux。
@Test
public void testMerge1() {
Flux.range(1,5)
.mergeWith(Flux.just(8,9,10))
.subscribe(System.out::println);
}
mergeWith和merge相同。
@Test
public void testMerge2() {
Flux.range(0,10)
.mergeComparingWith(Flux.just(11,-1,-2,-3,-4),Integer::compareTo)
.subscribe(System.out::println);
}
每个Flux之间排序后合并成一个Flux。输出如下:
mergeComparingWith以Flux的第一个元素比较并排序,是Flux之间的排序,不会改变单个Flux之间的元素顺序。改成:
@Test
public void testMerge2() {
Flux.range(0,10)
.mergeComparingWith(Flux.just(-1,-2,-3,-4),Integer::compareTo)
.subscribe(System.out::println);
}
输出:
@Test
public void testMerge3() {
Flux.merge(Flux.just(Flux.just(1,2,3),Flux.range(-10,5)))
.subscribe(System.out::println);
}
Flux的元素是Flux,也能合并成一个Flux。嵌套的Flux合并成一个Flux。
@Test
public void testMerge4() {
Flux.mergePriority(Flux.range(1,10),Flux.just(-8,-1,-9))
.subscribe(System.out::println);
}
mergePriority以自然顺序排序后合并成一个Flux。但是用int类型没有排序。
@Test
public void testMerge5() {
Flux.mergeSequential(Flux.range(1,10),Flux.just(-8,-1,-9))
.subscribe(System.out::println);
}
与merge不同,mergeSequential发出的值按订阅顺序合并到最终序列中。
concat
@Test
public void testConcat() {
Flux.concat(Flux.range(1,10),Flux.just("a","b","c"))
.subscribe(System.out::println);
}
将多个Flux连接成一个Flux。
@Test
public void testConcat1() {
Flux.concat(Flux.range(1,10),Flux.error(new RuntimeException()),Flux.just("a","b","c"))
.subscribe(System.out::println);
}
如果遇到异常则停止处理。只打印1到10。
@Test
public void testConcat3() {
Flux.range(1,10)
.concatWith(t -> {
t.onNext(20);
t.onNext(30);
})
.subscribe(System.out::println);
}
concatWith和concat相同。
@Test
public void testConcat4() {
Flux.range(1,10)
.concatWithValues(20,30)
.subscribe(System.out::println);
}
concatWithValues和concat相同。
@Test
public void testConcatDelayError() {
Flux.concatDelayError(Flux.range(1,10), Flux.error(new RuntimeException()), Flux.just(20,30))
.subscribe(System.out::println);
}
@Test
public void testConcatDelayError1() {
Flux.concatDelayError(Flux.range(1,10), Flux.just(20,30),Flux.error(new RuntimeException()))
.subscribe(System.out::println);
}
上面两个单测运行结果相同,都是1到10,20,30。concatDelayError延迟异常的抛出,如果处理遇到异常,要等处理完后才抛出异常。
标签:10,Reactor,void,接口,之二,Flux,Test,println,public From: https://www.cnblogs.com/shigongp/p/17377613.html