首页 > 其他分享 > Reactor 模式与Tomcat中的Reactor

Reactor 模式与Tomcat中的Reactor

时间:2023-06-23 17:11:47浏览次数:45  
标签:Reactor Tomcat 处理 模式 Handler 线程 事件 IO

系列文章目录和关于我

参考:[nio.pdf (oswego.edu)](https://gee.cs.oswego.edu/dl/cpjslides/nio.pdf)

一丶什么是Reactor

The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers.


Reactor模式是一种用于处理高并发的设计模式,也被称为事件驱动模式。在这种模式中,应用程序会将输入事件交给一个事件处理器,称为Reactor,Reactor会监听所有输入事件,并将它们分发给相应的处理程序进行处理。这种模式可以大大提高应用程序的性能和可扩展性,因为它使用了非阻塞I/O和异步处理技术,使得一个进程可以同时处理多个事件,而不会因为某个事件的处理时间过长而影响其他事件的处理。Reactor模式被广泛应用于网络编程和操作系统级别的事件驱动程序。

二丶为什么需要Reactor

1.传统BIO

在传统BIO模式中有多少个客户端请求,就需要多少个对于的线程进行一对一的处理。

这种模型有如下缺点:

  • 同步阻塞IO,读写阻塞,大量线程挂起
  • 指定线程数的时候,只能依据系统的cpu核心数,无法根据并发请求数来指定。
  • 大量线程导致上下文切换开销大,线程占用内存大。

2.NIO

Java NIO 带来非阻塞IO,和IO多路复用。

得益于非阻塞IO和IO多路复用,让服务可以处理更多的并发请,不再受限于一个客户端一个线程来处理,而是一个线程可以维护多个客户端。

image-20230611135115721

可以看到java 中NIO有点reactor的意思:

Selector多路复用器监听IO事件进行分发,针对连接事件,读写事件进行不同的处理。


Reactor核心是Reactor加上对应的处理器Handler,Reactor在一个单独的线程中运行,负责监听和分发事件,将接收到的事件交给不同的Handler来处理,Handler是处理程序执行I/O事件的实际操作。

  • 高并发:Reactor模式可以在同一时间内处理大量的客户端请求,提高了系统的并发处理能力。得益于Java NIO 非阻塞IO 于 IO多路复用
  • 可扩展性:Reactor模式可以很容易地扩展到更多的处理器,以满足更高的并发量。
  • 编码简单:Reactor模式可以使编码更加简单明了,因为它将不同的事件分离开来处理,降低了代码的复杂度。例如Netty就使用了Reactor模式,程序员只需要写如何处理事件
  • 效率高:Reactor模式采用非阻塞I/O和异步处理技术,可以使得一个进程可以同时处理多个事件,而不会因为某个事件的处理时间过长而影响其他事件的处理,从而提高了系统的效率。
  • 可移植性好:Reactor模式可以很方便地移植到不同的平台上,因为它遵循了标准的Java NIO接口,可以在不同的操作系统上实现。

三丶Reactor模型于简单代码实现

1.单Reactor单线程模型

image-20230623141616010

这个模型诠释了Reactor模式的组成部分:

  • Reactor 负责分离套接字,对于触发connect的io事件交给Acceptor处理,对于IO读写事件交给Handler处理
  • Acceptor负责创建Handler,将Handler和socketChannel进行绑定,当socketChannel读事件触发后,Reactor进行分发给对应Handler处理。
public class Reactor implements Runnable {
    
    //多路复用器
    final Selector selector;
	//服务端Channel
    final ServerSocketChannel serverSocket;

    Reactor(int port) throws IOException {
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(
                new InetSocketAddress(port));
        serverSocket.configureBlocking(false);
        // 注册io多路复用器连接事件
        SelectionKey sk =
                serverSocket.register(selector,
                        SelectionKey.OP_ACCEPT);
        // 将服务端Channel 关联一个Acceptor
        sk.attach(new Acceptor());
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                selector.select();
                Set<SelectionKey> selected = selector.selectedKeys();
                Iterator<SelectionKey> it = selected.iterator();
                while (it.hasNext())
                    //分发
                    dispatch(it.next());
                selected.clear();
            }
        } catch (IOException ex) { /* ... */ }
    }

    void dispatch(SelectionKey k) {
        // 拿到关联的acceptor 或者handler
        Runnable r = (Runnable) (k.attachment());
        if (r != null)
            r.run();
    }
	
    //内部类 负责处理连接事件
    class Acceptor implements Runnable {
        public void run() {
            try {
                // 拿到Channel
                SocketChannel c = serverSocket.accept();
                if (c != null)
                    // 创建handler
                    new Handler(selector, c);
            } catch (IOException ex) { /* ... */ }
        }
    }

    final class Handler implements Runnable {
        final SocketChannel socket;
        final SelectionKey sk;
        ByteBuffer input = ByteBuffer.allocate(1024);
        ByteBuffer output = ByteBuffer.allocate(1024);
        static final int READING = 0, SENDING = 1;
        int state = READING;

        //设置非阻塞
        //监听可读事件
        Handler(Selector sel, SocketChannel c)
                throws IOException {
            socket = c;
            c.configureBlocking(false);
            sk = socket.register(sel, 0);
            sk.attach(this);
            sk.interestOps(SelectionKey.OP_READ);
            sel.wakeup();
        }

        boolean inputIsComplete() {
            return false;
        }

        boolean outputIsComplete() {
            return false;
        }

        void process() {
        }

        public void run() {
            try {
                //如果可读
                if (state == READING) read();
				//如果可写
                else if (state == SENDING) send();
            } catch (IOException ex) { /* ... */ }
        }

        void read() throws IOException {
            socket.read(input);
            if (inputIsComplete()) {
                process();
                state = SENDING;
                sk.interestOps(SelectionKey.OP_WRITE);
            }
        }

        void send() throws IOException {
            socket.write(output);
            if (outputIsComplete()) sk.cancel();
        }
    }
}

