首页 > 系统相关 >Tornado实现多线程/多进程的HTTP服务

Tornado实现多线程/多进程的HTTP服务

时间:2023-10-30 16:05:14浏览次数:28  
标签:__ HTTP Tornado self server tornado import logger 多线程

用tornado web服务的基本流程

原文链接 1.实现处理请求的Handler,该类继承自tornado.web.RequestHandler,实现用于请求的对应方法如:get,post等。返回内容用self.write方法输出。 **2.实例化一个Application。**构造函数的参数是一个Handler列表,通过正则表达式,将请求与Handler对应起来。通过dict将Handler需要的其他对象以参数的方式传递给Handler的initialize方法。 3.初始化一个tornado.httpserver.HTTPServer对象,构造函数的参数是上一步的Application对象。 4.为HRRPServer对象绑定一个端口。5.开始IOLoop。

可以用到的特性

由于tornado的亮点是异步请求,所以这里首先想到的是将所有请求都改造为异步的。但是这里遇到一个问题,就是异步函数内一定不能有阻塞调用出现,否则整个IOLoop都会被卡住。这就要求彻底地区改造服务,将所有IO或是用时较长的请求都改造为异步函数。这个工程量是非常大的,需要区修改已有的代码。因此,我们考虑用线程池的方式去实现。当一个线程阻塞在某个请求或IO时,其他线程或IOLoop会继续执行。 另外一个瓶颈就是GIL限制CPU的并发数量,因此考虑用紫禁城的方式增加进程数,提高服务能力上限。 综合分析,大致用以下方案: 1.通过子进程的方式复制多个进程,使子进程中的只读页指向同一个物理页。 2.线程池。回避异步改造的工作量,增加IO的并发量。

测试代码 首先测试线程池,测试用例为: 对sleep页面同时发出两个请求:

1.在线程池中运行的函数(这里是self.block_task)能够同时执行。表现为在控制台交替打印出数字。 2.两个get请求几乎同时返回,在浏览器上显示返回的内容。

线程池的测试代码如下:

import os
import sys 
import time

import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web
import tornado.gen
from tornado.concurrent import run_on_executor
from concurrent.futures import ThreadPoolExecutor
from tornado.options import define, options

class HasBlockTaskHandler(tornado.web.RequestHandler):
    executor = ThreadPoolExecutor(20)   #起线程池,由当前RequestHandler持有
    
    @tornado.gen.coroutine
    def get(self):
        strTime = time.strftime("%Y-%m-%d %H:%M:%S")
        print "in get before block_task %s" % strTime
        result = yield self.block_task(strTime)
        print "in get after block_task"
        self.write("%s" % (result))

    @run_on_executor
    def block_task(self, strTime):
        print "in block_task %s" % strTime
        for i in range(1, 16):
            time.sleep(1)
            print "step %d : %s" % (i, strTime)
        return "Finish %s" % strTime

if __name__ == "__main__":
    tornado.options.parse_command_line()
    app = tornado.web.Application(handlers=[(r"/sleep", HasBlockTaskHandler)], autoreload=False, debug=False)
    http_server = tornado.httpserver.HTTPServer(app)
    http_server.bind(8888)
    tornado.ioloop.IOLoop.instance().start()

整个代码里有几个位置值得关注:

  1. executor = ThreadPoolExecutor(20)。这是给Handler类初始化了一个线程池。其中concurrent.futures不属于tornado,是python的一个独立模块,在python3中是内置模块,python2.7需要自己安装。 2.修饰符@run_on_executor。这个修饰符将同步函数改造为在executor(这里是线程池)上运行的异步函数,内部实现是将被修饰的函数submit到executor,返回一个Future对象。 3.修饰符@tornado.gen.coroutine。被这个修饰符修饰的函数,是一个以同步函数方式编写的异步函数。原本通过callback方式编写的异步代码,有了这个修饰符,可以通过yield一个Future的方式来写。被修饰的函数在yield了一个Future对象后将会被挂起,Future对象的结果返回后继续执行。

运行代码后,在两个不同浏览器上访问sleep页面,得到了想要的效果。这里有一个小插曲,就是如果在同一浏览器的两个tab上进行测试,是无法看到想要的效果。第二个get请求会被block,直到第一个get请求返回,服务端才开始处理第二个get请求。这让我一度觉得多线程没有生效,用了半天时间查了很多资料,才看到是浏览器把相同的第二个请求block了,具体链接参考这里

由于tornado很方便地支持多进程模型,多进程的使用要简单很多,在以上例子中,只需要对启动部分稍作改动即可。具体代码如下所示:

