首页 > 编程语言 >使用Java和Reactive Streams构建流式应用

使用Java和Reactive Streams构建流式应用

时间:2024-07-22 22:08:45浏览次数:12  
标签:Java Flux Mono void Reactive Streams 数据流 public

使用Java和Reactive Streams构建流式应用

大家好,我是微赚淘客系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿!今天,我们将深入探讨如何使用Java和Reactive Streams构建流式应用。流式应用能够高效处理异步数据流,尤其适合处理大量数据和实时数据的场景。Reactive Streams是一个标准的Java库,用于处理异步流数据。

一、Reactive Streams概述

Reactive Streams是Java 9引入的标准,旨在提供一种异步处理数据流的方式。它定义了四个核心接口:

  • Publisher:提供数据流。
  • Subscriber:消费数据流。
  • Subscription:连接Publisher和Subscriber。
  • Processor:同时作为Publisher和Subscriber。

这些接口帮助我们在Java中实现高效的异步数据处理。

二、Reactive Streams基础

  1. 创建Publisher

    在Reactive Streams中,Publisher是数据流的源。我们可以使用Publisher接口的实现类来创建一个简单的Publisher。例如:

    package cn.juwatech.streams;
    
    import org.reactivestreams.Publisher;
    import org.reactivestreams.Subscriber;
    import org.reactivestreams.Subscription;
    
    import java.util.Arrays;
    import java.util.List;
    
    public class SimplePublisher implements Publisher<String> {
        private final List<String> data;
    
        public SimplePublisher(String... data) {
            this.data = Arrays.asList(data);
        }
    
        @Override
        public void subscribe(Subscriber<? super String> subscriber) {
            subscriber.onSubscribe(new Subscription() {
                @Override
                public void request(long n) {
                    for (String item : data) {
                        subscriber.onNext(item);
                    }
                    subscriber.onComplete();
                }
    
                @Override
                public void cancel() {
                    // No-op
                }
            });
        }
    
        public static void main(String[] args) {
            SimplePublisher publisher = new SimplePublisher("Hello", "Reactive", "Streams");
            publisher.subscribe(new Subscriber<String>() {
                @Override
                public void onSubscribe(Subscription s) {
                    s.request(Long.MAX_VALUE);
                }
    
                @Override
                public void onNext(String s) {
                    System.out.println("Received: " + s);
                }
    
                @Override
                public void one rror(Throwable t) {
                    System.err.println("Error: " + t);
                }
    
                @Override
                public void onComplete() {
                    System.out.println("Completed");
                }
            });
        }
    }
    

    在这个例子中,SimplePublisher类实现了Publisher接口,模拟了一个简单的数据源。subscribe方法接受一个Subscriber对象,request方法用于请求数据。

  2. 创建Subscriber

    Subscriber是数据流的消费者。以下是一个简单的Subscriber实现:

    package cn.juwatech.streams;
    
    import org.reactivestreams.Subscriber;
    import org.reactivestreams.Subscription;
    
    public class SimpleSubscriber implements Subscriber<String> {
        @Override
        public void onSubscribe(Subscription s) {
            s.request(Long.MAX_VALUE); // 请求所有数据
        }
    
        @Override
        public void onNext(String s) {
            System.out.println("Received: " + s);
        }
    
        @Override
        public void one rror(Throwable t) {
            System.err.println("Error: " + t);
        }
    
        @Override
        public void onComplete() {
            System.out.println("Completed");
        }
    
        public static void main(String[] args) {
            SimplePublisher publisher = new SimplePublisher("Reactive", "Streams", "Example");
            SimpleSubscriber subscriber = new SimpleSubscriber();
            publisher.subscribe(subscriber);
        }
    }
    

    这个示例中,SimpleSubscriber实现了Subscriber接口,并对数据流中的每个元素进行处理。

三、使用Flux和Mono

