首页 > 其他分享 >asyncio的subprocess使用

asyncio的subprocess使用

时间:2023-08-03 16:11:24浏览次数:75  
标签:__ stdout cmd subprocess stderr 使用 asyncio

1.asyncio的subprocess

asyncio提供了两个开箱即用的协程函数来创建子进程,这些协程函数都返回一个Process实例
  1.asyncio.create_subprocess_exec(),用于直接运行命令(如ls、pwd、who、python3、go等)。
  2.asyncio.create_subprocess_shell(),用于通过 shell 运行命令。
  
create_subprocess_exec相比create_subprocess_shell更安全,应用程序有责任确保所有空格和特殊字符都被适当地引用,以避免 shell injection 漏洞。这个 shlex.quote() 函数可用于正确转义将用于构造shell命令的字符串中的空白字符和特殊shell字符。

2.使用create_subprocess_exec

2-1.FileNotFoundError报错

create_subprocess_exec会把第一个参数当做命令(如bash、python、go这些),后面更命令的参数

2-1-1.报错原因

# 1.源码参数
async def create_subprocess_exec(
  program, 
  *args, 
  stdin=None, 
  stdout=None,
  stderr=None, loop=None,
  limit=streams._DEFAULT_LIMIT, **kwds
)

# 2.调用时入参
asyncio.create_subprocess_exec('ls -l', 
                               stdout=asyncio.subprocess.PIPE, 
                               stdin=asyncio.subprocess.PIPE,
                               stderr=asyncio.subprocess.PIPE)

# 3.相当于program 指向了 'ls -l'命令
# 4.系统中只有ls、pwd等这些命令,所以'ls -l'这个命令会报错说找不到这个命令文件(FileNotFoundError)
# 5.正确的解法应该是asyncio.create_subprocess_exec('ls', '-l', xxx),把命令和参数分开。

2-1-2.案列

import shlex
import asyncio

async def run_asyncio_subprocess_exec(cmd):
    proc = await asyncio.create_subprocess_exec(cmd, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE)
    stdout, stderr = await proc.communicate()
    # 获取返回值
    print(proc.returncode)
    # 输出内容
    print(f'stdout: {repr(stdout.decode())}')
    # 输出错误信息
    print(f'stderr: {repr(stderr.decode())}')

if __name__ == "__main__":
    cmd = "ls -l"
    asyncio.run(run_asyncio_subprocess_exec(cmd))
FileNotFoundError: [Errno 2] No such file or directory: 'ls -l'

2-2.单个命令

import shlex
import asyncio

async def run_asyncio_subprocess_exec(cmd):
    proc = await asyncio.create_subprocess_exec(cmd, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE)
    stdout, stderr = await proc.communicate()
    # 获取返回值
    print(proc.returncode)
    # 输出内容
    print(f'stdout: {repr(stdout.decode())}')
    # 输出错误信息
    print(f'stderr: {repr(stderr.decode())}')

if __name__ == "__main__":
    cmd = "ls"
    asyncio.run(run_asyncio_subprocess_exec(cmd))
0
stdout: '1.html\nPie.html\n'
stderr: ''

2-3.命令+参数

通过*shlex.split(cmd)解决(类似于create_subprocess_exec('ls', '-l'))

import shlex
import asyncio

async def run_asyncio_subprocess_exec(cmd):
  	# 解决方法
    proc = await asyncio.create_subprocess_exec(*shlex.split(cmd), stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE)
    stdout, stderr = await proc.communicate()
    # 获取返回值
    print(proc.returncode)
    # 输出内容
    print(f'stdout: {repr(stdout.decode())}')
    # 输出错误信息
    print(f'stderr: {repr(stderr.decode())}')

if __name__ == "__main__":
    cmd = "ls -l"
    asyncio.run(run_asyncio_subprocess_exec(cmd))
0
stdout: 'total 30272\n-rw-r--r--@  1 lxd670  staff  3704145  7 25  2022 1.html\n-rw-r--r--   1 lxd670  staff  3690539  7 26  2022 Pie.html\n'
stderr: ''

2-4.wait和communicate

2-4-1.使用wait获取内容

等待子进程终结。设置并返回 returncode属性。

当使用 stdout=PIPEstderr=PIPE 并且子进程产生了足以阻塞 OS 管道缓冲区等待接收更多的数据的输出时,此方法会发生死锁。 当使用管道时请使用 communicate() 方法来避免这种情况。

import shlex
import asyncio

async def run_asyncio_subprocess_exec(cmd):
    proc = await asyncio.create_subprocess_exec(cmd, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE)
   
    # 获取输出内容
    stdout = await proc.stdout.read()
    stderr = await proc.stderr.read()
    
    # 等待子进程终结。设置并返回returncode属性。
    returncode = await proc.wait()
    print(returncode)
    print(stdout)
    print(stderr)

