首页 > 其他分享 >Reactor模式

Reactor模式

时间:2024-09-28 20:34:52浏览次数:1  
标签:Reactor 模式 selector socketChannel byteBuffer selectionKey serverSocketChannel pu

Reactor模式

许多高性能的服务器软件离不开Reactor模式.像高性能缓存Redis,高性能web服务器Nginx,高性能的网络组件Netty,高性能的消息中间件Kafka,RocketMQ等.

那什么是Reactor模式呢?借用Doug Lea大师的话来说,就是:

Reactor模式由Reactor线程,Handles处理器两大角色组成,它们的职责分别是:

1.Reactor线程负责响应IO事件,并且将IO事件分发到Handles处理器
2.Handles线程:IO的读取,业务逻辑处理,写入

那为什么会产生Reactor模式呢?这就不得不说起OIO了.OIO又叫BIO,阻塞IO,像ServerScoket,Socket的read和write都会阻塞线程,直到完成IO操作. 系统的吞吐量特别低,每个IO操作都会阻塞其他的操作.为了解决这个问题,后面引入了每个连接一个线程.由每个线程处理一个连接的IO读,业务处理,IO写.这样每个连接在IO操作阻塞时不会影响其他的线程.

在系统的连接数少时没有问题,当连接数越来越多时,线程的数量就越来越多.对系统的资源消耗太高.

为了解决以上的问题,需要控制线程的数量.假设一个线程处理大量的连接,就可以控制系统资源的使用,同时提高系统的吞吐量.

单线程的Reactor模式

如果Reactor线程和Handles线程是同一个线程,就是最简单的Reactor模式了.

来看个例子,实现个Echo服务,简单返回客户端发送的数据.

服务端:

public interface Handler {
    void handle() throws IOException;
}


public class AcceptorHandler implements Handler{

    ServerSocketChannel serverSocketChannel;

    Selector selector;

    public AcceptorHandler(ServerSocketChannel serverSocketChannel,
                           Selector selector) {
        this.serverSocketChannel = serverSocketChannel;
        this.selector = selector;
    }

    @Override
    public void handle() {
        try {
            SocketChannel channel = serverSocketChannel.accept();

            channel.configureBlocking(false);

            SelectionKey selectionKey = channel.register(selector, 0);
            selectionKey.attach(new IOHandler(channel, selector,selectionKey));
            selectionKey.interestOps(SelectionKey.OP_READ);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

    }
}

public class IOHandler implements Handler {

    Selector selector;

    SocketChannel socketChannel;

    SelectionKey selectionKey;
    ByteBuffer byteBuffer = ByteBuffer.allocate(2048);

    public IOHandler(SocketChannel socketChannel,Selector selector,
                     SelectionKey selectionKey) {
        this.selector = selector;
        this.socketChannel = socketChannel;
        this.selectionKey = selectionKey;
    }

    @Override
    public void handle() {
        if (selectionKey.isReadable()) {
            try {
                while (socketChannel.read(byteBuffer) > 0) {
                    byteBuffer.flip();
                    System.out.println("读的内容是"+ new String(byteBuffer.array(),byteBuffer.position(),byteBuffer.limit()));
                }

                SelectionKey selectionKey1 = selectionKey.interestOps(SelectionKey.OP_WRITE);
                selectionKey1.attach(this);
            } catch (IOException e) {
                try {
                    socketChannel.close();
                } catch (IOException ex) {

                }
            }
        }else if (selectionKey.isWritable()) {
            try {
                while (socketChannel.write(byteBuffer) > 0) {

                }

                byteBuffer.clear();

                SelectionKey selectionKey1 = selectionKey.interestOps(SelectionKey.OP_READ);
                selectionKey1.attach(this);
            } catch (Exception e) {
                try {
                    socketChannel.close();
                } catch (IOException ex) {

                }
            }
        }
     }
}

public class EchoServer {

    public static void main(String[] args) {
        try {
            startServer();
        }catch (Exception e) {
            e.printStackTrace();
        }
    }


