首页 > 编程语言 >python学习笔记-异步非阻塞web框架

python学习笔记-异步非阻塞web框架

时间:2023-10-19 19:11:27浏览次数:42  
标签:__ web 异步 python self bytes tornado main def

一、异步非阻塞框架介绍

1、介绍

支持异步非阻塞web框架:tornado , node js

2、定义对比

异步IO模块:我们作为客户端向服务端“并发”请求

异步非阻塞web框架:针对服务端,希望一个线程处理更多的请求

二、tornado异步非阻塞

【要点提炼】

使用装饰器@gen.coroutine

模拟等待,使用特殊的形式等待5秒,写法

future=Future()
tornado.ioloop.IOLoop.current().add_timeout(time.time()+5,self.done)
yield future

1、场景实例1

程序1,之前写法,同步阻塞(先访问main会进入等待,此时访问index也到等main处理完了才有返回)

import tornado.ioloop
import tornado.web
import time

class MainHandler(tornado.web.RequestHandler):
    def get(self):
        time.sleep(10)
        self.write("main")

class IndexHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("index")

application=tornado.web.Application([
    (r"/main",MainHandler),
    (r"/index", IndexHandler),])

if __name__=="__main__":
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

程序2,此时要用到future,实现异步不阻塞

from tornado import gen
from tornado.concurrent import Future

class MainHandler(tornado.web.RequestHandler):

    @gen.coroutine
    def get(self):
        future = Future()
        tornado.ioloop.IOLoop.current().add_timeout(time.time() + 5, self.done)
        yield future

    def done(self):
        self.write("Main")
        self.finish()

class IndexHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("index")

application=tornado.web.Application([
    (r"/main",MainHandler),
    (r"/index", IndexHandler),])

if __name__=="__main__":
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

  

2、场景实例2 (请求外部url时引起的阻塞)

【要点提炼】

通过tornado发请求,实现异步非阻塞

from tornado import httpclient
http=httpclient.AsyncHTTPClient()
yield http.fetch("http://www.google.com",self.done)

future的作用

1、挂起当前请求,线程可以处理其他请求

2、future设置值,当前挂起的请求返回

future.set_result(None)

程序3,同步阻塞场景

import tornado.ioloop
import tornado.web

class MainHandler(tornado.web.RequestHandler):
    def get(self):
        import requests
        ret=requests.get("http://www.google.com")
        self.write("main")

class IndexHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("index")

application=tornado.web.Application([
    (r"/main",MainHandler),
    (r"/index", IndexHandler),])

if __name__=="__main__":
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

程序4、异步非阻塞

import tornado.ioloop
import tornado.web
from tornado import gen

class MainHandler(tornado.web.RequestHandler):

    @gen.coroutine
    def get(self):
        from tornado import httpclient
        http=httpclient.AsyncHTTPClient()
        yield http.fetch("http://www.google.com",self.done)

    def done(self,*args,**kwargs):
        self.write("Main")
        self.finish()

class IndexHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("index")

application=tornado.web.Application([
    (r"/main",MainHandler),
    (r"/index", IndexHandler),])

if __name__=="__main__":
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

  

3、实例3,(自己设置future的值,将挂起的请求释放)

future=None
class MainHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        global future
        future=Future()
        future.add_done_callback(self.done)
        yield future

    def done(self,*args,**kwargs):
        self.write("Main")
        self.finish()

class IndexHandler(tornado.web.RequestHandler):
    def get(self):
        global future
        future.set_result(None) #设置值,将挂起的请求释放
        self.write("index")

application=tornado.web.Application([
    (r"/main",MainHandler),
    (r"/index", IndexHandler),])

if __name__=="__main__":
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

  

三、自定义web框架

1、使用socket实现同步的web框架

示例:

import socket
import select

