首页 > 编程语言 >Tomcat源码分析使用NIO接收HTTP请求(七)----使用SocketProcessor接收请求

Tomcat源码分析使用NIO接收HTTP请求(七)----使用SocketProcessor接收请求

时间:2022-12-10 14:33:42浏览次数:62  
标签:null return 请求 void socketWrapper public 源码 接收 event

在第一章笔者提到在NioEndpoint类中会使用五个内部类来处理请求。在之前的文章中为了学习的方便只使用了四个类,在本章中就让我们使用 SocketProcessor处理请求。

本章程序目录结构

 

 

 第一步:新建SocketProcessorBase类

public abstract class SocketProcessorBase<S> implements Runnable {
    protected SocketWrapperBase<S> socketWrapper;
    protected SocketEvent event;

    public SocketProcessorBase(SocketWrapperBase<S> socketWrapper, SocketEvent event) {
        reset(socketWrapper, event);
    }


    public void reset(SocketWrapperBase<S> socketWrapper, SocketEvent event) {
        Objects.requireNonNull(event);
        this.socketWrapper = socketWrapper;
        this.event = event;
    }


    @Override
    public final void run() {
        synchronized (socketWrapper) {
            // It is possible that processing may be triggered for read and
            // write at the same time. The sync above makes sure that processing
            // does not occur in parallel. The test below ensures that if the
            // first event to be processed results in the socket being closed,
            // the subsequent events are not processed.
            if (socketWrapper.isClosed()) {
                return;
            }
            doRun();
        }
    }


    protected abstract void doRun();
}
第二步:在NioEndPoint类中新建SocketProcessor类并继承SocketProcessorBase类。在SocketProcessor类中实现doRun方法
protected class SocketProcessor extends SocketProcessorBase<NioChannel> {

        public SocketProcessor(SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) {
            super(socketWrapper, event);
        }

        @Override
        protected void doRun() {
            NioChannel socket = socketWrapper.getSocket();
            SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());

            try {
                int handshake = -1;
                if (key != null) {
                    handshake = 0;
                }
                if (handshake == 0) {
                    Handler.SocketState state = Handler.SocketState.OPEN;
                    if (event == null) {
                        state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
                    } else {
                        state = getHandler().process(socketWrapper, event);
                    }
                    if (state == Handler.SocketState.CLOSED) {
                        //close(socket, key);
                    }
                }

            } catch (Exception e) {
                // ignore
            } finally {
                socketWrapper = null;
                event = null;
            }
        }
    }
第三步:新建AbstractEndpoint类,让NioEndpoint类继承AbstractEndpoint类。在AbstractEndpoint中定义了一个Handler接口,在这接口中
的process方法是我们当下关注的重点。AbstractEndpoint定义了一个processorCache栈属性,processorCache用来保存SocketProcessorBase
,SocketProcessorBase是用来处理请求的,当其处理完请求并不会被销毁而是保存在processorCache中以减少GC。
public abstract class AbstractEndpoint<S> {

    public static interface Handler<S> {

        public enum SocketState {
            OPEN, CLOSED, LONG, ASYNC_END, SENDFILE, UPGRADING, UPGRADED, SUSPENDED
        }

        public SocketState process(SocketWrapperBase<S> socket,
                                   SocketEvent status);
        public Object getGlobal();

        public Set<S> getOpenSockets();

        public void release(SocketWrapperBase<S> socketWrapper);

        public void pause();

        public void recycle();
    }


    protected Handler<S> handler = null;
    public Handler<S> getHandler() { return handler; }
    protected SynchronizedStack<SocketProcessorBase<S>> processorCache;

    protected abstract SocketProcessorBase<S> createSocketProcessor(
            SocketWrapperBase<S> socketWrapper, SocketEvent event);

    private Executor executor = Executors.newFixedThreadPool(5);;
    public Executor getExecutor() {
        return executor;
    }

    public boolean processSocket(SocketWrapperBase<S> socketWrapper,
                                 SocketEvent event, boolean dispatch) {
        try {
            if (socketWrapper == null) {
                return false;
            }
            SocketProcessorBase<S> sc = processorCache.pop();
            if (sc == null) {
                sc = createSocketProcessor(socketWrapper, event);
            } else {
                sc.reset(socketWrapper, event);
            }
            Executor executor = getExecutor();
            if (dispatch && executor != null) {
                executor.execute(sc);
            } else {
                sc.run();
            }
        } catch (Throwable ree) {
            return false;
        }
        return true;
    }
}
第四步:在NioPoint中实现createSocketProcessor抽象方法
protected SocketProcessorBase<NioChannel> createSocketProcessor(
    SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) {
    return new SocketProcessor(socketWrapper, event);
}
第五步:新建ConnectionHandler类并实现Handelr接口
public class ConnectionHandler <S> implements AbstractEndpoint.Handler<S>{
    @Override
    public SocketState process(SocketWrapperBase<S> socket, SocketEvent status) {
        Processor processor = createProcessor();
        processor.service(socket);
        return null;
    }