    public static void startServer() throws Exception {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

        serverSocketChannel.configureBlocking(false);

        serverSocketChannel.bind(new InetSocketAddress("localhost",10700));

        Selector selector = Selector.open();

        SelectionKey selectionKey = serverSocketChannel.register(selector, 0);
        selectionKey.attach(new AcceptorHandler(serverSocketChannel,selector));
        selectionKey.interestOps(SelectionKey.OP_ACCEPT);

        while (!Thread.interrupted()) {
            int select = selector.select();
            if (select > 0) {
                Set<SelectionKey> selectionKeys = selector.selectedKeys();

                Iterator<SelectionKey> iterator = selectionKeys.iterator();

                while (iterator.hasNext()) {
                    SelectionKey selectionKey1 = iterator.next();

                    Handler handler = (Handler) selectionKey1.attachment();
                    handler.handle();
                }
                selectionKeys.clear();
            }
        }

        serverSocketChannel.close();
        selector.close();
    }
}

客户端:

public class EchoClient {

    private static final ByteBuffer byteBuffer = ByteBuffer.allocate(2048);

    public static void main(String[] args) {
        try {
            startClient();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void startClient() throws Exception {
        SocketChannel socketChannel = SocketChannel.open();

        socketChannel.configureBlocking(false);
        socketChannel.connect(new InetSocketAddress("localhost",10700));

        while (!socketChannel.finishConnect()) {

        }

        System.out.println("成功与服务器建立连接");

        Scanner scanner = new Scanner(System.in);
        System.out.print("请输入一行:");
        while (scanner.hasNext()) {
            String s = scanner.nextLine();

            byteBuffer.clear();
            byteBuffer.put(s.getBytes(StandardCharsets.UTF_8));

            byteBuffer.flip();

            while(socketChannel.write(byteBuffer)>0) {

            }


            byteBuffer.clear();
            while (socketChannel.read(byteBuffer) == 0) {

            }

            byteBuffer.flip();
            System.out.println("从服务器接收数据:"+new String(byteBuffer.array(), byteBuffer.position(), byteBuffer.limit()));

            System.out.print("请输入一行:");
        }

        socketChannel.close();
    }
}

可以看到一个线程管理了很多连接,解决了每个连接一个线程的系统资源消耗的问题.可以看出,缺点就是进行IO操作时还会阻塞其他的线程.

多线程Reactor模式

在单线程的Reactor模式加上多线程来改进阻塞问题:
1.Handle加多线程,考虑使用线程池
2.Reactor加多线程,引入多个Selector

现在看下多线程版本的Reactor实现的Echo服务:

服务器

public class MultiAcceptorHandler implements Handler {

    ServerSocketChannel serverSocketChannel;

    Selector selector;

    ExecutorService executorService;

    public MultiAcceptorHandler(ServerSocketChannel serverSocketChannel,
                                Selector selector,ExecutorService executorService) {
        this.serverSocketChannel = serverSocketChannel;
        this.selector = selector;
        this.executorService = executorService;
    }

    @Override
    public void handle() {
        try {
            SocketChannel channel = serverSocketChannel.accept();

            channel.configureBlocking(false);

            new MultiIOHandler(channel, selector,executorService);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

    }
}

public class MultiIOHandler implements Handler {

    final Selector selector;

    final SocketChannel socketChannel;

    final SelectionKey selectionKey;

    final ExecutorService executorService;
    final ByteBuffer byteBuffer = ByteBuffer.allocate(2048);

    int read = 0;
    public MultiIOHandler(SocketChannel socketChannel, Selector selector,
                           ExecutorService executorService) {
        this.selector = selector;
        this.socketChannel = socketChannel;
        this.executorService = executorService;

        try {
            this.selectionKey = socketChannel.register(selector, 0);
            this.socketChannel.configureBlocking(false);
            this.selectionKey.attach(this);
            this.selectionKey.interestOps(SelectionKey.OP_READ);
            selector.wakeup();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void handle() {
        executorService.submit(this::syncRun);
     }

     public synchronized void syncRun() {
         if (read==0) {
             try {
                 while (socketChannel.read(byteBuffer) > 0) {
                     System.out.println("读的内容是"+ new String(byteBuffer.array(),0,byteBuffer.position()));
                 }
                 byteBuffer.flip();
                 System.out.println("读取"+byteBuffer.position()+","+byteBuffer.limit());
                 selectionKey.interestOps(SelectionKey.OP_WRITE);

                 read=1;
                 selector.wakeup();
             } catch (IOException e) {
                 try {
                     socketChannel.close();
                 } catch (IOException ex) {

                 }
             }
         }else {
             try {
                 while (socketChannel.write(byteBuffer) > 0) {

                 }

                 byteBuffer.clear();

                 selectionKey.interestOps(SelectionKey.OP_READ);
                 read=0;
                 selector.wakeup();
             } catch (Exception e) {
                 try {
                     socketChannel.close();
                 } catch (IOException ex) {

                 }
             }
         }
     }
}

public class SelectorThread implements Runnable {

    Selector selector;

    int index;

    public SelectorThread(Selector selector,int index) {
        this.selector = selector;
        this.index = index;
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
//                System.out.println(index+"正在运行");
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();

                Iterator<SelectionKey> iterator = selectionKeys.iterator();

                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();

                    Handler handler = (Handler) selectionKey.attachment();

                    if (handler != null) {
                        handler.handle();
                    }
                }

                selectionKeys.clear();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
}


public class MultiEchoServer {

    public static void main(String[] args) {
        try {
            startServer();
        }catch (Exception e) {
            e.printStackTrace();
        }
    }


    public static void startServer() throws Exception {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

        serverSocketChannel.configureBlocking(false);

        serverSocketChannel.bind(new InetSocketAddress("localhost",10700));

        Selector[] selectors = new Selector[2];

        SelectorThread[] selectorThreads = new SelectorThread[2];

        for (int i = 0; i < 2; i++) {
            selectors[i] = Selector.open();
        }

        ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

        SelectionKey selectionKey = serverSocketChannel.register(selectors[0], 0);
        selectionKey.attach(new MultiAcceptorHandler(serverSocketChannel,selectors[1],executorService));
        selectionKey.interestOps(SelectionKey.OP_ACCEPT);

        for (int i = 0; i < 2; i++) {
            selectorThreads[i] = new SelectorThread(selectors[i],i);
            new Thread(selectorThreads[i]).start();
        }

		//        Thread closeThread = new Thread(() -> {
		//            try {
		//                executorService.awaitTermination(5, TimeUnit.SECONDS);
		//
		//                executorService.shutdown();
		//                serverSocketChannel.close();
		//
		//                for (int i= 0; i < 2; i++) {
		//                    selectors[i].close();
		//                }
		//            } catch (Exception e) {
		//                e.printStackTrace();
		//            }
		//        });
		//
		//        Runtime.getRuntime().addShutdownHook(closeThread);
			}
		}

Selector[] selectors = new Selector[2]使用了两个Selector,第一个用于接收客户端的连接,注册的Handle是MultiAcceptorHandler;接收连接后将处理IO注册到第二个Selector, 第二个用于处理IO读取和写入,注册的Handle是MultiIOHandler.每个连接的IO处理也是用线程池处理的.

注意:在MultiIOHandler不能用selectionKey.isReadable()来判断是否是可读,增加了read标志来判断.

优缺点

优点

1.响应快,虽然Reactor线程是同步的,但是不会被IO操作所阻塞
2.编程简单
3.可扩展,可以加多线程充分利用CPU资源

缺点

1.有一定复杂性
2.依赖操作系统支持
3.同一个Handle中出现长时间读写,会造成Reactor线程的其他通道的IO处理

标签:Reactor,模式,selector,socketChannel,byteBuffer,selectionKey,serverSocketChannel,pu
From: https://www.cnblogs.com/shigongp/p/18438134

相关文章

  • 设计模式之模板方法模式
    模板方法模式模板方法模式是一种行为型设计模式,它定义了一个操作中的算法的框架,并将一些步骤的执行延迟到子类中。通过这种方式,模板方法使得子类可以在不改变算法的结构的情况下,重定义算法中的某些特定步骤。核心组成:抽象类(AbstractClass):这个抽象类包含模板方法本身,同时也可......
  • 软件设计模式概述
    概述软件设计内容软件体系结构—宏观设计,模块软件设计模式—中间级别,类,接口,模块数据结构与算法—微观设计,方法是什么设计经验的总结七个常用原则单一职责原则就一个类而言,应该仅有一个引起他变化的原因为什么当一个类职责过多,一个职责的变化可能影响......
  • C++设计模式
    C++设计模式提供了一些常见的问题和解决方案,帮助我们写出更清晰、更高效、更容易维护的代码。1.单例模式:就像整形诊所里有一个特别的规定,全世界只能有一个人拥有某款独一无二的鼻子,其他人都不能复制。(确保一个类只有一个实例,并提供一个全局访问点。)classSingleton{priva......
  • 结构型设计模式详解与对比:优化你的系统设计
    结构型设计模式(StructuralDesignPatterns)主要关注类和对象的组合,以形成更大的结构。它们通过识别实体之间的关系,帮助设计师确保系统的各个部分能够协同工作。以下是几种常见的结构型设计模式及其详细介绍和对比:一,代理模式(ProxyPattern)用途:为另一个对象提供一个替身......
  • 商场促销——策略模式
    文章目录商场促销——策略模式商场收银软件增加打折简单工厂实现策略模式策略模式实现策略与简单工厂结合策略模式解析商场促销——策略模式商场收银软件时间:2月27日22点地点:大鸟房间人物:小菜、大鸟“小菜,给你出个作业,做一个商场收银软件,营业员根据客户所......
  • 如果再回到从前——备忘录模式
    文章目录如果再回到从前——备忘录模式如果再给我一次机会……游戏存进度备忘录模式备忘录模式基本代码游戏进度备忘如果再回到从前——备忘录模式如果再给我一次机会……时间:5月6日18点地点:小菜、大鸟住所的客厅人物:小菜、大鸟"小菜,今天上午看NBA了吗?"大......
  • 开源链动2+1模式AI智能名片小程序源码:放大特色,构建独特竞争力
    摘要:本文探讨在当今社会背景下,开源链动2+1模式AI智能名片小程序源码如何通过坚持与众不同来构建独特竞争力。在信息传播便捷但个体易同质化的时代,拥有特色是脱颖而出的关键,而这种模式下的小程序源码具有独特的发展潜力。一、引言当今社会,一方面互联网使信息传播便捷高效,另一......
  • 多线程下单例模式延迟初始化的实现
    多线程下单例模式延迟初始化的实现前言synchronized修饰方法双重检查锁定双重检查锁定的问题基于volatile的双重锁定方案类初始化总结前言在程序开发中,存在一些开销较高的资源,例如数据库连接等,我们使用单例模式保证其唯一并且进行延迟初始化,只有当使用的时候才进行......
  • keepalived+nginx实现高可用的案例详解(主主模式)
    文章目录前言keepalived主备模式和主主模式有什么区别1.主备模式(Master-BackupMode)2.主主模式(Active-ActiveMode或DualMasterMode)主备模式vs主主模式的区别总结:环境案例实现具体步骤ngx1ngx2验证前言keepalived主备模式和主主模式有什么区别Keepali......
  • RabbitMQ五种工作模式的代码具体实现(代码简单易懂)(一)
    一、简单队列模式:直接把消息发送给队列,且队列只有一个消费者。使用的是Rabbitmq默认的交换机。生产者部分:<!--amqp依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</art......