if __name__ == "__main__":
    tornado.options.parse_command_line()
    app = tornado.web.Application(handlers=[(r"/sleep", HasBlockTaskHandler)], autoreload=False, debug=False)
    http_server = tornado.httpserver.HTTPServer(app)
    http_server.bind(8888)
    print tornado.ioloop.IOLoop.initialized()
    http_server.start(5)
    tornado.ioloop.IOLoop.instance().start()

需要注意的地方有两点:

1.app = tornado.web.Application(handlers=[(r"/sleep", HasBlockTaskHandler)], autoreload=False, debug=False),在生成Application对象时,要将autoreload和debug两个参数至为False。也就是需要保证在fork子进程之前IOLoop是未被初始化的。这个可以通过tornado.ioloop.IOLoop.initialized()函数来跟。 2.http_server.start(5)在启动IOLoop之前通过start函数设置进程数量,如果设置为0表示每个CPU都启动一个进程。

最后的效果是可以看到n+1个进程在运行,且公用同一个端口。

实际代码

大部分逻辑代码是封装好的,服务的代码如下:

import os
import sys
import json

import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.httpclient
import tornado.web
import tornado.gen
from tornado.concurrent import run_on_executor
from concurrent.futures import ThreadPoolExecutor
from tornado.options import define, options

import rela_baike_server
from rela_baike_server import RelaBaikeRequest, RelaBaikeResult, RelaBaikeServer

import logging
from logging.handlers import TimedRotatingFileHandler
logging.basicConfig()

import pdb

g_log_prefix = '../log/rela_baike_tornado.'

def getLogger(strPrefixBase):
    strPrefix = "%s%d" % (strPrefixBase, os.getpid())
    logger = logging.getLogger("RELA_BAIKE")
    logger.propagate = False
    handler = TimedRotatingFileHandler(strPrefix, 'H', 1)
    handler.suffix = "%Y%m%d_%H%M%S.log"
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    logger.setLevel(logging.INFO)
    return logger

def makeResponseBody(retCode, errReason, dicSummary):
    dicRes = {}
    dicRes['retCode'] = retCode
    if retCode != 0:
        dicRes['error'] = errReason
    else:
        dicRes['data'] = dicSummary
    return json.dumps(dicRes)

class RelaBaikeHandler(tornado.web.RequestHandler):
    executor = ThreadPoolExecutor(50)
    def initialize(self, relaServer, logger):
        self.__serverRelaBaike = relaServer
        self.__logger = logger

    @tornado.gen.coroutine
    def get(self):
        lstSummary = []
        retCode = 0
        errReason = ""
        try:
            utfQuery = self.get_argument('query').encode('utf8').strip()
        except:
            errorReason = 'Query encoding not utf-8.'
            strRes = makeResponseBody(-1, errorReason, lstSummary)
            self.write(strRes)
            return
        if utfQuery == "":
            strRes = makeResponseBody(0, '', lstSummary)
            self.write(strRes)
            return

        error, errReason, lstSummary = yield self.getRelaBaike(utfQuery)
        strRes = makeResponseBody(error, errReason, lstSummary)
        self.write(strRes)

    def __logResponse(self, utfQuery, relaResult):
        succ = relaResult.isSuccess()
        if succ:
            self.__logger.info("%s\tSucc\t%s" % (utfQuery, "|".join([str(item[0]) for item in relaResult])))
        else:
            self.__logger.info("%s\tError:%d" % (utfQuery, relaResult.getError()))

    @run_on_executor
    def getRelaBaike(self, utfQuery):
        error = 0
        lstSummary = []
        relaBaikeRequest = RelaBaikeRequest(content=utfQuery)
        relaBaikeResult = self.__serverRelaBaike.getRelaBaike(relaBaikeRequest)
        self.__logResponse(utfQuery, relaBaikeResult)
        if relaBaikeResult.isSuccess():
            for item in relaBaikeResult:
                baikeid = item[0]
                try:
                    dicSummary = json.loads(item[1])
                except:
                    return -2, 'summary format error' ,lstSummary
                lstSummary.append(dicSummary)
        else:
            return relaBaikeResult.getError(), rela_baike_server.g_dic_error.get(relaBaikeResult.getError(), 'other error') ,lstSumm
ary
        return 0, 'success',lstSummary

def start():
    port = int(sys.argv[1])

    serverRelaBaike = rela_baike_server.getRelaBaikeServer()
    logger = getLogger(g_log_prefix)

    app = tornado.web.Application(handlers=[(r"/rela_baike", RelaBaikeHandler,  dict(relaServer=serverRelaBaike, logger=logger))])
    http_server = tornado.httpserver.HTTPServer(app)
    http_server.bind(port)
    http_server.start(2)
    tornado.ioloop.IOLoop.instance().start()

