我有以下代码:
import time
from fastapi import FastAPI, Request
app = FastAPI()
@app.get("/ping")
async def ping(request: Request):
print("Hello")
time.sleep(5)
print("bye")
return {"ping": "pong!"}
如果我在本地主机上运行我的代码 - 例如,
http://localhost:8501/ping
- 在同一浏览器窗口的不同选项卡中,我得到:
Hello
bye
Hello
bye
而不是:
Hello
Hello
bye
bye
我已阅读有关使用|的内容||,但我仍然无法实现真正的并行化。有什么问题?
httpx
, but still, I cannot have a true parallelization. What's the problem?
问题在于,尽管你使用了
async
和
await
,但
time.sleep(5)
仍然是一个阻塞操作。在 FastAPI 中,
async
仅仅意味着函数可以被挂起,以便在等待 I/O 操作(例如网络请求)完成时,其他请求可以被处理。然而,
time.sleep(5)
会阻塞事件循环,阻止其他请求被处理,直到睡眠时间结束。
为了实现真正的并行化,你需要使用非阻塞操作来模拟长时间的任务。以下是一些选项:
1. 使用
asyncio.sleep()
:
asyncio.sleep()
是
time.sleep()
的异步版本,它不会阻塞事件循环。
import asyncio
from fastapi import FastAPI, Request
app = FastAPI()
@app.get("/ping")
async def ping(request: Request):
print("Hello")
await asyncio.sleep(5) # 使用 asyncio.sleep()
print("bye")
return {"ping": "pong!"}
2. 使用后台任务:
如果你需要执行 CPU 密集型任务,可以将它们卸载到后台任务中,以避免阻塞主线程。FastAPI 提供了
BackgroundTasks
类来实现这一点。
from fastapi import FastAPI, Request, BackgroundTasks
import time
app = FastAPI()
def background_task():
print("Hello")
time.sleep(5)
print("bye")
@app.get("/ping")
async def ping(request: Request, background_tasks: BackgroundTasks):
background_tasks.add_task(background_task) # 在后台运行任务
return {"ping": "pong!"}
3. 使用线程或进程池:
对于 CPU 密集型任务,你还可以使用线程池或进程池来实现并行化。Python 的
concurrent.futures
模块提供了一些工具来管理线程和进程池。
from fastapi import FastAPI, Request
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=2) # 创建一个线程池
app = FastAPI()
def blocking_task():
print("Hello")
time.sleep(5)
print("bye")
@app.get("/ping")
async def ping(request: Request):
loop = asyncio.get_event_loop()
await loop.run_in_executor(executor, blocking_task) # 在线程池中运行任务
return {"ping": "pong!"}
请记住,选择哪种方法取决于你的具体需求和任务类型。如果你的任务是 I/O 密集型的,那么使用
asyncio.sleep()
就足够了。如果你的任务是 CPU 密集型的,那么你可能需要使用后台任务、线程池或进程池。