首页 > 编程语言 >JAVA 发布订阅模式

JAVA 发布订阅模式

时间:2022-08-15 10:04:28浏览次数:55  
标签:订阅 JAVA lock Flow 模式 线程 发布者 subscription

JAVA 发布订阅模式

一、发布订阅模式

  在软件架构中,发布订阅是一种消息范式,消息的发送者(称为发布者)不会将消息直接发送给特定的接收者(称为订阅者)。而是将发布的消息分为不同的类别,无需了解哪些订阅者(如果有的话)可能存在。同样的,订阅者可以表达对一个或多个类别的兴趣,只接收感兴趣的消息,无需了解哪些发布者(如果有的话)存在。   Java9开始新增了一个发布-订阅框架,框架是基于异步响应流。发布,订阅框架可以非常方便地处理异步线程之间的流数据交换( 比如两个线程之间需要交换数据) 而且这个发布、订阅框架不需要使用数据中心来缓冲数据,同时具有非常高效的性能。

 

二、发布订阅模式的4个角色

1、Flow.Publisher: 代表数据发布者,生产者

2、Flow.Subscriber: 表数据订阅者、消费者

3、Flow.Subscription: 表发布者和订阅者之间的链接纽带。订阅者既可通过调用该对象的request()方法来获取数据项,也可通过调用对象的cancel()方法来取消订阅。

4、Flow.Processor: 数据处理器,它可同时作为发布者和订阅者使用

测试用例:发布者每秒钟发布一条消息,订阅者每秒钟订阅一条消息。

注意:订阅者处理消息,依赖当前线程的存活状态,如果发布消息后当前程序代码运行完毕会立即退出,订阅者来不及执行任何程序。

例1:用锁保持当前线程存活

import java.util.List;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class PublisherFlowSubscriber {
    /**
     * 定义用来保持线程不退出的锁
     */
    private static Lock lock = new ReentrantLock(true);
    private static Condition condition = lock.newCondition();
 
    public static void main(String[] args) throws InterruptedException {
        /**
         * 定义一个发布者,需要设定要发送消息的泛型数据类型
         */
        SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
        /**
         * 定义一个订阅者
         */
        MySubscirber<String> subscirber = new MySubscirber<>("订阅者1");
        MySubscirber<String> subscirber2 = new MySubscirber<>("订阅者2");
        /**
         * 通过发布者配置订阅者 会触发订阅者的onSubscribe方法,他们之间的链接纽带会通过参数传递给onSubscribe方法,如果注册失败会触发onError方法
         */
        publisher.subscribe(subscirber);publisher.subscribe(subscirber2);
 
        /**
         * 测试发布消息
         */
        List<String> list =  List.of("张三", "李四", "王五", "赵六");
        list.forEach(string -> publisher.submit(string)); //向订阅者发布数据,需要保持前台的线程存活,否则当前线程执行结束,发布者和订阅者都被销毁了。
        /**
         * 关闭消息发布
         */
        publisher.close(); //关闭后,如果当前线程未退出,待订阅者所有消息都处理完毕才会运行订阅者的onComplete方法
        lock.lock();
        //抛出锁
        condition.await();
        lock.unlock();
 
    }
 
    /**
     * 定义订阅者类,需要注意实现接口Flow.Subscriber 实现其泛型传递
     */
    private static class MySubscirber<T> implements Flow.Subscriber<T>{
        /**
         * 订阅者自定义的属性,名字,关联的订阅平台
         */
        private String name;
        private Flow.Subscription subscription;
 
        public MySubscirber(String name) {
            this.name = name;
        }
 
        /**
         * 订阅的时候触发的方法
         * @param subscription 订阅者被关联的订阅平台
         */
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            System.out.println(name + "开启订阅" + subscription);
            /**
             * 从订阅平台获取一条消息
             */
            subscription.request(1);
            /**
             * 将平台实例保存,便于复用
             */
            this.subscription = subscription;
        }
 
        /**
         * 获取一条数据后触发的方法
         * @param
         */
        @Override
        public void onNext(T t) {
            System.out.println(name + "获取到了一条数据:" +t);
            //再次获取一条数据...自循环触发自己循环调用,一直将所有数据获取完毕
            subscription.request(1);
            /**
             * 模拟处理耗时
             */
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
 
        /**
         * 订阅出错时运行的方法
         * @param throwable 错误对象
         */
        @Override
        public void one rror(Throwable throwable) {
            throwable.printStackTrace();
        }
 
        /**
         * 发布者停止发布,且订阅者处理完接收数据后,触发该方法
         */
        @Override
        public void onComplete() {
            System.out.println(name + "发布者关闭了发布");
            lock.lock();
            condition.signalAll();
            lock.unlock();
        }
    }
}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

标签:订阅,JAVA,lock,Flow,模式,线程,发布者,subscription
From: https://www.cnblogs.com/lizm166/p/16587139.html

相关文章

  • 【java面试题】面向对象的特征
    【java面试题】面向对象的特征 面向对象编程是利用类和对象编程的一种思想,万物可归类,类是对于世界事物的高度抽象,万物皆对象,对象是具体的世界事物。面向对象的三大特征......
  • lvs模式配置
    lvs模式配置lvs简介LVS(LinuxVirtualServer)即Linux虚拟服务器,是由章文嵩博士主导的开源负载均衡项目,目前LVS已经被集成到Linux内核模块中。该项目在Linux内核中实现了......
  • 长篇图解java反射机制及其应用场景
    一、什么是java反射?在java的面向对象编程过程中,通常我们需要先知道一个Class类,然后new类名()方式来获取该类的对象。也就是说我们需要在写代码的时候(编译期或者编译期之......
  • Java开发学习(二十五)----使用PostMan完成不同类型参数传递
    一、请求参数请求路径设置好后,只要确保页面发送请求地址和后台Controller类中配置的路径一致,就可以接收到前端的请求,接收到请求后,如何接收页面传递的参数?关于请求参数的......
  • CAD设置经典模式
    1、打开桌面CAD2020软件,点击开始绘制。2、点击最上面的倒三角,下拉,点击【显示菜单栏】; 3、点击菜单栏的【工具】,点击【选项板】,点击【功能区】,关闭功能区;  4、......
  • 【Java】List排序方法(包括对象、Map等内部排序实现)
    前言日常开发中经常会对List集合做排序操作,JDK为我们提供了强大的排序方法,可以针对对象、Map、基本类型等进行正/倒排序操作。参考博客:JAVA列表排序方法sort和reversed......
  • javaweb Filter详解
    Filter详解1.快速入门packagecom.cj.filter;importjavax.servlet.*;importjavax.servlet.annotation.WebFilter;importjavax.servlet.annotation.WebServlet;i......
  • Day2(复习java基础知识)
    Java基础java是一门强类型语言要求变量的使用要严格符合规定,所有变量都必须先定义后才能使用 Java的特性简单性面向对象可移植性高性能分布式动态......
  • 【Linux】Java获取Linux本机ip为127.0.0.1的解决方法
    前言参考博客:Java获取Linux本机ip为127.0.0.1的解决方法最近新部署了一台服务器,环境搭建好后,运行相应的项目代码时,发现项目获取的服务器IP地址不是我想要的Java代码中......
  • Java面向对象
    面向对象一、对象类=方法+属性面向过程:步骤清晰简单,第一步做什么,第二部做什么,适合处理一些较为简单的问题面向对象:分类的思维,思考问题需要哪些分类,对这些分类进行单独......