首页 > 编程语言 >WSGI服务器源码分析

WSGI服务器源码分析

时间:2022-12-31 16:23:44浏览次数:57  
标签:WSGI self request server 源码 address 服务器 class def

一、wsgiref模块

在web开发中,后端服务分为服务器和应用程序:

  • 服务器本质就是通过socket进行请求的接收
  • 应用程序就是对请求进行相应业务处理

为了开发方便,将上述两部分分开实现,只要它们之间符合一定的规范要求即可,至于如何实现无需操心,而这个规范就是WSGI协议。

符合第一部分就是WSGI服务器,wsgiref模块是WSGI协议规范的参考实现,通过wsgiref可以实现一个WSGI服务器,所以在第一部分中,我们可以使用已经现有很多支持WSGI协议服务器,比如:wsgiref、nginx、gunicorn。

我们主要做的部分就是第二部分对请求的处理,也就是通过框架来进行业务处理。

通过wsgiref实现一个简单的wsgi协议的应用如下所示:

from wsgiref.simple_server import make_server


def index():
    return b"index"


def home():
    return b"home"


urlpattern = [
    ("/index", index),
    ("/home", home)
]


def simple_app(environ, start_response):
    status = "200 OK"
    response_headers = [('Content-Type', 'text/plain')]
    start_response(status, response_headers)
    current_path = environ.get("PATH_INFO")
    func = None
    for path_tuple in urlpattern:
        if path_tuple[0] == current_path:
            func = path_tuple[1]
            break
    if func:
        response = func()
    else:
        response = b"404 not found!"
    return [response, ]


if __name__ == '__main__':
    srv = make_server("127.0.0.1", 9000, simple_app)
    srv.serve_forever()

  • srv启动一个WSGI服务器进行监听,接收请求
  • simple_app是需要做的主要工作,根据请求进行业务处理

二、WSGI服务器源码剖析

对于第二部分我们已经很清楚,这时我们自己写的,但是对于第一部分我们去了解一些,因为借助于现有的模块。

(一)srv = make_server("127.0.0.1", 9000, simple_app)

首先通过make_server方法创建一个服务器:

def make_server(
    host, port, app, server_class=WSGIServer, handler_class=WSGIRequestHandler
):
    """Create a new WSGI server listening on `host` and `port` for `app`"""
    server = server_class((host, port), handler_class)
    server.set_app(app)
    return server
  • app指的就是simple_app

1、server = server_class((host, port), handler_class)

通过server_class执行WSGIServer构造方法,最终执行的是socketserver.TCPServer类的构造方法:

class TCPServer(BaseServer):

    address_family = socket.AF_INET

    socket_type = socket.SOCK_STREAM

    request_queue_size = 5

    allow_reuse_address = False

    def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
        """Constructor.  May be extended, do not override."""
        BaseServer.__init__(self, server_address, RequestHandlerClass)
        self.socket = socket.socket(self.address_family,
                                    self.socket_type)
        if bind_and_activate:
            try:
                self.server_bind()
                self.server_activate()
            except:
                self.server_close()
                raise

在这里执行了socketserver.BaseServer类的构造方法:

class BaseServer:

    timeout = None

    def __init__(self, server_address, RequestHandlerClass):
        """Constructor.  May be extended, do not override."""
        self.server_address = server_address
        self.RequestHandlerClass = RequestHandlerClass
        self.__is_shut_down = threading.Event()
        self.__shutdown_request = False

socketserver.TCPServer类的构造方法中后续的主要步骤就是建立socket、进行ip和端口的绑定、监听端口,不过进行方法查找一定要注意查找顺序,对于self.server_bind()self.server_activate()方法:

simple_server.WSGIServer-->server.HTTPServer-->socketServer.TCPServer-->socketserver.BaseServer

所以socketServer.TCPServer类的构造方法中self.server_bind()先在simple_server.WSGIServer中查找:

class WSGIServer(HTTPServer):

    """BaseHTTPServer that implements the Python WSGI protocol"""

    application = None

    def server_bind(self):
        """Override server_bind to store the server name."""
        HTTPServer.server_bind(self)
        self.setup_environ()

        
class HTTPServer(socketserver.TCPServer):

    allow_reuse_address = 1    # Seems to make sense in testing environment

    def server_bind(self):
        """Override server_bind to store the server name."""
        socketserver.TCPServer.server_bind(self)
        host, port = self.server_address[:2]
        self.server_name = socket.getfqdn(host)
        self.server_port = port
        
        
