首页 > 编程语言 >Tomcat长轮询原理与源码解析

Tomcat长轮询原理与源码解析

时间:2023-04-16 19:24:44浏览次数:58  
标签:Tomcat tomcat 轮询 配置 源码 线程 import 请求

系列文章目录和关于我

零丶长轮询的引入

最近在看工作使用到的diamond配置中心原理,发现大多数配置中心在推和拉模型上做的选择出奇的一致选择了基于长轮询的拉模型

  • 基于拉模型的客户端轮询的方案
    客户端通过轮询方式发现服务端的配置变更事件。轮询的频率决定了动态配置获取的实时性。

    • 优点:简单、可靠。
    • 缺点:应用增多时,较高的轮询频率给整个配置中心服务带来巨大的压力。

    另外,从配置中心的应用场景上来看,是一种写少读多的系统,客户端大多数轮询请求都是没有意义的,因此这种方案不够高效。

  • 基于推模型的客户端长轮询的方案

    基于Http长轮询模型,实现了让客户端在没有发生动态配置变更的时候减少轮询。这样减少了无意义的轮询请求量,提高了轮询的效率;也降低了系统负载,提升了整个系统的资源利用率。

一丶何为长轮询

长轮询 本质上是原始轮询技术的一种更有效的形式。

它的出现是为了解决:向服务器发送重复请求会浪费资源,因为必须为每个新传入的请求建立连接,必须解析请求的 HTTP 头部,必须执行对新数据的查询,并且必须生成和交付响应(通常不提供新数据)然后必须关闭连接并清除所有资源。

  • 从tomcat服务器的角度就是客户端不停请求,每次都得解析报文封装成Request,Response对象,并且占用线程池中的一个线程。
  • 并且每次轮询都要进行tcp握手,挥手,网卡发起中断,操作系统处理中断从内核空间拷贝数据到用户空间,一通忙活服务端返回 配置未修改(配置中心没有修改配置,客户端缓存的配置和配置中心一致,所以是白忙活)

长轮询是一种服务器选择尽可能长的时间保持和客户端连接打开的技术仅在数据变得可用或达到超时阙值后才提供响应而不是在给到客户端的新数据可用之前,让每个客户端多次发起重复的请求

image-20230416133047669

简而言之,就是服务端并不是立马写回响应,而是hold住一段时间,如果这段时间有数据需要写回(例如配置的修改,新配置需要写回)再写回,然后浏览器再发送一个新请求,从而实现及时性,节省网络开销的作用。

image-20230416145221280

二丶使用等待唤醒机制写一个简单的“长轮询”(脱裤子放屁)

package com.cuzzz.springbootlearn.longpull;

import org.springframework.beans.factory.InitializingBean;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

@RestController
@RequestMapping("long-pull")
public class MyController implements InitializingBean {

    /**
     * 处理任务的线程
     */
    private ThreadPoolExecutor processExecutor;
    /**
     * 等待唤醒的锁
     */
    private static final ReentrantLock lock = new ReentrantLock();
    /**
     * 当请求获取配置的时候,在此condition上等待一定时间
     * 当修改配置的时候通过这个condition 通知其他获取配置的线程
     */
    private static final Condition condition = lock.newCondition();

    @GetMapping
    public void get(HttpServletRequest request, HttpServletResponse response) throws ExecutionException, InterruptedException {
        //组转成任务
        Task<String> task = new Task<String>(request, response,
                () -> "拿配置" + System.currentTimeMillis());
        //提交到线程池
        Future<?> submit = processExecutor.submit(task);
        //tomcat线程阻塞于此
        submit.get();
    }

    /**
     * 模拟修改配置
     *
     * 唤醒其他获取配置的线程
     */
    @PostMapping
    public String post(HttpServletRequest request, HttpServletResponse response) {
        lock.lock();
        try {
            condition.signalAll();
        }finally {
            lock.unlock();
        }
        return "OK";
    }


    static class Task<T> implements Runnable {
        private HttpServletResponse response;
        /**
         * 等待时长
         */
        private final long timeout;
        private Callable<T> task;

        public Task(HttpServletRequest request, HttpServletResponse response, Callable<T> task) {
            this.response = response;

            String time = request.getHeader("time-out");
            if (time == null){
                //默认等待10秒
                this.timeout = 10;
            }else {
                this.timeout = Long.parseLong(time);
            }
            this.task = task;
        }


