我是 Python 和协程的新手,我正在尝试利用 Python 的 asyncio 库来并行处理阻塞函数。我正在使用 python 3.8.6。我有一个阻塞函数,它从数组输入中接收不同的输入,我需要每个输入的阻塞函数同时运行。我已经尝试过,但它们似乎仍然按顺序运行:
async def main():
tasks = [asyncio.create_task(blocking_function(input)) for input in inputs]
result = await asyncio.gather(*tasks)
print(result)
你理解了使用
asyncio.create_task
和
asyncio.gather
并行运行任务的基本思想。但是,
asyncio
只在异步操作之间进行协作多任务处理。如果你的
blocking_function
包含同步阻塞代码,它将阻塞事件循环,从而阻止其他任务运行。
为了实现真正的并行,你需要将阻塞操作移到线程池或单独的进程中。这里有两个选择:
1. 使用
loop.run_in_executor
和线程池:
这允许你在一个单独的线程中运行阻塞函数,将事件循环释放出来处理其他任务。
```python import asyncio from concurrent.futures import ThreadPoolExecutor
async def main(): loop = asyncio.get_running_loop() with ThreadPoolExecutor() as pool: tasks = [loop.run_in_executor(pool, blocking_function, input) for input in inputs] result = await asyncio.gather(*tasks) print(result)
def blocking_function(input): # 模拟阻塞操作 time.sleep(1) return input * 2
asyncio.run(main()) ```
-
我们获取当前正在运行的事件循环 (
asyncio.get_running_loop()
). -
我们创建一个
ThreadPoolExecutor
来管理线程。 -
我们使用
loop.run_in_executor
在线程池中运行blocking_function
。这返回一个asyncio.Future
对象,我们可以像任何其他异步任务一样等待它。
2. 使用
asyncio.to_thread
(Python 3.9+):
Python 3.9 引入了一个更方便的函数
asyncio.to_thread
,它简化了将函数卸载到线程的过程。
```python import asyncio
async def main(): tasks = [asyncio.to_thread(blocking_function, input) for input in inputs] result = await asyncio.gather(*tasks) print(result)
def blocking_function(input): # 模拟阻塞操作 time.sleep(1) return input * 2
asyncio.run(main()) ```
-
asyncio.to_thread
在一个单独的线程中运行blocking_function
并返回一个asyncio.Future
。
选择哪种方法取决于你的 Python 版本以及你对线程池的控制程度。对于大多数情况,
asyncio.to_thread
更简单直接。如果需要更精细的控制线程池,可以使用
loop.run_in_executor
。
请记住,使用线程或进程会增加程序的复杂性。确保你理解并行编程的潜在问题,例如竞争条件,并在必要时使用适当的同步机制。
标签:python,multithreading,python-asyncio,coroutine,concurrent-processing From: 78849969