if __name__ == "__main__":
    cmd = "ls"
    asyncio.run(run_asyncio_subprocess_exec(cmd))
0
b'1.html\nPie.html\n'
b''

2-4-2.使用communicate获取内容

与进程交互:

  1. 发送数据到 stdin (如果 input 不为 None);
  2. stdoutstderr 读取数据,直至到达 EOF;
  3. 等待进程终结。

可选的 input 参数为将被发送到子进程的数据 (bytes 对象)。

返回一个元组 (stdout_data, stderr_data)

import shlex
import asyncio

async def run_asyncio_subprocess_exec(cmd):
    proc = await asyncio.create_subprocess_exec(cmd, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE)
   	# 使用communicate就可以获取输出内容
    stdout, stderr = await proc.communicate()
    # 获取返回值
    print(proc.returncode)
    # 输出内容
    print(f'stdout: {repr(stdout.decode())}')
    # 输出错误信息
    print(f'stderr: {repr(stderr.decode())}')

if __name__ == "__main__":
    cmd = "ls"
    asyncio.run(run_asyncio_subprocess_exec(cmd))
0
stdout: '1.html\nPie.html\n'
stderr: ''

命令执行失败

import shlex
import asyncio

async def run_asyncio_subprocess_exec(cmd):
    proc = await asyncio.create_subprocess_exec(*shlex.split(cmd), stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE)
    stdout, stderr = await proc.communicate()
    # 获取返回值
    print(proc.returncode)
    # 输出内容
    print(f'stdout: {repr(stdout.decode())}')
    # 输出错误信息
    print(f'stderr: {repr(stderr.decode())}')

if __name__ == "__main__":
    cmd = "ls -l /asdad"
    asyncio.run(run_asyncio_subprocess_exec(cmd))
1
stdout: ''
stderr: 'ls: /asdad: No such file or directory\n'

3.使用create_subprocess_shell

3-1.初步使用

使用communicate来获取信息

import shlex
import asyncio

async def run_asyncio_subprocess_shell(cmd):
    proc = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE)
    # 使用communicate接收返回值
    stdout, stderr = await proc.communicate()
    # 获取返回值状态
    print(proc.returncode)
    # 输出内容
    print(f'stdout: {repr(stdout.decode())}')
    # 输出错误信息
    print(f'stderr: {repr(stderr.decode())}')

if __name__ == "__main__":
    cmd = "cd $HOME && pwd"
    asyncio.run(run_asyncio_subprocess_shell(cmd))
0
stdout: '/Users/lxd670\n'
stderr: ''

3-2.命令执行失败


import shlex
import asyncio

async def run_asyncio_subprocess_shell(cmd):
    proc = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE)
    # 使用communicate接收返回值
    stdout, stderr = await proc.communicate(input="yes\nno".encode('utf-8'))
    
    # 获取返回值
    print(proc.returncode)
    # 输出内容
    print(f'stdout: {repr(stdout.decode())}')
    # 输出错误信息
    print(f'stderr: {repr(stderr.decode())}')

if __name__ == "__main__":
    cmd = "cd /Users/lxd670/ccc && lsx"
    asyncio.run(run_asyncio_subprocess_shell(cmd))
127
stdout: ''
stderr: '/bin/sh: lsx: command not found\n'

4.执行操作异步

在async中使用subprocess.Popen执行会是同步效果

4-1.被操作的文件test_cmd_1.py

import time
for i in range(2):
    res = input(f"请输入内容[{i}]:\n")
    print(f"你选择了[{i}]: {res}")
    time.sleep(1)
print("完成选择")

4-2.使用create_subprocess_shell

import time
import shlex
import asyncio
from functools import wraps

def totle_time(func):
    @wraps(func)
    async def wrapper(*args, **kwargs):
        start = time.time()
        res = await func(*args, **kwargs)
        print(f'RUN TIME: {time.time() - start}')
        return res
    return wrapper

async def run_asyncio_subprocess_shell(cmd):
    proc = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE)
    stdout, stderr = await proc.communicate(input="yes\nno".encode('utf-8'))
    print(proc.returncode)
    print(f'stdout: {repr(stdout.decode())}')
    print(f'stderr: {repr(stderr.decode())}')

@totle_time
async def run():
    cmd = "python3 test_cmd_1.py"
    task = []
    # 循环10个
    for i in range(10):
        t = asyncio.create_task(run_asyncio_subprocess_shell(cmd=cmd))
        task.append(t)
    await asyncio.gather(*task)

if __name__ == "__main__":
    asyncio.run(run())
0
stdout: '请输入内容[0]:\n你选择了[0]: yes\n请输入内容[1]:\n你选择了[1]: no\n完成选择\n'
stderr: ''
...
...
RUN TIME: 2.062711000442505

