首页 > 其他分享 >tornado服务端+tornado.ioloop.PeriodicCallback定时任务踩坑记录及解决方案

tornado服务端+tornado.ioloop.PeriodicCallback定时任务踩坑记录及解决方案

时间:2023-05-02 19:01:02浏览次数:48  
标签:ioloop PeriodicCallback tornado self db batch process model

背景:用tornado部署一个AI模型的服务端,由于AI模型较慢,收到请求肯定没办法同步返回结果,所以最后定的方案是批处理并异步回调。
异步回调下,我这边的处理方式是:实时接收所有请求并多线程落库(使用数据库连接池),再启动一个定时任务取出库中(未处理过的)数据进行批处理。
因为web框架用的是tornado,所以最开始的实现方式就是使用tornado的定时任务PeriodicCallback,代码如下(该方案有问题:PeriodicCallback任务启动时,调用AI模型处理数据会阻塞ioloop,导致服务端无法接收新的请求):

import tornado.ioloop
import tornado.web
from concurrent.futures import ThreadPoolExecutor
import requests
from pymysqlpool import ConnectionPool
import multiprocessing
import os
import re
import whisper

class Database: """ MySQL 连接池 """ _instance = None def __init__(self, host, port, username, password, db_name, pool_size=5): self.host = host self.port = port self.username = username self.password = password self.db_name = db_name self.pool = ConnectionPool( pool_name="mypool", host=self.host, port=self.port, user=self.username, password=self.password, db=self.db_name, max_pool_size=pool_size, ) def execute_fetch(self, query, args=None): results = [] with self.pool.cursor() as cursor: cursor.execute(query, args) for request in cursor: results.append(request) return results def execute_insert_or_update(self, query, args=None): with self.pool.cursor() as cursor: result = cursor.execute(query, args) return result class TranscribeHandler(tornado.web.RequestHandler): executor = ThreadPoolExecutor(max_workers=20) def initialize(self, database): self.db = database async def post(self): body = tornado.escape.json_decode(self.request.body) aaa= body.get("aaa") bbb = body.get("bbb") ccc= body.get("ccc") try: with ThreadPoolExecutor(max_workers=10) as executor: insert_request_future = tornado.ioloop.IOLoop.current().run_in_executor(executor, self.insert_request, aaa, bbb, ccc) insert_results_future = tornado.ioloop.IOLoop.current().run_in_executor(executor, self.insert_results, aaa) await insert_request_future await insert_results_future self.write({"error_code": 0, "message": "ok"}) except Exception as e: self.write({"error_code": 1, "message": str(e)}) raise e def insert_request(self, aaa, bbb, ccc): # 在每个线程中创建一个数据库连接,执行操作,然后关闭连接 db = self.db db.execute_insert_or_update( "INSERT INTO requests (aaa, bbb, ccc, status) VALUES (%s, %s, %s, %s)", (aaa, bbb, ccc, 'unprocessed') ) def insert_results(self, aaa): db = self.db db.execute_insert_or_update( "INSERT INTO results (aaa) VALUES (%s)", (aaa,)) def batch_process(model, db, batch_size):
#此处为具体批处理代码,略过


if __name__ == "__main__": # Initialize the Whisper model and MySQL connection print("loading model```") model = whisper.load_model("large") model.to('cuda:2') print("loading model done```") db_pool = Database(host='xxxxxx', port=3306, username='xxxx', password='xxx', db_name="xxx", pool_size=20) a # Create a Torndo application and HTTP server app = tornado.web.Application([ (r"/transcribe", TranscribeHandler, dict(database=db_pool)), ]) server = tornado.httpserver.HTTPServer(app) # Schedule a periodic task to process requests in batches scheduler = tornado.ioloop.PeriodicCallback( lambda: batch_process(model, db_pool, batch_size=10), 100000) scheduler.start() print("start batch_process job```") # Start the server server.listen(8000) tornado.ioloop.IOLoop.current().start()

 

询问chatGPT:

 我在执行上述代码tornado.ioloop.PeriodicCallback对象在定期调用batch_process函数时,Tornado就不能接收传入的HTTP请求了,应该怎么解决PeriodicCallback阻塞I/O循环并导致请求超时或无法响应的问题?

