def hioadm_shell(self, start_cmd, cmd, wait_str='Press CTRL+C', wait_time=2, record_size=10240): """进入盘内命令 hioadmshell + 控制盘 Args: start_cmd: hioadmshell + device cmd: next cmd Returns: """ std = self.con.exec_command_with_confirm(start_cmd+'\n', next_cmd=cmd, confirm=False, wait_str=wait_str, wait_time=wait_time, record_size=record_size) self.con.invoke.send('\x03\n') return std
def exec_command_with_confirm(self, command,next_cmd="", end_words='\n', confirm=True, wait_str='', wait_time=10, record_size=10240):
"""
SSH执行命令并确认
:param wait_str:
:param confirm:
:param wait_time: 执行命令后,等待时间
:param command: 执行命令
:return: 命令执行结果
"""
self.alive()
log.debug(f'{self.ip} 执行了命令 {command}{end_words}')
self.invoke.send(f'{command}{end_words}')
result = ''
out = ''
try:
if confirm and wait_str:
for i in range(0, wait_time):
result = self.invoke.recv(1024).decode('utf-8',errors='ignore')
print(result)
if 'y/n' in result or 'Y/N' in result:
result = self.exec_command_with_confirm('Y', confirm=False, wait_str=wait_str)
break
else:
sleep(1)
else:
for i in range(0, wait_time):
result += self.invoke.recv(1024).decode('utf-8', errors='ignore')
result = result.replace('\r\r\n', '').replace('\r\n','')
if wait_str in result:
log.info(result)
self.invoke.send('\n')
if next_cmd:
sleep(1)
self.invoke.send(f'{next_cmd}\n')
while True:
sleep(2)
if self.invoke.recv_ready():
out= self.invoke.recv(record_size).decode('utf-8', errors='ignore')
out = out.replace('\r\r\n','').replace('\r\n','')
log.info(out)
self.invoke.send('\n')
if 'CORE0->' in out:
break
return out.replace('CORE0->','')
# log.info('执行结果如下:\n%s\n\n' % result)
break
else:
log.debug('数据未收全,等待1s继续查收')
sleep(1)
# log.info(result)
except Exception as e:
raise Exception(f'命令执行错误{e}')
self.invoke.close()
return result
标签:cmd,invoke,self,串口,result,str,带内,输入,wait From: https://www.cnblogs.com/Tanwheey/p/18515922