首页 > 编程问答 >如何根据工作结果向正在运行的多处理池添加额外的任务?

如何根据工作结果向正在运行的多处理池添加额外的任务?

时间:2024-07-26 06:20:49浏览次数:4  
标签:python multiprocessing python-multiprocessing

假设我有一组 20 个 CPU 繁重的任务(每个大约 1 小时,但有些需要更长的时间),这些任务是通过例如 Pool.apply_async(function, task_list) 等调用函数来运行的。 PC有12个核心。这样我就可以分散负载,并使用所有 12 个核心。

每个任务的结果可能需要运行一个新任务(某些任务可能需要 1 次新运行,其他任务可能需要 10 次)。

当需要新任务,我想将该任务生成到现有池中 task_list ,以始终充分优化 CPU 使用率。

当前我运行 20 个任务,等待完成,启动新的 ~18 个任务,等待完成,开始剩余的新任务,......,虽然一小时内只有 1 个核心被使用,而不是 12 个。这总计会损失几个小时到几天的计算时间。我可以在同一个工作线程中运行更新的任务,但这会导致更大的损失)

使用池,似乎不可能在启动时向池中添加更多任务。这是正确的吗?或者有一些我在到处搜索时错过的聪明方法吗?

(我看到的唯一选项是使用 process 而不是 pool ,并在 a 上进行 while 循环动态列表,将典型任务作为单个进程启动,同时只允许最多 12 个进程同时运行,并且对于每个任务或新任务,将它们放入动态列表中,并在发送任务时将其删除一个过程。)


你提到的问题是动态任务并行化的常见情况。你完全正确,使用 multiprocessing.Pool 在启动后无法向池中添加任务。

幸运的是,你不需要求助于手动管理单个进程。 concurrent.futures 模块(以及 Python 3.2 开始的 multiprocessing.Pool 中的后端)提供了一种更优雅且高效的解决方案: ProcessPoolExecutor submit 方法。

submit 方法允许你将任务提交到池,并返回一个 Future 对象,该对象表示异步执行的结果。你可以使用 Future.done() 方法检查任务是否完成,并使用 Future.result() 方法获取结果。

以下是如何使用它来实现你想要的目标:

from concurrent.futures import ProcessPoolExecutor, as_completed
import time

def task_function(task):
    # 模拟任务执行
    time.sleep(1)
    # 根据任务结果,生成 0 到 3 个新任务
    new_tasks = [i for i in range(task % 4)]
    return new_tasks

def main():
    num_workers = 12  # 根据你的 CPU 核心数设置
    initial_tasks = [i for i in range(20)]

    with ProcessPoolExecutor(max_workers=num_workers) as executor:
        # 提交初始任务
        futures = [executor.submit(task_function, task) for task in initial_tasks]

        # 处理完成的任务并提交新任务
        while futures:
            completed_futures = as_completed(futures)
            for future in completed_futures:
                # 从完成的 Futures 列表中移除已完成的 Future
                futures.remove(future)
                try:
                    new_tasks = future.result()
                    # 添加新任务到执行队列
                    for new_task in new_tasks:
                        new_future = executor.submit(task_function, new_task)
                        futures.append(new_future)
                except Exception as e:
                    # 处理任务执行过程中可能出现的异常
                    print(f"Task raised an exception: {e}")

    print("All tasks completed.")

if __name__ == "__main__":
    main()

这段代码做了以下事情:

  1. 创建 ProcessPoolExecutor 使用 num_workers 参数指定要使用的worker进程数,并将其设置为你的 CPU 核心数以获得最佳性能。
  2. 提交初始任务: 使用 executor.submit() 方法将初始任务提交到池。
  3. 处理完成的任务: 使用 concurrent.futures.as_completed() 函数迭代已完成的任务。这将返回一个迭代器,该迭代器在有 Future 完成时产生 Future。
  4. 获取结果并提交新任务: 对于每个完成的任务,使用 future.result() 获取结果。根据结果生成新任务,并使用 executor.submit() 方法将它们提交到池。
  5. 循环直到所有任务完成: 代码使用一个 while 循环持续运行,直到所有 Future 都完成。

这种方法允许你动态地向池中添加任务,并确保所有 CPU 核心都得到充分利用。