4-3.使用create_subprocess_exec

import time
import shlex
import asyncio
from functools import wraps

def totle_time(func):
    @wraps(func)
    async def wrapper(*args, **kwargs):
        start = time.time()
        res = await func(*args, **kwargs)
        print(f'RUN TIME: {time.time() - start}')
        return res
    return wrapper

async def run_asyncio_subprocess_exec(cmd):
    proc = await asyncio.create_subprocess_exec(*shlex.split(cmd), stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE)
    stdout, stderr = await proc.communicate(input="yes\nno".encode('utf-8'))
    print(proc.returncode)
    print(f'stdout: {repr(stdout.decode())}')
    print(f'stderr: {repr(stderr.decode())}')

@totle_time
async def run():
    cmd = "python3 test_cmd_1.py"
    task = []
    # 循环10个
    for i in range(10):
        t = asyncio.create_task(run_asyncio_subprocess_exec(cmd=cmd))
        task.append(t)
    await asyncio.gather(*task)

if __name__ == "__main__":
    asyncio.run(run())
0
stdout: '请输入内容[0]:\n你选择了[0]: yes\n请输入内容[1]:\n你选择了[1]: no\n完成选择\n'
stderr: ''
...
...
RUN TIME: 2.0569019317626953

4-4.使用subprocess(同步)

如果参数shell设为true,程序将通过shell来执行,且args输入应当是字符串形式,同shell窗口执行的命令 如果不设置,默认为false,则输入的args应当是字符串列表

import time
import shlex
import asyncio
import subprocess
from functools import wraps

def totle_time(func):
    @wraps(func)
    async def wrapper(*args, **kwargs):
        start = time.time()
        res = await func(*args, **kwargs)
        print(f'RUN TIME: {time.time() - start}')
        return res
    return wrapper

async def run_subprocess_shell(cmd):
    p = subprocess.Popen(cmd, text=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
    stdout, stderr = p.communicate(input="yes\nno")
    print(p.returncode)
    print(repr(stdout))
    print(repr(stderr))

@totle_time
async def run():
    cmd = "python3 test_cmd_1.py"
    task = []
    for i in range(2):
        t = asyncio.create_task(run_subprocess_shell(cmd=cmd))
        task.append(t)
    await asyncio.gather(*task)

if __name__ == "__main__":
    asyncio.run(run())
0
'请输入内容[0]:\n你选择了[0]: yes\n请输入内容[1]:\n你选择了[1]: no\n完成选择\n'
''
0
'请输入内容[0]:\n你选择了[0]: yes\n请输入内容[1]:\n你选择了[1]: no\n完成选择\n'
''
RUN TIME: 4.050457000732422

5.使用同步锁

5-1.使用create_subprocess_shell

import time
import shlex
import asyncio
from functools import wraps

def totle_time(func):
    @wraps(func)
    async def wrapper(*args, **kwargs):
        start = time.time()
        res = await func(*args, **kwargs)
        print(f'RUN TIME: {time.time() - start}')
        return res
    return wrapper

async def run_asyncio_subprocess_shell(cmd, lock):
    async with lock:
        proc = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE)
        stdout, stderr = await proc.communicate(input="yes\nno".encode('utf-8'))
    print(proc.returncode)
    print(f'stdout: {repr(stdout.decode())}')
    print(f'stderr: {repr(stderr.decode())}')
    
@totle_time
async def run():
  	# 使用lock锁
    lock = asyncio.Lock()
    cmd = "python3 test_cmd_1.py"
    task = []
    for i in range(2):
        t = asyncio.create_task(run_asyncio_subprocess_shell(cmd=cmd, lock=lock))
        task.append(t)
    await asyncio.gather(*task)

if __name__ == "__main__":
    asyncio.run(run())
0
stdout: '请输入内容[0]:\n你选择了[0]: yes\n请输入内容[1]:\n你选择了[1]: no\n完成选择\n'
stderr: ''
0
stdout: '请输入内容[0]:\n你选择了[0]: yes\n请输入内容[1]:\n你选择了[1]: no\n完成选择\n'
stderr: ''
RUN TIME: 4.0933310985565186

5-2.使用run_asyncio_subprocess_exec

import time
import shlex
import asyncio
from functools import wraps

def totle_time(func):
    @wraps(func)
    async def wrapper(*args, **kwargs):
        start = time.time()
        res = await func(*args, **kwargs)
        print(f'RUN TIME: {time.time() - start}')
        return res
    return wrapper

async def run_asyncio_subprocess_exec(cmd, lock):
    async with lock:
        proc = await asyncio.create_subprocess_exec(*shlex.split(cmd), stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE)
        stdout, stderr = await proc.communicate(input="yes\nno".encode('utf-8'))
    print(proc.returncode)
    print(f'stdout: {repr(stdout.decode())}')
    print(f'stderr: {repr(stderr.decode())}')