        @Override
        public void run() {
            lock.lock();
            try {

                //超市等待
                boolean await = condition.await(timeout, TimeUnit.SECONDS);
                //超时
                if (!await) {
                    throw new TimeoutException();
                }
                //获取配置
                T call = task.call();
                //写回
                ServletOutputStream outputStream = response.getOutputStream();
                outputStream.write(("没超时拿当前配置:" + call).getBytes(StandardCharsets.UTF_8));
            } catch (TimeoutException | InterruptedException exception) {
                //超时或者线程被中断
                try {
                    ServletOutputStream outputStream = response.getOutputStream();
                    T call = task.call();
                    outputStream.write(("超时or中断拿配置:" + call).getBytes(StandardCharsets.UTF_8));
                } catch (Exception ex) {
                    throw new RuntimeException(ex);
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
        }
    }


    @Override
    public void afterPropertiesSet() {

        int cpuNums = Runtime.getRuntime().availableProcessors();

        processExecutor
                = new ThreadPoolExecutor(cpuNums, cpuNums * 2, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy());
    }
}

使用get方法反问的请求回被提交到线程池进行await等待,使用post方法的请求回唤醒这些线程。

但是这个写法有点脱裤子放屁

image-20230416143924564

为什么会出现这种情况,直接提交到线程池异步执行不可以么,加入我们删除上面submit.get方法会发现其实什么结果都不会,这是因为异步提交到线程池后,tomcat已经结束了这次请求,并没有维护这个连接,所以没有办法写回结果。

如果不删除这一行,tomcat线程阻塞住我们可以写回结果,但是其实没有达到配置使用长轮询的初衷——"解放tomcat线程,让配置中心服务端可以处理更多请求"。

image-20230416145058166

所以我们现在陷入一个尴尬的境地,怎么解决昵?看下去

三丶Tomcat Servlet 3.0长轮询原理

1.AsyncContext实现长轮询

package com.cuzzz.springbootlearn.longpull;

import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

@RestController
@RequestMapping("long-pull3")
public class MyController2 {

    private static final ScheduledExecutorService procesExecutor
            = Executors.newSingleThreadScheduledExecutor();
    /**
     * 记录配置改变的map
     */
    private static final ConcurrentHashMap<String, String> configCache
            = new ConcurrentHashMap<>();
    /**
     * 记录长轮询的任务
     */
    private static final ConcurrentLinkedDeque<AsyncTask> interestQueue
            = new ConcurrentLinkedDeque<>();

    static {
        //每2秒看一下释放配置变更,或者任务超时
        procesExecutor.scheduleWithFixedDelay(() -> {
            List<AsyncTask>needRemove  = new ArrayList<>();
            for (AsyncTask asyncTask : interestQueue) {
                if (asyncTask.timeout()) {
                    asyncTask.run();
                    needRemove.add(asyncTask);
                    continue;
                }
                if (configCache.containsKey(asyncTask.configId)) {
                    needRemove.add(asyncTask);
                    asyncTask.run();
                }
            }
            interestQueue.removeAll(needRemove);
        }, 1, 2, TimeUnit.SECONDS);
    }


    static class AsyncTask implements Runnable {
        private final AsyncContext asyncContext;
        private final long timeout;
        private static long startTime;
        private String configId;

        AsyncTask(AsyncContext asyncContext) {
            this.asyncContext = asyncContext;
            HttpServletRequest request = (HttpServletRequest) asyncContext.getRequest();
            String timeStr = request.getHeader("time-out");
            if (timeStr == null) {
                timeout = 10;
            } else {
                timeout = Long.parseLong(timeStr);
            }
        	//关注的配置key,应该getParameter的,无所谓
            this.configId = request.getHeader("config-id");
            if (this.configId == null) {
                this.configId = "default";
            }
            
            //开始时间
            startTime = System.currentTimeMillis();
        }
		
        //是否超时
        public boolean timeout() {
            return (System.currentTimeMillis() - startTime) / 1000 > timeout;
        }

        @Override
        public void run() {
		
            String result = "开始于" + System.currentTimeMillis() + "--";
            try {
                if (timeout()) {
                    result = "超时: " + result;
                } else {
                    result += configCache.get(this.configId);
                }

                result += "--结束于:" + System.currentTimeMillis();
                ServletResponse response = asyncContext.getResponse();
                response.getOutputStream().write(result.getBytes(StandardCharsets.UTF_8));
                
                //后续将交给tomcat线程池处理,将给客户端响应
                asyncContext.complete();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }

        }

    }