其他建议:

  • 考虑使用 chunksize 参数将大型任务列表拆分成更小的块。这可以减少进程间通信的开销。
  • 如果你需要对任务执行顺序进行更精细的控制,请查看 concurrent.futures.wait() concurrent.futures.FIRST_COMPLETED 等函数。
  • 对于更复杂的用例,你可能需要考虑使用专门的第三方库,例如 Dask 或 Celery。

标签:python,multiprocessing,python-multiprocessing
From: 54351828

相关文章

  • 84 我正在使用 Python 开发 selenium 自动化项目。我收到错误 .NoSuchElementExceptio
    场景是这样的,我将打开一个网页,在使用selenium单击该网页后,它会要求位置访问权限,屏幕上会出现一堆按钮,我正在尝试定位其中一个按钮,但即使正确给出了Xpath地址,我得到.NoSuchElementException:错误能够单击目标按钮你正在使用Selenium在Python中开发自动化项目,并遇到......
  • 在Python 3中删除两个指定字符串之间的字符串
    我正在从事一个NLP项目,该项目要求我从一段文本中删除计算机代码。代码包含在标签<pre><code>和</code></pre>之间。现在我可以做一个简单的正则表达式匹配,但我想概括这个函数,以便它可以删除任何两个指定字符串之间的文本,即使它们是嵌套的。例如,如果我有一个......
  • Azure Open AI - Python 和 Java API 之间 gpt4o 的结果截然不同
    我使用Java和PythonAPI对AzureOpenAI进行相同的调用,但收到截然不同的结果:相同的系统提示相同的用户提示适用于Java和Python的azureai包的相同(最新)版本尽管输入的用户和系统提示完全相同,但响应却非常不同-python提示是“正确的”并......
  • leetcode 输出错误? (Python)
    我的VSCode/本地终端给出了[1,4,1,5,1,6]的正确输出,但不知何故leetcode给了我完全不同的输出。我在这里错过了什么吗?这怎么可能?顺便说一下,这是wigglesort2将我的本地代码复制粘贴到leetcode中给出了不同的输出数组很难在没有看到你的代码的情况下......
  • 当 python 窗口的一部分不在屏幕上时,如何让它自己被记录?
    在Windows10中,大多数应用程序窗口都可以使用OBS等程序进行记录。当窗口被拖动以致其部分内容在显示屏上不可见时,通常OBS仍会接收窗口的内容,即使它在屏幕上不可见。但是,在编写python应用程序时,这似乎不以相同的方式工作。我尝试了几种不同的类似GUI的模块......
  • 使用 aws cdk 设置用户池客户端属性以具有读/写访问权限 - Python
    我试图根据属性给予一些自定义属性特定的读/写访问权限。我收到此错误。资源处理程序返回消息:“无效写入创建客户端时指定的属性(服务:CognitoIdentityProvider,状态代码:400,请求ID:<request_id>)”(RequestToken:<request_token>,HandlerErrorCode:InvalidRequest)任何人都可以为......
  • 试图找出此页面的逻辑:存储了大约 ++ 100 个结果 - 并使用 Python 和 BS4 进行了解析
    试图找出此页面背后的逻辑:我们已将一些结果存储在以下数据库中:https://www.raiffeisen.ch/rch/de/ueber-uns/raiffeisen-gruppe/Organization/raiffeisenbanken/deutsche-schweiz.html#accordionitem_18104049731620873397从a到z大约:120个结果或更多:......
  • 如何在 Numpy Python 中将 4 维数组的下三角形复制到上三角形?
    目标是将下三角形复制到上三角形。根据OP中提出的建议,起草了以下代码。importnumpyasnplw_up_pair=np.tril_indices(4,-1)arr=np.zeros((4,4,1,1))arr[1,:1,:,0]=1arr[2,:2,0,0]=2arr[3,:3,0,0]=3arr=arr+arr.T-np.diag(np.diag(arr))但是,它......
  • 如何在 Python 中对多行使用单个 INSERT INTO 语句?
    我目前正在开发一个DiscordPython机器人,我在其中循环遍历ForumTags列表,并为每个对象生成INSERTINTOSQL语句以将数据插入MySQL数据库。但是,我想要通过将所有这些单独的INSERTINTO语句组合到单个查询中来优化我的代码,如下所示:INSERTINTO......
  • 双 for 循环的 Pythonic 方式
    我有以下代码:importnumpyasnpepsilon=np.array([[0.,0.00172667,0.00071437,0.00091779,0.00154501],[0.00128983,0.,0.00028139,0.00215905,0.00094862],[0.00035811,0.00018714,0.,0.00029365,0.00036993......