首页 > 其他分享 >Reactor接口之五

Reactor接口之五

时间:2023-05-07 17:12:01浏览次数:40  
标签:Reactor void System 接口 Flux 之五 println public out

defaultIfEmpty

@Test
public void testDefaultIfEmpty() {
    Flux.range(1,10)
            .defaultIfEmpty(30)
            .subscribe(System.out::println);
}

@Test
public void testDefaultIfEmpty1() {
    Flux.empty()
            .defaultIfEmpty(30)
            .subscribe(System.out::println);
}

defaultIfEmpty为空Flux提供默认值。

last

@Test
public void testLast() {
    Flux.range(1,10)
            .last()
            .subscribe(System.out::println);
}

获取最后一个元素。输出10

skip

@Test
public void testSkip() {
    Flux.range(1,10)
            .skip(5)
            .subscribe(System.out::println);
}

skip跳过n个元素。输出6到10。

@Test
public void testSkip1() {
    Flux.range(1,10)
            .skip(5)
            .subscribe(new Subscriber<Integer>() {
                @Override
                public void onSubscribe(Subscription subscription) {
                    subscription.request(2);
                }

                @Override
                public void onNext(Integer integer) {
                    System.out.println(integer);
                }

                @Override
                public void one rror(Throwable throwable) {
                    if (throwable != null) {
                        throwable.printStackTrace();
                    }
                }

                @Override
                public void onComplete() {
                    System.out.println("complete");
                }
            });
}

调用subscription.request来实现limit功能。再加上skip可以实现分页功能。

log

@Test
public void testLog() {
    Flux.range(1,10)
            .log()
            .subscribe();
}

log类似日志打印。

doOn系列函数

@Test
public void testDoOn() {
    Flux.range(1,10)
            .doFirst(() -> System.out.println("first"))
            .doOnNext(System.out::println)
            .doOnRequest(t -> {
                System.out.println("request:" + t);
            })
            .doOnCancel(() -> System.out.println("cancel"))
            .doOnComplete(() -> System.out.println("complete"))
            .doOnTerminate(() -> System.out.println("terminate"))
            .subscribe();
}

输出:

first
request:9223372036854775807
1
2
3
4
5
6
7
8
9
10
complete
terminate

首先执行doFirst,下游向上游请求数据时执行doOnRequest且执行一次。有数据时执行doOnNext,发生了cancel执行doOnCancel。请求完成执行doOnComplete。最后执行doOnTerminate。doOnRequest默认是向上游请求 Long.MAX_VALUE个数据。

 

@Test
public void testDoOn1() {
    Flux.range(1,10)
            .doFirst(() -> System.out.println("first"))
            .doOnNext(System.out::println)
            .doOnRequest(t -> {
                System.out.println("request:" + t + ":"+(t==Long.MAX_VALUE));
            })
            .doOnCancel(() -> System.out.println("cancel"))
            .doOnComplete(() -> System.out.println("complete"))
            .doOnTerminate(() -> System.out.println("terminate"))
            .doOnSubscribe(t -> {
                t.request(9);
                t.cancel();
            })
            .subscribe();
}

输出:

first
request:9:false
1
2
3
4
5
6
7
8
9
cancel
request:9223372036854775807:true

doOnSubscribe执行了cancel后执行了doOnCancel。cancel执行后没有执行doOnComplete和doOnTerminate。

 

@Test
public void testDoOn3() {
    Flux.just(1,Flux.error(new RuntimeException()))
            .doOnError(e -> System.out.println(e.getClass()))
            .doOnNext(System.out::println)
            .subscribe();
}

当发生异常时执行doOnError。

 

@Test
public void testDoOn4() {
    Flux.just(1,Flux.error(new RuntimeException()))
            .doFirst(() -> System.out.println("first"))
            .doOnError(e -> System.out.println(e.getClass()))
            .doOnNext(System.out::println)
            .doOnRequest(t -> {
                System.out.println("request:" + t + ":"+(t==Long.MAX_VALUE));
            })
            .doOnCancel(() -> System.out.println("cancel"))
            .doOnComplete(() -> System.out.println("complete"))
            .doOnTerminate(() -> System.out.println("terminate"))
            .doOnSubscribe(t -> {
                t.request(9);
                t.cancel();
            })
            .subscribe();
}

输出:

first
request:9:false
1
FluxError
complete
terminate
cancel
cancel

执行出现异常时,先执行doOnError,在执行doOnComplete,最后执行doOnTerminate。

@Test
public void testDoOn5() {
    Flux.just(1,Flux.error(new RuntimeException()))
            .doOnEach(System.out::println)
            .subscribe();
}

