首页 > 编程语言 >【Java 并发】【队列应用】【二】Tomcat的NioEndPoint中ConcurrentLinkedQueue 的使用

【Java 并发】【队列应用】【二】Tomcat的NioEndPoint中ConcurrentLinkedQueue 的使用

时间:2024-02-11 16:00:11浏览次数:33  
标签:... Java Tomcat NioEndPoint 队列 线程 接字 Poller socket

1  前言

这一节我们讲解Tomcat的NioEndPoint中ConcurrentLinkedQueue 的使用。

2  Tomcat的容器结构

本节讲解apache-tomcat-7.0.32-src 源码中ConcurrentLinkedQueue 的使用。 首先介绍 Tomcat 的容器结构以及NioEndPoint的作用,以便后面能够更加平滑地切入话题,如 图11-4 所示是Tomcat的容器结构。

其中,Connector 是一个桥梁,它把Server和Engine连接了起来,Connector的作用 是接受客户端的请求,然后把请求委托给Engine容器处理。在Connector的内部具体使 用Endpoint 进行处理,根据处理方式的不同Endpoint可分为NioEndpoint、JIoEndpoint、 AprEndpoint。本节介绍NioEndpoint 中的并发组件队列的使用。为了让读者更好地理解, 有必要先说下NioEndpoint的作用。首先来看NioEndpoint中的三大组件的关系图(见图 11-5) 。

Acceptor 是套接字接受线程(Socket acceptor thread),用来接受用户的请求,并把 请求封装为事件任务放入Poller的队列,一个Connector里面只有一个Acceptor。

Poller 是套接字处理线程(Socket poller thread),每个 Poller 内部都有一个独有的队 列,Poller 线程则从自己的队列里面获取具体的事件任务,然后将其交给Worker进 行处理。Poller线程的个数与处理器的核数有关,代码如下。

protected int pollerThreadCount = Math.min(2,Runtime.getRuntime(). availableProcessors());

这里最多有2个Poller线程。

Worker 是实际处理请求的线程,Worker只是组件名字,真正做事情的是 SocketProcessor,它是 Poller 线程从自己的队列获取任务后的真正任务执行者。

可见,Tomcat使用队列把接受请求与处理请求操作进行解耦,实现异步处理。其实 Tomcat 中 NioEndPoint 中的每个Poller 里面都维护一个ConcurrentLinkedQueue,用来缓存 请求任务,其本身也是一个多生产者-单消费者模型。

3  生产者——Acceptor线程

Acceptor 线程的作用是接受客户端发来的连接请求并将其放入Poller的事件队列。首 先看下Acceptor处理请求的简明时序图(见图11-6)。

下面分析Acceptor的源码,看其如何把接受的套接字连接放入队列。

protected class Acceptor extends AbstractEndpoint.Acceptor {
    @Override
    public void run() {
        int errorDelay = 0;
        //(1)
        一直循环直到接收到shutdown命令
        while (running) {
            ...
            if (!running) {
                break;
            }
            state = AcceptorState.RUNNING;
            try {
                //(2)如果达到max connections个请求则等待
                countUpOrAwaitConnection();
                SocketChannel socket = null;
                try {
                    //(3)从TCP缓存获取一个完成三次握手的套接字,没有则阻塞
                    socket = serverSock.accept();
                } catch (IOException ioe) {
                        ...
                }
                errorDelay = 0;
                if (running && !paused) {
                    //(4)设置套接字参数并封装套接字为事件任务,然后放入Poller的队列
                    if (!setSocketOptions(socket)) {
                        countDownConnection();
                        closeSocket(socket);
                    }
                } else {
                    countDownConnection();
                    closeSocket(socket);
                }
                   ....
            } catch (SocketTimeoutException sx) {
                ....
            }
            state = AcceptorState.ENDED;
        }
    }
}

代码(1)中的无限循环用来一直等待客户端的连接,循环退出条件是调用了 shutdown 命令。

代码(2)用来控制客户端的请求连接数量,如果连接数量达到设置的阈值,则当前 请求会被挂起。

