假设我有一组 20 个 CPU 繁重的任务(每个大约 1 小时,但有些需要更长的时间),这些任务是通过例如
Pool.apply_async(function, task_list)
等调用函数来运行的。 PC有12个核心。这样我就可以分散负载,并使用所有 12 个核心。
每个任务的结果可能需要运行一个新任务(某些任务可能需要 1 次新运行,其他任务可能需要 10 次)。
当需要新任务,我想将该任务生成到现有池中
task_list
,以始终充分优化 CPU 使用率。
当前我运行 20 个任务,等待完成,启动新的 ~18 个任务,等待完成,开始剩余的新任务,......,虽然一小时内只有 1 个核心被使用,而不是 12 个。这总计会损失几个小时到几天的计算时间。我可以在同一个工作线程中运行更新的任务,但这会导致更大的损失)
使用池,似乎不可能在启动时向池中添加更多任务。这是正确的吗?或者有一些我在到处搜索时错过的聪明方法吗?
(我看到的唯一选项是使用
process
而不是
pool
,并在 a 上进行 while 循环动态列表,将典型任务作为单个进程启动,同时只允许最多 12 个进程同时运行,并且对于每个任务或新任务,将它们放入动态列表中,并在发送任务时将其删除一个过程。)
你提到的问题是动态任务并行化的常见情况。你完全正确,使用
multiprocessing.Pool
在启动后无法向池中添加任务。
幸运的是,你不需要求助于手动管理单个进程。
concurrent.futures
模块(以及 Python 3.2 开始的
multiprocessing.Pool
中的后端)提供了一种更优雅且高效的解决方案:
ProcessPoolExecutor
和
submit
方法。
submit
方法允许你将任务提交到池,并返回一个
Future
对象,该对象表示异步执行的结果。你可以使用
Future.done()
方法检查任务是否完成,并使用
Future.result()
方法获取结果。
以下是如何使用它来实现你想要的目标:
from concurrent.futures import ProcessPoolExecutor, as_completed
import time
def task_function(task):
# 模拟任务执行
time.sleep(1)
# 根据任务结果,生成 0 到 3 个新任务
new_tasks = [i for i in range(task % 4)]
return new_tasks
def main():
num_workers = 12 # 根据你的 CPU 核心数设置
initial_tasks = [i for i in range(20)]
with ProcessPoolExecutor(max_workers=num_workers) as executor:
# 提交初始任务
futures = [executor.submit(task_function, task) for task in initial_tasks]
# 处理完成的任务并提交新任务
while futures:
completed_futures = as_completed(futures)
for future in completed_futures:
# 从完成的 Futures 列表中移除已完成的 Future
futures.remove(future)
try:
new_tasks = future.result()
# 添加新任务到执行队列
for new_task in new_tasks:
new_future = executor.submit(task_function, new_task)
futures.append(new_future)
except Exception as e:
# 处理任务执行过程中可能出现的异常
print(f"Task raised an exception: {e}")
print("All tasks completed.")
if __name__ == "__main__":
main()
这段代码做了以下事情:
-
创建
ProcessPoolExecutor
: 使用num_workers
参数指定要使用的worker进程数,并将其设置为你的 CPU 核心数以获得最佳性能。 -
提交初始任务:
使用
executor.submit()
方法将初始任务提交到池。 -
处理完成的任务:
使用
concurrent.futures.as_completed()
函数迭代已完成的任务。这将返回一个迭代器,该迭代器在有 Future 完成时产生 Future。 -
获取结果并提交新任务:
对于每个完成的任务,使用
future.result()
获取结果。根据结果生成新任务,并使用executor.submit()
方法将它们提交到池。 -
循环直到所有任务完成:
代码使用一个
while
循环持续运行,直到所有 Future 都完成。
这种方法允许你动态地向池中添加任务,并确保所有 CPU 核心都得到充分利用。
其他建议:
-
考虑使用
chunksize
参数将大型任务列表拆分成更小的块。这可以减少进程间通信的开销。 -
如果你需要对任务执行顺序进行更精细的控制,请查看
concurrent.futures.wait()
和concurrent.futures.FIRST_COMPLETED
等函数。 - 对于更复杂的用例,你可能需要考虑使用专门的第三方库,例如 Dask 或 Celery。