在第一章笔者提到在NioEndpoint类中会使用五个内部类来处理请求。在之前的文章中为了学习的方便只使用了四个类,在本章中就让我们使用 SocketProcessor处理请求。
本章程序目录结构
第一步:新建SocketProcessorBase类
public abstract class SocketProcessorBase<S> implements Runnable { protected SocketWrapperBase<S> socketWrapper; protected SocketEvent event; public SocketProcessorBase(SocketWrapperBase<S> socketWrapper, SocketEvent event) { reset(socketWrapper, event); } public void reset(SocketWrapperBase<S> socketWrapper, SocketEvent event) { Objects.requireNonNull(event); this.socketWrapper = socketWrapper; this.event = event; } @Override public final void run() { synchronized (socketWrapper) { // It is possible that processing may be triggered for read and // write at the same time. The sync above makes sure that processing // does not occur in parallel. The test below ensures that if the // first event to be processed results in the socket being closed, // the subsequent events are not processed. if (socketWrapper.isClosed()) { return; } doRun(); } } protected abstract void doRun(); }
第二步:在NioEndPoint类中新建SocketProcessor类并继承SocketProcessorBase类。在SocketProcessor类中实现doRun方法
protected class SocketProcessor extends SocketProcessorBase<NioChannel> { public SocketProcessor(SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) { super(socketWrapper, event); } @Override protected void doRun() { NioChannel socket = socketWrapper.getSocket(); SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); try { int handshake = -1; if (key != null) { handshake = 0; } if (handshake == 0) { Handler.SocketState state = Handler.SocketState.OPEN; if (event == null) { state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ); } else { state = getHandler().process(socketWrapper, event); } if (state == Handler.SocketState.CLOSED) { //close(socket, key); } } } catch (Exception e) { // ignore } finally { socketWrapper = null; event = null; } } }
第三步:新建AbstractEndpoint类,让NioEndpoint类继承AbstractEndpoint类。在AbstractEndpoint中定义了一个Handler接口,在这接口中
的process方法是我们当下关注的重点。AbstractEndpoint定义了一个processorCache栈属性,processorCache用来保存SocketProcessorBase
,SocketProcessorBase是用来处理请求的,当其处理完请求并不会被销毁而是保存在processorCache中以减少GC。
public abstract class AbstractEndpoint<S> { public static interface Handler<S> { public enum SocketState { OPEN, CLOSED, LONG, ASYNC_END, SENDFILE, UPGRADING, UPGRADED, SUSPENDED } public SocketState process(SocketWrapperBase<S> socket, SocketEvent status); public Object getGlobal(); public Set<S> getOpenSockets(); public void release(SocketWrapperBase<S> socketWrapper); public void pause(); public void recycle(); } protected Handler<S> handler = null; public Handler<S> getHandler() { return handler; } protected SynchronizedStack<SocketProcessorBase<S>> processorCache; protected abstract SocketProcessorBase<S> createSocketProcessor( SocketWrapperBase<S> socketWrapper, SocketEvent event); private Executor executor = Executors.newFixedThreadPool(5);; public Executor getExecutor() { return executor; } public boolean processSocket(SocketWrapperBase<S> socketWrapper, SocketEvent event, boolean dispatch) { try { if (socketWrapper == null) { return false; } SocketProcessorBase<S> sc = processorCache.pop(); if (sc == null) { sc = createSocketProcessor(socketWrapper, event); } else { sc.reset(socketWrapper, event); } Executor executor = getExecutor(); if (dispatch && executor != null) { executor.execute(sc); } else { sc.run(); } } catch (Throwable ree) { return false; } return true; } }
第四步:在NioPoint中实现createSocketProcessor抽象方法
protected SocketProcessorBase<NioChannel> createSocketProcessor( SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) { return new SocketProcessor(socketWrapper, event); }
第五步:新建ConnectionHandler类并实现Handelr接口
public class ConnectionHandler <S> implements AbstractEndpoint.Handler<S>{ @Override public SocketState process(SocketWrapperBase<S> socket, SocketEvent status) { Processor processor = createProcessor(); processor.service(socket); return null; } @Override public Object getGlobal() { return null; } @Override public Set<S> getOpenSockets() { return null; } @Override public void release(SocketWrapperBase<S> socketWrapper) { } @Override public void pause() { } @Override public void recycle() { } protected Processor createProcessor() { Http11Processor processor = new Http11Processor(); return processor; }
第六步:在NioEndpoint类中的startInternal方法中引用ConnectionHandler。初始化processorCache大小
public void startInternal() { try { processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, 500); handler = new ConnectionHandler<>(); pollers = new Poller[pollerThreadCount]; for (int i=0; i<pollers.length; i++) { pollers[i] = new Poller(); Thread pollerThread = new Thread(pollers[i]); // 设置线程优先级 pollerThread.setPriority(5); // 设置守护线程 pollerThread.setDaemon(true); pollerThread.start(); } Runnable acceptor = new Acceptor(); new Thread(acceptor).start(); } catch (Exception e) { } }
第七步:新建SynchronizedStack。
public class SynchronizedStack<T> { public static final int DEFAULT_SIZE = 128; private static final int DEFAULT_LIMIT = -1; private int size; private final int limit; private int index = -1; private Object[] stack; public SynchronizedStack() { this(DEFAULT_SIZE, DEFAULT_LIMIT); } public SynchronizedStack(int size, int limit) { if (limit > -1 && size > limit) { this.size = limit; } else { this.size = size; } this.limit = limit; stack = new Object[size]; } public synchronized boolean push(T obj) { index++; if (index == size) { if (limit == -1 || size < limit) { expand(); } else { index--; return false; } } stack[index] = obj; return true; } public synchronized T pop() { if (index == -1) { return null; } T result = (T) stack[index]; stack[index--] = null; return result; } public synchronized void clear() { if (index > -1) { for (int i = 0; i < index + 1; i++) { stack[i] = null; } } index = -1; } private void expand() { int newSize = size * 2; if (limit != -1 && newSize > limit) { newSize = limit; } Object[] newStack = new Object[newSize]; System.arraycopy(stack, 0, newStack, 0, size); stack = newStack; size = newSize; } }
第八步:新建Processor接口
public interface Processor { public AbstractEndpoint.Handler.SocketState service(SocketWrapperBase<?> socketWrapper); }
第九步:新建Http11Processor类并实现Processor接口
public class Http11Processor implements Processor { protected final Http11InputBuffer inputBuffer; public Http11Processor() { inputBuffer = new Http11InputBuffer(); } /** * 在这里解析请求行和请求头 */ @Override public AbstractEndpoint.Handler.SocketState service(SocketWrapperBase<?> socketWrapper) { setSocketWrapper(socketWrapper); try { inputBuffer.parseRequestLine(); inputBuffer.parseHeaders(); } catch (Exception e) { // ignore } return null; } protected final void setSocketWrapper(SocketWrapperBase<?> socketWrapper) { inputBuffer.init(socketWrapper); } }
第十步:在NioChannel中新建setPoller方法
public void setPoller(NioEndpoint.Poller poller) { this.poller = poller; }
第十一步:在Poller类中的register方法中调用setPoller方法
运行结果
下面是本章程序中处理一次请求的时序图
结束!!!
标签:null,return,请求,void,socketWrapper,public,源码,接收,event From: https://www.cnblogs.com/yishi-san/p/16971551.html