class HttpRequest(object):
    """
    用来封装用户请求信息
    """
    def __init__(self, content):
        self.content = content #用户发来的请求数据,请求头和请求体
        self.header_bytes = bytes()
        self.header_dict = {}
        self.body_bytes = bytes()

        self.method = ""
        self.url = ""
        self.protocol = ""

        self.initialize()
        self.initialize_headers()

    def initialize(self):
        temp = self.content.split(b'\r\n\r\n', 1)
        if len(temp) == 1:
            self.header_bytes += temp
        else:
            h, b = temp
            self.header_bytes += h
            self.body_bytes += b
            header_flag = True

    @property
    def header_str(self):
        return str(self.header_bytes, encoding='utf-8')

    def initialize_headers(self):
        headers = self.header_str.split('\r\n')
        first_line = headers[0].split(' ')
        if len(first_line) == 3:
            self.method, self.url, self.protocol = headers[0].split(' ')
            for line in headers:
                kv = line.split(':')
                if len(kv) == 2:
                    k, v = kv
                    self.header_dict[k] = v

def index(requet):
    return "index"

def main(request):
    return 'main'

routers=[('/index',index),
         ('/main',main),
         ]

def run():
    sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
    sock.bind(('127.0.0.1',8002,))
    sock.listen(128)

    inputs=[]
    inputs.append(sock)

    while True:
        rlist,wlist,elist=select.select(inputs,[],[],0.05)
        for r in rlist:
            if r==sock:
                '''新请求到来'''
                conn,addr=sock.accept()
                conn.setblocking(False)
                inputs.append(conn)
            else:
                '''客户端发来数据'''
                data=b""
                while True:
                    try:
                        chunk=r.recv(1024)
                        data=data+chunk
                    except Exception as e:
                        chunk=None
                    if not chunk:
                        break
                #dada进行处理:请求头和请求体
                #1、请求头中获取url
                #2、去路由中匹配,获取指定的函数
                #3、执行函数,获取返回值
                #4、发送给客户端
                request=HttpRequest(data)
                import re
                flag=False
                func=None
                for route in routers:
                    if re.match(route[0],request.url):
                        flag=True
                        func=route[1]
                        break
                if flag:
                    result=func(request)
                    r.sendall(bytes("HTTP/1.1 201 OK\r\n\r\n", 'utf8'))
                    r.sendall(bytes(result, 'utf8'))
                else:
                    r.sendall(bytes("HTTP/1.1 201 OK\r\n\r\n", 'utf8'))
                    r.sendall(b'404')

                inputs.remove(r)
                r.close()

if __name__=="__main__":
    run()

  

2、自定义web框架支持异步

【要点提炼】

编写实现future功能,如访问的main,不执行操作

》示例2

import socket
import select

class HttpRequest(object):
    """
    用来封装用户请求信息
    """
    def __init__(self, content):
        self.content = content #用户发来的请求数据,请求头和请求体
        self.header_bytes = bytes()
        self.header_dict = {}
        self.body_bytes = bytes()

        self.method = ""
        self.url = ""
        self.protocol = ""

        self.initialize()
        self.initialize_headers()

    def initialize(self):
        temp = self.content.split(b'\r\n\r\n', 1)
        if len(temp) == 1:
            self.header_bytes += temp
        else:
            h, b = temp
            self.header_bytes += h
            self.body_bytes += b
            header_flag = True

    @property
    def header_str(self):
        return str(self.header_bytes, encoding='utf-8')

    def initialize_headers(self):
        headers = self.header_str.split('\r\n')
        first_line = headers[0].split(' ')
        if len(first_line) == 3:
            self.method, self.url, self.protocol = headers[0].split(' ')
            for line in headers:
                kv = line.split(':')
                if len(kv) == 2:
                    k, v = kv
                    self.header_dict[k] = v

class Future(object): def __init__(self): self.result=None F=None def stop(request): global F F.result=b'future-xxxxx' return 'stop' def index(requet): return "index" def main(request): global F F=Future() return F routers=[('/index',index), ('/main',main), ('/stop',stop), ] def run(): sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) sock.bind(('127.0.0.1',8002,)) sock.listen(128) inputs=[] inputs.append(sock) async_request_dict={} while True: rlist,wlist,elist=select.select(inputs,[],[],0.05) for r in rlist: if r==sock: '''新请求到来''' conn,addr=sock.accept() conn.setblocking(False) inputs.append(conn) else: '''客户端发来数据''' data=b"" while True: try: chunk=r.recv(1024) data=data+chunk except Exception as e: chunk=None if not chunk: break request=HttpRequest(data) import re flag=False func=None for route in routers: if re.match(route[0],request.url): flag=True func=route[1] break if flag: result=func(request) if isinstance(result,Future): async_request_dict[r]=result else: r.sendall(bytes("HTTP/1.1 201 OK\r\n\r\n", 'utf8')) r.sendall(bytes(result,encoding='utf8')) inputs.remove(r) r.close() else: r.sendall(bytes("HTTP/1.1 201 OK\r\n\r\n", 'utf8')) r.sendall(b'404') inputs.remove(r) r.close() for conn in list(async_request_dict.keys()): future=async_request_dict[conn] if future.result: conn.sendall(bytes("HTTP/1.1 201 OK\r\n\r\n", 'utf8')) conn.sendall(future.result) conn.close() del async_request_dict[conn] inputs.remove(conn) if __name__=="__main__": run()

 

