我有一个第 3 方 cli 可执行文件,需要从 python 代码中调用。这些都是繁重的计算(CPU),我需要调用它大约 50-100 次。可执行文件本身在某种程度上是多线程的,但不是所有步骤,而且我有很多可用的核心。
这意味着我希望同时运行多个子进程,但不是全部。因此,我需要提交其中一些,然后跟踪其中一个完成后,启动一个新的以优化 cpu 使用率。
我有一个工作版本,但它非常幼稚,但它有效。它只是等待第一个提交的进程完成。但它依赖于数据,因此在某些时候,一个运行时间非常长的进程是其余进程中提交的“第一个”进程,并且它只会阻止任何其他进程提交,直到该进程完成为止。还有长顺序/IO?阶段,这个长时间运行的阶段可能会占用 1% 的 cpu 数小时,在此期间 cpu 大部分处于空闲状态。
因此我需要优化我的进程提交代码:
num_concurrent_processes = 6
for path in files:
# Building cmd omitted
rs_process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True,
bufsize=1, creationflags=0x00000008)
rs_processes.append(rs_process)
if len(rs_processes) >= num_concurrent_processes:
# wait until at least one process has completed before submitting the next one
continue_polling = True
while continue_polling:
for idx, proc in enumerate(rs_processes):
poll = proc.poll()
if poll is not None:
# process complete
stdout_data, stderr_data = proc.communicate()
# removed some logging in case of error
#remove completed process from list
del rs_processes[idx]
# exit polling loop as a new subprocess can be submitted
continue_polling = False
break
if continue_polling:
# Put some breaks on the polling
time.sleep(10)
如果已达到限制。然后我进行轮询,直到找到一个已完成的进程(根据 python 文档, 轮询不是 None )。我通过通信获取进程的输出,并对其执行一些操作(记录,未显示)。然后我从“跟踪列表”中删除该进程并退出轮询循环。
但是代码中存在某种逻辑缺陷,因为运行它时“陈旧”进程会累积。他们的支持率都为 0%,但不知何故,民意调查并不认为他们完成了任务。一旦出现
num_concurrent_processes
过时的进程,进度就会完全停止。这里出了什么问题?使用 poll() 是否意味着我现在需要手动终止该进程?使用旧方法,所有进程都运行良好并自行停止,我在新方法中使用相同的数据。
问题在于,当从
rs_processes
中删除进程时,正在修改迭代的列表。当删除元素时,列表的后续元素会向上移动,这可能会导致跳过进程或多次处理同一个进程。
此外,
communicate()
在进程仍在运行时阻塞。如果进程的输出缓冲区已满,则会发生这种情况,并且它正在等待有人读取其输出以继续运行。
这是改进的代码版本,它解决了这些问题:
import subprocess
import time
num_concurrent_processes = 6
rs_processes = []
for path in files:
# Building cmd omitted
rs_process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
creationflags=0x00000008
)
rs_processes.append(rs_process)
while len(rs_processes) >= num_concurrent_processes:
for proc in rs_processes[:]: # 迭代列表的副本
poll = proc.poll()
if poll is not None:
# 读取输出(非阻塞)
for line in proc.stdout:
print(line.strip())
# 处理完成
rs_processes.remove(proc)
time.sleep(10)
改进:
-
迭代列表副本:
我们现在迭代
rs_processes[:]
,它是列表的副本。这意味着我们可以安全地从原始列表中删除元素,而不会影响迭代。 -
非阻塞输出读取:
我们不再使用
communicate()
,因为它可能会阻塞。我们改为迭代proc.stdout
,这允许我们逐行读取输出,而不会阻塞。 - 简化的逻辑: 代码更清晰、更简洁。
其他建议:
-
考虑使用线程池或进程池来管理子进程。这可以简化代码并提高性能。Python 的
concurrent.futures
模块提供了一个高级接口来实现这一点。 - 如果正在处理大量数据,请考虑使用文件或管道将输出从子进程重定向到主进程。这可以帮助防止输出缓冲区填满并导致死锁。
希望这些改进可以解决的问题!
标签:python,popen From: 78777096