可以看到Reactor模式将Channel和Acceptor,Handler进行绑定依赖于SelectionKey#attach方法,通过此方法在不同的事件发生时调用SelectionKey#attachment方法,获取到对应的处理程序进行处理。

Reactor由单线程运行,通过IO多路复用Selector监听多个事件是否就绪,得益于Channel提供的非阻塞IO能力,当IO没有就绪的时候,单线程不会阻塞而是继续处理下一个。

由于其单线程的原因,无法利用计算机多核心资源,并且如果读取请求内容处理的过程存在耗时操作(比如数据库,rpc等)那么回导致下一个事件得不到快速的响应。

2.单Reactor多线程模型

引入多线程解决单线程Reactor的不足

image-20230623154401204

可以看到多线程模型引入了线程池,对于就绪的可读,可写IO事件交给线程池进行处理。

主要是对单线程模型中的Handler进行改造,将处理逻辑提交到线程池中。

image-20230623155059811

多线程模型涉及到共享资源的使用,不如读写Channel依赖的Buffer如何分配。

可以看到多线程模型的缺点:线程通信和同步逻辑复杂,需要处理多线程安全问题。

3.多Reactor多线程模型

image-20230623154855490

在这种模型中,mainReactor负责处理连接建立事件,只需要一个线程即可。subReactor负责和建立连接的socket进行数据交互并处理业务逻辑,并且每一个subReactor可持有一个独立的Selector进行IO多路复用事件监听。

// SubReactor 池子,负责负载均衡的选择SubReactor
public class SubReactorPool {
    final static SubReactor[] subReactors;
    static final AtomicInteger count = new AtomicInteger();
    static {
        int availableProcessors = Runtime.getRuntime().availableProcessors();
        subReactors = new SubReactor[availableProcessors];
        for (int i = 0; i < subReactors.length; i++) {
            subReactors[i] = new SubReactor();
        }
    }

    static class SubReactor implements Runnable{
        // 业务处理线程池
        final static Executor poolExecutor = Executors.newCachedThreadPool();
		// io多路复用
        Selector selector;
		
        SubReactor()  {
            try {
                selector = Selector.open();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        public void registry(SocketChannel socketChannel) throws ClosedChannelException {
            socketChannel.register(selector,SelectionKey.OP_READ);
        }
        @Override
        public void run() {
            while (true){
                try {
                    selector.select();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()){
                    SelectionKey sk = iterator.next();
                    if (sk.isReadable()) {
                        poolExecutor.execute(()->new Handler(sk));
                    }
                    // 可写,。。。。
                    iterator.remove();
                }

            }
        }

    }
	
    
    //选择合适的SubReactor
    static SubReactor loadBalanceChoose(SocketChannel socketChannel){
        int countInt = count.getAndAdd(1);
        return subReactors[countInt % subReactors.length];
    }
}

多Reactor解决了单个Selector注册连接,读写事件,导致内核轮询的时候需要判断太多fd而效率缓慢的问题。

四丶Tomcat中Reactor

Tomcat请求处理流程与源码浅析 - Cuzzz - 博客园 (cnblogs.com)中,说到Tomcat Connector的设计

image-20230521211759461

其中