    @GetMapping
    public void get(HttpServletRequest request, HttpServletResponse response) {
        //打印处理的tomcate线程id
        System.out.println("线程id" + Thread.currentThread().getId());
        //添加一个获取配置的异步任务
        interestQueue.add(new AsyncTask(asyncContext));
        //开启异步
        AsyncContext asyncContext = request.startAsync();
        asyncContext.setTimeout(0);
        //监听器打印最后回调的tomcat线程id
        asyncContext.addListener(new AsyncListener() {
            @Override
            public void onComplete(AsyncEvent event) throws IOException {
                System.out.println("线程id" + Thread.currentThread().getId());
            }
            //...剩余其他方法
        });
        
        //立马就会释放tomcat线程池资源
        System.out.println("tomcat主线程释放");
    }

    @PostMapping
    public void post(HttpServletRequest request) {
        String c = String.valueOf(request.getParameter("config-id"));
        if (c.equals("null")){
            c = "default";
        }
        String v = String.valueOf(request.getParameter("value"));
        configCache.put(c, v);
    }
}

image-20230416163659265

image-20230416164009702

上面演示利用AsyncContext tomcat是如何实现长轮询

这种方式的优势在于:解放了tomcat线程,其实tomcat的线程只是运行了get方法中的代码,然后立马可以去其他请求,真正获取配置更改的是我们的单线程定时2秒去轮询。

image-20230416170528139

2.实现原理

2.1 tomcat处理一个请求的流程

  • Connector是客户端连接到Tomcat容器的服务点,它提供协议服务来将引擎与客户端各种协议隔离开来

    在Connector组件中创建了Http11NioProtocol组件,Http11NioProtocol默认持有NioEndpoin,NioEndpoint中持有Acceptor和Poller,并且启动的时候会启动一个线程运行Acceptor

  • Acceptor服务器端监听客户端的连接,会启动线程一直执行

    image-20230416173102328

    每接收一个客户端连接就轮询一个Poller组件,添加到Poller组件的事件队列中。,每接收一个客户端连接就轮询一个Poller组件,添加到Poller组件的事件队列中。

  • Poller组件持有多路复用器selector,poller组件不停从自身的事件队列中将事件取出注册到自身的多路复用器上,同时多路复用器会不停的轮询检查是否有通道准备就绪,准备就绪的通道就可以扔给tomcat线程池处理了。

    image-20230416173513349

    image-20230416173724384

  • tomcat线程池处理请求

    • 这里会根据协议创建不同的Processor处理,这里创建的是Http11Processor,Http11Processor会使用CoyoteAdapter去解析报文随后交给Container去处理请求

    • CoyoteAdapter解析报文随后交给Container去处理请求

      image-20230416175001392

    • Container会将Filter和Servlet组装成FilterChain依次调用

      image-20230416180033573

    • FilterChain会依次调用Filter#doFilter,然后调用Servlet#service方法

      至此会调用到Servlete#service方法,SpringMVC中的Dispatcher会反射调用我们controller的方法

2.2 AsyncContext 如何实现异步

2.2.1 request.startAsync() 修改异步状态机状态为Starting

AsycContext内部持有一个AsyncStateMachine来管理异步请求的状态(有点状态模式的意思)

状态机的初始状态是AsyncState.DISPATCHED,通过setStarted将状态机的状态更新成STARTING

image-20230416181604623

2.2.2 AbstractProtocol启动定时任务处理超时异步请求

Connector启动的时候触发ProtocolHandler的start方法,如下

image-20230416182047418

其中startAsyncTimeout方法会遍历waitingProcessors中每一个Processor的timeoutAsync方法,这里的Processor就是Http11Processor

image-20230416182324701

那么waitProcessors中的Http11Processor是谁塞进去的昵?

tomcat线程在执行完我们的Servlet代码后,Http11NioProtocol会判断请求状态,如果为Long那么会塞到waitProcessors集合中。

如果发现请求超时,那么会调用Http11Processor#doTimeoutAsycn然后由封装的socket通道socketWrapper以TIMEOUT的事件类型重新提交到tomcat线程池中。

image-20230416183626153

2.2.3 AsyncContext#complete触发OPEN_READ事件

image-20230416184726098

可以看到其实和超时一样,只不过超时是由定时任务线程轮询来判断,而AsyncContext#complete则是我们业务线程触发processSocketEvent将后续处理提交到tomcat线程池中。

四丶长轮询的优点和缺点

本文学习了长轮询和tomcat长轮询的原理,可以看到这种方式的优点

  • 浏览器长轮询的过程中,请求并没有理解响应,而是等到超时或者有需要返回的数据(比如配置中心在这个超时事件内发送配置的变更)才返回,解决了短轮询频繁进行请求网络开销的问题,减少了读多写少业务情景下无意义请求。
  • 真是通过这种方式,减少了无意义的请求,而且释放了tomcat线程池中的线程,使得我们服务端可以支持更多的客户端(因为业务逻辑是放在其他的线程池执行的,而且对于配置中心来说,可以让多个客户端的长轮询请求由一个线程去处理,原本是一个请求一个tomcat线程处理,从而可以支持更多的请求)

当然这种方式也是有缺点的

