背景:用tornado部署一个AI模型的服务端,由于AI模型较慢,收到请求肯定没办法同步返回结果,所以最后定的方案是批处理并异步回调。
异步回调下,我这边的处理方式是:实时接收所有请求并多线程落库(使用数据库连接池),再启动一个定时任务取出库中(未处理过的)数据进行批处理。
因为web框架用的是tornado,所以最开始的实现方式就是使用tornado的定时任务PeriodicCallback,代码如下(该方案有问题:PeriodicCallback任务启动时,调用AI模型处理数据会阻塞ioloop,导致服务端无法接收新的请求):
import tornado.ioloop import tornado.web from concurrent.futures import ThreadPoolExecutor import requests from pymysqlpool import ConnectionPool import multiprocessing import os import re
import whisper
class Database: """ MySQL 连接池 """ _instance = None def __init__(self, host, port, username, password, db_name, pool_size=5): self.host = host self.port = port self.username = username self.password = password self.db_name = db_name self.pool = ConnectionPool( pool_name="mypool", host=self.host, port=self.port, user=self.username, password=self.password, db=self.db_name, max_pool_size=pool_size, ) def execute_fetch(self, query, args=None): results = [] with self.pool.cursor() as cursor: cursor.execute(query, args) for request in cursor: results.append(request) return results def execute_insert_or_update(self, query, args=None): with self.pool.cursor() as cursor: result = cursor.execute(query, args) return result class TranscribeHandler(tornado.web.RequestHandler): executor = ThreadPoolExecutor(max_workers=20) def initialize(self, database): self.db = database async def post(self): body = tornado.escape.json_decode(self.request.body) aaa= body.get("aaa") bbb = body.get("bbb") ccc= body.get("ccc") try: with ThreadPoolExecutor(max_workers=10) as executor: insert_request_future = tornado.ioloop.IOLoop.current().run_in_executor(executor, self.insert_request, aaa, bbb, ccc) insert_results_future = tornado.ioloop.IOLoop.current().run_in_executor(executor, self.insert_results, aaa) await insert_request_future await insert_results_future self.write({"error_code": 0, "message": "ok"}) except Exception as e: self.write({"error_code": 1, "message": str(e)}) raise e def insert_request(self, aaa, bbb, ccc): # 在每个线程中创建一个数据库连接,执行操作,然后关闭连接 db = self.db db.execute_insert_or_update( "INSERT INTO requests (aaa, bbb, ccc, status) VALUES (%s, %s, %s, %s)", (aaa, bbb, ccc, 'unprocessed') ) def insert_results(self, aaa): db = self.db db.execute_insert_or_update( "INSERT INTO results (aaa) VALUES (%s)", (aaa,)) def batch_process(model, db, batch_size):
#此处为具体批处理代码,略过
if __name__ == "__main__": # Initialize the Whisper model and MySQL connection print("loading model```") model = whisper.load_model("large") model.to('cuda:2') print("loading model done```") db_pool = Database(host='xxxxxx', port=3306, username='xxxx', password='xxx', db_name="xxx", pool_size=20) a # Create a Torndo application and HTTP server app = tornado.web.Application([ (r"/transcribe", TranscribeHandler, dict(database=db_pool)), ]) server = tornado.httpserver.HTTPServer(app) # Schedule a periodic task to process requests in batches scheduler = tornado.ioloop.PeriodicCallback( lambda: batch_process(model, db_pool, batch_size=10), 100000) scheduler.start() print("start batch_process job```") # Start the server server.listen(8000) tornado.ioloop.IOLoop.current().start()
询问chatGPT:
我在执行上述代码tornado.ioloop.PeriodicCallback对象在定期调用batch_process函数时,Tornado就不能接收传入的HTTP请求了,应该怎么解决PeriodicCallback阻塞I/O循环并导致请求超时或无法响应的问题?
问了N次,各种解决方案都试了,主要是batch_process改成协程,但无论如何还是会阻塞ioloop...遂放弃使用PeriodicCallback! 想起之前另一个项目用到过的apscheduler库来执行定时任务,来问问chatGPT怎么修改。答:
方案一:在Tornado中,当处理长时间的计算任务时,应该使用协程来避免I/O阻塞。可以使用Tornado的
gen.coroutine
装饰器将batch_process
函数转化为协程。并使用await
关键字等待函数执行完毕。另外,还可以将batch_process
函数放在一个线程池中执行,以避免阻塞I/O循环。方案二:可以使用
asyncio
库来异步调用batch_process
函数,避免阻塞I/O循环。方案三:您可以将
方案四:您可以将batch_process
函数转换为协程,然后使用asyncio.gather
并发运行batch_process
函数和 Tornado 的事件循环,以避免 Tornado 无法处理新的 HTTP 请求的问题。batch_process
函数改成一个协程,并用asyncio.create_task
来调用它,这样就可以将它异步地运行。同时,您可以将periodic_task
函数变成一个协程,然后在启动服务器前运行asyncio.gather
将HTTP服务器和periodic_task
都注册到事件循环中,这样两者就可以并发运行了。 方案五:在 Tornado 中,PeriodicCallback 是一个周期性执行的回调函数。在这里,你的周期性任务是在 I/O 循环中执行,这可能会导致阻塞,从而导致无法响应 HTTP 请求。为了解决这个问题,可以将周期性任务设置为使用 asyncio 的事件循环,并使用 asyncio.create_task() 运行批量处理任务。这样,Tornado 的 I/O 循环将不会被阻塞。 ...
from apscheduler.schedulers.blocking import BlockingScheduler if __name__ == "__main__": # Initialize the Whisper model and MySQL connection print("loading model```") model = whisper.load_model("large") model.to('cuda:2') print("loading model done```") db_pool = Database(host='xxx', port=3306, username='xxx', password='xxx', db_name="xxx", pool_size=20) # Create a Tornado application and HTTP server app = tornado.web.Application([ (r"/transcribe", TranscribeHandler, dict(database=db_pool)), ]) server = tornado.httpserver.HTTPServer(app) # Schedule a periodic task to process requests in batches print("start batch_process job```") # Start the server server.listen(8000) tornado.ioloop.IOLoop.current().start() bscheduler = BlockingScheduler() # bscheduler.add_job(push_seeds, 'cron', kwargs={'spider_name': 'YqSpider'}, minute='0', hour='*/2', name='tasks') bscheduler.add_job(batch_process(model, db_pool, batch_size=10), 'cron', minute='*', hour='*', name='task1') 以上代码有问题吗?
有问题。在
bscheduler.add_job
函数中,第一个参数应该是一个函数名,而不是函数的返回值。因此,您需要将batch_process
函数定义为一个单独的函数,然后将其传递给add_job
函数。此外,如果您想让定期任务在后台运行并不阻塞HTTP请求处理,您需要在server.listen
之前启动定期任务。以下是一个示例代码,可以帮助您实现这一点:
import asyncio import tornado.httpserver import tornado.ioloop import tornado.web from database import Database import whisper from transcribe_handler import TranscribeHandler from apscheduler.schedulers.background import BackgroundScheduler def batch_process(model, db, batch_size): max_fail_count = 3 # 最大重试次数 frequests = db.execute_fetch( "SELECT request_id, url, footage_type FROM requests WHERE status='unprocessed' ORDER BY id ASC LIMIT %s", batch_size) print("start batch_process, unprocessed:", len(frequests)) if __name__ == "__main__": multiprocessing.set_start_method("spawn") # Initialize the Whisper model and MySQL connection print("loading model```") model = whisper.load_model("large") model.to('cuda:2') print("loading model done```") db_pool = Database(host='xxx', port=3306, username='xxx', password='xxx', db_name="xxx", pool_size=20) # Create a Tornado application and HTTP server app = tornado.web.Application([ (r"/transcribe", TranscribeHandler, dict(database=db_pool)), ]) server = tornado.httpserver.HTTPServer(app) # Start the server server.listen(8000) # Schedule a periodic task to process requests in batches bscheduler = BackgroundScheduler() bscheduler.add_job(batch_process, 'cron', args=[model, db_pool, 10], minute='*', hour='*', name='task1') bscheduler.start() # Start the IOLoop tornado.ioloop.IOLoop.current().start()
测试了一下,搞定!
标签:ioloop,PeriodicCallback,tornado,self,db,batch,process,model From: https://www.cnblogs.com/jimmy996/p/17346513.html