答:

方案一:在Tornado中,当处理长时间的计算任务时,应该使用协程来避免I/O阻塞。可以使用Tornado的gen.coroutine装饰器将batch_process函数转化为协程。并使用await关键字等待函数执行完毕。另外,还可以将batch_process函数放在一个线程池中执行,以避免阻塞I/O循环。

方案二:可以使用asyncio库来异步调用batch_process函数,避免阻塞I/O循环。

方案三:您可以将 batch_process 函数转换为协程,然后使用 asyncio.gather 并发运行 batch_process 函数和 Tornado 的事件循环,以避免 Tornado 无法处理新的 HTTP 请求的问题。

方案四:您可以将batch_process函数改成一个协程,并用asyncio.create_task来调用它,这样就可以将它异步地运行。同时,您可以将periodic_task函数变成一个协程,然后在启动服务器前运行asyncio.gather将HTTP服务器和periodic_task都注册到事件循环中,这样两者就可以并发运行了。 方案五:在 Tornado 中,PeriodicCallback 是一个周期性执行的回调函数。在这里,你的周期性任务是在 I/O 循环中执行,这可能会导致阻塞,从而导致无法响应 HTTP 请求。为了解决这个问题,可以将周期性任务设置为使用 asyncio 的事件循环,并使用 asyncio.create_task() 运行批量处理任务。这样,Tornado 的 I/O 循环将不会被阻塞。 ...
问了N次,各种解决方案都试了,主要是batch_process改成协程,但无论如何还是会阻塞ioloop...遂放弃使用PeriodicCallback!   想起之前另一个项目用到过的apscheduler库来执行定时任务,来问问chatGPT怎么修改。
from apscheduler.schedulers.blocking import BlockingScheduler


if __name__ == "__main__":
# Initialize the Whisper model and MySQL connection
    print("loading model```")
    model = whisper.load_model("large")
    model.to('cuda:2')
    print("loading model done```")
    db_pool = Database(host='xxx',
                       port=3306,
                       username='xxx',
                       password='xxx',
                       db_name="xxx",
                       pool_size=20)

    # Create a Tornado application and HTTP server
    app = tornado.web.Application([
        (r"/transcribe", TranscribeHandler, dict(database=db_pool)),
    ])
    server = tornado.httpserver.HTTPServer(app)

    # Schedule a periodic task to process requests in batches
    print("start batch_process job```")
    # Start the server
    server.listen(8000)
    tornado.ioloop.IOLoop.current().start()
    bscheduler = BlockingScheduler()
    # bscheduler.add_job(push_seeds, 'cron', kwargs={'spider_name': 'YqSpider'}, minute='0', hour='*/2', name='tasks')
    bscheduler.add_job(batch_process(model, db_pool, batch_size=10), 'cron', minute='*', hour='*', name='task1')
以上代码有问题吗?

有问题。在bscheduler.add_job函数中,第一个参数应该是一个函数名,而不是函数的返回值。因此,您需要将batch_process函数定义为一个单独的函数,然后将其传递给add_job函数。此外,如果您想让定期任务在后台运行并不阻塞HTTP请求处理,您需要在server.listen之前启动定期任务。

以下是一个示例代码,可以帮助您实现这一点:

import asyncio
import tornado.httpserver
import tornado.ioloop
import tornado.web
from database import Database
import whisper
from transcribe_handler import TranscribeHandler
from apscheduler.schedulers.background import BackgroundScheduler

def batch_process(model, db, batch_size):
    max_fail_count = 3  # 最大重试次数
    frequests = db.execute_fetch(
        "SELECT request_id, url, footage_type FROM requests WHERE status='unprocessed' ORDER BY id ASC LIMIT %s",
        batch_size)
    print("start batch_process, unprocessed:", len(frequests))

