1.asyncio的subprocess
asyncio提供了两个开箱即用的协程函数来创建子进程,这些协程函数都返回一个Process实例
1.asyncio.create_subprocess_exec(),用于直接运行命令(如ls、pwd、who、python3、go等)。
2.asyncio.create_subprocess_shell(),用于通过 shell 运行命令。
create_subprocess_exec相比create_subprocess_shell更安全,应用程序有责任确保所有空格和特殊字符都被适当地引用,以避免 shell injection 漏洞。这个 shlex.quote() 函数可用于正确转义将用于构造shell命令的字符串中的空白字符和特殊shell字符。
2.使用create_subprocess_exec
2-1.FileNotFoundError报错
create_subprocess_exec
会把第一个参数当做命令(如bash、python、go这些),后面更命令的参数
2-1-1.报错原因
# 1.源码参数
async def create_subprocess_exec(
program,
*args,
stdin=None,
stdout=None,
stderr=None, loop=None,
limit=streams._DEFAULT_LIMIT, **kwds
)
# 2.调用时入参
asyncio.create_subprocess_exec('ls -l',
stdout=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
# 3.相当于program 指向了 'ls -l'命令
# 4.系统中只有ls、pwd等这些命令,所以'ls -l'这个命令会报错说找不到这个命令文件(FileNotFoundError)
# 5.正确的解法应该是asyncio.create_subprocess_exec('ls', '-l', xxx),把命令和参数分开。
2-1-2.案列
import shlex
import asyncio
async def run_asyncio_subprocess_exec(cmd):
proc = await asyncio.create_subprocess_exec(cmd, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
stdout, stderr = await proc.communicate()
# 获取返回值
print(proc.returncode)
# 输出内容
print(f'stdout: {repr(stdout.decode())}')
# 输出错误信息
print(f'stderr: {repr(stderr.decode())}')
if __name__ == "__main__":
cmd = "ls -l"
asyncio.run(run_asyncio_subprocess_exec(cmd))
FileNotFoundError: [Errno 2] No such file or directory: 'ls -l'
2-2.单个命令
import shlex
import asyncio
async def run_asyncio_subprocess_exec(cmd):
proc = await asyncio.create_subprocess_exec(cmd, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
stdout, stderr = await proc.communicate()
# 获取返回值
print(proc.returncode)
# 输出内容
print(f'stdout: {repr(stdout.decode())}')
# 输出错误信息
print(f'stderr: {repr(stderr.decode())}')
if __name__ == "__main__":
cmd = "ls"
asyncio.run(run_asyncio_subprocess_exec(cmd))
0
stdout: '1.html\nPie.html\n'
stderr: ''
2-3.命令+参数
通过
*shlex.split(cmd)
解决(类似于create_subprocess_exec('ls', '-l')
)
import shlex
import asyncio
async def run_asyncio_subprocess_exec(cmd):
# 解决方法
proc = await asyncio.create_subprocess_exec(*shlex.split(cmd), stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
stdout, stderr = await proc.communicate()
# 获取返回值
print(proc.returncode)
# 输出内容
print(f'stdout: {repr(stdout.decode())}')
# 输出错误信息
print(f'stderr: {repr(stderr.decode())}')
if __name__ == "__main__":
cmd = "ls -l"
asyncio.run(run_asyncio_subprocess_exec(cmd))
0
stdout: 'total 30272\n-rw-r--r--@ 1 lxd670 staff 3704145 7 25 2022 1.html\n-rw-r--r-- 1 lxd670 staff 3690539 7 26 2022 Pie.html\n'
stderr: ''
2-4.wait和communicate
2-4-1.使用wait获取内容
等待子进程终结。设置并返回
returncode
属性。当使用
stdout=PIPE
或stderr=PIPE
并且子进程产生了足以阻塞 OS 管道缓冲区等待接收更多的数据的输出时,此方法会发生死锁。 当使用管道时请使用communicate()
方法来避免这种情况。
import shlex
import asyncio
async def run_asyncio_subprocess_exec(cmd):
proc = await asyncio.create_subprocess_exec(cmd, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
# 获取输出内容
stdout = await proc.stdout.read()
stderr = await proc.stderr.read()
# 等待子进程终结。设置并返回returncode属性。
returncode = await proc.wait()
print(returncode)
print(stdout)
print(stderr)
if __name__ == "__main__":
cmd = "ls"
asyncio.run(run_asyncio_subprocess_exec(cmd))
0
b'1.html\nPie.html\n'
b''
2-4-2.使用communicate获取内容
与进程交互:
- 发送数据到 stdin (如果 input 不为
None
);- 从 stdout 和 stderr 读取数据,直至到达 EOF;
- 等待进程终结。
可选的 input 参数为将被发送到子进程的数据 (
bytes
对象)。返回一个元组
(stdout_data, stderr_data)
。
import shlex
import asyncio
async def run_asyncio_subprocess_exec(cmd):
proc = await asyncio.create_subprocess_exec(cmd, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
# 使用communicate就可以获取输出内容
stdout, stderr = await proc.communicate()
# 获取返回值
print(proc.returncode)
# 输出内容
print(f'stdout: {repr(stdout.decode())}')
# 输出错误信息
print(f'stderr: {repr(stderr.decode())}')
if __name__ == "__main__":
cmd = "ls"
asyncio.run(run_asyncio_subprocess_exec(cmd))
0
stdout: '1.html\nPie.html\n'
stderr: ''
命令执行失败
import shlex
import asyncio
async def run_asyncio_subprocess_exec(cmd):
proc = await asyncio.create_subprocess_exec(*shlex.split(cmd), stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
stdout, stderr = await proc.communicate()
# 获取返回值
print(proc.returncode)
# 输出内容
print(f'stdout: {repr(stdout.decode())}')
# 输出错误信息
print(f'stderr: {repr(stderr.decode())}')
if __name__ == "__main__":
cmd = "ls -l /asdad"
asyncio.run(run_asyncio_subprocess_exec(cmd))
1
stdout: ''
stderr: 'ls: /asdad: No such file or directory\n'
3.使用create_subprocess_shell
3-1.初步使用
使用
communicate
来获取信息
import shlex
import asyncio
async def run_asyncio_subprocess_shell(cmd):
proc = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
# 使用communicate接收返回值
stdout, stderr = await proc.communicate()
# 获取返回值状态
print(proc.returncode)
# 输出内容
print(f'stdout: {repr(stdout.decode())}')
# 输出错误信息
print(f'stderr: {repr(stderr.decode())}')
if __name__ == "__main__":
cmd = "cd $HOME && pwd"
asyncio.run(run_asyncio_subprocess_shell(cmd))
0
stdout: '/Users/lxd670\n'
stderr: ''
3-2.命令执行失败
import shlex
import asyncio
async def run_asyncio_subprocess_shell(cmd):
proc = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
# 使用communicate接收返回值
stdout, stderr = await proc.communicate(input="yes\nno".encode('utf-8'))
# 获取返回值
print(proc.returncode)
# 输出内容
print(f'stdout: {repr(stdout.decode())}')
# 输出错误信息
print(f'stderr: {repr(stderr.decode())}')
if __name__ == "__main__":
cmd = "cd /Users/lxd670/ccc && lsx"
asyncio.run(run_asyncio_subprocess_shell(cmd))
127
stdout: ''
stderr: '/bin/sh: lsx: command not found\n'
4.执行操作异步
在async中使用
subprocess.Popen
执行会是同步效果
4-1.被操作的文件test_cmd_1.py
import time
for i in range(2):
res = input(f"请输入内容[{i}]:\n")
print(f"你选择了[{i}]: {res}")
time.sleep(1)
print("完成选择")
4-2.使用create_subprocess_shell
import time
import shlex
import asyncio
from functools import wraps
def totle_time(func):
@wraps(func)
async def wrapper(*args, **kwargs):
start = time.time()
res = await func(*args, **kwargs)
print(f'RUN TIME: {time.time() - start}')
return res
return wrapper
async def run_asyncio_subprocess_shell(cmd):
proc = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
stdout, stderr = await proc.communicate(input="yes\nno".encode('utf-8'))
print(proc.returncode)
print(f'stdout: {repr(stdout.decode())}')
print(f'stderr: {repr(stderr.decode())}')
@totle_time
async def run():
cmd = "python3 test_cmd_1.py"
task = []
# 循环10个
for i in range(10):
t = asyncio.create_task(run_asyncio_subprocess_shell(cmd=cmd))
task.append(t)
await asyncio.gather(*task)
if __name__ == "__main__":
asyncio.run(run())
0
stdout: '请输入内容[0]:\n你选择了[0]: yes\n请输入内容[1]:\n你选择了[1]: no\n完成选择\n'
stderr: ''
...
...
RUN TIME: 2.062711000442505
4-3.使用create_subprocess_exec
import time
import shlex
import asyncio
from functools import wraps
def totle_time(func):
@wraps(func)
async def wrapper(*args, **kwargs):
start = time.time()
res = await func(*args, **kwargs)
print(f'RUN TIME: {time.time() - start}')
return res
return wrapper
async def run_asyncio_subprocess_exec(cmd):
proc = await asyncio.create_subprocess_exec(*shlex.split(cmd), stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
stdout, stderr = await proc.communicate(input="yes\nno".encode('utf-8'))
print(proc.returncode)
print(f'stdout: {repr(stdout.decode())}')
print(f'stderr: {repr(stderr.decode())}')
@totle_time
async def run():
cmd = "python3 test_cmd_1.py"
task = []
# 循环10个
for i in range(10):
t = asyncio.create_task(run_asyncio_subprocess_exec(cmd=cmd))
task.append(t)
await asyncio.gather(*task)
if __name__ == "__main__":
asyncio.run(run())
0
stdout: '请输入内容[0]:\n你选择了[0]: yes\n请输入内容[1]:\n你选择了[1]: no\n完成选择\n'
stderr: ''
...
...
RUN TIME: 2.0569019317626953
4-4.使用subprocess(同步)
如果参数
shell
设为true,程序将通过shell来执行,且args输入应当是字符串形式,同shell窗口执行的命令 如果不设置,默认为false,则输入的args应当是字符串列表
import time
import shlex
import asyncio
import subprocess
from functools import wraps
def totle_time(func):
@wraps(func)
async def wrapper(*args, **kwargs):
start = time.time()
res = await func(*args, **kwargs)
print(f'RUN TIME: {time.time() - start}')
return res
return wrapper
async def run_subprocess_shell(cmd):
p = subprocess.Popen(cmd, text=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
stdout, stderr = p.communicate(input="yes\nno")
print(p.returncode)
print(repr(stdout))
print(repr(stderr))
@totle_time
async def run():
cmd = "python3 test_cmd_1.py"
task = []
for i in range(2):
t = asyncio.create_task(run_subprocess_shell(cmd=cmd))
task.append(t)
await asyncio.gather(*task)
if __name__ == "__main__":
asyncio.run(run())
0
'请输入内容[0]:\n你选择了[0]: yes\n请输入内容[1]:\n你选择了[1]: no\n完成选择\n'
''
0
'请输入内容[0]:\n你选择了[0]: yes\n请输入内容[1]:\n你选择了[1]: no\n完成选择\n'
''
RUN TIME: 4.050457000732422
5.使用同步锁
5-1.使用create_subprocess_shell
import time
import shlex
import asyncio
from functools import wraps
def totle_time(func):
@wraps(func)
async def wrapper(*args, **kwargs):
start = time.time()
res = await func(*args, **kwargs)
print(f'RUN TIME: {time.time() - start}')
return res
return wrapper
async def run_asyncio_subprocess_shell(cmd, lock):
async with lock:
proc = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
stdout, stderr = await proc.communicate(input="yes\nno".encode('utf-8'))
print(proc.returncode)
print(f'stdout: {repr(stdout.decode())}')
print(f'stderr: {repr(stderr.decode())}')
@totle_time
async def run():
# 使用lock锁
lock = asyncio.Lock()
cmd = "python3 test_cmd_1.py"
task = []
for i in range(2):
t = asyncio.create_task(run_asyncio_subprocess_shell(cmd=cmd, lock=lock))
task.append(t)
await asyncio.gather(*task)
if __name__ == "__main__":
asyncio.run(run())
0
stdout: '请输入内容[0]:\n你选择了[0]: yes\n请输入内容[1]:\n你选择了[1]: no\n完成选择\n'
stderr: ''
0
stdout: '请输入内容[0]:\n你选择了[0]: yes\n请输入内容[1]:\n你选择了[1]: no\n完成选择\n'
stderr: ''
RUN TIME: 4.0933310985565186
5-2.使用run_asyncio_subprocess_exec
import time
import shlex
import asyncio
from functools import wraps
def totle_time(func):
@wraps(func)
async def wrapper(*args, **kwargs):
start = time.time()
res = await func(*args, **kwargs)
print(f'RUN TIME: {time.time() - start}')
return res
return wrapper
async def run_asyncio_subprocess_exec(cmd, lock):
async with lock:
proc = await asyncio.create_subprocess_exec(*shlex.split(cmd), stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
stdout, stderr = await proc.communicate(input="yes\nno".encode('utf-8'))
print(proc.returncode)
print(f'stdout: {repr(stdout.decode())}')
print(f'stderr: {repr(stderr.decode())}')
@totle_time
async def run():
# 使用Lock锁
lock = asyncio.Lock()
cmd = "python3 test_cmd_1.py"
task = []
for i in range(2):
t = asyncio.create_task(run_asyncio_subprocess_exec(cmd=cmd, lock=lock))
task.append(t)
await asyncio.gather(*task)
if __name__ == "__main__":
asyncio.run(run())
0
stdout: '请输入内容[0]:\n你选择了[0]: yes\n请输入内容[1]:\n你选择了[1]: no\n完成选择\n'
stderr: ''
0
stdout: '请输入内容[0]:\n你选择了[0]: yes\n请输入内容[1]:\n你选择了[1]: no\n完成选择\n'
stderr: ''
RUN TIME: 4.0468339920043945
标签:__,stdout,cmd,subprocess,stderr,使用,asyncio
From: https://www.cnblogs.com/lxd670/p/17603616.html