首页 > 编程问答 >通过套接字将子进程输出按时间线逐行发送到前端失败

通过套接字将子进程输出按时间线逐行发送到前端失败

时间:2024-08-04 15:11:55浏览次数:9  
标签:python flask websocket frontend subprocess

这是我的代码:

def enqueue_output(file, queue):
    for line in iter(file.readline, ''):
        queue.put(line)
    file.close()


def read_popen_pipes(p):

    with ThreadPoolExecutor(2) as pool:
        q_stdout, q_stderr = Queue(), Queue()

        pool.submit(enqueue_output, p.stdout, q_stdout)
        pool.submit(enqueue_output, p.stderr, q_stderr)

        while True:

            if p.poll() is not None and q_stdout.empty() and q_stderr.empty():
                break

            out_line = err_line = ''

            try:
                out_line = q_stdout.get_nowait()
            except Empty:
                pass
            try:
                err_line = q_stderr.get_nowait()
            except Empty:
                pass

            yield (out_line, err_line)

def handle_base():
    try:
        with subprocess.Popen(
            ['conda', 'run', '-n', 'information_security', 'python', '/data/lkl/front/snn_assess/resnet18.py'],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE, 
            text=True
        ) as process:

            for out_line, err_line in read_popen_pipes(process):
                if(out_line != ''):
                    socketio.emit('task_output', {'output': out_line.strip()})
                # Do stuff with each line, e.g.:
                print(out_line, end='')
                print(err_line, end='')

            return process.poll()

我已经尝试了帖子中的每个热门解决方案: 逐行读取子进程标准输出

但它们都不起作用。

像:

for line in io.TextIOWrapper(process.stdout, encoding="utf-8"):  # or another encoding
            # line = process.stdout.readline()
            if not line:
                break
            print(line, end='')
            socketio.emit('task_output', {'output': line.strip()})

其他方法也没有不行。

似乎只有当子进程终止时,才能一次性收到所有输出,这不是预期的(逐行)。

我真的很困惑,想知道python的原理子流程,以及如何按时间线逐行获取输出。


遇到的问题在于,试图在一个循环中同时读取子进程的标准输出和标准错误,但这会导致阻塞,从而无法及时地将输出发送到前端。

以下是一种改进的解决方案,它使用异步 I/O 来避免阻塞:

import asyncio
import subprocess

async def read_stream(stream, queue):
    """异步读取流并将其放入队列中。"""
    while True:
        line = await stream.readline()
        if not line:
            break
        await queue.put(line.decode())

async def handle_base():
    try:
        process = await asyncio.create_subprocess_exec(
            'conda', 'run', '-n', 'information_security', 'python', '/data/lkl/front/snn_assess/resnet18.py',
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )

        stdout_queue = asyncio.Queue()
        stderr_queue = asyncio.Queue()

        # 创建两个任务,分别读取标准输出和标准错误
        asyncio.create_task(read_stream(process.stdout, stdout_queue))
        asyncio.create_task(read_stream(process.stderr, stderr_queue))

        while True:
            # 使用 asyncio.wait 同时等待两个队列
            done, pending = await asyncio.wait(
                [stdout_queue.get(), stderr_queue.get()],
                return_when=asyncio.FIRST_COMPLETED
            )

            # 处理已完成的任务
            for task in done:
                line = task.result()
                stream = 'stdout' if task.get_coro().__name__ == 'read_stream' and task._coro.args[0] == process.stdout else 'stderr'
                print(f"[{stream}] {line.strip()}")
                socketio.emit('task_output', {'output': line.strip(), 'stream': stream})

            # 如果子进程已结束,则退出循环
            if process.returncode is not None:
                break

        return process.returncode

    except Exception as e:
        print(f"Error: {e}")

# 在事件循环中运行 handle_base 函数
asyncio.run(handle_base())

代码解释:

  1. 异步函数 : 使用 async def 定义异步函数,使用 await 关键字等待异步操作完成。
  2. asyncio.create_subprocess_exec : 创建异步子进程,并获取标准输出和标准错误的流。
  3. asyncio.Queue : 使用异步队列来存储从流中读取的行。
  4. asyncio.create_task : 创建两个异步任务,分别读取标准输出和标准错误,并将读取到的内容放入相应的队列中。
  5. asyncio.wait : 使用 asyncio.wait 函数同时等待两个队列,并在其中任何一个队列中有数据时返回。
  6. 处理输出 : 从队列中获取数据,并将其发送到前端,同时区分标准输出和标准错误。
  7. 退出循环 : 当子进程结束时,退出循环。