  • hold住请求也是会消耗资源的,如果1w个请求同时到来,我们都需要hold住(封装成任务塞到队列)这写任务也是会占用内存的,而短轮询则会立马返回,从而时间资源的释放

  • 请求先后顺序无法保证,比如轮询第五个客户端的请求的时候,出现了配置的变更,这时候第五个请求会被提交到tomcat线程池中,从而早于前面四个请求得到响应,这对于需要严格有序的业务场景是有影响的

  • 多台实例监听配置中心实例,出现不一致的情况

    比如配置中心四台实例监听配置变更,前三台可能响应了得到V1的配置,但是轮询到第四台实例的请求的时候又发生了变更可能就得到了v2的配置,这时候这四台配置不一致了。需要保证这种一致性需要我们采取其他的策略,比如配置中心服务端主动udp推,或者加上版本号保证这四台配置一致。

标签:Tomcat,tomcat,轮询,配置,源码,线程,import,请求
From: https://www.cnblogs.com/cuzzz/p/17323848.html

相关文章

  • vue2源码-六、根据render函数生成vnode
    根据render函数生成vnode上文介绍上面已经将模板编译成了render函数,下面就要使用render函数,从而完成渲染的操作:首先,根据render函数生成虚拟节点;然后根据虚拟节点+真实数据生成真实节点。实现mountComponent方法,完成渲染虚拟节点生成封装vm._render方法。Vue.proto......
  • DAPLink源码生成Keil工程并编译成功——笔记(实践篇)
    本文介绍使用DAP源码生产Keil工程的步骤。一、前期准备工作以下1~4为步骤:1.安装Python3(https://www.python.org/downloads/),并添加至路径PATH,此处忘截图了,总之看见pip、alluser、addtoPATH之类的就勾选。(网上也有些帖子说暂时不支持Python3要用Python2.7的,本人实测Pyt......
  • eureka源码简单剖析-服务端(服务续约接口)
           ......
  • eureka源码简单剖析-服务端(服务注册接口-作用是客户端的注册服务)
    本部分讲的是客户端的一些服务注册要注册中心,就是服务的提供者将服务注册到注册中心,方便消费者拿到需要的服务  peer是集群的模式 然后看下这个super.register(info,leaseDuration,isReplication);   日常学习使用的一般是eureka单机模式,企业使用都是eureka......
  • r0tracer 源码分析
    使用方法修改r0tracer.js文件最底部处的代码,开启某一个Hook模式。functionmain(){Java.perform(function(){console.Purple("r0tracerbegin...!")//0.增加精简模式,就是以彩虹色只显示进出函数。默认是关闭的,注释此行打开精简模式。//is......
  • eureka源码简单剖析-服务端(服务接口暴露策略)
    下面来看下服务接口暴露的策略。其中服务端使用了Jersey框架,而Jersey框架是一个发布restful风格接口的框架,类似我们使用的springmvc, 然后下面看下jersey部分    以上就是服务接口暴露的相关策略部分......
  • Visual Studio Code开发常用的工具栏选项,查看源码技巧以及【vscode常用的快捷键】
    一、开发常用的工具栏选项1、当前打开的文件快速在左侧资源树中定位:其实打开了当前的文件已经有在左侧资源树木定位了,只是颜色比较浅2、打开太多文件的时候,可以关闭3、设置查看当前类或文件的结构OUTLINE相当于idea查看当前类或接口的结构Structure二、查看......
  • OPCUA实践之winnt服务源码分享
    前言孔乙己显出极高兴的样子,将两个指头的长指甲敲着柜台,点头说:“对呀,对呀!......OPCUA,你用过么?”大家好,我是44岁的大龄程序员码农阿峰。离开上一个项目近半年了,这时当时在项目做的3个winnt服务,算是OPCUA的初次使用,代码并没有什么出彩的地方,却是能正常运行,而且工作了近1年的时......
  • 【CVE-2017-12615】Tomcat 远程代码执行漏洞复现
    0x00环境搭建用vulhub的环境查看配置文件conf/web.xml中readonly的设置0x01漏洞复现访问主页,抓包后修改数据包可通过PUT方式创建一个JSP文件。虽然Tomcat对文件后缀有一定检测(不能直接写jsp),但我们使用一些文件系统的特性(如Linux下可用/)来绕过了限制。改完包的时候......
  • IDEA进入一个类的源码
    进入一个类的源码按住Ctrl后单击类名即可进入类的源码单击Structure即可看到类里的方法......