代码(3)从TCP缓存获取一个完成三次握手的套接字,如果当前没有,则当前线程 会被阻塞挂起。

当代码(3)获取到一个连接套接字后,代码(4)会调用setSocketOptions设置该套接字。

protected boolean setSocketOptions(SocketChannel socket) {
    // 
    处理链接
    try { 
       ...
        //封装链接套接字为channel并注册到Poller队列 
        getPoller0().register(channel);
    } catch (Throwable t) { 
       ...
        return false;
    }
    return true;
}

代码(5)将连接套接字封装为一个channel对象,并将其注册到poller对象的队列。

//具体注册到事件队列 
public void register(final NioChannel socket) { 
    ...
    PollerEvent r = eventCache.poll();
    ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into. 
    if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
    else r.reset(socket,ka,OP_REGISTER);
    addEvent(r);
}
public void addEvent(Runnable event) {
    events.offer(event); 
    ...
}

其中,events 的定义如下:

protected ConcurrentLinkedQueue<Runnable> events = new ConcurrentLinkedQueue<Runnable>();

由此可见,events是一个无界队列ConcurrentLinkedQueue,根据前文讲的,使用队列 作为同步转异步的方式要注意设置队列大小,否则可能造成OOM。当然Tomcat肯定不会 忽略这个问题,从代码(2)可以看出,Tomcat让用户配置了一个最大连接数,超过这个 数则会等待。

4  消费者——Poller线程

Poller线程的作用是从事件队列里面获取事件并进行处理。首先我们从时序图来全局 了解下Poller线程的处理逻辑(见图11-7) 。

同理,我们看一下Poller线程的run方法代码逻辑。

public void run() {
    while (true) {
        try {
        ...
            if (close) {
           ...
            } else {
                //(6)从事件队列获取事件
                hasEvents = events();
            }
            try { 
            ...
            } catch ( NullPointerException x ) {...
            }
            Iterator<SelectionKey> iterator =
                    keyCount > 0 ? selector.selectedKeys().iterator() : null;
            //(7)遍历所有注册的channel并对感兴趣的事件进行处理 
            while (iterator != null && iterator.hasNext()) {
                SelectionKey sk = iterator.next();
                KeyAttachment attachment = (KeyAttachment)sk.attachment();
                if (attachment == null) {
                    iterator.remove();
                } else {
                    attachment.access();
                    iterator.remove();
                    //(8)具体调用SocketProcessor进行处理 
                    processKey(sk, attachment);
                }
            }//while 
       ...
        } catch (OutOfMemoryError oom) { 
        ...
        }
    }//while 
    ...
}

其中,代码(6)从poller的事件队列获取一个事件,events()的代码如下。

public boolean events() {
    boolean result = false;
    //从队列获取任务并执行
    Runnable r = null;
    while ( (r = events.poll()) != null ) {
        result = true;
        try {
            r.run();
            ...
        } catch ( Throwable x ) {
            ...
        }
    }
    return result;
}

这里是使用循环来实现的,目的是为了避免虚假唤醒。

其中代码(7)和代码(8)则遍历所有注册的channel,并对感兴趣的事件进行处理。

public boolean processSocket(NioChannel socket, SocketStatus status, boolean
        dispatch) {
    try { 
         ...
        SocketProcessor sc = processorCache.poll();
        if ( sc == null ) sc = new SocketProcessor(socket,status);
        else sc.reset(socket,status);
        if ( dispatch && getExecutor()!=null ) getExecutor().execute(sc);
        else sc.run();
    } catch (RejectedExecutionException rx) { 
       ...
    } catch (Throwable t) { 
       ...
       return false;
    }
    return true;
}

5  小结

本节通过分析Tomcat中NioEndPoint的实现源码介绍了并发组件ConcurrentLinkedQueue 的使用。NioEndPoint 的思想也是使用队列将同步转为异步,并且由于 ConcurrentLinkedQueue 是无界队列,所以需要让用户提供一个设置队列大小的接口以防 止队列元素过多导致OOM。

标签:...,Java,Tomcat,NioEndPoint,队列,线程,接字,Poller,socket
From: https://www.cnblogs.com/kukuxjx/p/18013373