优点 :

  • 使用异步 I/O,避免阻塞,提高效率。
  • 使用 asyncio.Queue 存储数据,保证数据的顺序。
  • 同时读取标准输出和标准错误,并区分输出。

注意 :

  • 需要安装 python-socketio 包。
  • 需要根据实际情况修改 Socket.IO 事件名称和数据格式。

希望这段代码能够帮助解决问题,并让更好地理解 Python 的异步编程和子进程操作。

标签:python,flask,websocket,frontend,subprocess
From: 78830475

相关文章

  • Flask 用户身份验证包括登录、注册和谷歌登录未集成在一起
    https://www.freecodecamp.org/news/how-to-setup-user-authentication-in-flask/我已经使用上面的链接进行了基本的用户身份验证,但我无法添加谷歌登录。我已经尝试了很多次,但我无法添加谷歌登录。任何人都可以帮我做这个吗?问题是有src_init_.py和man......
  • 如何在python中使用xarray打开grib2文件?
    将xarray导入为xr导入cfgrib导入生态码将pandas导入为pddata=xr.open_dataset(r"C:\Users\new\forecast_data.grib2",engine="cfgrib")这是我的代码。我只想使用xarray读取这个文件。错误是:无法识别的引擎cfgrib必须是以下之一:['netcdf4'、'scipy'、'......
  • 如何在 java 或 python 中使用 HTTP(S) 解决无法解析的主机名或无法识别的名称错误?
    我尝试以编程方式访问网站的信息,但在Java和Python上都无法解析主机名。如果我指定IP地址,则会将错误更改为TLSV1_UNRECOGNIZED_NAME。不过,这个网站无需任何额外的工作就可以通过任何浏览器解决。我在这里浏览了很多潜在的解决方案,但对于Python,它说这个问题应该在2.7......
  • Python 请求 POST 请求与 websockets 库一起使用时挂起
    我使用Python中的requests库发送POST请求,同时维护与websockets库的WebSocket连接:importasyncioimportrequestsimportwebsocketsasyncdefwebsocket_handler(uri):asyncwithwebsockets.connect(uri)aswebsocket:whileTrue:me......
  • 在Python中,list1[::] = list2的空间复杂度是多少?
    此代码首先迭代列表nums,更新整数0、1、2(也分别称为红色、白色和蓝色)的计数。nums保证只有整数0、1和/或2。找到计数后,代码使用[::],这是一种就地修改列表的技巧,以排序numsdefsortColors(self,nums:List[int])->None:re......
  • [附开题]flask框架高校资产管理系统d8y3s(源码+论文+python)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容研究背景随着高等教育事业的快速发展,高校资产规模日益庞大,种类繁多,管理难度显著增加。传统的资产管理方式往往依赖于手工记录和纸质档案,不仅效率低......
  • [附开题]flask框架贺州图特产管理系统uuy79(源码+论文+python)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容研究背景贺州,这座历史悠久、文化底蕴深厚的城市,以其丰富的自然资源和独特的地理位置孕育了众多令人瞩目的特产。然而,在信息化快速发展的今天,贺州特......
  • [附开题]flask框架红枫超市会员管理系统ew5iq(源码+论文+python)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容研究背景随着零售行业的快速发展与消费者需求的日益多样化,超市作为人们日常生活中不可或缺的一部分,其管理效率和服务质量直接影响着顾客的购物体验......
  • PYTHON专题-(4)python叫你搞对象
    什么是面向过程编程?面向过程的程序设计把计算机程序视为一系列的命令集合,即一组函数的顺序执行。为了简化程序设计,面向过程把函数继续切分为子函数,即把大块函数通过切割成小块函数来降低系统的复杂度。什么是面向对象编程?面向对象编程——ObjectOrientedProgramming,简......
  • Python 基础教学:中文编码处理
    《Python基础教学:中文编码处理》在编程中,处理中文字符时经常会遇到编码问题。Python3默认使用UTF-8编码,但在处理文件、网络数据或与旧系统交互时,可能需要处理GBK、GB2312等其他编码。1.字符串的编码和解码在Python中,字符串(str)默认是Unicode编码。当你需要将......