Spring WebFlux提供了更高级的Reactive Streams实现,包括FluxMono,它们分别代表零到多个和零到一个异步数据项。

  1. 使用Flux

    Flux表示一个异步数据流。以下是一个使用Flux的示例:

    package cn.juwatech.streams;
    
    import reactor.core.publisher.Flux;
    
    public class FluxExample {
        public static void main(String[] args) {
            Flux<String> flux = Flux.just("Hello", "Reactive", "World")
                                     .doOnNext(System.out::println)
                                     .doOnComplete(() -> System.out.println("Flux completed"));
    
            flux.subscribe();
        }
    }
    

    在这个例子中,Flux.just创建了一个包含三个元素的FluxdoOnNext用于处理每个数据项,doOnComplete用于处理流完成时的操作。

  2. 使用Mono

    Mono表示一个异步数据流中的单个元素或没有元素。以下是一个使用Mono的示例:

    package cn.juwatech.streams;
    
    import reactor.core.publisher.Mono;
    
    public class MonoExample {
        public static void main(String[] args) {
            Mono<String> mono = Mono.just("Hello Mono")
                                    .doOnNext(System.out::println)
                                    .doOnTerminate(() -> System.out.println("Mono terminated"));
    
            mono.subscribe();
        }
    }
    

    这个示例中,Mono.just创建了一个包含单个元素的MonodoOnNext用于处理元素,doOnTerminate用于处理流终止时的操作。

四、组合和变换数据流

Reactive Streams允许我们组合和变换数据流。以下是一些常用操作:

  1. 变换操作

    使用mapflatMap来变换数据流中的元素:

    package cn.juwatech.streams;
    
    import reactor.core.publisher.Flux;
    
    public class TransformExample {
        public static void main(String[] args) {
            Flux<String> flux = Flux.just("hello", "world")
                                     .map(String::toUpperCase)
                                     .doOnNext(System.out::println);
    
            flux.subscribe();
        }
    }
    

    在这个例子中,map操作将每个字符串转换为大写。

  2. 过滤操作

    使用filter来过滤数据流中的元素:

    package cn.juwatech.streams;
    
    import reactor.core.publisher.Flux;
    
    public class FilterExample {
        public static void main(String[] args) {
            Flux<Integer> flux = Flux.range(1, 10)
                                     .filter(i -> i % 2 == 0)
                                     .doOnNext(System.out::println);
    
            flux.subscribe();
        }
    }
    

    在这个示例中,filter操作仅保留偶数元素。

  3. 合并和连接操作

    使用concatmerge操作来合并数据流:

    package cn.juwatech.streams;
    
    import reactor.core.publisher.Flux;
    
    public class MergeExample {
        public static void main(String[] args) {
            Flux<String> flux1 = Flux.just("A", "B", "C");
            Flux<String> flux2 = Flux.just("D", "E", "F");
    
            Flux<String> mergedFlux = Flux.concat(flux1, flux2)
                                          .doOnNext(System.out::println);
    
            mergedFlux.subscribe();
        }
    }
    

    在这个例子中,concat操作将两个Flux合并为一个流。

五、错误处理

处理流中的错误非常重要。可以使用onErrorResumeonErrorReturn来处理错误:

  1. 使用onErrorResume

    package cn.juwatech.streams;
    
    import reactor.core.publisher.Flux;
    
    public class ErrorHandlingExample {
        public static void main(String[] args) {
            Flux<String> flux = Flux.just("A", "B", "C")
                                     .concatWith(Flux.error(new RuntimeException("Error occurred")))
                                     .onErrorResume(e -> Flux.just("Error handled"))
                                     .doOnNext(System.out::println);
    
            flux.subscribe();
        }
    }
    

    在这个示例中,onErrorResume用于处理错误并提供备用数据流。

  2. 使用onErrorReturn

    package cn.juwatech.streams;
    
    import reactor.core.publisher.Mono;
    
    public class ErrorHandlingMonoExample {
        public static void main(String[] args) {
            Mono<String> mono = Mono.just("Hello")
                                    .flatMap(value -> Mono.error(new RuntimeException("Error occurred")))
                                    .onErrorReturn("Default value");
    
            mono.subscribe(System.out::println);
        }
    }
    

    在这个例子中,onErrorReturn用于在发生错误时返回一个默认值。

六、总结

使用Java和Reactive Streams构建流式应用可以大大提升数据处理的灵活性和效率。Reactive Streams提供了一套标准的接口,用于处理异步数据流。通过使用PublisherSubscriberFluxMono,我们可以创建、变换、过滤和合并数据流,并处理流中的错误。这些技术在构建高性能、可伸缩的应用程序中发挥着重要作用。