相关文章

  • 【Java 并发】【队列应用】【一】ArrayBlockingQueue 的使用-Logback异步日志打印
    1 前言看了那么多Java提供的队列工具,那么我们这节开始看看哪些地方用到了这些队列哈。这一节我们讲解logback异步日志打印中ArrayBlockingQueue的使用。2  异步日志打印模型概述在高并发、高流量并且响应时间要求比较小的系统中同步打印日志已经满足不了需求了,这是因为......
  • 【Java 并发】【十】【JUC数据结构】【十】PriorityBlockingQueue 原理
    1 前言这节我们继续看看另一个队列 PriorityBlockingQueue,优先级的哈。2 PriorityBlockingQueue介绍PriorityBlockingQueue是带优先级的无界阻塞队列,每次出队都返回优先级最高或者最低的元素。其内部是使用平衡二叉树堆实现的,所以直接遍历队列元素不保证有序。默认使......
  • 【Azure Function】Azure Function中使用 Java 8 的安全性问题
    问题描述使用AzureFunction,环境是Linux的Java8。目前OracleJavaJDK8,11,17和OpenJDK8/11/17都在存在漏洞受影响版本的范围内。OpenJDK                 CVEnumbers:    CVE‑2023‑21954CVE‑2023‑21938CVE‑2023‑21937CVE......
  • 【Java 并发】【十】【JUC数据结构】【九】ConcurrentLinkedQueue 原理
    1 前言JDK中提供了一系列场景的并发安全队列。总的来说,按照实现方式的不同可分为阻塞队列和非阻塞队列,前者使用锁实现,而后者则使用CAS非阻塞算法实现。这节我们来看看 ConcurrentLinkedQueue。2 ConcurrentLinkedQueue介绍ConcurrentLinkedQueue是线程安全的无界非阻......
  • Java之泛型系列--继承父类与实现多个接口(有示例)
    原文网址:​​Java之泛型系列--继承父类与实现多个接口(有示例)_IT利刃出鞘的博客-CSDN博客​​简介本文介绍java如何用泛型表示继承父类并实现多个接口。用泛型表示某个类是某个类的子类或者实现了接口的方法为:<TextendsA&B&C> 用法1:全都是接口。对于本例来说:A、B......
  • JAVA中this和super
    thisthis表示使用的对象本身,可以调用对象的属性和对象的方法以及对象的构造方法(this.x和this.(),其中this.()只能在构造方法的第一行调用且不能和super.()共同使用)使用原因避免属性和方法变量名相同时出现就近原则的冲突使用细节supersuper代表父类的引用,用于访问父类......
  • JAVA构造方法
    构造方法介绍语法使用细节关于在继承中新增的构造方法使用细节1子类必须要调用父类的构造器,完成父类的初始化2父类构造器的调用不限于直接父类!将一直往上追溯直到Object类(顶级父类)3当创建子类对象时,不管使用的是子类的哪个构造器,默认情况下总会调用父类......
  • JAVA的4种访问修饰符
    1、基本介绍补充:1属性和方法可以用四种访问修饰符修饰;类只能用public和默认修饰符修饰,且一个.java文件中只能有一个public修饰的类作为主类,其他类用默认修饰符修饰。2访问权限起作用的情况:①在一个类中定义另一个类的对象,当访问该对象的属性或方法时,修饰符根据同类、同......
  • java中使用opencl操作GPU
    需要管理GPU资源,使用java编写,选用opencl框架,并且选择org.jocl包(<dependency><groupId>org.jocl</groupId><artifactId>jocl</artifactId><version>2.0.5</version></dependency>)。具体opencl原理此处不涉及,仅记录使用java该如何做基本操作。最少要以下几步,详细可以参看:ht......
  • JAVA零钱通面向过程和oop
    编程思想1.每一个小功能对应一个代码块,高内聚低耦合。2.建议先排除不正确情况,而不是对每一个正确情况做一些操作。编程效果源码实现面向过程点击查看代码packagelingqiantong.chuantong;importjava.text.SimpleDateFormat;importjava.util.Date;importjava.uti......