@totle_time
async def run():
    # 使用Lock锁
    lock = asyncio.Lock()
    cmd = "python3 test_cmd_1.py"
    task = []
    for i in range(2):
        t = asyncio.create_task(run_asyncio_subprocess_exec(cmd=cmd, lock=lock))
        task.append(t)
    await asyncio.gather(*task)

if __name__ == "__main__":
    asyncio.run(run())
0
stdout: '请输入内容[0]:\n你选择了[0]: yes\n请输入内容[1]:\n你选择了[1]: no\n完成选择\n'
stderr: ''
0
stdout: '请输入内容[0]:\n你选择了[0]: yes\n请输入内容[1]:\n你选择了[1]: no\n完成选择\n'
stderr: ''
RUN TIME: 4.0468339920043945

标签:__,stdout,cmd,subprocess,stderr,使用,asyncio
From: https://www.cnblogs.com/lxd670/p/17603616.html

相关文章

  • 【亲测】解决使用super_gradients库预测图片无法获取预测结果图片
    问题在使用super_gradients库中的Yolo-nas预测图片时,想要获取预测好的图片,但执行out=model.predict("camera01.png",conf=0.6,batch_size=None)之后只能out.show()和out.save(),无法返回预测结果图片。解决方法importcv2importtorchfromsuper_gradients.trainingimportmo......
  • 使用 OpenTelemetry 监控 Spring Boot
    通过将OpenTelemetry与SpringBoot集成,您可以从应用程序捕获分布式跟踪和其他遥测数据,从而提供有关其在分布式环境中的性能和行为的宝贵见解。什么是开放遥测?OpenTelemetry定义了用于收集遥测数据(例如指标、跟踪和日志)的API和协议,并为流行的编程语言和技术提供了各种库、代理......
  • abp使用动态api客户端注意事项
    步骤按照官方的来就行API/DynamicCSharpAPIClients|DocumentationCenter|ABP.IO但有一点要注意,这也是官方文档没提及的,比如你在application这一层调用另一个项目的api客户端则要在application层的module里加上依赖,这个容易忘记。[DependsOn(typeof(Bank......
  • pycharm使用技巧汇总
    pycharm使用技巧汇总0、安装pycharm略。1、指定虚拟环境 2、创建代码块pycharm自定义代码片段https://www.cnblogs.com/andy9468/p/8988501.htmlpycharm中python模板代码自动生成https://www.cnblogs.com/andy9468/p/8302701.html 3、远程调试远程调试忽略上传文件......
  • minio python sdk使用
    如下fromminioimportMiniofromminio.errorimportS3Errorimportlogginglogging.basicConfig(filename='logs/myProgramLog.log',level=logging.INFO,format='%(asctime)s-%(levelname)s-%(message)s')classBucket......
  • uniapp中使用微信支付
     超简单wx.requestPayment({ timeStamp:zhifu.timeStamp,//需要的参数,由后端返回 nonceStr:zhifu.nonceStr,//需要的参数,由后端返回 package:zhifu.prepayId,//需要的参数,由后端返回 signType:zhifu.signType,//需要的参数,由后端返回 paySign:zh......
  • nsenter的使用
    nsenter的使用$nsenter--help用法:nsenter[选项][<程序>[<参数>...]]以其他程序的名字空间运行某个程序。选项:-a,--allenterallnamespaces-t,--target<pid>要获取名字空间的目标进程-m,--mount[=<文件>]进入mount名字空间-u,......
  • 免费IP代理使用效果实测
    经常在网上冲浪的人,对代理ip一定不陌生,而在选择代理ip的时候,“免费”一定是最吸引人的关键词,毕竟能免费谁想花钱啊?在今天的文章中,我们要一起来浅析免费IP代理的使用效果,帮助大家更好地了解和选择最合适的代理方案。我们将分析免费IP代理的优势和限制,并提供一些实际的例子和操作......
  • PyYAML的使用
    YAML是一个被广泛使用的数据序列化和配置语言,作为一个开发者,总是不免和它打交道。但处理YAML文档,尤其是使用PyYAML的过程总是非常痛苦。这篇文章分享我在Python下使用PyYAML的技巧和代码片段,并介绍几个相关的库。注意:本文中的代码仅保证在Python3下正常工作总是使用 s......
  • Chat GPT是什么,初学者使用Chat GPT,需要注意些什么
    ChatGPT是什么ChatGPT是由OpenAI开发的一种大型语言模型,它基于GPT(GenerativePre-trainedTransformer)架构。GPT是一种基于深度学习的预训练模型,通过在大规模文本数据上进行训练,学习了语言的统计规律和语义信息。ChatGPT专注于对话式交互,它可以接收用户的输入,并生成相应的回复......