  • Endpoint:tomcat中没有这个接口,只有AbstractEndpoint,它负责启动线程来监听服务器端口,并且在接受到数据后交给Processor处理
  • Processor:Processor读取到客户端请求后按照请求地址映射到具体的容器进行处理,这个过程请求映射,Processor实现请求映射依赖于Mapper对象,在容器发生注册和注销的时候,MapperListener会监听到对应的事件,从而来变更Mapper中维护的请求映射信息。
  • ProtocolHandler:协议处理器,针对不同的IO方式(NIO,BIO等)和不同的协议(Http,AJP)具备不同的实现,ProtocolHandler包含一个Endpoint来开启端口监听,并且包含一个Processor用于按照协议读取数据并将请求交给容器处理。
  • Acceptor:Acceptor实现了Runnable接口,可以作为一个线程启动,使用Socket API监听指定端口,用于接收用户请求。
  • Poller:主要用于监测注册在原始 scoket 上的事件是否发生,Acceptor接受到请求后,会注册到Poller的队列中。

下图展示了Acceptor 和 Poller的协作

tomcat

1.Acceptor 等待客户端连接

这一步借助ServerSocketChannel#accept方法,进行等待客户端连接,Acceptor单线程进行监听。

image-20230623163759981

2.Acceptor选择Poller进行注册

这一步设置非阻塞,并且使用计数取模的方式实现多个Poller的负载均衡

image-20230623164254734

然后将事件保证为PollerEvent 提交到Poller的阻塞队列中

image-20230623164512378

3.Poller 轮询阻塞队列中的PollerEvent并注册到Selector上

image-20230623164919962

轮询阻塞队列中的PollerEvent,并且调用run方法,run方法会把事件注册到Poller的Selector上,注意下面的注册将NioSocketWrapper作为attachment进行了绑定

image-20230623164900104

4.Poller中Selector IO多路复用处理事件,并处理事件

image-20230623165432055

tomcat处理事件的时候,会创建出SocketProcessor进行处理,SocketProcessor是一个Runnable,最后会提交到线程池。

image-20230623165618379

标签:Reactor,Tomcat,处理,模式,Handler,线程,事件,IO
From: https://www.cnblogs.com/cuzzz/p/17499364.html

相关文章

  • 2023 跟我一起学设计模式:命令模式
    命令模式亦称:动作、事务、Action、Transaction、Command命令模式是一种行为设计模式,它可将请求转换为一个包含与请求相关的所有信息的独立对象。该转换让你能根据不同的请求将方法参数化、延迟请求执行或将其放入队列中,且能实现可撤销操作。问题假如你正在开发一款新的文字编......
  • 头一次见单例模式讲的如此透彻
    简介单例模式是一种常用的软件设计模式,用于创建类型。通过单例模式的方法创建的类在当前进程中只有一个实例。单例模式的类只能允许一个实例存在。单例模式的作用是保证在整个应用程序的生命周期中,任何一个时刻,单例类的实例都只存在一个。组成部分:私有化构造方法。私有化内部......
  • Tomcat项目不放在webapps下放在别的盘符目录下的配置
    一、项目不放在tomcat下的webapps下,而是放在别的盘符路径下,在service.xml中加以配置即可,配置如下:<HostappBase="webapps"autoDeploy="true"name="localhost"unpackWARs="true"xmlNamespaceAware="false"xmlValidation="false">......
  • 装饰模式-11
    概述装饰模式(DecoratorPattern)又称包装器,与适配器模式别名一样,但使用的目的不同。它动态地给一个对象添加职责,相比于通过继承添加职责更加灵活。也称“油漆工”模式(视翻译而定)。优点:扩展对象功能的同时提高了灵活性,符合“开闭原则”。缺点:增加系统的复杂性。interfaceCo......
  • Android四种Activity的加载模式
    建议首先阅读下面两篇文章,这样才可以更好的理解Activity的加载模式:Android的进程,线程模型其中对“Android的单线程模型”的描述,明白Activity的一些注意事项。AndroidApplicationTaskActivities的关系尤其要明白Task是啥。 一个Activty的生命周期Activty的生命周期的也......
  • RAW域算法之固定模式噪声消除FPN
    固定模式噪声消除(FixedPatternNoiseRemove)由于Sensor工艺的原因导致了Sensor会在固定的位置产生相对固定的随时间变化较小的噪声,称之为固定模式噪声。固定模式噪声一般出现于CMOSSensor,并且Sensor的模拟增益或者列增益开的越大,固定模式噪声越明显。图12增益增大时......
  • Tomcat作用
    转载自tomcat到底是干嘛的-Cherishforchen-博客园(cnblogs.com)暂时不是很理解,先记下,之后再补充。这个软件用来装载你的javaweb程序,可以称它为web容器,你的jsp/servlet程序需要运行在Web容器上,Web容器有很多种,JBoss、WebLogic等等,Tomcat是其中一种。tomcat是一种web服务器,也......
  • hiredis的同步模式和异步模式
    1.什么是hiredisHiredis是一个C语言编写的Redis客户端库,用于与Redis数据库进行交互。它提供了一个简洁而高效的接口,使开发人员可以方便地在自己的C/C++项目中使用Redis。Hiredis是一个开源项目,可从其官方GitHub仓库获取源代码,并在符合BSD许可证的条件下使用和分......
  • 智能化制造:一种新的生产模式
    目录1.引言2.技术原理及概念3.实现步骤与流程4.应用示例与代码实现讲解5.优化与改进6.结论与展望智能化制造:一种新的生产模式随着人工智能技术的不断发展,智能化制造成为了现代制造业的一个重要趋势。智能化制造能够实现生产过程中的自动化、智能化和数据化,提高生产效率和......
  • 20230430 28. 访问者模式 - 男女对比
    介绍访问者模式(Visitor),表示一个作用于某对象结构中的各元素的操作。它使你可以在不改变各元素的类的前提下定义作用于这些元素的新操作访问者模式适用于数据结构相对稳定的系统访问者模式把数据结构和作用于结构上的操作之间的耦合解脱开,使得操作集合可以相对自由地演化。......