这是我的代码:
def enqueue_output(file, queue):
for line in iter(file.readline, ''):
queue.put(line)
file.close()
def read_popen_pipes(p):
with ThreadPoolExecutor(2) as pool:
q_stdout, q_stderr = Queue(), Queue()
pool.submit(enqueue_output, p.stdout, q_stdout)
pool.submit(enqueue_output, p.stderr, q_stderr)
while True:
if p.poll() is not None and q_stdout.empty() and q_stderr.empty():
break
out_line = err_line = ''
try:
out_line = q_stdout.get_nowait()
except Empty:
pass
try:
err_line = q_stderr.get_nowait()
except Empty:
pass
yield (out_line, err_line)
def handle_base():
try:
with subprocess.Popen(
['conda', 'run', '-n', 'information_security', 'python', '/data/lkl/front/snn_assess/resnet18.py'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
) as process:
for out_line, err_line in read_popen_pipes(process):
if(out_line != ''):
socketio.emit('task_output', {'output': out_line.strip()})
# Do stuff with each line, e.g.:
print(out_line, end='')
print(err_line, end='')
return process.poll()
我已经尝试了帖子中的每个热门解决方案: 逐行读取子进程标准输出
但它们都不起作用。
像:
for line in io.TextIOWrapper(process.stdout, encoding="utf-8"): # or another encoding
# line = process.stdout.readline()
if not line:
break
print(line, end='')
socketio.emit('task_output', {'output': line.strip()})
其他方法也没有不行。
似乎只有当子进程终止时,才能一次性收到所有输出,这不是预期的(逐行)。
我真的很困惑,想知道python的原理子流程,以及如何按时间线逐行获取输出。
遇到的问题在于,试图在一个循环中同时读取子进程的标准输出和标准错误,但这会导致阻塞,从而无法及时地将输出发送到前端。
以下是一种改进的解决方案,它使用异步 I/O 来避免阻塞:
import asyncio
import subprocess
async def read_stream(stream, queue):
"""异步读取流并将其放入队列中。"""
while True:
line = await stream.readline()
if not line:
break
await queue.put(line.decode())
async def handle_base():
try:
process = await asyncio.create_subprocess_exec(
'conda', 'run', '-n', 'information_security', 'python', '/data/lkl/front/snn_assess/resnet18.py',
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout_queue = asyncio.Queue()
stderr_queue = asyncio.Queue()
# 创建两个任务,分别读取标准输出和标准错误
asyncio.create_task(read_stream(process.stdout, stdout_queue))
asyncio.create_task(read_stream(process.stderr, stderr_queue))
while True:
# 使用 asyncio.wait 同时等待两个队列
done, pending = await asyncio.wait(
[stdout_queue.get(), stderr_queue.get()],
return_when=asyncio.FIRST_COMPLETED
)
# 处理已完成的任务
for task in done:
line = task.result()
stream = 'stdout' if task.get_coro().__name__ == 'read_stream' and task._coro.args[0] == process.stdout else 'stderr'
print(f"[{stream}] {line.strip()}")
socketio.emit('task_output', {'output': line.strip(), 'stream': stream})
# 如果子进程已结束,则退出循环
if process.returncode is not None:
break
return process.returncode
except Exception as e:
print(f"Error: {e}")
# 在事件循环中运行 handle_base 函数
asyncio.run(handle_base())
代码解释:
-
异步函数
: 使用
async def
定义异步函数,使用await
关键字等待异步操作完成。 - asyncio.create_subprocess_exec : 创建异步子进程,并获取标准输出和标准错误的流。
- asyncio.Queue : 使用异步队列来存储从流中读取的行。
- asyncio.create_task : 创建两个异步任务,分别读取标准输出和标准错误,并将读取到的内容放入相应的队列中。
-
asyncio.wait
: 使用
asyncio.wait
函数同时等待两个队列,并在其中任何一个队列中有数据时返回。 - 处理输出 : 从队列中获取数据,并将其发送到前端,同时区分标准输出和标准错误。
- 退出循环 : 当子进程结束时,退出循环。
优点 :
- 使用异步 I/O,避免阻塞,提高效率。
- 使用 asyncio.Queue 存储数据,保证数据的顺序。
- 同时读取标准输出和标准错误,并区分输出。
注意 :
-
需要安装
python-socketio
包。 - 需要根据实际情况修改 Socket.IO 事件名称和数据格式。
希望这段代码能够帮助解决问题,并让更好地理解 Python 的异步编程和子进程操作。
标签:python,flask,websocket,frontend,subprocess From: 78830475