if __name__ == "__main__":
    start()

代码所涉及的特性基本上不超过前面的测试例子,除了下两几点:

1.在*Handler类里增加了一个def initialize(self, relaServer, logger)函数。这是为了把一些初始化好的对象传到Handler类里。 2.app = tornado.web.Application(handlers=[(r"/rela_baike", RelaBaikeHandler, dict(relaServer=serverRelaBaike, logger=logger))])。前面handler的initialize函数参数,对应于Application初始化时,每个handler对应的dict。

标签:__,HTTP,Tornado,self,server,tornado,import,logger,多线程
From: https://blog.51cto.com/u_16105058/8089568

相关文章

  • python爬虫知识体系80页md笔记,0基础到scrapy项目高手,第(2)篇:http协议复习精讲
    本文主要学习一下关于爬虫的相关前置知识和一些理论性的知识,通过本文我们能够知道什么是爬虫,都有那些分类,爬虫能干什么等,同时还会站在爬虫的角度复习一下http协议。完整体系笔记直接地址:请移步这里共8章,37子模块,总计5.6w+字今天这一篇主讲:爬虫基础本阶段本文主要学......
  • XMLHttpRequest拦截请求和响应
    环境:angular实现:拦截请求向请求信息增加字段           拦截响应过滤返回值响应拦截:根据angular使用的XMLHttpRequest将对原本的请求转移到另一个将监听返回事件挂载到另一个世纪发送请求的xml上使用getset将客户端获取的responseText和response按照自己的意......
  • 流畅的Flurl.Http[转]
    流畅的Flurl.Http https://flurl.dev/docs/testable-http/注意:除了URL构建和解析之外的所有内容都需要安装Flurl.Http而不是基本的Flurl包。考虑与HTTP服务交互的一种非常常见的方式是“我想构建一个URL,然后调用它”。Flurl.Http允许您非常简洁地表达:usingFlurl;u......
  • 多线程读取多个摄像头并把画面显示到Tkinter 的 label上面
    importcv2importthreadingimporttkinterastkfromPILimportImage,ImageTkclassCameraThread(threading.Thread):def__init__(self,camera_index,label):threading.Thread.__init__(self)self.camera_index=camera_indexs......
  • 解决使用 OkHttp 库出现 java.lang.NoSuchMethodError: okhttp3.internal.platform.Pl
    报错:Exceptioninthread"main"java.lang.NoSuchMethodError:okhttp3.internal.platform.Platform.log(ILjava/lang/String;Ljava/lang/Throwable;)Vatokhttp3.logging.HttpLoggingInterceptor$Logger.lambda$static$0(HttpLoggingInterceptor.java:112)......
  • C++多线程编程——线程的基本概念和使用方法
    什么是线程?在计算机科学中,线程是进程中的一个执行控制单元,也被称为执行路径。每个进程可以包含多个线程,每条线程并行执行不同的任务。线程是操作系统可识别的最小执行和调度单位。进程和线程的区别进程是程序在某个数据集合上的一次运行活动,也是操作系统进行资源分配和保护的......
  • java实现多线程下载器
    前言:......
  • Failed to start The nginx HTTP and reverse proxy server.
    本章教程主要分享一下,当nginx启动时,遇到报这个错误时的一个解决思路。 目录 1、观察报错信息 2、尝试性解决 1、观察报错信息根据日志的信息,我们至少可以知道2个比较关键的信息。1、操作用户执行命令是在非root权限下进行操作的。2、Addressalreadyinuse这个很明显:意思就......
  • 【Asp.net】Asp.net core中IIS配置注意事项一、提示:关于IIS上运行ASP.NET Core 站点的
    1、应用地址池设为无托管代码一、提示:关于IIS上运行ASP.NETCore站点的“HTTP500.19”错误安装dotnet-hosting-3.1.2-win.exeASP.NETCore3.1Runtime(v3.1.2)下载地址:https://download.visualstudio.microsoft.com/download/pr/dd119832-dc46-4ccf-bc12-69e7bfa61b18/990843c6......
  • [Java]Java初学之多线程05--Lock锁
    Intro除了synchronized关键字,从JDK5.0开始,Java提供了更强大的线程同步机制--通过显式定义同步锁对象来实现同步。同步锁使用Lock对象充当。本文简单讲一下Lock锁的概念以及简单应用。正文其实理解了synchronized关键字后,Lock锁的理解就会变得简单起来。Lock锁实际上是使用了j......