    @Override
    public Object getGlobal() {
        return null;
    }

    @Override
    public Set<S> getOpenSockets() {
        return null;
    }

    @Override
    public void release(SocketWrapperBase<S> socketWrapper) {

    }

    @Override
    public void pause() {

    }

    @Override
    public void recycle() {

    }

    protected Processor createProcessor() {
        Http11Processor processor = new Http11Processor();
        return processor;
    }
第六步:在NioEndpoint类中的startInternal方法中引用ConnectionHandler。初始化processorCache大小
public void startInternal() {
        try {

            processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, 500);
            handler = new ConnectionHandler<>();
            pollers = new Poller[pollerThreadCount];
            for (int i=0; i<pollers.length; i++) {
                pollers[i] = new Poller();
                Thread pollerThread = new Thread(pollers[i]);
                // 设置线程优先级
                pollerThread.setPriority(5);
                // 设置守护线程
                pollerThread.setDaemon(true);
                pollerThread.start();
            }

            Runnable acceptor = new Acceptor();
            new Thread(acceptor).start();
        } catch (Exception e) {

        }
    }
第七步:新建SynchronizedStack。
public class SynchronizedStack<T> {
    public static final int DEFAULT_SIZE = 128;
    private static final int DEFAULT_LIMIT = -1;

    private int size;
    private final int limit;

    private int index = -1;

    private Object[] stack;


    public SynchronizedStack() {
        this(DEFAULT_SIZE, DEFAULT_LIMIT);
    }

    public SynchronizedStack(int size, int limit) {
        if (limit > -1 && size > limit) {
            this.size = limit;
        } else {
            this.size = size;
        }
        this.limit = limit;
        stack = new Object[size];
    }


    public synchronized boolean push(T obj) {
        index++;
        if (index == size) {
            if (limit == -1 || size < limit) {
                expand();
            } else {
                index--;
                return false;
            }
        }
        stack[index] = obj;
        return true;
    }

    public synchronized T pop() {
        if (index == -1) {
            return null;
        }
        T result = (T) stack[index];
        stack[index--] = null;
        return result;
    }

    public synchronized void clear() {
        if (index > -1) {
            for (int i = 0; i < index + 1; i++) {
                stack[i] = null;
            }
        }
        index = -1;
    }

    private void expand() {
        int newSize = size * 2;
        if (limit != -1 && newSize > limit) {
            newSize = limit;
        }
        Object[] newStack = new Object[newSize];
        System.arraycopy(stack, 0, newStack, 0, size);
        stack = newStack;
        size = newSize;
    }
}
第八步:新建Processor接口
public interface Processor {
    public AbstractEndpoint.Handler.SocketState service(SocketWrapperBase<?> socketWrapper);
}
第九步:新建Http11Processor类并实现Processor接口
public class Http11Processor implements Processor {

    protected final Http11InputBuffer inputBuffer;

    public Http11Processor() {
        inputBuffer = new Http11InputBuffer();
    }

    /**
     * 在这里解析请求行和请求头
     */
    @Override
    public AbstractEndpoint.Handler.SocketState service(SocketWrapperBase<?> socketWrapper) {
        setSocketWrapper(socketWrapper);
        try {
            inputBuffer.parseRequestLine();
            inputBuffer.parseHeaders();
        } catch (Exception e) {
            // ignore
        }
        return null;
    }

    protected final void setSocketWrapper(SocketWrapperBase<?> socketWrapper) {
        inputBuffer.init(socketWrapper);
    }
}
第十步:在NioChannel中新建setPoller方法
public void setPoller(NioEndpoint.Poller poller) {
    this.poller = poller;
}
第十一步:在Poller类中的register方法中调用setPoller方法

 

 运行结果

 

 下面是本章程序中处理一次请求的时序图

 

 

 

结束!!!


标签:null,return,请求,void,socketWrapper,public,源码,接收,event
From: https://www.cnblogs.com/yishi-san/p/16971551.html

相关文章