if __name__ == "__main__":
    multiprocessing.set_start_method("spawn")
    # Initialize the Whisper model and MySQL connection
    print("loading model```")
    model = whisper.load_model("large")
    model.to('cuda:2')
    print("loading model done```")
    db_pool = Database(host='xxx',
                       port=3306,
                       username='xxx',
                       password='xxx',
                       db_name="xxx",
                       pool_size=20)

    # Create a Tornado application and HTTP server
    app = tornado.web.Application([
        (r"/transcribe", TranscribeHandler, dict(database=db_pool)),
    ])
    server = tornado.httpserver.HTTPServer(app)

    # Start the server
    server.listen(8000)

    # Schedule a periodic task to process requests in batches
    bscheduler = BackgroundScheduler()
    bscheduler.add_job(batch_process, 'cron', args=[model, db_pool, 10], minute='*', hour='*', name='task1')
    bscheduler.start()

    # Start the IOLoop
    tornado.ioloop.IOLoop.current().start()

测试了一下,搞定!

标签:ioloop,PeriodicCallback,tornado,self,db,batch,process,model
From: https://www.cnblogs.com/jimmy996/p/17346513.html

相关文章

  • Django&Tornado&Flask比较
    1.DjangoDjango概述Django太重,除了web框架,自带ORM和模板引擎,灵活和自由度不够高。Django能开发小应用,但总会有“杀鸡焉用牛刀”的感觉。Django的自带ORM非常优秀,综合评价略高于SQLAlchemyDjango自带的模板引擎简单好用,但其强大程度和综合评价略低于Jinja。Django自带ORM也使D......
  • 第四篇:白话tornado源码之褪去模板外衣的前戏
    原笔记博客链接:https://www.cnblogs.com/wupeiqi/p/4592637.html 执行字符串表示的函数,并为该函数提供全局变量本篇的内容从题目中就可以看出来,就是为之后剖析tornado模板做准备,也是由于该知识点使用的巧妙,所有就单独用一篇来介绍了。废话不多说,直接上代码:#!u......
  • [护网杯 2018]easy_tornado_wp(finish)
    buuctfeasy_tornado解题思路打开题目链接可以看到web里有3条链接分别是分析/flag.txt里的可以看到我们想要的存在”flagin/fllllllllllllag“fllllllllllllag这......
  • 解决Python Tornado的某个页面不需要进行xsrf的检查
    错误信息:‘_xsrf’argumentmissingfrompost我们总是对全站开启xsrf的功能,但是有时候想对单个页面不希望启用该功能的验证,那么我们可以进行重写​​check_xsrf_cookie()......
  • tornado异步非阻塞测试
    测试不同的异步实现方式:协程+第三方库;线程池测试工具最近使用tornado6.1版本进行服务开发,毕竟纸上得来终觉浅,于是针对tornado异步非阻塞的功能进行了详细的验证和测试,并......
  • tornado原理介绍及异步非阻塞实现方式
    tornado原理介绍及异步非阻塞实现方式以下内容根据自己实操和理解进行的整理,欢迎交流~在tornado的开发中,我们一般会见到以下四个组成部分。ioloop:同一个ioloop实例运......
  • TornadoFx的TableView组件使用
    原文:TornadoFx的TableView组件使用-Stars-One的杂货小窝最近慢慢地接触了JavaFx中的TableView的使用,记下笔记总结使用1.基本使用TornadoFx中对tableView进行了封......
  • BUU 刷题之 护网杯 2018]easy_tornado 1
    今天打靶场第一次碰到这个题,就想记录下。虽然做题的过程基本都是照着WP做的。。。。。。首先我们进入这个题目,上面有三个文件链接,指出了flag的位置以及加密方式。还有rend......
  • tornado 4.2.1 移植指南(openEuler 20.03 LTS SP1)
    tornado4.2.1移植指南(openEuler20.03LTSSP1)介绍简要介绍Tornado是一个PythonWeb框架和异步网络库,最初由FriendFeed开发。通过使用非阻塞网络I/O,Tornado可以扩展......
  • 异步框架tornado下使用pyppeteer将动态html转化为pdf
    项目背景:云上服务器存储html,前端通过传递给后端html_url,由后端服务器获取html文件进行渲染,生成pdf,然后将pdf上传云上服务器。 使用的框架/库:tornado/pyppeteer/......