本文著作权

归聚娃科技微赚淘客系统开发者团队,转载请注明出处!

标签:Java,Flux,Mono,void,Reactive,Streams,数据流,public
From: https://www.cnblogs.com/szk123456/p/18317069

相关文章

  • 使用Java和Spring WebFlux构建响应式微服务
    使用Java和SpringWebFlux构建响应式微服务大家好,我是微赚淘客系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿!今天我们将探讨如何使用Java和SpringWebFlux构建响应式微服务。SpringWebFlux是Spring框架的一部分,专为创建响应式应用程序而设计。在这篇文章中,我们将介绍如何......
  • 超热门!身份证实名认证接口Java调用示例
    一、什么是身份证实名认证?输入姓名、身份证号,校验此两项是否匹配,同时返回生日、性别、籍贯等信息。二、身份证实名认证接口适用哪些场景呢?金融领域、电商与支付、社交与通讯、交通与出行、在线教育与培训等。三、如何用Java快速调用该接口呢?以下以阿里云为例:接口地址:身份......
  • java做算法题可以用到的方法(都是很常用的)
    java做算法题可以用到的方法(都是很常用的)数组排序(从小到大)将字符串大写字母转为小写替换字符串中符合某种规则的字符去除字符串两端的空白字符分割字符串将数组转换为列表两数比较取较大/较小的数字int类型转换为String类型赋予int类型一个最大数(算法题中一般用于初始化一......
  • 2024年Java高级开发工程师面试准备
    20240722前三步因为是在20年找工作的时候已经充分学习过,所以现在基本只需要读一遍即可第一步:Java基础(CYC2018[2.1-2.4]+JavaGuide[第二章])Java基础+JVM+多线程+Java集合第二步:计算机基础(算法和设计模式靠积累,计算机网络和操作系统读一遍:CYC2018[3.1-3.2]+JavaGuide[......
  • Java基础-学习笔记06
    **06访问修饰符封装继承多态**访问修饰符public公开级别,对外公开protected受保护级别,对子类和同一个包中的类公开default默认级别,无修饰符,向同一个包的类公开private私有级别,只有类本身可以访问,不对外公开修饰符可以用来修饰类中的属性,成员方法以及类只有默认......
  • Javase-11.多态
    1.什么是多态?多态的概念:通俗来说就是多种形态.具体点就是去完成某个行为时,不同的对象去完成会产生不同的状态.比如:同样是打印,彩色打印机打印出来的纸是彩色的,而黑白打印机打印出来的是黑白色的.多态体现:在代码运行时,当传递不同类对象时,会调用对应类中的方法。2.......
  • Java 经典排序算法代码 + 注释分析(冒泡、选择、插入、希尔、快排、计数、堆排、归并)
    Java经典排序算法代码+注释分析(冒泡、选择、插入、希尔、快排、计数、堆排、归并)以下是八种经典排序算法的代码,Java8亲测可用,可以直接运行importjava.util.Arrays;publicclassSort{privatestaticfinalint[]nums={3,44,38,5,47,15,36,26,27......
  • java毕业设计-基于springboot+vue的校园二手交易系统,基于java的校园二手交易系统,基于j
    文章目录前言演示视频项目背景项目架构和内容获取(文末获取)具体实现截图前台功能管理后台技术栈具体功能模块设计系统需求分析可行性分析系统测试为什么我?关于我我自己的网站前言博主介绍:✌️码农一枚,专注于大学生项目实战开发、讲解和毕业......
  • Multithreading in Java
    Whatismultithread?multithread(多线程)可以让程序/系统同时做多件事情。用于提升效率。这里要着重介绍四个概念。process(进程),进程具有自包含的独立运行环境(self-containedexcesiveenvironment),并且有着自己的内存空间(ownmemoryspace)。thread(线程),线程和进程都......
  • JavaScript笔记总结(Xmind格式):第一天
    Xmind鸟瞰图:简单文字总结:js使用方法:        1.行内样式(需要特定条件才可以使用)        2.内部样式(script尽量写在body最下面)        3.外部样式(在script标签中通过src引入外部的js文件)window对象的方法(window可以省略):        1.alert......