输出:

doOnEach_onNext(1)
doOnEach_onNext(FluxError)
onComplete()

有元素时执行doOnNext,出现异常时执行doOnError,最后执行doOnComplete。doOnEach相当于简化了doOnNext,doOnError,doOnComplete的调用。

onErrorResume

@Test
public void testOnErrorResume() {
   Flux.range(1,10)
           .map(k -> 4/(k-2))
            .onErrorResume(e -> Flux.error(new RuntimeException(e.getMessage())))
            .subscribe(System.out::println);
}

当发生异常时,封装成另一个异常。

 

@Test
public void testOnErrorResume1() {
    Flux.range(1,10)
            .map(k -> 4/(k-2))
            .onErrorResume(e -> Flux.just(0))
            .subscribe(System.out::println);
}

.
发生异常时给个默认值。输出-4,0

标签:Reactor,void,System,接口,Flux,之五,println,public,out
From: https://www.cnblogs.com/shigongp/p/17379451.html

相关文章

  • Reactor接口之四
    interval@TestpublicvoidtestInterval(){CountDownLatchcountDownLatch=newCountDownLatch(1);Flux.range(1,10).zipWith(Flux.interval(Duration.ofSeconds(1))).subscribe(System.out::println,null,countDownLatch::countD......
  • Reactor接口之三
    defer@TestpublicvoidtestDefer(){Flux.defer(()->{returnFlux.range(0,10);}).subscribe(System.out::println);}输出0到9。defer每次对结果Flux进行订阅时,懒惰地提供发布服务。因此实际的源实例化会推迟到每次订阅时。collect@Te......
  • SpringCloud gateway内置过滤器之五
    1、SetRequestHeaderGatewayFilterSetRequestHeaderGatewayFilter设置请求头,会覆盖原来已有的请求头。spring:cloud:gateway:enabled:trueroutes:-id:Goods-Server#路由id,唯一标识uri:lb://producerpredicates......
  • 接口自动化 测试数据驱动 DDD模块使用
    一、DDT简单介绍名称:Data-DrivenTests,数据驱动测试作用:由外部数据集合来驱动测试用例的执行核心的思想:数据和测试代码分离应用场景:一组外部数据来执行相同的操作优点:当测试数据发生大量变化的情况下,测试代码可以保持不变实际项目:excel存储测试数据,ddt读取测试数据到单元......
  • vue配置请求本地接口
    proxy:{'/nrms':{target:'http://localhost:8921/',changeOrigin:true,ws:true,pathRewrite:{'^/nrms':'/'}},'/':{targ......
  • Reactor接口之二
    merge@TestpublicvoidtestMerge(){Flux.merge(Flux.just(1,2,3),Flux.range(5,6)).subscribe(System.out::println);}merge将多个Flux合并成一个Flux。 @TestpublicvoidtestMerge1(){Flux.range(1,5).mergeWith(Flux.just(8,9......
  • 【接口自动化测试】月薪12k必会技术,从0到1学习接口自动化测试,6个操作安排的明明白白
        ​导读:在所有的开发测试中,接口测试是必不可少的一项。有效且覆盖完整的接口测试,不仅能保障新功能的开发质量,还能让开发在修改功能逻辑的时候有回归的能力,同时也是能优雅地进行重构的前提。编写接口测试要遵守哪些原则?测试代码的结构应该是什么样的?接口测试......
  • 聊聊关于,SpringBoot写后端接口
    前言:一个后端接口大致分为四个部分组成:接口地址(url)、接口请求方式(get、post等)、请求数据(request)、响应数据(response)。如何构建这几个部分每个公司要求都不同,没有什么“一定是最好的”标准,但一个优秀的后端接口和一个糟糕的后端接口对比起来差异还是蛮大的,其中最重要的关键点就是......
  • Java获取pdd详情api接口、商品详情、商品描述、宝贝链接获取展示示例
    ​拼多多商品详情就是对拼多多商城中的宝贝的描述了,消费者们在进入到宝贝的详情页面后,可以通过这些描述去了解该款宝贝。其实这样也可以大大的增加商品的转化率。那么它的作用有什么呢?1.突出商品卖点:把商品的特色和突出点写表现出来,很更好的吸引到顾客,让顾客有点击和购买的欲......
  • 与chatGPT谈TyptScript接口问题
    与chatGPT谈TyptScript接口问题问1:能给我说说c#中的inteface与typescript中的inteface的不同与相同吗?答1:C#中的Interface和TypeScript中的Interface有一些相似之处,但也有一些不同之处。相同点:Interface都是用来定义对象的结构和行为的。都支持继承和多态的概念。......