示例2是手动写的方法停止阻塞,可以实现利用超时时间自动停止

》示例3

import socket
import select
import time

class HttpRequest(object):
    """
    用来封装用户请求信息
    """
    def __init__(self, content):
        self.content = content #用户发来的请求数据,请求头和请求体
        self.header_bytes = bytes()
        self.header_dict = {}
        self.body_bytes = bytes()

        self.method = ""
        self.url = ""
        self.protocol = ""

        self.initialize()
        self.initialize_headers()

    def initialize(self):
        temp = self.content.split(b'\r\n\r\n', 1)
        if len(temp) == 1:
            self.header_bytes += temp
        else:
            h, b = temp
            self.header_bytes += h
            self.body_bytes += b
            header_flag = True

    @property
    def header_str(self):
        return str(self.header_bytes, encoding='utf-8')

    def initialize_headers(self):
        headers = self.header_str.split('\r\n')
        first_line = headers[0].split(' ')
        if len(first_line) == 3:
            self.method, self.url, self.protocol = headers[0].split(' ')
            for line in headers:
                kv = line.split(':')
                if len(kv) == 2:
                    k, v = kv
                    self.header_dict[k] = v

class Future(object):
    def __init__(self,timeout=0):
        self.result=None
        self.timeout=timeout
        self.start=time.time()
F=None
def index(requet):
    return "index"

def main(request):
    global F
    F=Future(5)
    return F

routers=[('/index',index),
         ('/main',main),
         ]

def run():
    sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
    sock.bind(('127.0.0.1',8002,))
    sock.listen(128)

    inputs=[]
    inputs.append(sock)

    async_request_dict={}

    while True:
        rlist,wlist,elist=select.select(inputs,[],[],0.05)
        for r in rlist:
            if r==sock:
                '''新请求到来'''
                conn,addr=sock.accept()
                conn.setblocking(False)
                inputs.append(conn)
            else:
                '''客户端发来数据'''
                data=b""
                while True:
                    try:
                        chunk=r.recv(1024)
                        data=data+chunk
                    except Exception as e:
                        chunk=None
                    if not chunk:
                        break
                request=HttpRequest(data)
                import re
                flag=False
                func=None
                for route in routers:
                    if re.match(route[0],request.url):
                        flag=True
                        func=route[1]
                        break
                if flag:
                    result=func(request)
                    if isinstance(result,Future):
                        async_request_dict[r]=result
                    else:
                        r.sendall(bytes("HTTP/1.1 201 OK\r\n\r\n", 'utf8'))
                        r.sendall(bytes(result,encoding='utf8'))
                        inputs.remove(r)
                        r.close()
                else:
                    r.sendall(bytes("HTTP/1.1 201 OK\r\n\r\n", 'utf8'))
                    r.sendall(b'404')

                    inputs.remove(r)
                    r.close()

        for conn in list(async_request_dict.keys()):
            future=async_request_dict[conn]
            start=future.start
            timeout=future.timeout
            ctime=time.time()
            if (start+timeout)<=ctime:
                future.result=b'timeout'
            if future.result:
                conn.sendall(bytes("HTTP/1.1 201 OK\r\n\r\n", 'utf8'))
                conn.sendall(future.result)
                conn.close()
                del async_request_dict[conn]
                inputs.remove(conn)


if __name__=="__main__":
    run()

  

 

