一、wsgiref模块
在web开发中,后端服务分为服务器和应用程序:
- 服务器本质就是通过socket进行请求的接收
- 应用程序就是对请求进行相应业务处理
为了开发方便,将上述两部分分开实现,只要它们之间符合一定的规范要求即可,至于如何实现无需操心,而这个规范就是WSGI协议。
符合第一部分就是WSGI服务器,wsgiref模块是WSGI协议规范的参考实现,通过wsgiref可以实现一个WSGI服务器,所以在第一部分中,我们可以使用已经现有很多支持WSGI协议服务器,比如:wsgiref、nginx、gunicorn。
我们主要做的部分就是第二部分对请求的处理,也就是通过框架来进行业务处理。
通过wsgiref实现一个简单的wsgi协议的应用如下所示:
from wsgiref.simple_server import make_server
def index():
return b"index"
def home():
return b"home"
urlpattern = [
("/index", index),
("/home", home)
]
def simple_app(environ, start_response):
status = "200 OK"
response_headers = [('Content-Type', 'text/plain')]
start_response(status, response_headers)
current_path = environ.get("PATH_INFO")
func = None
for path_tuple in urlpattern:
if path_tuple[0] == current_path:
func = path_tuple[1]
break
if func:
response = func()
else:
response = b"404 not found!"
return [response, ]
if __name__ == '__main__':
srv = make_server("127.0.0.1", 9000, simple_app)
srv.serve_forever()
srv
启动一个WSGI
服务器进行监听,接收请求simple_app
是需要做的主要工作,根据请求进行业务处理
二、WSGI服务器源码剖析
对于第二部分我们已经很清楚,这时我们自己写的,但是对于第一部分我们去了解一些,因为借助于现有的模块。
(一)srv = make_server("127.0.0.1", 9000, simple_app)
首先通过make_server
方法创建一个服务器:
def make_server(
host, port, app, server_class=WSGIServer, handler_class=WSGIRequestHandler
):
"""Create a new WSGI server listening on `host` and `port` for `app`"""
server = server_class((host, port), handler_class)
server.set_app(app)
return server
app
指的就是simple_app
1、server = server_class((host, port), handler_class)
通过server_class
执行WSGIServer
构造方法,最终执行的是socketserver.TCPServer
类的构造方法:
class TCPServer(BaseServer):
address_family = socket.AF_INET
socket_type = socket.SOCK_STREAM
request_queue_size = 5
allow_reuse_address = False
def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
"""Constructor. May be extended, do not override."""
BaseServer.__init__(self, server_address, RequestHandlerClass)
self.socket = socket.socket(self.address_family,
self.socket_type)
if bind_and_activate:
try:
self.server_bind()
self.server_activate()
except:
self.server_close()
raise
在这里执行了socketserver.BaseServer
类的构造方法:
class BaseServer:
timeout = None
def __init__(self, server_address, RequestHandlerClass):
"""Constructor. May be extended, do not override."""
self.server_address = server_address
self.RequestHandlerClass = RequestHandlerClass
self.__is_shut_down = threading.Event()
self.__shutdown_request = False
在socketserver.TCPServer
类的构造方法中后续的主要步骤就是建立socket、进行ip和端口的绑定、监听端口,不过进行方法查找一定要注意查找顺序,对于self.server_bind()
、self.server_activate()
方法:
simple_server.WSGIServer
-->server.HTTPServer
-->socketServer.TCPServer
-->socketserver.BaseServer
所以socketServer.TCPServer
类的构造方法中self.server_bind()
先在simple_server.WSGIServer
中查找:
class WSGIServer(HTTPServer):
"""BaseHTTPServer that implements the Python WSGI protocol"""
application = None
def server_bind(self):
"""Override server_bind to store the server name."""
HTTPServer.server_bind(self)
self.setup_environ()
class HTTPServer(socketserver.TCPServer):
allow_reuse_address = 1 # Seems to make sense in testing environment
def server_bind(self):
"""Override server_bind to store the server name."""
socketserver.TCPServer.server_bind(self)
host, port = self.server_address[:2]
self.server_name = socket.getfqdn(host)
self.server_port = port
class TCPServer(BaseServer):
def server_bind(self):
"""Called by constructor to bind the socket.
May be overridden.
"""
if self.allow_reuse_address:
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind(self.server_address)
self.server_address = self.socket.getsockname()
socketServer.TCPServer
类的构造方法中self.server_activate()
、self.server_close()
方法亦是如此:
class TCPServer(BaseServer):
def server_activate(self):
"""Called by constructor to activate the server.
May be overridden.
"""
self.socket.listen(self.request_queue_size)
def server_close(self):
"""Called to clean-up the server.
May be overridden.
"""
self.socket.close()
2、server.set_app(app)
这里的server
就是WSGIServer
,所以在simple_server.WSGIServer
类中:
class WSGIServer(HTTPServer):
"""BaseHTTPServer that implements the Python WSGI protocol"""
application = None
def set_app(self,application):
self.application = application
(二)srv.server_forever()
对于server_forever()
的查找,通过simple_server.WSGIServer
-->server.HTTPServer
-->socketServer.TCPServer
-->socketserver.BaseServer
所以在socketserver.BaseServer
类中:
class BaseServer:
def serve_forever(self, poll_interval=0.5):
"""Handle one request at a time until shutdown.
Polls for shutdown every poll_interval seconds. Ignores
self.timeout. If you need to do periodic tasks, do them in
another thread.
"""
self.__is_shut_down.clear()
try:
# XXX: Consider using another file descriptor or connecting to the
# socket to wake this up instead of polling. Polling reduces our
# responsiveness to a shutdown request and wastes cpu at all other
# times.
with _ServerSelector() as selector:
selector.register(self, selectors.EVENT_READ)
while not self.__shutdown_request:
ready = selector.select(poll_interval)
# bpo-35017: shutdown() called during select(), exit immediately.
if self.__shutdown_request:
break
if ready:
self._handle_request_noblock()
self.service_actions()
finally:
self.__shutdown_request = False
self.__is_shut_down.set()
通过select
来实现监听socket
的变化,从而来处理请求,一旦接收到请求就通过self._handle_request_noblock()
、self.process_request(request, client_address)
、self.finish_request(request, client_address)
进行处理。
class BaseServer:
def _handle_request_noblock(self):
"""Handle one request, without blocking.
I assume that selector.select() has returned that the socket is
readable before this function was called, so there should be no risk of
blocking in get_request().
"""
try:
request, client_address = self.get_request()
except OSError:
return
if self.verify_request(request, client_address):
try:
self.process_request(request, client_address)
except Exception:
self.handle_error(request, client_address)
self.shutdown_request(request)
except:
self.shutdown_request(request)
raise
else:
self.shutdown_request(request)
def process_request(self, request, client_address):
"""Call finish_request.
Overridden by ForkingMixIn and ThreadingMixIn.
"""
self.finish_request(request, client_address)
self.shutdown_request(request)
def finish_request(self, request, client_address):
"""Finish one request by instantiating RequestHandlerClass."""
self.RequestHandlerClass(request, client_address, self)
最后调用的就是处理请求的类self.RequestHandlerClass(self, request, client_address)
,这个类指代的就是simple_server.RequestHandlerClass
,这时在服务器实例化过程中默认传入的参数。
现在查看这个类的继承关系:simple_server.RequestHandlerClass
-->server.BaseHTTPRequestHandler
-->socketserver.StreamRequestHandler
-->socketserver.BaseRequestHandler
,对其进行实例化按照以上顺序执行构造方法:
class BaseRequestHandler:
"""Base class for request handler classes.
This class is instantiated for each request to be handled. The
constructor sets the instance variables request, client_address
and server, and then calls the handle() method. To implement a
specific service, all you need to do is to derive a class which
defines a handle() method.
The handle() method can find the request as self.request, the
client address as self.client_address, and the server (in case it
needs access to per-server information) as self.server. Since a
separate instance is created for each request, the handle() method
can define other arbitrary instance variables.
"""
def __init__(self, request, client_address, server):
self.request = request
self.client_address = client_address
self.server = server
self.setup()
try:
self.handle()
finally:
self.finish()
最终执行的是socketserver.BaseRequestHandler
类的构造方法,在构造方法中执行self.handle()
:
class WSGIRequestHandler(BaseHTTPRequestHandler):
def handle(self):
"""Handle a single HTTP request"""
self.raw_requestline = self.rfile.readline(65537)
if len(self.raw_requestline) > 65536:
self.requestline = ''
self.request_version = ''
self.command = ''
self.send_error(414)
return
if not self.parse_request(): # An error code has been sent, just exit
return
handler = ServerHandler(
self.rfile, self.wfile, self.get_stderr(), self.get_environ(),
multithread=False,
)
handler.request_handler = self # backpointer for logging
handler.run(self.server.get_app())
在这个方法中实例化执行了simple_server.ServerHandler
类,该类的继承关系:simple_server.ServerHandler
-->handlers.SimpleHandler
-->handlers.BaseHandler
class SimpleHandler(BaseHandler):
"""Handler that's just initialized with streams, environment, etc.
This handler subclass is intended for synchronous HTTP/1.0 origin servers,
and handles sending the entire response output, given the correct inputs.
Usage::
handler = SimpleHandler(
inp,out,err,env, multithread=False, multiprocess=True
)
handler.run(app)"""
def __init__(self,stdin,stdout,stderr,environ,
multithread=True, multiprocess=False
):
self.stdin = stdin
self.stdout = stdout
self.stderr = stderr
self.base_env = environ
self.wsgi_multithread = multithread
self.wsgi_multiprocess = multiprocess
然后执行simple_server.ServerHandler
类的实例化方法,构造方法在handlers.SimpleHandler
类中找到。
紧接着就执行 handler.run(self.server.get_app())
方法,在handlers.BaseHandler
中实现:
class BaseHandler:
"""Manage the invocation of a WSGI application"""
def run(self, application):
"""Invoke the application"""
# Note to self: don't move the close()! Asynchronous servers shouldn't
# call close() from finish_response(), so if you close() anywhere but
# the double-error branch here, you'll break asynchronous servers by
# prematurely closing. Async servers must return from 'run()' without
# closing if there might still be output to iterate over.
try:
self.setup_environ()
self.result = application(self.environ, self.start_response)
self.finish_response()
except (ConnectionAbortedError, BrokenPipeError, ConnectionResetError):
# We expect the client to close the connection abruptly from time
# to time.
return
except:
try:
self.handle_error()
except:
# If we get an error handling an error, just give up already!
self.close()
raise # ...and let the actual server figure it out.
def start_response(self, status, headers,exc_info=None):
"""'start_response()' callable as specified by PEP 3333"""
if exc_info:
try:
if self.headers_sent:
# Re-raise original exception if headers sent
raise exc_info[0](exc_info[1]).with_traceback(exc_info[2])
finally:
exc_info = None # avoid dangling circular ref
elif self.headers is not None:
raise AssertionError("Headers already set!")
self.status = status
self.headers = self.headers_class(headers)
status = self._convert_string_type(status, "Status")
assert len(status)>=4,"Status must be at least 4 characters"
assert status[:3].isdigit(), "Status message must begin w/3-digit code"
assert status[3]==" ", "Status message must have a space after code"
if __debug__:
for name, val in headers:
name = self._convert_string_type(name, "Header name")
val = self._convert_string_type(val, "Header value")
assert not is_hop_by_hop(name),\
f"Hop-by-hop header, '{name}: {val}', not allowed"
return self.write
def finish_response(self):
"""Send any iterable data, then close self and the iterable
Subclasses intended for use in asynchronous servers will
want to redefine this method, such that it sets up callbacks
in the event loop to iterate over the data, and to call
'self.close()' once the response is finished.
"""
try:
if not self.result_is_file() or not self.sendfile():
for data in self.result:
self.write(data)
self.finish_content()
except:
# Call close() on the iterable returned by the WSGI application
# in case of an exception.
if hasattr(self.result, 'close'):
self.result.close()
raise
else:
# We only call close() when no exception is raised, because it
# will set status, result, headers, and environ fields to None.
# See bpo-29183 for more details.
self.close()
这里的application
就是传入的simple_app
,通过self.result = application(self.environ, self.start_response)
触发应用执行,获取到业务执行的结果response,它是一个可迭代对象。
最后通过self.finish_response()
调用了write()
函数,write()
函数每次调用都会检查headers是否已经发送,否则先发送headers,再发送data,往客户端写入内容即可。而self.finish_content()
就是确保响应头的输出。