class TCPServer(BaseServer):
    
        def server_bind(self):
        """Called by constructor to bind the socket.

        May be overridden.

        """
        if self.allow_reuse_address:
            self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.socket.bind(self.server_address)
        self.server_address = self.socket.getsockname()

socketServer.TCPServer类的构造方法中self.server_activate()self.server_close()方法亦是如此:

class TCPServer(BaseServer):
    
    def server_activate(self):
        """Called by constructor to activate the server.

        May be overridden.

        """
        self.socket.listen(self.request_queue_size)

    def server_close(self):
        """Called to clean-up the server.

        May be overridden.

        """
        self.socket.close()

2、server.set_app(app)

这里的server就是WSGIServer,所以在simple_server.WSGIServer类中:

class WSGIServer(HTTPServer):

    """BaseHTTPServer that implements the Python WSGI protocol"""

    application = None

    def set_app(self,application):
        self.application = application

(二)srv.server_forever()

对于server_forever()的查找,通过simple_server.WSGIServer-->server.HTTPServer-->socketServer.TCPServer-->socketserver.BaseServer

所以在socketserver.BaseServer类中:

class BaseServer:
    
    def serve_forever(self, poll_interval=0.5):
        """Handle one request at a time until shutdown.

        Polls for shutdown every poll_interval seconds. Ignores
        self.timeout. If you need to do periodic tasks, do them in
        another thread.
        """
        self.__is_shut_down.clear()
        try:
            # XXX: Consider using another file descriptor or connecting to the
            # socket to wake this up instead of polling. Polling reduces our
            # responsiveness to a shutdown request and wastes cpu at all other
            # times.
            with _ServerSelector() as selector:
                selector.register(self, selectors.EVENT_READ)

                while not self.__shutdown_request:
                    ready = selector.select(poll_interval)
                    # bpo-35017: shutdown() called during select(), exit immediately.
                    if self.__shutdown_request:
                        break
                    if ready:
                        self._handle_request_noblock()

                    self.service_actions()
        finally:
            self.__shutdown_request = False
            self.__is_shut_down.set()

通过select来实现监听socket的变化,从而来处理请求,一旦接收到请求就通过self._handle_request_noblock()self.process_request(request, client_address)self.finish_request(request, client_address)进行处理。

class BaseServer:
    
    def _handle_request_noblock(self):
        """Handle one request, without blocking.

        I assume that selector.select() has returned that the socket is
        readable before this function was called, so there should be no risk of
        blocking in get_request().
        """
        try:
            request, client_address = self.get_request()
        except OSError:
            return
        if self.verify_request(request, client_address):
            try:
                self.process_request(request, client_address)
            except Exception:
                self.handle_error(request, client_address)
                self.shutdown_request(request)
            except:
                self.shutdown_request(request)
                raise
        else:
            self.shutdown_request(request)
            
    def process_request(self, request, client_address):
        """Call finish_request.

        Overridden by ForkingMixIn and ThreadingMixIn.

        """
        self.finish_request(request, client_address)
        self.shutdown_request(request)
        
        
    def finish_request(self, request, client_address):
        """Finish one request by instantiating RequestHandlerClass."""
        self.RequestHandlerClass(request, client_address, self)

最后调用的就是处理请求的类self.RequestHandlerClass(self, request, client_address),这个类指代的就是simple_server.RequestHandlerClass,这时在服务器实例化过程中默认传入的参数。

现在查看这个类的继承关系:simple_server.RequestHandlerClass-->server.BaseHTTPRequestHandler-->socketserver.StreamRequestHandler-->socketserver.BaseRequestHandler,对其进行实例化按照以上顺序执行构造方法:

class BaseRequestHandler:

    """Base class for request handler classes.

    This class is instantiated for each request to be handled.  The
    constructor sets the instance variables request, client_address
    and server, and then calls the handle() method.  To implement a
    specific service, all you need to do is to derive a class which
    defines a handle() method.

    The handle() method can find the request as self.request, the
    client address as self.client_address, and the server (in case it
    needs access to per-server information) as self.server.  Since a
    separate instance is created for each request, the handle() method
    can define other arbitrary instance variables.

    """

    def __init__(self, request, client_address, server):
        self.request = request
        self.client_address = client_address
        self.server = server
        self.setup()
        try:
            self.handle()
        finally:
            self.finish()

最终执行的是socketserver.BaseRequestHandler类的构造方法,在构造方法中执行self.handle()

