import telnetlib, re标签:tn,中兴,host,交换机,telnetlib,interface,output,port,match From: https://www.cnblogs.com/norvell/p/18408391
from multiprocessing import Process
import pandas as pd
# 登录设备
def telnetDevice(host,port,command):
# 创建telnet连接
tn = telnetlib.Telnet(host, port, timeout=5) #paramiko
# 等待设备响应,通常需要一段时间
tn.read_until(b"Username:")
# 输入用户名
tn.write(username.encode('ascii') + b"\n")
tn.read_until(b"Password:")
# 输入密码
tn.write(password.encode('ascii') + b"\n")
# 等待设备登录并执行命令
tn.read_until(b"#")
# 设置不分页显示
# tn.write(b"screen-length 0 temporary\n") # huawei
tn.write(b"terminal length 0 \n") #zte
tn.read_until(b"#")
# 执行命令
tn.write(command.encode('ascii') + b"\n")
# 读取命令执行结果
output = tn.read_until(b"#")
# 打印输出
# print(output.decode('ascii'))
# 关闭连接
tn.close()
return output.decode('ascii')
def showIPInterfaces(host,port,command="show ip int bri"):
output = telnetDevice(host, port, command)
outputs = re.split(r'\n\s*\n', output.strip())[-1].splitlines()
interfaces = []
for line in outputs:
# 提取详细接口信息
interface_pattern = r'(\S+)\s+(\S+)\s+(\S+)\s+(down|up)\s+(down|up)\s+(down|up)\s*'
interface_matches = re.findall(interface_pattern, line)
if interface_matches != []:
match = interface_matches[0]
interfaces.append({
'interface': match[0],
'ip_address': match[1],
'mask': match[2],
'admin_status': match[3],
'phy_status': match[4],
'prot_status': match[5]
})
# print("Interfaces:", interfaces)
df = pd.DataFrame(interfaces)
print(df)
return df
def showInterfaces(host,port,command="show interface bri"):
output = telnetDevice(host, port, command)
outputs = re.split(r'\n\s*\n', output.strip())[-1].splitlines()
interfaces = []
for line in outputs:
# 提取详细接口信息
interface_pattern = r'(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(down|up)\s+(down|up)\s+(down|up)\s+(\S*)'
interface_matches = re.findall(interface_pattern, line)
if interface_matches != []:
match = interface_matches[0]
if len(match) == 8:
desc = match[7]
else:
desc = ""
interfaces.append({
'interface': match[0],
'portattribute': match[1],
'mode': match[2],
'bw(mbps)': match[3],
'admin_status': match[4],
'phy_status': match[5],
'prot_status': match[6],
'description': desc,
})
# print("Interfaces:", interfaces)
df = pd.DataFrame(interfaces)
print(df)
return df
def showIPOspfNeighbor(host,port,command="show ip ospf neighbor"):
output = telnetDevice(host, port, command)
outputs = re.split(r'\n\s*\n', output.strip())[-1].splitlines()
interfaces = []
for line in outputs:
# 提取详细接口信息
interface_pattern = r'(\S+)\s+(\d+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)'
interface_matches = re.findall(interface_pattern, line)
if interface_matches != []:
match = interface_matches[0]
interfaces.append({
'neighborid': match[0],
'pri': match[1],
'state': match[2],
'deadtime': match[3],
'address': match[4],
'interface': match[5]
})
# print("Interfaces:", interfaces)
df = pd.DataFrame(interfaces)
print(df)
return df
def showInterfacedetail(host,port,interface):
output = telnetDevice(host, port, "show interface " + interface + "\n")
# print(output)
msg = {}
msg["host"] = host
msg["interface"] = interface
pattern = r'In_Bytes\s+(\S+)\s+'
match = int(re.findall(pattern, output, re.S)[0])
msg["In_Bytes"] = match
pattern = r'E_Bytes\s+(\S+)\s+'
match = int(re.findall(pattern, output, re.S)[0])
msg["E_Bytes"] = match
print(msg)
def runcommands(host,port,command="show running"):
commands = command.split("\n")
# 创建telnet连接
tn = telnetlib.Telnet(host, port)
# 等待设备响应,通常需要一段时间
tn.read_until(b"Username:")
# 输入用户名
tn.write(username.encode('ascii') + b"\n")
tn.read_until(b"Password:")
# 输入密码
tn.write(password.encode('ascii') + b"\n")
# 等待设备登录并执行命令
tn.read_until(b"#")
# 设置不分页显示
tn.write(b"screen-length 0 temporary\n") # huawei
# tn.write(b"terminal length 0 \n") #zte
tn.read_until(b"#")
for cmd in commands:
# 执行命令
tn.write(cmd.encode('ascii') + b"\n")
# 读取命令执行结果
# 打印输出
# print(output.decode('ascii'))
# 关闭连接
output = tn.read_until(b"#",5)
output = output.decode('ascii')
tn.close()
print(output)
return output
def process_print():
print(time.time())
if __name__ == '__main__':
login_msgs = [{"ip":"129.60.161.169","port":"23"},{"ip":"129.60.161.170","port":"23"}]
# ip_interfaces =pd.DataFrame()
# for login_msg in login_msgs:
# host = login_msg["ip"]
# port = login_msg["port"]
# print(f"host = {host} , port = {port}")
# interfaces = showInterfaces(host, port)
# for interface in interfaces["interface"]:
# showInterfacedetail(host, port, interface)
# ip_interface = showIPOspfNeighbor(host,port)
# ip_interface["host"] = host
# print(ip_interface)
# ip_interfaces = pd.concat([ip_interfaces, ip_interface],ignore_index=True)
# print("-------------------")
# print(ip_interfaces)
processes = []
for login_msg in login_msgs:
host = login_msg["ip"]
port = login_msg["port"]
p = Process(target=showInterfaces(host, port))
p.start()
processes.append(p)
# 等待所有进程完成
for p in processes:
p.join()