标签:__,web,异步,python,self,bytes,tornado,main,def
From: https://www.cnblogs.com/steven223-z/p/17775414.html

相关文章

  • CapacityScheduler异步调度功能实践
    1.背景在https://blog.51cto.com/u_15327484/7920197文章中,将调度器从FairScheduler迁移到CapacityScheduler。CapacityScheduler在默认情况下,当接受到NodeManager心跳请求时,会调用nodeUpdate方法开始进行资源调度,这种调度方式称为心跳调度,也称同步调度。心跳调度实现起来简单,......
  • 基于Java Web的多功能旅游网站的设计与实现-计算机毕业设计源码+LW文档
    摘 要 随着时代的发展,人们对旅游也越来越重视,近些年来我国的旅游产业也发生了翻天覆地的变化,但是很多人在出去旅游的时候不知道去哪里旅游,在预订酒店和机票的时候也没有一个综合性的旅游网站,为了让人们的旅游变的更加的方便,为此我开发了本基于JavaWeb的多功能旅游网站本基于......
  • 无代码玩转web UI自动化
    可以直接进入官网下载开源版或点击右上角体验企业版体验RunnerGoUI自动化平台RunnerGo提供从API管理到API性能再到可视化的API自动化、UI自动化测试功能模块,覆盖了整个产品测试周期。RunnerGoUI自动化基于Selenium浏览器自动化方案构建,内嵌高度可复用的测试脚本,测试团队无需复杂......
  • 基于Python的《计算机组成原理》在线学习平台-计算机毕业设计源码+LW文档
    摘 要随着互联网的发展,通过计算机来学习是当前非常流行的一种学习方式。通过课程虽然可以面对面的进行交流和学习,但是很多时候因为地区和空间的限制会受到很多的影响但是通过网络来进行学习可以打破这一局限性,为此我开发了本基于Python的《计算机组成原理》在线学习平台网站本......
  • 基于Python的招聘网站爬虫及可视化的实现-计算机毕业设计源码+LW文档
    一、内容框架(一)主要内容论文按照项目的研究内容及技术路线,分为六章进行论述:第一章为概述,介绍了数据可视化和招聘网站的研究背景以及研究意义,分析了数据可视化和招聘技术的发展,对论文的研究内容做出阐述,最后简述了本文的章节安排。第二章为招聘数据可视化以及相关职位推荐系统使......
  • 基于python的旅游网站-计算机毕业设计源码+LW文档
    摘 要 随着时代的发展,人们对旅游也越来越重视,近些年来我国的旅游产业也发生了翻天覆地的变化,但是很多人在出去旅游的时候不知道去哪里旅游,在预订酒店和机票的时候也没有一个综合性的旅游网站,为了让人们的旅游变的更加的方便,为此我开发了本基于python的旅游网站本基于python的......
  • 无代码玩转web UI自动化
    可以直接进入官网下载开源版或点击右上角体验企业版体验 RunnerGoUI自动化平台RunnerGo提供从API管理到API性能再到可视化的API自动化、UI自动化测试功能模块,覆盖了整个产品测试周期。RunnerGoUI自动化基于Selenium浏览器自动化方案构建,内嵌高度可复用的测试脚本,测试团队......
  • celery包结构、celery延迟任务和定时任务、django中使用celery、接口缓存、双写一致性
    celery包结构project├──celery_task #celery包  这个包可以放在任意位置│├──__init__.py#包文件│├──celery.py#celery连接和配置相关文件,且名字必须叫celery.py│└──tasks.py#所有任务函数│├──add_task.p......
  • python调用企业微信发送消息
    #-*-coding:utf-8-*-importosfromrequestsimportrequestfromloguruimportloggerimportbase64importhashlibimportreclassWechatBot:"""企业微信机器人当前自定义机器人支持文本(text)、markdown(markdown)、图片(image)、图文(news),文件(file)五种消息类型。......
  • 在Python中range()的用法:
    在Python中,range()是一个内置函数,用于生成一个整数序列,通常用于循环遍历。以下是range()函数的一些常见用法:1.默认情况当你调用range()函数时,它会生成一个从0开始到给定数字(不包括该数字)的整数序列。foriinrange(5):print(i)#输出:0,1,2,3,42.指定开始和结......