class WSGIRequestHandler(BaseHTTPRequestHandler):

    def handle(self):
        """Handle a single HTTP request"""

        self.raw_requestline = self.rfile.readline(65537)
        if len(self.raw_requestline) > 65536:
            self.requestline = ''
            self.request_version = ''
            self.command = ''
            self.send_error(414)
            return

        if not self.parse_request(): # An error code has been sent, just exit
            return

        handler = ServerHandler(
            self.rfile, self.wfile, self.get_stderr(), self.get_environ(),
            multithread=False,
        )
        handler.request_handler = self      # backpointer for logging
        handler.run(self.server.get_app())

在这个方法中实例化执行了simple_server.ServerHandler类,该类的继承关系:simple_server.ServerHandler-->handlers.SimpleHandler-->handlers.BaseHandler

class SimpleHandler(BaseHandler):
    """Handler that's just initialized with streams, environment, etc.

    This handler subclass is intended for synchronous HTTP/1.0 origin servers,
    and handles sending the entire response output, given the correct inputs.

    Usage::

        handler = SimpleHandler(
            inp,out,err,env, multithread=False, multiprocess=True
        )
        handler.run(app)"""

    def __init__(self,stdin,stdout,stderr,environ,
        multithread=True, multiprocess=False
    ):
        self.stdin = stdin
        self.stdout = stdout
        self.stderr = stderr
        self.base_env = environ
        self.wsgi_multithread = multithread
        self.wsgi_multiprocess = multiprocess

然后执行simple_server.ServerHandler类的实例化方法,构造方法在handlers.SimpleHandler类中找到。

紧接着就执行 handler.run(self.server.get_app())方法,在handlers.BaseHandler中实现:

class BaseHandler:
    """Manage the invocation of a WSGI application"""

    def run(self, application):
        """Invoke the application"""
        # Note to self: don't move the close()!  Asynchronous servers shouldn't
        # call close() from finish_response(), so if you close() anywhere but
        # the double-error branch here, you'll break asynchronous servers by
        # prematurely closing.  Async servers must return from 'run()' without
        # closing if there might still be output to iterate over.
        try:
            self.setup_environ()
            self.result = application(self.environ, self.start_response)
            self.finish_response()
        except (ConnectionAbortedError, BrokenPipeError, ConnectionResetError):
            # We expect the client to close the connection abruptly from time
            # to time.
            return
        except:
            try:
                self.handle_error()
            except:
                # If we get an error handling an error, just give up already!
                self.close()
                raise   # ...and let the actual server figure it out.
   
     def start_response(self, status, headers,exc_info=None):
        """'start_response()' callable as specified by PEP 3333"""

        if exc_info:
            try:
                if self.headers_sent:
                    # Re-raise original exception if headers sent
                    raise exc_info[0](exc_info[1]).with_traceback(exc_info[2])
            finally:
                exc_info = None        # avoid dangling circular ref
        elif self.headers is not None:
            raise AssertionError("Headers already set!")

        self.status = status
        self.headers = self.headers_class(headers)
        status = self._convert_string_type(status, "Status")
        assert len(status)>=4,"Status must be at least 4 characters"
        assert status[:3].isdigit(), "Status message must begin w/3-digit code"
        assert status[3]==" ", "Status message must have a space after code"

        if __debug__:
            for name, val in headers:
                name = self._convert_string_type(name, "Header name")
                val = self._convert_string_type(val, "Header value")
                assert not is_hop_by_hop(name),\
                       f"Hop-by-hop header, '{name}: {val}', not allowed"

        return self.write
    
    def finish_response(self):
        """Send any iterable data, then close self and the iterable

        Subclasses intended for use in asynchronous servers will
        want to redefine this method, such that it sets up callbacks
        in the event loop to iterate over the data, and to call
        'self.close()' once the response is finished.
        """
        try:
            if not self.result_is_file() or not self.sendfile():
                for data in self.result:
                    self.write(data)
                self.finish_content()
        except:
            # Call close() on the iterable returned by the WSGI application
            # in case of an exception.
            if hasattr(self.result, 'close'):
                self.result.close()
            raise
        else:
            # We only call close() when no exception is raised, because it
            # will set status, result, headers, and environ fields to None.
            # See bpo-29183 for more details.
            self.close()

这里的application就是传入的simple_app,通过self.result = application(self.environ, self.start_response)触发应用执行,获取到业务执行的结果response,它是一个可迭代对象。

最后通过self.finish_response()调用了write()函数,write()函数每次调用都会检查headers是否已经发送,否则先发送headers,再发送data,往客户端写入内容即可。而self.finish_content()就是确保响应头的输出。

标签:WSGI,self,request,server,源码,address,服务器,class,def
From: https://www.cnblogs.com/shenjianping/p/16945731.html

相关文章